<br>

<h1 style="text-align:center;">Rainbow Algorithm Playing Atari Games</h1>

<br>

## Introduction

---

In this project, we aim to implement the Rainbow algorithm, a state-of-the-art reinforcement learning method that combines several key techniques to achieve superior performance in playing Atari games. The Rainbow algorithm integrates improvements from various sources, including Double Q-learning, Prioritized Experience Replay, Dueling Network Architectures, and more, to address the limitations of traditional Deep Q-Networks (DQN).


In [1]:
# Import the libraries
import os
import cv2
import random
import numpy as np
import gymnasium as gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import math

<br>

## Noisy Linear

---


In [2]:
class NoisyLinear(nn.Module):
    def __init__(self, in_features, out_features, std_init=0.5):
        super(NoisyLinear, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.std_init = std_init
        self.weight_mu = nn.Parameter(torch.FloatTensor(out_features, in_features))
        self.weight_sigma = nn.Parameter(torch.FloatTensor(out_features, in_features))
        self.register_buffer('weight_epsilon', torch.FloatTensor(out_features, in_features))
        self.bias_mu = nn.Parameter(torch.FloatTensor(out_features))
        self.bias_sigma = nn.Parameter(torch.FloatTensor(out_features))
        self.register_buffer('bias_epsilon', torch.FloatTensor(out_features))
        self.reset_parameters()
        self.reset_noise()

    def reset_parameters(self):
        mu_range = 1 / np.sqrt(self.in_features)
        self.weight_mu.data.uniform_(-mu_range, mu_range)
        self.weight_sigma.data.fill_(self.std_init / np.sqrt(self.in_features))
        self.bias_mu.data.uniform_(-mu_range, mu_range)
        self.bias_sigma.data.fill_(self.std_init / np.sqrt(self.out_features))

    def reset_noise(self):
        epsilon_in = self._scale_noise(self.in_features)
        epsilon_out = self._scale_noise(self.out_features)
        self.weight_epsilon.copy_(epsilon_out.ger(epsilon_in))
        self.bias_epsilon.copy_(self._scale_noise(self.out_features))

    def _scale_noise(self, size):
        x = torch.randn(size)
        return x.sign().mul_(x.abs().sqrt_())

    def forward(self, input):
        if self.training:
            return F.linear(input, self.weight_mu + self.weight_sigma * self.weight_epsilon,
                            self.bias_mu + self.bias_sigma * self.bias_epsilon)
        else:
            return F.linear(input, self.weight_mu, self.bias_mu)

<br>

## Prioritized Replay Buffer

---

In [3]:
class PrioritizedReplayBuffer:
    def __init__(self, capacity, alpha=0.6, beta=0.4):
        self.capacity = capacity
        self.alpha = alpha
        self.beta = beta
        self.buffer = []
        self.priorities = np.zeros((capacity,), dtype=np.float32)
        self.position = 0

    def push(self, state, action, reward, next_state, done):
        max_priority = self.priorities.max() if self.buffer else 1.0
        if len(self.buffer) < self.capacity:
            self.buffer.append((state, action, reward, next_state, done))
        else:
            self.buffer[self.position] = (state, action, reward, next_state, done)
        self.priorities[self.position] = max_priority
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        if len(self.buffer) == self.capacity:
            priorities = self.priorities
        else:
            priorities = self.priorities[:self.position]
        
        probs = priorities ** self.alpha
        probs /= probs.sum()
        
        indices = np.random.choice(len(self.buffer), batch_size, p=probs)
        samples = [self.buffer[idx] for idx in indices]
        
        weights = (len(self.buffer) * probs[indices]) ** (-self.beta)
        weights /= weights.max()
        
        return samples, indices, weights

    def update_priorities(self, indices, priorities):
        for idx, priority in zip(indices, priorities):
            self.priorities[idx] = priority + 1e-5  # Add small constant to avoid zero priority

<br>

## Rainbow Algorithm

---

In [4]:
class RainbowDQN(nn.Module):
    def __init__(self, state_dim, action_dim, num_atoms, v_min, v_max):
        super(RainbowDQN, self).__init__()
        self.num_atoms = num_atoms
        self.v_min = v_min
        self.v_max = v_max
        self.action_dim = action_dim
        
        if isinstance(state_dim, int) or len(state_dim) == 1:
            # For 1D state spaces (like CartPole)
            self.features = nn.Sequential(
                NoisyLinear(state_dim[0] if isinstance(state_dim, tuple) else state_dim, 128),
                nn.ReLU(),
                NoisyLinear(128, 128),
                nn.ReLU()
            )
            feature_output = 128
        else:
            # For image-based state spaces (like Atari games)
            c, h, w = state_dim
            self.features = nn.Sequential(
                nn.Conv2d(c, 32, kernel_size=8, stride=4),
                nn.ReLU(),
                nn.Conv2d(32, 64, kernel_size=4, stride=2),
                nn.ReLU(),
                nn.Conv2d(64, 64, kernel_size=3, stride=1),
                nn.ReLU(),
                nn.Flatten()
            )
            feature_output = self._get_conv_output(state_dim)
        
        self.advantage_stream = nn.Sequential(
            NoisyLinear(feature_output, 512),
            nn.ReLU(),
            NoisyLinear(512, action_dim * num_atoms)
        )
        
        self.value_stream = nn.Sequential(
            NoisyLinear(feature_output, 512),
            nn.ReLU(),
            NoisyLinear(512, num_atoms)
        )
        
    def _get_conv_output(self, shape):
        o = self.features(torch.zeros(1, *shape))
        return int(np.prod(o.size()))
        
    def forward(self, state):
        features = self.features(state)
        advantage = self.advantage_stream(features).view(-1, self.action_dim, self.num_atoms)
        value = self.value_stream(features).view(-1, 1, self.num_atoms)
        q_dist = value + advantage - advantage.mean(dim=1, keepdim=True)
        return F.softmax(q_dist, dim=-1)

    def reset_noise(self):
        for module in self.modules():
            if isinstance(module, NoisyLinear):
                module.reset_noise()

In [5]:
class RainbowAgent:
    def __init__(self, state_dim, action_dim, lr=0.0001, gamma=0.99, num_atoms=51, v_min=-10, v_max=10, buffer_size=100000, batch_size=32):
 

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.action_dim = action_dim
        self.gamma = gamma
        self.num_atoms = num_atoms
        self.v_min = v_min
        self.v_max = v_max
        self.batch_size = batch_size
        self.state_dim = state_dim
        
        self.policy_net = RainbowDQN(state_dim, action_dim, num_atoms, v_min, v_max).to(self.device)
        self.target_net = RainbowDQN(state_dim, action_dim, num_atoms, v_min, v_max).to(self.device)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=lr)
        self.memory = PrioritizedReplayBuffer(buffer_size)
        
        self.support = torch.linspace(v_min, v_max, num_atoms).to(self.device)
        self.delta_z = (v_max - v_min) / (num_atoms - 1)
        
        self.state_dim = state_dim
        
    def preprocess_state(self, state):
        if len(self.state_dim) == 3:
            # For image-based states (like Atari)
            if len(state.shape) == 3 and state.shape[2] == 3:  # If the state is in RGB
                state = cv2.cvtColor(state, cv2.COLOR_RGB2GRAY)
            elif len(state.shape) == 3:  # If the state is already grayscale but has 3 dimensions
                state = state[:, :, 0]
            # Resize the image
            state = cv2.resize(state, (84, 84), interpolation=cv2.INTER_AREA)
            state = np.expand_dims(state, axis=0)  # Add channel dimension
        return state.astype(np.float32) / 255.0  # Normalize the state
        
    def select_action(self, state, epsilon=0.01):
        if random.random() < epsilon:
            return random.randrange(self.action_dim)
        with torch.no_grad():
            state = self.preprocess_state(state)
            state = torch.FloatTensor(state).to(self.device)
            if state.dim() == 3:
                state = state.unsqueeze(0)  # Add batch dimension if not present
            dist = self.policy_net(state).data.cpu()
            dist = dist * self.support.cpu()
            action = dist.sum(2).max(1)[1].item()
        return action
    
    # ... (rest of the RainbowAgent class remains the same)
    
    def update(self):
        if len(self.memory.buffer) < self.batch_size:
            return
        
        transitions, indices, weights = self.memory.sample(self.batch_size)
        batch = list(zip(*transitions))
        
        state_batch = torch.FloatTensor(np.array(batch[0])).to(self.device)
        action_batch = torch.LongTensor(np.array(batch[1])).to(self.device)
        reward_batch = torch.FloatTensor(np.array(batch[2])).to(self.device)
        next_state_batch = torch.FloatTensor(np.array(batch[3])).to(self.device)
        done_batch = torch.FloatTensor(np.array(batch[4])).to(self.device)
        
        # Compute current Q-values
        current_q_dist = self.policy_net(state_batch)
        current_q_dist = current_q_dist[range(self.batch_size), action_batch]
        
        # Compute next Q-values
        with torch.no_grad():
            next_q_dist = self.target_net(next_state_batch)
            best_actions = (next_q_dist * self.support).sum(2).max(1)[1]
            next_q_dist = next_q_dist[range(self.batch_size), best_actions]
            
            # Compute projected distribution
            projected_dist = self._categorical_projection(next_q_dist, reward_batch, done_batch)
        
        # Compute loss
        loss = -(projected_dist * current_q_dist.log()).sum(1)
        priorities = loss.detach().cpu().numpy()  # This is now an array
        loss = (loss * torch.FloatTensor(weights).to(self.device)).mean()
        
        # Update priorities
        self.memory.update_priorities(indices, priorities)
        
        # Optimize the model
        self.optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(self.policy_net.parameters(), 10)
        self.optimizer.step()
        
        # Reset noisy layers
        self.policy_net.reset_noise()
        self.target_net.reset_noise()

    def _categorical_projection(self, next_q_dist, rewards, dones):
        batch_size = len(rewards)
        projected_dist = torch.zeros(batch_size, self.num_atoms).to(self.device)
        
        rewards = rewards.unsqueeze(1).expand_as(projected_dist)
        dones = dones.unsqueeze(1).expand_as(projected_dist)
        support = self.support.unsqueeze(0).expand_as(projected_dist)
        
        tz = rewards + (1 - dones) * self.gamma * support
        tz = tz.clamp(min=self.v_min, max=self.v_max)
        b = (tz - self.v_min) / self.delta_z
        l = b.floor().long()
        u = b.ceil().long()
        
        l[(u > 0) * (l == u)] -= 1
        u[(l < (self.num_atoms - 1)) * (l == u)] += 1
        
        offset = torch.linspace(0, (batch_size - 1) * self.num_atoms, batch_size).long().unsqueeze(1).expand(batch_size, self.num_atoms).to(self.device)
        
        projected_dist.view(-1).index_add_(0, (l + offset).view(-1), (next_q_dist * (u.float() - b)).view(-1))
        projected_dist.view(-1).index_add_(0, (u + offset).view(-1), (next_q_dist * (b - l.float())).view(-1))
        
        return projected_dist

<br>

## Training

---

In [6]:




def train(env_name, num_episodes=1000, save_interval=100, save_dir='saved_models'):
    env = gym.make(env_name)
    
    # Determine the state dimension based on the environment
    if len(env.observation_space.shape) == 3:
        state_dim = (1, 84, 84)  # We'll preprocess to this size for Atari games
    else:
        state_dim = env.observation_space.shape
    
    action_dim = env.action_space.n

    agent = RainbowAgent(state_dim, action_dim)

    # Create directory for saving models if it doesn't exist
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    for episode in range(num_episodes):
        state, _ = env.reset()
        state = agent.preprocess_state(state)
        done = False
        total_reward = 0
        
        while not done:
            action = agent.select_action(state)
            next_state, reward, terminated, truncated, _ = env.step(action)
            next_state = agent.preprocess_state(next_state)
            done = terminated or truncated
            total_reward += reward
            
            agent.memory.push(state, action, reward, next_state, done)
            agent.update()
            
            state = next_state
        
        if episode % 10 == 0:
            agent.target_net.load_state_dict(agent.policy_net.state_dict())
        
        print(f"Episode {episode}, Total Reward: {total_reward}")

        # Save model periodically
        if (episode + 1) % save_interval == 0:
            save_path = os.path.join(save_dir, f"{env_name}_rainbow_episode_{episode+1}.pth")
            torch.save(agent.policy_net.state_dict(), save_path)
            print(f"Model saved to {save_path}")

    # Save final model
    final_save_path = os.path.join(save_dir, f"{env_name}_rainbow_final.pth")
    torch.save(agent.policy_net.state_dict(), final_save_path)
    print(f"Final model saved to {final_save_path}")

    env.close()


In [7]:
# Test on CartPole
print("Training on CartPole-v1")
train("CartPole-v1", num_episodes=100)

Training on CartPole-v1
Episode 0, Total Reward: 9.0
Episode 1, Total Reward: 9.0
Episode 2, Total Reward: 9.0
Episode 3, Total Reward: 11.0
Episode 4, Total Reward: 19.0
Episode 5, Total Reward: 9.0
Episode 6, Total Reward: 10.0
Episode 7, Total Reward: 10.0
Episode 8, Total Reward: 11.0
Episode 9, Total Reward: 10.0
Episode 10, Total Reward: 10.0
Episode 11, Total Reward: 9.0
Episode 12, Total Reward: 11.0
Episode 13, Total Reward: 9.0
Episode 14, Total Reward: 10.0
Episode 15, Total Reward: 10.0
Episode 16, Total Reward: 9.0
Episode 17, Total Reward: 10.0
Episode 18, Total Reward: 10.0
Episode 19, Total Reward: 10.0
Episode 20, Total Reward: 9.0
Episode 21, Total Reward: 9.0
Episode 22, Total Reward: 10.0
Episode 23, Total Reward: 10.0
Episode 24, Total Reward: 9.0
Episode 25, Total Reward: 9.0
Episode 26, Total Reward: 9.0
Episode 27, Total Reward: 10.0
Episode 28, Total Reward: 9.0
Episode 29, Total Reward: 10.0
Episode 30, Total Reward: 10.0
Episode 31, Total Reward: 10.0
Episode

In [8]:
# Test on Space Invaders
print("\nTraining on SpaceInvaders-v4")
train("ALE/SpaceInvaders-v5", num_episodes=10)


Training on SpaceInvaders-v4
Episode 0, Total Reward: 435.0
Episode 1, Total Reward: 280.0
Episode 2, Total Reward: 295.0
Episode 3, Total Reward: 240.0
Episode 4, Total Reward: 355.0
Episode 5, Total Reward: 100.0
Episode 6, Total Reward: 385.0
Episode 7, Total Reward: 210.0
Episode 8, Total Reward: 120.0
Episode 9, Total Reward: 80.0
Final model saved to saved_models\ALE/SpaceInvaders-v5_rainbow_final.pth


<br>

## Inference

---

In [10]:
def inference(env_name, model_path, num_episodes=10, render=True):
    env = gym.make(env_name, render_mode="human" if render else None)
    
    # Determine the state dimension based on the environment
    if len(env.observation_space.shape) == 3:
        state_dim = (1, 84, 84)  # Preprocessed image size for Atari games
    else:
        state_dim = env.observation_space.shape
    
    action_dim = env.action_space.n
    
    # Create and load the agent
    agent = RainbowAgent(state_dim, action_dim)
    agent.policy_net.load_state_dict(torch.load(model_path))
    agent.policy_net.eval()  # Set the network to evaluation mode
    
    for episode in range(num_episodes):
        state, _ = env.reset()
        done = False
        total_reward = 0
        
        while not done:
            if render:
                env.render()
            
            action = agent.select_action(state, epsilon=0)  # Use epsilon=0 for greedy action selection
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            total_reward += reward
            
            state = next_state
        
        print(f"Episode {episode}, Total Reward: {total_reward}")
    
    env.close()

In [11]:
save_dir = 'saved_models'

In [15]:
# Inference for CartPole
env_name = "CartPole-v1"
model_path = os.path.join(save_dir, f"{env_name}_rainbow_final.pth")
print("Inferencing CartPole-v1:")
inference(env_name, model_path, num_episodes=10)

Inferencing CartPole-v1:
Episode 0, Total Reward: 13.0
Episode 1, Total Reward: 13.0
Episode 2, Total Reward: 22.0
Episode 3, Total Reward: 27.0
Episode 4, Total Reward: 11.0
Episode 5, Total Reward: 14.0
Episode 6, Total Reward: 21.0
Episode 7, Total Reward: 18.0
Episode 8, Total Reward: 24.0
Episode 9, Total Reward: 22.0


In [12]:
# Inference for Space Invaders
env_name = "ALE/SpaceInvaders-v5"
model_path = os.path.join(save_dir, f"{env_name}_rainbow_final.pth")
# model_path = "saved_models/ALE/SpaceInvaders-v5_rainbow_episode_100.pth"
print("\nInferencing SpaceInvaders-v5:")
inference(env_name, model_path, num_episodes=5)


Inferencing SpaceInvaders-v5:


  logger.warn(


Episode 0, Total Reward: 175.0
Episode 1, Total Reward: 130.0
Episode 2, Total Reward: 165.0
Episode 3, Total Reward: 340.0
Episode 4, Total Reward: 160.0
