<a href="https://colab.research.google.com/github/kscaman/DL_ENS/blob/main/TD/deep_reinforcement_learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Deep Reinforcement Learning

In this practical, we are going to use Deep Q Learning to train an agent to play the Atari game [Breakout](https://www.gymlibrary.dev/environments/atari/breakout/).
The code adapted from the [Pytorch tutorial on DQL](https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html).


In [None]:
!pip install gym[atari]
!pip install autorom[accept-rom-license]

from collections import namedtuple, deque
from IPython import display
from IPython.display import HTML
from itertools import count
import math
import matplotlib.pyplot as plt
from matplotlib import animation
import random
from tqdm.notebook import tqdm

import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from torchvision import models,transforms,datasets

# if gpu is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

First load the environment using `gym.make(env_name)` (you can search the [gym documentation](https://www.gymlibrary.dev/environments/atari/breakout/)).

In [None]:
env = ### YOUR CODE HERE ###

The environment is initialized through `env.reset()`, and updated via `env.step(action)`. Plot the state, action and reward after one step.

In [None]:
state = env.reset()
action = env.action_space.sample()
state, reward, terminated, truncated, _ = env.step(action)

#
# YOUR CODE HERE
#

In [None]:
def create_video(policy, num_frames=100, preprocess=None):
    def animation_update(num):
        progress_bar.update(1)
        ax.clear()
        state = env.render("rgb_array")
        ax.imshow(state)
        preprocessed_state = state if preprocess is None else preprocess(state)
        action = policy(preprocessed_state)
        env.step(action)

    fig, ax = plt.subplots(figsize=(6, 4), dpi=100)
    env.reset()
    progress_bar = tqdm(total=num_frames)
    anim = animation.FuncAnimation(fig, animation_update, frames=num_frames, interval=50)
    anim = HTML(anim.to_html5_video())
    progress_bar.close()
    plt.close()
    return anim

Create a video of a random agent playing Breakout.

In [None]:
#
# YOUR CODE HERE
#

Create an epsilon-greedy strategy that takes a policy encoded as a function that returns the Q function $q(s,a)$ for a given state $s$ as a vector, and return its corresponding epsilon greedy action. The output should have a shape $(1,1)$.

In [None]:
class EpsilonGreedy:
    def __init__(self, policy, epsilon):
        self.policy = policy
        self.epsilon = epsilon
    
    def __call__(self, state):
        #
        # YOUR CODE HERE
        #

Create a video of an epsilon-greedy strategy that goes left expect with probability $\varepsilon = 0.5$.

In [None]:
#
# YOUR CODE HERE
#

## Image preprocessing
The atari frame is large and contains scores at the top of the screen that is not useful for our agent (as a reward is already available). Using `transforms`, crop the image to a $144\times 144$ image in grayscale, and flatten the resulting image to a vector of size $144\times 144$. Test this preprocessing on an Atari frame.

In [None]:
preprocess = transforms.Compose([
    #
    # YOUR CODE HERE
    #
])

#
# YOUR CODE HERE
#

## Deep Q Learning algorithm
We are now going to implement the DQN algorithm. First, we need to encode the policy using a neural network. Create a neural network that takes a frame from the game, preprocesses it and returns a score for each possible action.

In [None]:
class DQN(nn.Module):
    #
    # YOUR CODE HERE
    #

We then define a replay memory to sample past (state, action, reward) tuples, and a utility function for plotting scores.

In [None]:
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

class ReplayMemory(object):
    def __init__(self, capacity):
        self.memory = deque([],maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

def plot_scores(score_values, show_result=False):
    plt.figure(1)
    scores_t = torch.tensor(score_values, dtype=torch.float)
    if show_result:
        plt.title('Result')
    else:
        plt.clf()
        plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Scores')
    plt.plot(scores_t.numpy())
    # Take 100 episode averages and plot them too
    if len(scores_t) >= 100:
        means = scores_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)  # pause a bit so that plots are updated
    if not show_result:
        display.display(plt.gcf())
        display.clear_output(wait=True)
    else:
        display.display(plt.gcf())

The optimization step performs one step of Q-Learning. Fill in the blanks.

In [None]:
def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    batch = Transition(*zip(*transitions))

    # Compute a mask of non-final states and concatenate the batch elements
    # (a final state would've been the one after which simulation ended)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
    # columns of actions taken. These are the actions which would've been taken
    # for each batch state according to policy_net
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # Compute V(s_{t+1}) for all next states.
    # Expected values of actions for non_final_next_states are computed based
    # on the "older" target_net; selecting their best reward with max(1)[0].
    # This is merged based on the mask, such that we'll have either the expected
    # state value or 0 in case the state was final.
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    with torch.no_grad():
        next_state_values[non_final_mask] = ### YOUR CODE HERE ###
    # Compute the expected Q values
    expected_state_action_values = ### YOUR CODE HERE ###

    # Compute Huber loss
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    # In-place gradient clipping
    torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
    optimizer.step()

Finally, the training loop!

In [None]:
BATCH_SIZE = 128
GAMMA = 0.99
EPS = 0.1
LR = 1e-4
NUM_EPISODES = 100
INIT_TRAINING = True

if INIT_TRAINING:
    policy_net = DQN().to(device)
    epsilon_greedy = EpsilonGreedy(policy_net, EPS)

    optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
    memory = ReplayMemory(10000)
    steps_done = 0
    episode_durations = []
    scores = []

for i_episode in range(NUM_EPISODES):
    # Initialize the environment and get its state
    state = preprocess(env.reset())
    score = 0
    for t in count():
        action = epsilon_greedy(state)
        observation, reward, terminated, truncated, _ = env.step(action.item())
        observation = observation
        reward = torch.tensor([reward], device=device)
        done = terminated or truncated
        score += reward.item()

        next_state = None if terminated else preprocess(observation)
        memory.push(state, action, next_state, reward)
        state = next_state
        optimize_model()

        if done:
            scores.append(score)
            episode_durations.append(t + 1)
            plot_scores(scores)
            break

print('Complete')
plot_scores(scores, show_result=True)
plt.ioff()
plt.show()

Create a video of the new agent playing Breakout. How is the agent performing?

In [None]:
#
# YOUR CODE HERE
#