# Deep Reinforcement Learning

 Reinforcement Learning (RL) is an approach wherein an agent learns to make sequential decisions by interacting with an environment. The objective is for the agent to maximize the cumulative reward it receives over time.
 The agent goes through this process by repeatedly evaluating the consequences of its actions, trying to select actions that lead to better outcomes.

To do this, we will use Gym, an platform for developing and comparing reinforcement learning algorithms. Gym provides an interface for interacting with different environments, it accepts actions from agents and plays them out in an environment, providing rewards.


## Environment

We will be using `CartPole` environment from gym's library for this assignment.  In this environment, a pole is attached by an un-actuated joint to a cart, which moves along a frictionless track. The pendulum is placed upright on the cart and the goal is to balance the pole by applying forces in the left and right direction on the cart.

You can use the code below to run an instance of a random agent in this environment and see the results.

In [3]:
from IPython.display import HTML
from base64 import b64encode

def show_video(path):
    mp4 = open(path, 'rb').read()
    data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
    return HTML("""
    <video width=400 controls>
          <source src="%s" type="video/mp4">
    </video>
    """ % data_url)

In [2]:
!pip install gym[atari,accept-rom-license] -qq
!pip install imageio -qq

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m9.8 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m434.7/434.7 kB[0m [31m14.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
  Building wheel for AutoROM.accept-rom-license (pyproject.toml) ... [?25l[?25hdone


In [6]:
import cv2
import gym
import imageio
import numpy as np
from gym import spaces

We use `gym.make()` to make an instance of a certain environemtn. We can then use `.step()` method which accepts an action as input and performs it. Before that we reset the environment to its initial state by using `.reset()` method.

In [7]:
# Create an instance of the CartPole environment
env_name = 'CartPole-v1'
env = gym.make(env_name)

# Reset the environment to its initial state
env.reset()

# Initialize an empty list to store frames
frames = []

# Run the environment for a specified number of steps
for _ in range(500):
    action = env.action_space.sample()  # Take a random action
    obs, reward, done, _ = env.step(action)  # Perform the action and observe the result

    # Render this frame and add to the list of frames
    frames.append(env.render(mode='rgb_array'))

    if done:
        env.reset()

# Close the environment
env.close()

# Save the rendered frames as a video
imageio.mimsave('./cartpole.mp4', frames, fps=25)


  deprecation(
  deprecation(
  if not isinstance(terminated, (bool, np.bool8)):
See here for more information: https://www.gymlibrary.ml/content/api/[0m
  deprecation(


In [8]:
show_video('./cartpole.mp4')

As you can see, the cart fails to keep the balance of the pole. In the next section we will train an agent to learn how to perform this task.

## Algorithm
We will be using A2C algorithm.

Advantage Actor-Critic (A2C) is a reinforcement learning algorithm.
It consists of an actor (which predicts the best action based on the current state) and a critic (which estimates the state's value function to measure expected future rewards).

We will implement this together step by step.




In [5]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions.categorical import Categorical
import numpy as np
import gym
from collections import deque
from tqdm import tqdm
import imageio
from IPython.display import HTML
from base64 import b64encode


## Neural Network

Here we design a simple feed forward model to embed the observation from the environment to a hidden layer. We then use two fully connected layers on top of the hidden layer, to predict the next action and estimate the value of current state. This acts as both actor, and critic.

You need to implement the convolutional neural network.


In [44]:
class ActorCritic(nn.Module):
    def __init__(self, input_size, hidden_size, num_outputs):
        super(ActorCritic, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc_action = nn.Linear(hidden_size, num_outputs)
        self.fc_value = nn.Linear(hidden_size, 1)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        action_probs = F.softmax(self.fc_action(x), dim=-1)
        value = self.fc_value(x)
        return action_probs, value


## A2C

The A2C algorithm aims to jointly train both the actor and the critic to improve the policy. It does this by updating the parameters
of the actor to increase the likelihood of good actions and updating the parameters
of the critic to better estimate the value function.

In each iteration A2C plays the until it ends. During this time it records log probabality of actions, rewards, and predicted values in each step. These values will be used to update the model at the end of this trajectory.

The actor is updated using the objective below:

$$ L_{\text{actor}} = -\log \pi(a|s;\theta) \times A(s, a) $$
Where advantage is calculated as:
$$A(s, a) = Q(s, a) - V(s) $$

Namely the function $Q(s,a)$ is the estimated value of taking action
$a$
 in state
$s$.
$V(s)$ is the predicted value of our critic.

This loss function aims to improve the probability of playing actions that result in higher rewards.

As for the critic the loss function is defined as a simple mean square loss between actual value of an state and the predicted one:

$$ L_{\text{critic}} = \frac{1}{2} ( R - V(s))^2 $$

In [45]:
class A2CAgent:
    def __init__(self, env, num_episodes=2000, max_steps=500, gamma=0.99, lr=5e-4, hidden_size=256):
        self.env = env
        self.num_episodes = num_episodes
        self.max_steps = max_steps
        self.gamma = gamma
        self.lr = lr
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        input_size = env.observation_space.shape[0]
        num_outputs = env.action_space.n

        self.policy_net = ActorCritic(input_size, hidden_size, num_outputs).to(self.device)
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=lr)
        self.critic_loss = nn.MSELoss()

    def choose_action(self, state):
      if not isinstance(state, torch.Tensor):
          state = torch.FloatTensor(state).to(self.device)
      action_probs, _ = self.policy_net(state)
      action_dist = Categorical(action_probs)
      action = action_dist.sample()
      return action.item(), action_dist.log_prob(action)


    def compute_returns(self, rewards):
        R = 0
        returns = []
        for r in reversed(rewards):
            R = r + self.gamma * R
            returns.insert(0, R)
        return returns

    def train(self):
        episode_rewards = []
        with tqdm(range(self.num_episodes)) as pbar:
            for episode in pbar:
                state = self.env.reset()
                episode_reward = 0
                log_probs = []
                values = []
                rewards = []

                for step in range(self.max_steps):
                    state = torch.FloatTensor(state).to(self.device)
                    action, log_prob = self.choose_action(state)
                    value = self.policy_net(state)[1]

                    next_state, reward, done, _ = self.env.step(action)
                    log_probs.append(log_prob)
                    values.append(value)
                    rewards.append(reward)
                    episode_reward += reward

                    if done:
                        break

                    state = next_state

                episode_rewards.append(episode_reward)
                returns = self.compute_returns(rewards)
                returns = torch.FloatTensor(returns).to(self.device)
                log_probs = torch.stack(log_probs)
                values = torch.stack(values).squeeze()

                advantage = returns - values
                actor_loss = - (log_probs * advantage.detach()).mean()
                critic_loss = self.critic_loss(values, returns)
                loss = actor_loss + critic_loss

                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()

                pbar.set_description(f"Episode {episode}, Reward: {episode_reward:.2f}")

        self.env.close()
        return episode_rewards


Define the model and set hyperparameters.

In [47]:
env_name = 'CartPole-v1'
env = gym.make(env_name)

num_episodes = 3000
max_steps = 500
lr = 8e-4
hidden_size = 256

a2c_model = A2CAgent(env, num_episodes, max_steps, lr=lr, hidden_size=hidden_size)


  and should_run_async(code)
  deprecation(
  deprecation(


Train the model.

In [48]:
rewards = a2c_model.train()

  if not isinstance(terminated, (bool, np.bool8)):
Episode 2999, Reward: 500.00: 100%|██████████| 3000/3000 [27:00<00:00,  1.85it/s]


## Evaluation

Use the `choose_action` method of the trained agent to evaluate its performance.

In [49]:
def show_video(path):
    mp4 = open(path, 'rb').read()
    data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
    return HTML("""
    <video width=400 controls>
          <source src="%s" type="video/mp4">
    </video>
    """ % data_url)

env = gym.make(env_name, render_mode='rgb_array')
model = a2c_model

num_episodes = 10
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

frames = []
episode_rewards = []

for i in range(num_episodes):
    reset_output = env.reset()
    if isinstance(reset_output, tuple):
        state = reset_output[0]
    else:
        state = reset_output
    episode_reward = 0

    for step in range(max_steps):
        if not isinstance(state, torch.Tensor):
            state = torch.FloatTensor(state).to(device)
        action, _ = model.choose_action(state)

        step_output = env.step(action)
        if isinstance(step_output, tuple) and len(step_output) == 4:
            next_state, reward, done, _ = step_output
        else:
            next_state, reward, done, _, _ = step_output

        episode_reward += reward

        if i == 0:
            frame = env.render()
            frame = np.array(frame)  # Convert frame to numpy array if it's not already

            # Ensure the frame has 3 channels and correct shape
            if len(frame.shape) == 4 and frame.shape[0] == 1:  # If there's an extra dimension
                frame = frame[0]  # Remove the extra dimension

            if len(frame.shape) == 2:  # If the frame is grayscale
                frame = np.stack([frame]*3, axis=-1)
            elif frame.shape[-1] == 1:  # If the frame has a single channel
                frame = np.repeat(frame, 3, axis=-1)
            elif frame.shape[-1] > 3:  # If the frame has more than 3 channels
                frame = frame[..., :3]  # Discard extra channels

            # Log frame shape and type
            print(f"Frame shape: {frame.shape}, dtype: {frame.dtype}")

            # Log and add frame if valid
            if frame.shape == (400, 600, 3):
                frames.append(frame)
            else:
                print(f"Skipping frame with unexpected shape: {frame.shape}")

        if done:
            break

        state = next_state

    episode_rewards.append(episode_reward)
    print(f"Episode Reward: {episode_reward}")

env.close()

episode_rewards = np.array(episode_rewards)
print(f"Average Reward over {num_episodes} episodes: {np.mean(episode_rewards)}")

# Check if all frames have the correct shape
for idx, frame in enumerate(frames):
    print(f"Final check - Frame {idx}: shape {frame.shape}, dtype: {frame.dtype}")
    if frame.shape != (400, 600, 3):
        print(f"Frame {idx} has unexpected shape: {frame.shape}")

imageio.mimsave('./test.mp4', frames, fps=25)
show_video('./test.mp4')


Frame shape: (2, 400, 600, 3), dtype: uint8
Skipping frame with unexpected shape: (2, 400, 600, 3)
Frame shape: (400, 600, 3), dtype: uint8
Frame shape: (400, 600, 3), dtype: uint8
Frame shape: (400, 600, 3), dtype: uint8
Frame shape: (400, 600, 3), dtype: uint8
Frame shape: (400, 600, 3), dtype: uint8
Frame shape: (400, 600, 3), dtype: uint8
Frame shape: (400, 600, 3), dtype: uint8
Frame shape: (400, 600, 3), dtype: uint8
Frame shape: (400, 600, 3), dtype: uint8
Frame shape: (400, 600, 3), dtype: uint8
Frame shape: (400, 600, 3), dtype: uint8
Frame shape: (400, 600, 3), dtype: uint8
Frame shape: (400, 600, 3), dtype: uint8
Frame shape: (400, 600, 3), dtype: uint8
Frame shape: (400, 600, 3), dtype: uint8
Frame shape: (400, 600, 3), dtype: uint8
Frame shape: (400, 600, 3), dtype: uint8
Frame shape: (400, 600, 3), dtype: uint8
Frame shape: (400, 600, 3), dtype: uint8
Frame shape: (400, 600, 3), dtype: uint8
Frame shape: (400, 600, 3), dtype: uint8
Frame shape: (400, 600, 3), dtype: uint8



Episode Reward: 458.0
Average Reward over 10 episodes: 422.7
Final check - Frame 0: shape (400, 600, 3), dtype: uint8
Final check - Frame 1: shape (400, 600, 3), dtype: uint8
Final check - Frame 2: shape (400, 600, 3), dtype: uint8
Final check - Frame 3: shape (400, 600, 3), dtype: uint8
Final check - Frame 4: shape (400, 600, 3), dtype: uint8
Final check - Frame 5: shape (400, 600, 3), dtype: uint8
Final check - Frame 6: shape (400, 600, 3), dtype: uint8
Final check - Frame 7: shape (400, 600, 3), dtype: uint8
Final check - Frame 8: shape (400, 600, 3), dtype: uint8
Final check - Frame 9: shape (400, 600, 3), dtype: uint8
Final check - Frame 10: shape (400, 600, 3), dtype: uint8
Final check - Frame 11: shape (400, 600, 3), dtype: uint8
Final check - Frame 12: shape (400, 600, 3), dtype: uint8
Final check - Frame 13: shape (400, 600, 3), dtype: uint8
Final check - Frame 14: shape (400, 600, 3), dtype: uint8
Final check - Frame 15: shape (400, 600, 3), dtype: uint8
Final check - Frame 1

In [None]:
show_video('./test.mp4')