# Introduction
In homework assignment 2, we will implement a basic deep Q learning (DQL) algorithm to solve a classic control problem--CartPole V1

# Install the gym environment

In [None]:
!pip install gymnasium

# Load tensorboard for visualizing

In [2]:
%load_ext tensorboard

# Import the required package

In [3]:
from collections import namedtuple
from collections import deque
import torch
import torch.nn as nn
import numpy as np
import gymnasium as gym
from torch.utils.tensorboard import SummaryWriter
import datetime
from typing import Tuple
from numpy.random import binomial
from numpy.random import choice
import numpy.random as nr

Tensor = torch.DoubleTensor
torch.set_default_tensor_type(Tensor)
Transitions = namedtuple('Transitions', ['obs', 'action', 'reward', 'next_obs', 'done'])

# Replay buffer to collect transition tuples

In [4]:
class ReplayBuffer:
    def __init__(self, config):
        replay_buffer_size = config['replay_buffer_size']
        seed = config['seed']
        nr.seed(seed)

        self.replay_buffer_size = replay_buffer_size
        self.obs = deque([], maxlen=self.replay_buffer_size)
        self.action = deque([], maxlen=self.replay_buffer_size)
        self.reward = deque([], maxlen=self.replay_buffer_size)
        self.next_obs = deque([], maxlen=self.replay_buffer_size)
        self.done = deque([], maxlen=self.replay_buffer_size)

    def append_memory(self,
                      obs,
                      action,
                      reward,
                      next_obs,
                      done: bool):
        self.obs.append(obs)
        self.action.append(action)
        self.reward.append(reward)
        self.next_obs.append(next_obs)
        self.done.append(done)

    def sample(self, batch_size):
        buffer_size = len(self.obs)

        idx = nr.choice(buffer_size,
                        size=min(buffer_size, batch_size),
                        replace=False)
        t = Transitions
        t.obs = torch.stack(list(map(self.obs.__getitem__, idx)))
        t.action = torch.stack(list(map(self.action.__getitem__, idx)))
        t.reward = torch.stack(list(map(self.reward.__getitem__, idx)))
        t.next_obs = torch.stack(list(map(self.next_obs.__getitem__, idx)))
        t.done = torch.tensor(list(map(self.done.__getitem__, idx)))[:, None]
        return t

    def clear(self):
        self.obs = deque([], maxlen=self.replay_buffer_size)
        self.action = deque([], maxlen=self.replay_buffer_size)
        self.reward = deque([], maxlen=self.replay_buffer_size)
        self.next_obs = deque([], maxlen=self.replay_buffer_size)
        self.done = deque([], maxlen=self.replay_buffer_size)


# Q network

In [5]:
class QNetwork(nn.Module):
    def __init__(self,
                 dim_obs: int,
                 dim_action: int,
                 dims_hidden_neurons: Tuple[int] = (64, 64)):
        if not isinstance(dim_obs, int):
            TypeError('dimension of observation must be int')
        if not isinstance(dim_action, int):
            TypeError('dimension of action must be int')
        if not isinstance(dims_hidden_neurons, tuple):
            TypeError('dimensions of hidden neurons must be tuple of int')

        super(QNetwork, self).__init__()
        self.num_layers = len(dims_hidden_neurons)
        self.dim_action = dim_action

        n_neurons = (dim_obs, ) + dims_hidden_neurons + (dim_action, )
        for i, (dim_in, dim_out) in enumerate(zip(n_neurons[:-2], n_neurons[1:-1])):
            layer = nn.Linear(dim_in, dim_out).double()
            torch.nn.init.xavier_uniform_(layer.weight)
            torch.nn.init.zeros_(layer.bias)
            exec('self.layer{} = layer'.format(i + 1))

        self.output = nn.Linear(n_neurons[-2], n_neurons[-1]).double()
        torch.nn.init.xavier_uniform_(self.output.weight)
        torch.nn.init.zeros_(self.output.bias)

    def forward(self, observation: torch.Tensor):
        x = observation.double()
        for i in range(self.num_layers):
            x = eval('torch.tanh(self.layer{}(x))'.format(i + 1))
        return self.output(x)


# DQN agent
The base code are given in this section. The updates of the neural networks are missing and are left out for you to fill. You may refer to the DQN papaer: https://www.cs.toronto.edu/~vmnih/docs/dqn.pdf

In [6]:
class DQN:
    def __init__(self, config):

        torch.manual_seed(config['seed'])

        self.lr = config['lr']  # learning rate
        self.C = config['C']  # copy steps
        self.eps_len = config['eps_len']  # length of epsilon greedy exploration
        self.eps_max = config['eps_max']
        self.eps_min = config['eps_min']
        self.discount = config['discount']  # discount factor
        self.batch_size = config['batch_size']  # mini batch size

        self.dims_hidden_neurons = config['dims_hidden_neurons']
        self.dim_obs = config['dim_obs']
        self.dim_action = config['dim_action']

        self.Q = QNetwork(dim_obs=self.dim_obs,
                          dim_action=self.dim_action,
                          dims_hidden_neurons=self.dims_hidden_neurons)
        self.Q_tar = QNetwork(dim_obs=self.dim_obs,
                              dim_action=self.dim_action,
                              dims_hidden_neurons=self.dims_hidden_neurons)

        self.optimizer_Q = torch.optim.Adam(self.Q.parameters(), lr=self.lr)
        self.training_step = 0

    def update(self, buffer):
        t = buffer.sample(self.batch_size)

        s = t.obs
        a = t.action
        r = t.reward
        sp = t.next_obs
        done = t.done

        self.training_step += 1

        # TODO: perform a single Q network update step. Also update the target Q network every C Q network update steps



    def act_probabilistic(self, observation: torch.Tensor):
        # epsilon greedy:
        first_term = self.eps_max * (self.eps_len - self.training_step) / self.eps_len
        eps = max(first_term, self.eps_min)

        explore = binomial(1, eps)

        if explore == 1:
            a = choice(self.dim_action)
        else:
            self.Q.eval()
            Q = self.Q(observation)
            val, a = torch.max(Q, axis=1)
            a = a.item()
            self.Q.train()
        return a

    def act_deterministic(self, observation: torch.Tensor):
        self.Q.eval()
        Q = self.Q(observation)
        val, a = torch.max(Q, axis=1)
        self.Q.train()
        return a.item()

# Create the environment

In [7]:
env = gym.make('CartPole-v1')
config = {
    'dim_obs': 4,  # Q network input
    'dim_action': 2,  # Q network output
    'dims_hidden_neurons': (64, 64),  # Q network hidden
    'lr': 0.0005,  # learning rate
    'C': 60,  # copy steps
    'discount': 0.99,  # discount factor
    'batch_size': 64,
    'replay_buffer_size': 100000,
    'eps_min': 0.01,
    'eps_max': 1.0,
    'eps_len': 4000,
    'seed': 1,
}


# Create the DQN agent

In [8]:
dqn = DQN(config)
buffer = ReplayBuffer(config)
train_writer = SummaryWriter(log_dir='tensorboard/dqn')

# Start training

In [None]:
steps = 0  # total number of steps
for i_episode in range(500):
    observation = env.reset()
    done = False
    truncated = False
    t = 0  # time steps within each episode
    ret = 0.  # episodic return
    while done is False and truncated is False:
        # env.render()  # render to screen, not working for jupyter

        obs = torch.tensor(env.state)  # observe the environment state

        action = dqn.act_probabilistic(obs[None, :])  # take action

        next_obs, reward, done, info,_ = env.step(action)  # environment advance to next step

        buffer.append_memory(obs=obs,  # put the transition to memory
                             action=torch.from_numpy(np.array([action])),
                             reward=torch.from_numpy(np.array([reward])),
                             next_obs=torch.from_numpy(next_obs),
                             done=done)

        dqn.update(buffer)  # agent learn

        t += 1
        steps += 1
        ret += reward  # update episodic return
        if done or truncated:
            print("Episode {} finished after {} timesteps".format(i_episode, t+1))
        train_writer.add_scalar('Performance/episodic_return', ret, i_episode)  # plot

env.close()
train_writer.close()

# Visualizing

In [None]:
%tensorboard --logdir='tensorboard/dqn'