In [2]:
import gymnasium as gym

env = gym.make("CarRacing-v3", render_mode="rgb_array", continuous=False)

# i have the environment, i can take actions in that environment
# now i need a policy i.e. a way of behaving in this environment
# given a state, i want to know the next action to take in the environment

# env.reset()
# while True:
#     # replace the random action sample with a policy
#     action = env.action_space.sample()
#     env.step(action)

In [4]:
env.reset()
action = env.action_space.sample()
obs, reward, terminated, truncated, info = env.step(action)

In [9]:
obs.shape, reward, terminated, truncated, info

((96, 96, 3), 7.042857142857144, False, False, {})

In [1]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import random
import cv2  # You might need: pip install opencv-python
from collections import deque
import math

# --- Custom Wrappers for Stability and Speed ---

class RepeatActionAndMaxFrame(gym.Wrapper):
    """
    Action Skipping: The agent acts every 'skip' frames.
    We return the max intensity of the last two frames to deal with 
    rendering flickering (common in Atari/Gym games).
    """
    def __init__(self, env=None, skip=4):
        super(RepeatActionAndMaxFrame, self).__init__(env)
        self._skip = skip

    def step(self, action):
        total_reward = 0.0
        done = False
        truncated = False
        
        for _ in range(self._skip):
            obs, reward, terminated, truncated, info = self.env.step(action)
            total_reward += reward
            if terminated or truncated:
                done = True
                break
        
        return obs, total_reward, terminated, truncated, info

class PreprocessFrame(gym.ObservationWrapper):
    """
    1. Cut out the bottom status bar (irrelevant text/numbers).
    2. Convert to Grayscale (color doesn't matter for the track, saves memory).
    3. Resize to 84x84 (standard DQN input size).
    4. Normalize 0-1.
    """
    def __init__(self, env=None):
        super(PreprocessFrame, self).__init__(env)
        self.observation_space = gym.spaces.Box(low=0, high=1, shape=(84, 84), dtype=np.float32)

    def observation(self, obs):
        # Crop: Remove bottom status bar (96x96 -> keeping top 84 rows)
        # CarRacing-v3 is 96x96. We crop to remove the bar at the bottom.
        # Actually, let's just resize the whole thing to 84x84 directly 
        # but grayscale it first.
        
        # RGB -> Gray using standard weights
        # obs shape: (H, W, 3)
        gray = cv2.cvtColor(obs, cv2.COLOR_RGB2GRAY)
        
        # Resize to 84x84
        resized = cv2.resize(gray, (84, 84), interpolation=cv2.INTER_AREA)
        
        # Normalize to 0-1
        norm = resized / 255.0
        
        return norm

class StackFrames(gym.Wrapper):
    """
    Stack k last frames. Returns a (k, 84, 84) array.
    This gives the agent a sense of VELOCITY.
    """
    def __init__(self, env, stack_size=4):
        super(StackFrames, self).__init__(env)
        self.stack_size = stack_size
        self.frames = deque(maxlen=stack_size)
        
        # Update observation space
        old_space = env.observation_space
        self.observation_space = gym.spaces.Box(
            low=0, high=1, 
            shape=(stack_size, old_space.shape[0], old_space.shape[1]), 
            dtype=np.float32
        )

    def reset(self, **kwargs):
        obs, info = self.env.reset(**kwargs)
        for _ in range(self.stack_size):
            self.frames.append(obs)
        return self._get_obs(), info

    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)
        self.frames.append(obs)
        return self._get_obs(), reward, terminated, truncated, info

    def _get_obs(self):
        return np.array(self.frames)

def make_env():
    # Helper to combine all wrappers
    env = gym.make("CarRacing-v3", render_mode="rgb_array", continuous=False)
    env = RepeatActionAndMaxFrame(env, skip=4)
    env = PreprocessFrame(env)
    env = StackFrames(env, stack_size=4)
    return env

In [10]:
class QNetwork(nn.Module):
    def __init__(self, action_size, input_channels=4):
        super(QNetwork, self).__init__()
        # Input shape: (Batch, 4, 84, 84)
        
        self.conv1 = nn.Conv2d(input_channels, 32, kernel_size=8, stride=4)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
        
        # 84x84 image -> Conv1 -> 20x20 -> Conv2 -> 9x9 -> Conv3 -> 7x7
        # 64 filters * 7 * 7 = 3136
        self.fc1 = nn.Linear(3136, 512)
        self.fc2 = nn.Linear(512, action_size)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        return self.fc2(x)

In [None]:
# --- Hyperparameters ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")

BATCH_SIZE = 128         # Higher batch size for stable gradients
GAMMA = 0.99             # Discount future rewards
EPS_START = 1.0
EPS_END = 0.1            # Don't go to 0.05 too fast, keep exploring a bit
EPS_DECAY = 100000       # Slower decay! This game takes time to learn.
LR = 1e-4
TARGET_UPDATE = 1000     # Hard update every 1000 steps
MEMORY_SIZE = 50000      
NUM_EPISODES = 1000      # 600-1000 episodes usually gets decent results

# Replay Buffer (Same as before but handles numpy stacks)
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = zip(*batch)
        return np.array(state), np.array(action), np.array(reward), np.array(next_state), np.array(done)
    def __len__(self):
        return len(self.buffer)

# --- Main Setup ---
env = make_env()
n_actions = env.action_space.n

policy_net = QNetwork(n_actions).to(device)
target_net = QNetwork(n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=LR)
memory = ReplayBuffer(MEMORY_SIZE)

steps_done = 0

print("Training Started (Press Ctrl+C to stop and save)...")

try:
    for i_episode in range(NUM_EPISODES):
        state, _ = env.reset()
        score = 0
        
        # CarRacing has a negative reward per frame (-0.1).
        # We need to encourage it to find the gas pedal early on.
        
        while True:
            # 1. Epsilon Greedy Action
            eps_threshold = EPS_END + (EPS_START - EPS_END) * \
                math.exp(-1. * steps_done / EPS_DECAY)
            
            if random.random() < eps_threshold:
                action = env.action_space.sample()
            else:
                with torch.no_grad():
                    state_t = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
                    action = policy_net(state_t).max(1)[1].item()
            
            steps_done += 1

            # 2. Step
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            
            # 3. Store
            memory.push(state, action, reward, next_state, done)
            
            state = next_state
            score += reward

            # 4. Optimize
            if len(memory) > BATCH_SIZE:
                states, actions, rewards, next_states, dones = memory.sample(BATCH_SIZE)

                state_batch = torch.tensor(states, dtype=torch.float32, device=device)
                action_batch = torch.tensor(actions, dtype=torch.long, device=device).unsqueeze(1)
                reward_batch = torch.tensor(rewards, dtype=torch.float32, device=device).unsqueeze(1)
                next_state_batch = torch.tensor(next_states, dtype=torch.float32, device=device)
                done_batch = torch.tensor(dones, dtype=torch.float32, device=device).unsqueeze(1)

                q_values = policy_net(state_batch).gather(1, action_batch)
                
                # Double DQN logic often helps, but standard DQN is fine here.
                # Standard Target Calculation:
                with torch.no_grad():
                    next_q_values = target_net(next_state_batch).max(1)[0].unsqueeze(1)
                    expected_q_values = reward_batch + (GAMMA * next_q_values * (1 - done_batch))

                loss = F.smooth_l1_loss(q_values, expected_q_values)

                optimizer.zero_grad()
                loss.backward()
                # Gradient clipping prevents "exploding gradients" which ruin training
                for param in policy_net.parameters():
                    param.grad.data.clamp_(-1, 1)
                optimizer.step()

            if steps_done % TARGET_UPDATE == 0:
                target_net.load_state_dict(policy_net.state_dict())

            if done:
                break
        
        # Logging
        print(f"Episode {i_episode} | Score: {score:.2f} | Epsilon: {eps_threshold:.3f}")
        
        # Save periodically
        if i_episode % 50 == 0:
            torch.save(policy_net.state_dict(), "car_racing_dqn.pth")

except KeyboardInterrupt:
    print("Training interrupted. Saving current model...")
    torch.save(policy_net.state_dict(), "car_racing_dqn_interrupted.pth")

print("Training Finished.")
env.close()

Device: cuda
Training Started (Press Ctrl+C to stop and save)...
Episode 0 | Score: -35.71 | Epsilon: 0.998
Episode 1 | Score: -15.79 | Epsilon: 0.996
Episode 2 | Score: -34.66 | Epsilon: 0.993
Episode 3 | Score: -25.20 | Epsilon: 0.991
Episode 4 | Score: -22.39 | Epsilon: 0.989
Episode 5 | Score: -21.15 | Epsilon: 0.987
Episode 6 | Score: -21.15 | Epsilon: 0.984
Episode 7 | Score: -70.78 | Epsilon: 0.982
Episode 8 | Score: -13.49 | Epsilon: 0.980
Episode 9 | Score: -47.19 | Epsilon: 0.978
Episode 10 | Score: -33.59 | Epsilon: 0.976
Episode 11 | Score: -37.98 | Epsilon: 0.973
Episode 12 | Score: -75.31 | Epsilon: 0.971
Episode 13 | Score: -28.57 | Epsilon: 0.969
Episode 14 | Score: -62.96 | Epsilon: 0.967
Episode 15 | Score: -60.63 | Epsilon: 0.965
Episode 16 | Score: -42.86 | Epsilon: 0.963
Episode 17 | Score: -48.72 | Epsilon: 0.960
Episode 18 | Score: -53.07 | Epsilon: 0.958
Episode 19 | Score: -35.15 | Epsilon: 0.956
Episode 20 | Score: -57.75 | Epsilon: 0.954
Episode 21 | Score: -

In [None]:
# --- INFERENCE ---
def watch_agent():
    # 1. Setup exact same environment wrappers
    env = gym.make("CarRacing-v3", render_mode="rgb_array", continuous=False)
    env = RepeatActionAndMaxFrame(env, skip=4)
    env = PreprocessFrame(env)
    env = StackFrames(env, stack_size=4)
    
    # 2. Load Model
    model = QNetwork(env.action_space.n).to(device)
    model.load_state_dict(torch.load("car_racing_dqn.pth", map_location=device))
    model.eval()
    
    # 3. Play
    state, _ = env.reset()
    total_reward = 0
    
    print("Watching Agent...")
    while True:
        state_t = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
        with torch.no_grad():
            action = model(state_t).max(1)[1].item()
            
        state, reward, terminated, truncated, _ = env.step(action)
        total_reward += reward
        
        # Render is handled by gym window usually
        # If running in Colab/Headless, you need specific video recording wrappers
        
        if terminated or truncated:
            print(f"Game Over. Total Reward: {total_reward}")
            break
    env.close()

# watch_agent() # Uncomment to run