In [None]:


# %% [code]
# Install required libraries (if not already installed)
# Uncomment and run the following commands if you need to install the libraries:
# !pip install opencv-python gym jupyter numpy matplotlib kaleido pandas plotly pyyaml requests seaborn scikit-learn torch imageio

# %% [code]
import cv2
import gym
import numpy as np
import random
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque
import imageio

# For reproducibility
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

# %% [code]
# Setup virtual display for environments that require rendering (e.g., on Google Colab)
try:
    from pyvirtualdisplay import Display
    display = Display(visible=0, size=(400, 300))
    display.start()
except ImportError:
    pass

# %% [code]
# Helper function to process the raw RGB image from the environment.
def process_image(image):
    """
    Convert an RGB image to grayscale and resize it to (160, 240).
    """
    if len(image.shape) == 3:
        image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    image = cv2.resize(image, (240, 160))
    # Normalize pixel values to [0, 1]
    image = image.astype(np.float32) / 255.0
    return image

# %% [code]
# Replay Memory to store experiences for training.
class ReplayMemory:
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        batch = random.sample(self.memory, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (np.stack(states),
                np.array(actions),
                np.array(rewards, dtype=np.float32),
                np.stack(next_states),
                np.array(dones, dtype=np.uint8))
    
    def __len__(self):
        return len(self.memory)

# %% [code]
# Define the CNN-based Q-Network using PyTorch.
class QNetwork(nn.Module):
    def __init__(self, input_channels, num_actions):
        super(QNetwork, self).__init__()
        # The input is a stack of 4 processed frames
        self.conv1 = nn.Conv2d(input_channels, 64, kernel_size=5, stride=3)  # output: (64, ? , ?)
        self.conv2 = nn.Conv2d(64, 64, kernel_size=4, stride=2)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
        
        # Compute the size of the conv output so we can define the first fully connected layer.
        # Create a dummy input (batch size=1, channels=4, height=160, width=240)
        dummy_input = torch.zeros(1, input_channels, 160, 240)
        conv_out = self._get_conv_out(dummy_input)
        
        self.fc1 = nn.Linear(conv_out, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, 64)
        self.out = nn.Linear(64, num_actions)
    
    def _get_conv_out(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        return int(np.prod(x.size()[1:]))
    
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = x.view(x.size(0), -1)  # flatten
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        return self.out(x)

# %% [code]
# Define the DQN Agent using the QNetwork defined above.
class Agent_DQN:
    def __init__(self, env, memory_capacity=10000, batch_size=32, gamma=0.99, lr=1e-4, target_update=1000):
        self.env = env
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.num_actions = env.action_space.n
        self.batch_size = batch_size
        self.gamma = gamma
        self.target_update = target_update
        
        self.policy_net = QNetwork(input_channels=4, num_actions=self.num_actions).to(self.device)
        self.target_net = QNetwork(input_channels=4, num_actions=self.num_actions).to(self.device)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval()
        
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=lr)
        self.memory = ReplayMemory(memory_capacity)
        self.steps_done = 0
        self.epsilon_start = 1.0
        self.epsilon_end = 0.05
        self.epsilon_decay = 5000  # decay rate for epsilon
    
    def select_action(self, state):
        # Epsilon-greedy action selection
        eps_threshold = self.epsilon_end + (self.epsilon_start - self.epsilon_end) * \
                        np.exp(-1. * self.steps_done / self.epsilon_decay)
        self.steps_done += 1
        if random.random() < eps_threshold:
            return self.env.action_space.sample()
        else:
            state_tensor = torch.from_numpy(state).unsqueeze(0).to(self.device)  # shape: (1,4,160,240)
            with torch.no_grad():
                q_values = self.policy_net(state_tensor)
                return q_values.argmax().item()
    
    def optimize_model(self):
        if len(self.memory) < self.batch_size:
            return
        
        states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size)
        
        # Convert to torch tensors
        states = torch.from_numpy(states).to(self.device)         # shape: (B,4,160,240)
        actions = torch.from_numpy(actions).unsqueeze(1).to(self.device)  # shape: (B,1)
        rewards = torch.from_numpy(rewards).unsqueeze(1).to(self.device)  # shape: (B,1)
        next_states = torch.from_numpy(next_states).to(self.device)   # shape: (B,4,160,240)
        dones = torch.from_numpy(dones).unsqueeze(1).to(self.device)  # shape: (B,1)
        
        # Current Q values
        state_action_values = self.policy_net(states).gather(1, actions)
        
        # Next state Q values from target network
        with torch.no_grad():
            next_state_values = self.target_net(next_states).max(1)[0].unsqueeze(1)
            expected_state_action_values = rewards + (1 - dones.float()) * self.gamma * next_state_values
        
        loss = F.mse_loss(state_action_values, expected_state_action_values)
        
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
    
    def update_target_network(self):
        self.target_net.load_state_dict(self.policy_net.state_dict())

# %% [code]
# Utility to stack frames
def stack_frames(stacked_frames, new_frame, is_new_episode):
    """
    Stack frames for state representation.
    Args:
        stacked_frames: deque object holding previous frames.
        new_frame: processed image frame.
        is_new_episode: bool, if true, clear the stack.
    Returns:
        stacked_state: np.array with shape (4, 160, 240)
        stacked_frames: updated deque of frames.
    """
    if is_new_episode:
        stacked_frames = deque([np.zeros((160, 240), dtype=np.float32) for _ in range(4)], maxlen=4)
        for _ in range(4):
            stacked_frames.append(new_frame)
    else:
        stacked_frames.append(new_frame)
    stacked_state = np.stack(stacked_frames, axis=0)
    return stacked_state, stacked_frames

# %% [code]
# Training loop parameters
num_episodes = 300       # Adjust based on available compute
max_steps = 200          # Max steps per episode (CartPole typically lasts 200 steps)
target_update_interval = 1000
eval_interval = 20

# Create gym environment
env = gym.make('CartPole-v1')

# Initialize DQN agent
agent = Agent_DQN(env)
episode_rewards = []

# Main training loop
stacked_frames = deque(maxlen=4)
total_steps = 0

for episode in range(1, num_episodes+1):
    state = env.reset()
    # Render initial frame and process image
    frame = process_image(env.render(mode='rgb_array'))
    state, stacked_frames = stack_frames(None, frame, True)
    
    total_reward = 0
    for t in range(max_steps):
        action = agent.select_action(state)
        _, reward, done, _ = env.step(action)
        total_reward += reward
        
        # Get next state image
        frame_next = process_image(env.render(mode='rgb_array'))
        next_state, stacked_frames = stack_frames(stacked_frames, frame_next, False)
        
        # Store transition in replay memory
        agent.memory.push(state, action, reward, next_state, done)
        state = next_state
        
        agent.optimize_model()
        total_steps += 1
        
        # Update target network periodically
        if total_steps % agent.target_update == 0:
            agent.update_target_network()
        
        if done:
            break
    
    episode_rewards.append(total_reward)
    
    if episode % eval_interval == 0:
        avg_reward = np.mean(episode_rewards[-eval_interval:])
        print(f"Episode {episode} - Average Reward: {avg_reward:.2f}")
        
# %% [code]
# Plot episode rewards over time
plt.figure(figsize=(10,5))
plt.plot(episode_rewards)
plt.xlabel("Episode")
plt.ylabel("Total Reward")
plt.title("Training Rewards over Episodes")
plt.show()

# %% [code]
# Save the trained model (optional)
torch.save(agent.policy_net.state_dict(), "dqn_cartpole.pth")


In [2]:
# In this notebook we:
# 1. Set up the environment and necessary libraries.
# 2. Define a helper function `process_image` that converts the rendered image to grayscale and resizes it.
# 3. Define the CNN architecture as our Q-network.
# 4. Create a replay memory buffer for experience replay.
# 5. Build an Agent class (`Agent_DQN`) that encapsulates our DQN training and evaluation routines.
# 6. Run the training loop.

In [3]:
import cv2
import gym
import numpy as np
import random
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque
import imageio


In [4]:
# For reproducibility
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

<torch._C.Generator at 0x7ff39bf52ed0>

In [5]:
# Setup virtual display for environments that require rendering (e.g., on Google Colab)
try:
    from pyvirtualdisplay import Display
    display = Display(visible=0, size=(400, 300))
    display.start()
except ImportError:
    pass

In [6]:
# Helper function to process the raw RGB image from the environment.
def process_image(image):
    """
    Convert an RGB image to grayscale and resize it to (160, 240).
    """
    if len(image.shape) == 3:
        image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    image = cv2.resize(image, (240, 160))
    # Normalize pixel values to [0, 1]
    image = image.astype(np.float32) / 255.0
    return image

In [7]:
# Replay Memory to store experiences for training.
class ReplayMemory:
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        batch = random.sample(self.memory, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (np.stack(states),
                np.array(actions),
                np.array(rewards, dtype=np.float32),
                np.stack(next_states),
                np.array(dones, dtype=np.uint8))
    
    def __len__(self):
        return len(self.memory)


In [8]:
# Define the CNN-based Q-Network using PyTorch.
class QNetwork(nn.Module):
    def __init__(self, input_channels, num_actions):
        super(QNetwork, self).__init__()
        # The input is a stack of 4 processed frames
        self.conv1 = nn.Conv2d(input_channels, 64, kernel_size=5, stride=3)  # output: (64, ? , ?)
        self.conv2 = nn.Conv2d(64, 64, kernel_size=4, stride=2)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
        
        # Compute the size of the conv output so we can define the first fully connected layer.
        # Create a dummy input (batch size=1, channels=4, height=160, width=240)
        dummy_input = torch.zeros(1, input_channels, 160, 240)
        conv_out = self._get_conv_out(dummy_input)
        
        self.fc1 = nn.Linear(conv_out, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, 64)
        self.out = nn.Linear(64, num_actions)
    
    def _get_conv_out(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        return int(np.prod(x.size()[1:]))
    
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = x.view(x.size(0), -1)  # flatten
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        return self.out(x)


In [9]:
# Define the DQN Agent using the QNetwork defined above.
class Agent_DQN:
    def __init__(self, env, memory_capacity=10000, batch_size=32, gamma=0.99, lr=1e-4, target_update=1000):
        self.env = env
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.num_actions = env.action_space.n
        self.batch_size = batch_size
        self.gamma = gamma
        self.target_update = target_update
        
        self.policy_net = QNetwork(input_channels=4, num_actions=self.num_actions).to(self.device)
        self.target_net = QNetwork(input_channels=4, num_actions=self.num_actions).to(self.device)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval()
        
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=lr)
        self.memory = ReplayMemory(memory_capacity)
        self.steps_done = 0
        self.epsilon_start = 1.0
        self.epsilon_end = 0.05
        self.epsilon_decay = 5000  # decay rate for epsilon
    
    def select_action(self, state):
        # Epsilon-greedy action selection
        eps_threshold = self.epsilon_end + (self.epsilon_start - self.epsilon_end) * \
                        np.exp(-1. * self.steps_done / self.epsilon_decay)
        self.steps_done += 1
        if random.random() < eps_threshold:
            return self.env.action_space.sample()
        else:
            state_tensor = torch.from_numpy(state).unsqueeze(0).to(self.device)  # shape: (1,4,160,240)
            with torch.no_grad():
                q_values = self.policy_net(state_tensor)
                return q_values.argmax().item()
    
    def optimize_model(self):
        if len(self.memory) < self.batch_size:
            return
        
        states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size)
        
        # Convert to torch tensors
        states = torch.from_numpy(states).to(self.device)         # shape: (B,4,160,240)
        actions = torch.from_numpy(actions).unsqueeze(1).to(self.device)  # shape: (B,1)
        rewards = torch.from_numpy(rewards).unsqueeze(1).to(self.device)  # shape: (B,1)
        next_states = torch.from_numpy(next_states).to(self.device)   # shape: (B,4,160,240)
        dones = torch.from_numpy(dones).unsqueeze(1).to(self.device)  # shape: (B,1)
        
        # Current Q values
        state_action_values = self.policy_net(states).gather(1, actions)
        
        # Next state Q values from target network
        with torch.no_grad():
            next_state_values = self.target_net(next_states).max(1)[0].unsqueeze(1)
            expected_state_action_values = rewards + (1 - dones.float()) * self.gamma * next_state_values
        
        loss = F.mse_loss(state_action_values, expected_state_action_values)
        
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
    
    def update_target_network(self):
        self.target_net.load_state_dict(self.policy_net.state_dict())

In [10]:
# Utility to stack frames
def stack_frames(stacked_frames, new_frame, is_new_episode):
    """
    Stack frames for state representation.
    Args:
        stacked_frames: deque object holding previous frames.
        new_frame: processed image frame.
        is_new_episode: bool, if true, clear the stack.
    Returns:
        stacked_state: np.array with shape (4, 160, 240)
        stacked_frames: updated deque of frames.
    """
    if is_new_episode:
        stacked_frames = deque([np.zeros((160, 240), dtype=np.float32) for _ in range(4)], maxlen=4)
        for _ in range(4):
            stacked_frames.append(new_frame)
    else:
        stacked_frames.append(new_frame)
    stacked_state = np.stack(stacked_frames, axis=0)
    return stacked_state, stacked_frames


In [None]:
# Training loop parameters
num_episodes = 300       # Adjust based on available compute
max_steps = 200          # Max steps per episode (CartPole typically lasts 200 steps)
target_update_interval = 1000
eval_interval = 20

# Create gym environment
env = gym.make('CartPole-v1', render_mode='rgb_array')

# Initialize DQN agent
agent = Agent_DQN(env)
episode_rewards = []

# Main training loop
stacked_frames = deque(maxlen=4)
total_steps = 0
for episode in range(1, num_episodes+1):
    state = env.reset()
    # Render initial frame and process image
    frame = process_image(env.render())
    state, stacked_frames = stack_frames(None, frame, True)
    
    total_reward = 0
    for t in range(max_steps):
        action = agent.select_action(state)
        _, reward, done, _, _ = env.step(action)
        total_reward += reward
        
        # Get next state image
        frame_next = process_image(env.render())
        next_state, stacked_frames = stack_frames(stacked_frames, frame_next, False)
        
        # Store transition in replay memory
        agent.memory.push(state, action, reward, next_state, done)
        state = next_state
        
        agent.optimize_model()
        total_steps += 1
        
        # Update target network periodically
        if total_steps % agent.target_update == 0:
            agent.update_target_network()
        
        if done:
            break
    
    episode_rewards.append(total_reward)
    
    if episode % eval_interval == 0:
        avg_reward = np.mean(episode_rewards[-eval_interval:])
        print(f"Episode {episode} - Average Reward: {avg_reward:.2f}")

Episode 20 - Average Reward: 19.45


In [None]:
# Plot episode rewards over time
plt.figure(figsize=(10,5))
plt.plot(episode_rewards)
plt.xlabel("Episode")
plt.ylabel("Total Reward")
plt.title("Training Rewards over Episodes")
plt.show()

In [None]:
# Save the trained model (optional)
torch.save(agent.policy_net.state_dict(), "dqn_cartpole.pth")