## 1 · Setup & Imports  
Installs Gymnasium 0.29, PyGame, Stable‑Baselines3 (+ QRDQN) and TensorBoard,  
then imports all libraries and defines global screen constants.

In [1]:
!pip install gymnasium==0.29.1 pygame stable-baselines3[extra] sb3-contrib tensorboard --quiet

import gymnasium as gym, numpy as np, random, pygame, cv2, torch, time, math
from gymnasium import spaces
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.callbacks import BaseCallback
from sb3_contrib import QRDQN
from torch import nn
pygame.init()

SCREEN_W, SCREEN_H = 288, 512
BASEY              = int(SCREEN_H*0.79)

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m953.9/953.9 kB[0m [31m50.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m92.8/92.8 kB[0m [31m9.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m184.5/184.5 kB[0m [31m15.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m106.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.6/24.6 MB[0m [31m85.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m883.7/883.7 kB[0m [31m44.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m664.8/664.8 MB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━

## 2 · Environment Definition  
* `FlappyCore` implements Flappy‑Bird physics and reward logic  
* `FlappyPixelEnv` wraps it as a Gymnasium Env, renders 84 × 84 grayscale  
  frames, and stacks 4 frames for temporal context (+0.2 living bonus).

In [2]:
def new_pipe(gap=100):
    gap_y = random.randint(120, 280)
    x = SCREEN_W + 10
    return [{'x': x, 'y': gap_y-320}, {'x': x, 'y': gap_y+gap}]

class FlappyCore:
    def __init__(self, gap=100): self.gap=gap; self.reset()
    def reset(self):
        self.bird_y, self.vel_y = SCREEN_H//2, -9
        self.pipes = list(new_pipe(self.gap)); self.frames=0
    def step(self, action):
        if action and self.bird_y>-24: self.vel_y = -9
        self.vel_y = min(self.vel_y+1, 10); self.bird_y += self.vel_y
        prev = self.pipes[0]['x']+52
        for p in self.pipes: p['x'] -= 4
        if self.pipes[-1]['x'] < SCREEN_W-150: self.pipes.extend(new_pipe(self.gap))
        if self.pipes[0]['x'] < -52: self.pipes=self.pipes[2:]
        curr = self.pipes[0]['x']+52; up, lo = self.pipes[0], self.pipes[1]
        term = self.bird_y+24>=BASEY or (up['x']<60<up['x']+52 and not(up['y']+320<self.bird_y<lo['y']-24))
        reward = -1. if term else 0.
        if not term and prev>=60>curr: reward = 1.
        self.frames += 1
        return reward, term

class FlappyPixelEnv(gym.Env):
    metadata = {}
    def __init__(self, frame_stack=4, gap=100):
        self.core = FlappyCore(gap); self.fs=frame_stack
        self.stack = np.zeros((frame_stack, 84, 84), np.uint8)
        self.canvas = pygame.Surface((SCREEN_W,SCREEN_H))
        self.action_space = spaces.Discrete(2)
        self.observation_space = spaces.Box(0,255,(frame_stack,84,84),np.uint8)
    def _render84(self):
        self.canvas.fill((135,206,250))
        for i in range(0,len(self.core.pipes),2):
            up, lo = self.core.pipes[i], self.core.pipes[i+1]
            pygame.draw.rect(self.canvas,(0,255,0),pygame.Rect(up['x'],up['y'],52,320))
            pygame.draw.rect(self.canvas,(0,255,0),pygame.Rect(lo['x'],lo['y'],52,320))
        pygame.draw.rect(self.canvas,(255,255,0),pygame.Rect(60,self.core.bird_y,34,24))
        img = pygame.surfarray.array3d(self.canvas).swapaxes(0,1)
        gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
        return cv2.resize(gray,(84,84),cv2.INTER_AREA)
    def reset(self, *, seed=None, options=None):
        super().reset(seed=seed); self.core.reset()
        f = self._render84(); self.stack[:] = f
        return self.stack.copy(), {}
    def step(self, action):
        r, term = self.core.step(int(action))
        f = self._render84(); self.stack = np.roll(self.stack,-1,0); self.stack[-1]=f
        r += 0.2                     # living bonus
        return self.stack.copy(), r, term, False, {}

## 3 · CNN Feature Extractors  
* **AtariExtractor** – classic 3‑conv Atari architecture (baseline)  
* **InceptionExtractor** – replaces middle layers with a lightweight  
  Inception‑A block for multi‑scale feature learning.

In [3]:
import torch
from torch import nn
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor

class AtariExtractor(BaseFeaturesExtractor):
    def __init__(self, obs_space, features_dim=512):
        super().__init__(obs_space, features_dim)
        n_ch = obs_space.shape[0]
        self.conv = nn.Sequential(
            nn.Conv2d(n_ch,32,8,4), nn.ReLU(),
            nn.Conv2d(32,64,4,2), nn.ReLU(),
            nn.Conv2d(64,64,3,1), nn.ReLU(),
            nn.Flatten())
        with torch.no_grad():
            n_flat = self.conv(torch.as_tensor(obs_space.sample()[None]).float()).shape[1]
        self.fc = nn.Sequential(nn.Linear(n_flat, features_dim), nn.ReLU())
        self._features_dim = features_dim
    def forward(self,x): return self.fc(self.conv(x/255.0))

class InceptionA(nn.Module):
    def __init__(self,in_c,out_c=96):
        super().__init__()
        k = out_c//3
        self.b1 = nn.Conv2d(in_c,k,1)
        self.b3 = nn.Sequential(nn.Conv2d(in_c,k,1),nn.ReLU(),
                                nn.Conv2d(k,k,3,1,1))
        self.b5 = nn.Sequential(nn.Conv2d(in_c,k,1),nn.ReLU(),
                                nn.Conv2d(k,k,5,1,2))
    def forward(self,x): return torch.cat((self.b1(x),self.b3(x),self.b5(x)),1)

class InceptionExtractor(BaseFeaturesExtractor):
    def __init__(self, obs_space, features_dim=512):
        super().__init__(obs_space, features_dim)
        n_ch = obs_space.shape[0]
        self.conv = nn.Sequential(
            nn.Conv2d(n_ch,32,8,4), nn.ReLU(),
            InceptionA(32,96), nn.ReLU(),
            nn.Conv2d(96,64,3,1), nn.ReLU(),
            nn.Flatten())
        with torch.no_grad():
            n_flat = self.conv(torch.as_tensor(obs_space.sample()[None]).float()).shape[1]
        self.fc = nn.Sequential(nn.Linear(n_flat, features_dim), nn.ReLU())
        self._features_dim = features_dim
    def forward(self,x): return self.fc(self.conv(x/255.0))

## 4 · Training Callback  
`FirstPipeCB` logs the simulator step at which the first +1 pipe‑pass reward  
is observed, giving a sample‑efficiency metric.

In [4]:
from stable_baselines3.common.logger import KVWriter

class FirstPipeCB(BaseCallback):
    def __init__(self): super().__init__(); self.hit=False
    def _on_step(self):
        r = self.locals["rewards"]
        if not self.hit and (r > 0.9).any():
            self.logger.record("custom/time_first_pipe", self.num_timesteps)
            self.hit=True
        return True

## 5 · Training Helper  
`train_agent()` builds a QR‑DQN with the chosen extractor, trains for 2 M  
steps, and returns the fitted model (TensorBoard logs under `tb/`).

In [5]:
def train_agent(extractor_cls, label, total_steps=2_000_000, gap=100):
    env = Monitor(FlappyPixelEnv(gap=gap))
    policy_kwargs = dict(
        features_extractor_class = extractor_cls,
        features_extractor_kwargs = dict(features_dim=512)
    )
    model = QRDQN(
        "CnnPolicy",
        env,
        policy_kwargs       = policy_kwargs,
        learning_rate       = 5e-4,
        buffer_size         = 50_000,
        learning_starts     = 5_000,
        batch_size          = 32,
        target_update_interval = 1_000,
        exploration_fraction   = 0.15,
        exploration_final_eps  = 0.10,
        gamma               = 0.99,
        verbose             = 1,
        tensorboard_log     = f"tb/{label}"
    )
    model.learn(total_timesteps=total_steps,
                callback=FirstPipeCB(),
                progress_bar=True)
    return model

## 6 · Baseline – Atari CNN  
Trains the baseline QR‑DQN for 2 M steps and prints average / best pipes  
over 50 evaluation episodes.

In [17]:
baseline   = train_agent(AtariExtractor,     "baseline")
metrics_b  = evaluate(baseline)
print("Baseline:", metrics_b)

Using cuda device
Wrapping the env in a DummyVecEnv.
Logging to tb/baseline/QRDQN_1


Output()

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
| train/              |          |
|    learning_rate    | 0.0005   |
|    loss             | 35.6     |
|    n_updates        | 465206   |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 102      |
|    ep_rew_mean      | 20.3     |
|    exploration_rate | 0.1      |
| time/               |          |
|    episodes         | 25172    |
|    fps              | 168      |
|    time_elapsed     | 11060    |
|    total_timesteps  | 1866162  |
| train/              |          |
|    learning_rate    | 0.0005   |
|    loss             | 33       |
|    n_updates        | 465290   |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 101      |
|    ep_rew_mean      | 20.1     |
|    exploration_rate | 0.1      |
| time/               |          |
|    episodes         | 2

Baseline: {'avg_pipes': np.float64(5.62), 'best': np.int64(22), 'avg_len': np.float64(298.74)}


## 7 · Proposed – Inception CNN  
Repeats the same training budget with the Inception extractor and prints the  
same evaluation metrics for direct comparison.

In [None]:
inception  = train_agent(InceptionExtractor, "inception")gvy
metrics_i  = evaluate(inception)
print("Inception:", metrics_i)

## 8 · Save Best Model  
Stores the trained Inception agent (`inception_qrdqn.zip`) for later reuse /  
demo without retraining.


In [10]:
inception.save("inception_qrdqn.zip")
print("Model written to inception_qrdqn.zip")

Model written to inception_qrdqn.zip


## 9 · Reload & Final Metrics  
Reloads the saved ZIP, runs a fresh 50‑episode evaluation (no living bonus),  
and reports average pipes, best episode, and average survival frames.

In [11]:
from sb3_contrib import QRDQN
inception = QRDQN.load("inception_qrdqn.zip", device="cuda" if torch.cuda.is_available() else "cpu")
print("Inception agent loaded")

import numpy as np, gymnasium as gym

def evaluate(agent, episodes=50, gap=100):
    env = FlappyPixelEnv(gap=gap)
    scores, lengths = [], []
    for _ in range(episodes):
        obs, _ = env.reset()
        pipes = frames = 0
        done = False
        while not done:
            act, _ = agent.predict(obs, deterministic=True)
            obs, r, done, _, _ = env.step(act)
            if r > 0.9:
                pipes += 1
            frames += 1
        scores.append(pipes); lengths.append(frames)
    return {
        "avg_pipes": float(np.mean(scores)),
        "best":      int(np.max(scores)),
        "avg_len":   float(np.mean(lengths))
    }

metrics_inc = evaluate(inception)
print(f"Inception CNN – avg pipes: {metrics_inc['avg_pipes']:.2f}  |  "
      f"best: {metrics_inc['best']}  |  avg frames: {metrics_inc['avg_len']:.0f}")

Inception agent loaded
Inception CNN – avg pipes: 13.18  |  best: 61  |  avg frames: 606


In [14]:
from sb3_contrib import QRDQN
model = QRDQN.load("inception_qrdqn.zip", device="cpu")

import imageio.v3 as iio, numpy as np
env = FlappyPixelEnv()
obs,_ = env.reset()
frames = []
done = False; pipes = 0
while not done:
    act,_ = model.predict(obs, deterministic=True)
    obs,r,done,_,_ = env.step(act)
    if r > 0.9: pipes += 1
    rgb = np.repeat(obs[-1][...,None], 3, axis=2)
    frames.append(rgb.astype(np.uint8))

print(f"Episode finished – pipes cleared: {pipes}")

iio.imwrite("play_demo.mp4", np.stack(frames), fps=15, codec="libx264")
from IPython.display import Video, display
display(Video("play_demo.mp4", embed=True))



Episode finished – pipes cleared: 21
