# Configuration & Tools

In [None]:
import sys
IN_COLAB = "google.colab" in sys.modules

if IN_COLAB:
    !apt install python-opengl
    !apt install ffmpeg
    !apt install xvfb
    !pip install pyvirtualdisplay
    from pyvirtualdisplay import Display
    
    # Start virtual display
    dis = Display(visible=0, size=(600, 400))
    dis.start()

## Import modules

In [None]:
import random
from collections import deque
from typing import Deque, Dict, List, Tuple

import gym
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from IPython.display import clear_output
from torch.distributions import Normal

## Set random seed

In [None]:
if torch.backends.cudnn.enabled:
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

seed = 777
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)

## Replay buffer
Typically, people implement replay buffers with one of the following three data structures:

- collections.deque
- list
- numpy.ndarray

**deque** is very easy to handle once you initialize its maximum length (e.g. deque(maxlen=buffer_size)). However, the indexing operation of deque gets terribly slow as it grows up because it is [internally doubly linked list](https://wiki.python.org/moin/TimeComplexity#collections.deque). On the other hands, **list** is an array, so it is relatively faster than deque when you sample batches at every step. Its amortized cost of Get item is [O(1)](https://wiki.python.org/moin/TimeComplexity#list).

Last but not least, let's see **numpy.ndarray**. numpy.ndarray is even faster than list due to the fact that it is [a homogeneous array of fixed-size items](https://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.html#numpy.ndarray), so you can get the benefits of [locality of reference](https://en.wikipedia.org/wiki/Locality_of_reference), . Whereas list is an array of pointers to objects, even when all of them are of the same type.

Here, we are going to implement a replay buffer using numpy.ndarray.

Reference: 
- [OpenAI spinning-up](https://github.com/openai/spinningup/blob/master/spinup/algos/sac/sac.py#L10)
- [rainbow-is-all-you-need](https://render.githubusercontent.com/view/ipynb?commit=032d11277cf2436853478a69ca5a4aba03202598&enc_url=68747470733a2f2f7261772e67697468756275736572636f6e74656e742e636f6d2f437572742d5061726b2f7261696e626f772d69732d616c6c2d796f752d6e6565642f303332643131323737636632343336383533343738613639636135613461626130333230323539382f30312e64716e2e6970796e62&nwo=Curt-Park%2Frainbow-is-all-you-need&path=01.dqn.ipynb&repository_id=191133946&repository_type=Repository#Replay-buffer)

In [None]:
class ReplayBuffer:
    """A simple numpy replay buffer."""

    def __init__(self, obs_dim: int, size: int, batch_size: int = 32):
        """Initialize."""
        # ==================================== Your Code (Begin) ====================================
        self.buffer_size = size
        self.batch_size = batch_size
        
        self.buffer_obs = np.zeros([size, obs_dim], dtype=np.float32)
        self.buffer_next_obs = np.zeros([size, obs_dim], dtype=np.float32)
        self.buffer_act = np.zeros([size], dtype=np.float32)
        self.buffer_reward = np.zeros([size], dtype=np.float32)
        self.buffer_done = np.zeros([size], dtype=np.float32)

        self.num_exp = 0
        # ==================================== Your Code (End) ====================================


    def store(self,
        obs: np.ndarray,
        act: np.ndarray, 
        rew: float, 
        next_obs: np.ndarray, 
        done: bool,
    ):
        """Store the transition in buffer."""
        # ==================================== Your Code (Begin) ====================================
        index = self.num_exp % self.buffer_size
        
        self.buffer_obs[index] = obs
        self.buffer_next_obs[index] = next_obs
        self.buffer_act[index] = act
        self.buffer_reward[index] = rew
        self.buffer_done[index] = done
        
        self.num_exp += 1
        # ==================================== Your Code (End) ====================================


    def sample_batch(self) -> Dict[str, np.ndarray]:
        """Randomly sample a batch of experiences from memory."""
        # ==================================== Your Code (Begin) ====================================
        if self.num_exp < self.buffer_size:
            idxs = np.random.choice(self.buffer_size, size=self.num_exp, replace=False)
        else:
            idxs = np.random.choice(self.buffer_size, size=self.batch_size, replace=False)
            
        return dict(obs = self.buffer_obs[idxs],
                    actions = self.buffer_act[idxs],
                    next_obs = self.buffer_next_obs[idxs],
                    rewards=self.buffer_reward[idxs],
                    done=self.buffer_done[idxs])
        # ==================================== Your Code (End) ====================================


    def __len__(self) -> int:
        # ==================================== Your Code (Begin) ====================================
        return self.buffer_size
        # ==================================== Your Code (End) ====================================


## OU Noise
**Ornstein-Uhlenbeck** process generates temporally correlated exploration, and it effectively copes with physical control problems of inertia.

$$
dx_t = \theta(\mu - x_t) dt + \sigma dW_t
$$

Reference: 
- [Udacity github](https://github.com/udacity/deep-reinforcement-learning/blob/master/ddpg-pendulum/ddpg_agent.py)
- [Wiki](https://en.wikipedia.org/wiki/Ornstein%E2%80%93Uhlenbeck_process)

In [None]:
class OUNoise:
    """Ornstein-Uhlenbeck process.
    Taken from Udacity deep-reinforcement-learning github repository:
    https://github.com/udacity/deep-reinforcement-learning/blob/master/
    ddpg-pendulum/ddpg_agent.py
    """

    def __init__(
        self, 
        size: int, 
        mu: float = 0.0, 
        theta: float = 0.15, 
        sigma: float = 0.2,
    ):
        """Initialize parameters and noise process."""
        self.state = np.float64(0.0)
        self.mu = mu * np.ones(size)
        self.theta = theta
        self.sigma = sigma
        self.reset()

    def reset(self):
        """Reset the internal state (= noise) to mean (mu)."""
        self.state = copy.copy(self.mu)

    def sample(self) -> np.ndarray:
        """Update internal state and return it as a noise sample."""
        x = self.state
        dx = self.theta * (self.mu - x) + self.sigma * np.array(
            [random.random() for _ in range(len(x))]
        )
        self.state = x + dx
        return self.state

# PPO

- PPO: [J. Schulman et al., "Proximal Policy Optimization Algorithms." arXiv preprint arXiv:1707.06347, 2017.](https://arxiv.org/abs/1707.06347.pdf)
- TRPO: [Schulman, John, et al. "Trust region policy optimization." International conference on machine learning. 2015.](http://proceedings.mlr.press/v37/schulman15.pdf)

There are two kinds of algorithms of PPO: PPO-Penalty and PPO-Clip. Here, we'll implement PPO-clip version.

TRPO computes the gradients with a complex second-order method. On the other hand, PPO tries to solve the problem with a first-order methods that keep new policies close to old. To simplify the surrogate objective, let $r(\theta)$ denote the probability ratio

$$ L^{CPI}(\theta) = \hat {\mathbb{E}}_t \left [ {\pi_\theta(a_t|s_t) \over \pi_{\theta_{old}}(a_t|s_t)} \hat A_t\right] = \hat {\mathbb{E}}_t \left [ r_t(\theta) \hat A_t \right ].$$

The objective is penalized further away from $r_t(\theta)$

$$ L^{CLIP}(\theta)=\hat {\mathbb{E}}_t \left [ \min(r_t(\theta) \hat A_t, \text{clip}(r_t(\theta), 1-\epsilon, 1+\epsilon)\hat A_t) \right ] $$

If the advantage is positive, the objective will increase. As a result, the action becomes more likely. If advantage is negative, the objective will decrease. AS a result, the action becomes less likely.

# DDPG 

[T. P. Lillicrap et al., "Continuous control with deep reinforcement learning." arXiv preprint arXiv:1509.02971, 2015.](https://arxiv.org/pdf/1509.02971.pdf)

Deep Q Network(DQN)([Mnih et al., 2013;2015](https://storage.googleapis.com/deepmind-media/dqn/DQNNaturePaper.pdf)) algorithm is combined advances in deep learning with reinforcement learning. However, while DQN solves problems with high-dimentional observation spaces, it can only handle discrete and low-dimentional action spaces because of using greedy policy. For learning in high-dimentional and continous action spaces, the authors combine the actor-critic approach with insights from the recent success of DQN. Deep DPG(DDPG) is based on the deterministic policy gradient(DPG) algorithm ([Silver et al., 2014](http://proceedings.mlr.press/v32/silver14.pdf)). 

### Deterministic policy gradient
The DPG algorithm maintains a parameterized actor function $\mu(s|\theta^{\mu})$ which specifies the current policy by deterministically mapping states to a specific action. The critic $Q(s, a)$ is learned using the Bellman equation as in Q-learning. The actor is updated by following the applying the chain rule to the expected return from the start distribution $J$ with respect to the actor parameters

$$
\begin{align*}
\nabla_{\theta^{\mu}}J &\approx E_{s_t\sim\rho^\beta} [\nabla_{\theta^{\mu}} Q(s,a|\theta^Q)|_{s=s_t, a=\mu(s_t|\theta^\mu)}] \\
&= E_{s_t\sim\rho^\beta} [\nabla_{a} Q(s,a|\theta^Q)|_{s=s_t, a=\mu(s_t)} \nabla_{\theta^{\mu}} \mu(s|\theta^\mu)|_{s=s_t}]
\end{align*}
$$

### Soft update target network
Since the network $(Q(s,a|\theta^Q)$ being updated is also used in calculating the target value, the Q update is prone to divergence. To avoid this, the authors use **the target network** like DQN, but modified for actor-critic and using **soft target updates**. Target netwokrs is created by copying the actor and critic networks, $Q'(s,a|\theta^{Q'})$ and $\mu'(s|\theta^{\mu`})$ respectively, that are used for calculating the target values. The weights of these target networks are then updated by having them slowly track the learned networks:

$$
\theta' \leftarrow \tau \theta + (1 - \tau)\theta' \ \ \ {with} \ \tau \ll 1.
$$

It greatly improves the stability of learning.

### Exploration for continuous action space
An advantage of offpolicies algorithms such as DDPG is that we can treat the problem of exploration independently from the learning algorithm. The authors construct an exploration policy $\mu'$ by adding noise sampled from a noise process $\mathcal{N}$ to the actor policy

$$
\mu'(s_t) = \mu(s_t|\theta^{\mu}_t) + \mathcal{N}
$$

$\mathcal{N}$ can be chosen to suit the environment. The authors used **Ornstein-Uhlenbeck process** to generate temporally correlated exploration.

# Networks


## PPO Networks

We will use two separated networks for actor and critic respectively. The actor network consists of two fully connected hidden layer with ReLU branched out two fully connected output layers for mean and standard deviation of Gaussian distribution. Pendulum-v0 has only one action which has a range from -2 to 2. In order to fit the range, the actor outputs the mean value with tanh. The result will be scaled in ActionNormalizer class. On the one hand, the critic network has three fully connected layers as two hidden layers (ReLU) and an output layer. One thing to note is that we initialize the last layers' weights and biases as uniformly distributed.

In [None]:
def init_layer_uniform(layer: nn.Linear, init_w: float = 3e-3) -> nn.Linear:
    """Init uniform parameters on the single layer."""
    # ==================================== Your Code (Begin) ====================================
    layer.weight.data.uniform_(-init_w, init_w)
    layer.bias.data.uniform_(-init_w, init_w)
    return layer
    # ==================================== Your Code (End) ====================================


class PPOActor(nn.Module):
    def __init__(
        self, 
        in_dim: int, 
        out_dim: int, 
        log_std_min: int = -20,
        log_std_max: int = 0,
    ):
        """Initialize."""
        super(PPOActor, self).__init__()

        # ==================================== Your Code (Begin) ====================================
        self.log_std_min = log_std_min
        self.log_std_max = log_std_max
        
        self.hidden1 = nn.Sequential(nn.Linear(in_dim, 32),
                                     nn.ReLU())
        
        self.hidden2 = nn.Sequential(nn.Linear(32, 64),
                                     nn.ReLU())
        
        self.mu_layer = nn.Linear(64, out_dim)
        self.mu_layer = init_layer_uniform(self.mu_layer)

        self.log_std_layer = nn.Linear(64, out_dim)
        self.log_std_layer = init_layer_uniform(self.log_std_layer)
        # ==================================== Your Code (End) ====================================


    def forward(self, state: torch.Tensor) -> torch.Tensor:
        """Forward method implementation."""
        
        # ==================================== Your Code (Begin) ====================================
        # return selected action and a normal distribution as output of this module
        x = self.hidden1(state)
        x = self.hidden2(x)
        
        mu = F.tanh(self.mu_layer(x))
        log_std = torch.tanh(self.log_std_layer(x))
        
        # Change log_std from (-1, 1) to (log_std_min, log_std_max)
        log_std = self.log_std_min + (self.log_std_max - self.log_std_min)/2 * (log_std + 1)
        std = torch.exp(log_std)

        distribution = Normal(mu, std)
        action = distribution.sample()
        
        return action, distribution
        # ==================================== Your Code (End) ====================================



class PPOCritic(nn.Module):
    def __init__(self, in_dim: int):
        """Initialize."""
        super(PPOCritic, self).__init__()
        # ==================================== Your Code (Begin) ====================================
        self.in_dim = in_dim
        
        self.hidden1 = nn.Sequential(nn.Linear(in_dim, 32),
                                     nn.ReLU())
        
        self.hidden2 = nn.Sequential(nn.Linear(32, 64),
                                     nn.ReLU())
        
        self.out = nn.Linear(64, 1)
        self.out = init_layer_uniform(self.out)
        # ==================================== Your Code (End) ====================================


    def forward(self, state: torch.Tensor) -> torch.Tensor:
        """Forward method implementation."""
        # ==================================== Your Code (Begin) ====================================
        x = self.hidden1(state)
        x = self.hidden2(x)
        V = self.out(x)
        
        return V
        # ==================================== Your Code (End) ====================================


## DDPG Networks
We are going to use two separated networks for actor and critic. The actor network has three fully connected layers and three non-linearity functions, **ReLU** for hidden layers and **tanh** for the output layer. On the other hand, the critic network has three fully connected layers, but it used two activation functions for hidden layers **ReLU**. Plus, its input sizes of critic network are sum of state sizes and action sizes. One thing to note is that we initialize the final layer's weights and biases so that they are **uniformly distributed.**

In [None]:
class DDPGActor(nn.Module):
    def __init__(
        self, 
        in_dim: int, 
        out_dim: int,
        init_w: float = 3e-3,
    ):
        """Initialize."""
        super(Actor, self).__init__()
        
        # ==================================== Your Code (Begin) ====================================
        # 1. set the hidden layers
        # 2. init hidden layers uniformly     
        self.in_dim = in_dim
        self.out_dim = out_dim
        
        self.hidden1 = nn.Sequential(nn.Linear(in_dim, 32),
                                     nn.ReLU())
        
        self.hidden2 = nn.Sequential(nn.Linear(32, 64),
                                     nn.ReLU())
        
        self.out = nn.Linear(64, 1)
        self.out = init_layer_uniform(self.out)
        # ==================================== Your Code (End) ====================================

    def forward(self, state: torch.Tensor) -> torch.Tensor:
        """Forward method implementation."""
        # ==================================== Your Code (Begin) ====================================
        # use a tanh function as a ativation function for output layer 
        x = self.hidden1(state)
        x = self.hidden2(x)
        action = F.tanh(self.out(x))
        
        return action
        # ==================================== Your Code (End) ====================================
    
    
class DDPGCritic(nn.Module):
    def __init__(
        self, 
        in_dim: int, 
        init_w: float = 3e-3,
    ):
        """Initialize."""
        super(Critic, self).__init__()
        
        # ==================================== Your Code (Begin) ====================================
        # 1. set the hidden layers
        # 2. init hidden layers uniformly    
        self.in_dim = in_dim
        
        self.hidden1 = nn.Sequential(nn.Linear(in_dim, 32),
                                     nn.ReLU())
        
        self.hidden2 = nn.Sequential(nn.Linear(32, 64),
                                     nn.ReLU())
        
        self.out = nn.Linear(64, 1)
        self.out = init_layer_uniform(self.out)
        # ==================================== Your Code (End) ====================================

    def forward(
        self, state: torch.Tensor, action: torch.Tensor
    ) -> torch.Tensor:
        """Forward method implementation."""
        # ==================================== Your Code (Begin) ====================================
        # notice that this value function is Q(s, a)
        x = torch.cat((state, action), dim=-1)
        x = self.hidden1(x)
        x = self.hidden2(x)
        Q = self.out(x)
        
        return Q
        # ==================================== Your Code (End) ====================================


# Agents 

## PPO Agent
Here is a summary of PPOAgent class.

| Method           | Note                                                 |
|---               |---                                                   |
|select_action     | select an action from the input state.               |
|step              | take an action and return the response of the env.   |
|update_model      | update the model by gradient descent.                |
|train             | train the agent during num_frames.                   |
|test              | test the agent (1 episode).                          |
|_plot             | plot the training progresses.                        |


PPO updates the model several times(`epoch`) using the stacked memory. By `ppo_iter` function, It yield the samples of stacked memory by interacting a environment.

In [None]:
def ppo_iter(
    epoch: int,
    mini_batch_size: int,
    states: torch.Tensor,
    actions: torch.Tensor,
    values: torch.Tensor,
    log_probs: torch.Tensor,
    returns: torch.Tensor,
    advantages: torch.Tensor,
):
    """Yield mini-batches."""
    # ==================================== Your Code (Begin) ====================================
    batch_size = states.size(0)
    for _ in range(epoch):
        for _ in range(batch_size // mini_batch_size):
            idxs = np.random.choice(batch_size, mini_batch_size, replace=False)
            yield states[idxs, :], actions[idxs], values[idxs], log_probs[idxs],
            returns[idxs], advantages[idxs]
    # ==================================== Your Code (End) ====================================


In [3]:
import numpy as np

state = np.array([2, 2, 1])
np.expand_dims(state, axis=0).shape

(1, 3)

In [2]:
state.shape

(3,)

In [None]:
class PPOAgent:
    """PPO Agent.
    Attributes:
        env (gym.Env): Gym env for training
        gamma (float): discount factor
        tau (float): lambda of generalized advantage estimation (GAE)
        batch_size (int): batch size for sampling
        epsilon (float): amount of clipping surrogate objective
        epoch (int): the number of update
        rollout_len (int): the number of rollout
        entropy_weight (float): rate of weighting entropy into the loss function
        actor (nn.Module): target actor model to select actions
        critic (nn.Module): critic model to predict state values
        transition (list): temporory storage for the recent transition
        device (torch.device): cpu / gpu
        total_step (int): total step numbers
        is_test (bool): flag to show the current mode (train / test)        
    """

    def __init__(
        self,
        env: gym.Env,
        memory_size: int,
        batch_size: int,
        gamma: float,
        tau: float,
        epsilon: float,
        epoch: int,
        rollout_len: int,
        entropy_weight: float,
    ):
        """Initialize."""
        # ==================================== Your Code (Begin) ====================================
        # 1. set hyperparameters
        self.env = env
        self.batch_size = batch_size
        self.gamma = gamma
        self.tau = tau
        self.epsilon = epsilon
        self.epoch = epoch
        self.rollou_len = rollou_len
        self.entropy_weight = entropy_weight
        
        # 2. check device: cpu/GPU
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        
        # 3. init actor critic networks
        self.obs_dim = self.env.observation_space.shape[0]
        self.action_dim = self.env.action_space.shape[0]
        self.actor = PPOActor(self.obs_dim, self.action_dim).to(self.device)
        self.critic = PPOCritic(self.obs_dim).to(self.device)
        
        # 4. set Optimizer for each network
        self.optim_actor = optim.Adam(self.actor.parameters(), lr=0.001)
        self.optim_critic = optim.Adam(self.critic.parameters(), lr=0.001)
        
        # 5. consider memory for training
        self.buffer = ReplayBuffer(obs_dim = self.obs_dim, size = memory_size, batch_size = self.batch_size)
        
        # 6. set total step counts equal to 1
        self.total_step = 1
        
        # 7. define a mode for train/test
        self.is_test = False
        # ==================================== Your Code (End) ====================================

    
        

    def select_action(self, state: np.ndarray) -> np.ndarray:
        """Select an action from the input state."""
        # ==================================== Your Code (Begin) ====================================
        # 1. select action for train or test mode
        state = torch.from_numpy(state).to(self.device)
        act, dist = self.actor(state)
        action = dist.mean if self.is_test else act
        action = action.detach().cpu().numpy()
        
        next_obs, rew, done = self.step(action)
    
        # 2. if you are in train mode take care of filing considered memory
        if not self.is_test:
            self.buffer.store(state, action, rew, next_obs, done)
        # ==================================== Your Code (End) ====================================


    def step(self, action: np.ndarray) -> Tuple[np.ndarray, np.float64, bool]:
        """Take an action and return the response of the env."""
        # ==================================== Your Code (Begin) ====================================
        next_obs, rew, done, _, _ = self.env.step(act)
        return (next_obs, rew.astype(float64), done)
        # ==================================== Your Code (End) ====================================


    def update_model(
        self, next_state:+ np.ndarray
    ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
        """Update the model by gradient descent."""
        # ==================================== Your Code (Begin) ====================================
        # 1. set device
        device = self.device
        
        # 2. for each step:
        # 3.    calculate ratios
        # 4.    calculate actor_loss
        # 5.    calculate entropy
        # 6.    calculate critic_loss
        # 7.    Train  critic
        # 8.    Train actor
        
        # ==================================== Your Code (End) ====================================

    

    def train(self, num_frames: int, plotting_interval: int = 200):
        """Train the agent."""
        # ==================================== Your Code (Begin) ====================================
        # 1. set the status of trainig
        # 2. Reset environment
        # 3. for number of frames:
        # 4.    select an action
        # 5.    step in environment
        # 6.    update model
        # 7. terminate environment after training is finished
        # ==================================== Your Code (End) ====================================

        

    def test(self):
        """Test the agent."""
        # ==================================== Your Code (Begin) ====================================
        # 1. set the status of trainig
        # 2. Reset environment
        # 3. roll out one episode living in the environment and save frames for getting render
        # ==================================== Your Code (End) ====================================


    def _plot(
        self,
        frame_idx: int,
        scores: List[float],
        actor_losses: List[float],
        critic_losses: List[float],
    ):
        """Plot the training progresses."""
        # ==================================== Your Code (Begin) ====================================
        # 1. define a function for sub plots
        # 2. for each variable plot the specific subplot
        # 3. plot the whole figure
        # ==================================== Your Code (End) ====================================


## DDPG Agent
Here is a summary of DDPGAgent class.

| Method           | Note                                                 |
|---               |---                                                  |
|select_action     | select an action from the input state.               |
|step              | take an action and return the response of the env.   |
|update_model      | update the model by gradient descent.                |
|train             | train the agent during num_frames.                   |
|test              | test the agent (1 episode).                          |
|\_target_soft_update| soft update from the local model to the target model.|
|\_plot              | plot the training progresses.                        |

In [None]:
class DDPGAgent:
    """DDPGAgent interacting with environment.
    
    Attribute:
        env (gym.Env): openAI Gym environment
        actor (nn.Module): target actor model to select actions
        actor_target (nn.Module): actor model to predict next actions
        actor_optimizer (Optimizer): optimizer for training actor
        critic (nn.Module): critic model to predict state values
        critic_target (nn.Module): target critic model to predict state values
        critic_optimizer (Optimizer): optimizer for training critic
        memory (ReplayBuffer): replay memory to store transitions
        batch_size (int): batch size for sampling
        gamma (float): discount factor
        tau (float): parameter for soft target update
        initial_random_steps (int): initial random action steps
        noise (OUNoise): noise generator for exploration
        device (torch.device): cpu / gpu
        transition (list): temporory storage for the recent transition
        total_step (int): total step numbers
        is_test (bool): flag to show the current mode (train / test)
    """
    def __init__(
        self,
        env: gym.Env,
        memory_size: int,
        batch_size: int,
        ou_noise_theta: float,
        ou_noise_sigma: float,
        gamma: float = 0.99,
        tau: float = 5e-3,
        initial_random_steps: int = 1e4,
    ):
        """Initialize."""

        # ==================================== Your Code (Begin) ====================================
        # 1. initialize hyper parameters, reply buffer and environment
        obs_dim = env.observation_space.shape[0]
        act_dim = env.action_space.shape[0]

        self.env = env
        self.memory = ReplayBuffer(obs_dim, memory_size, batch_size)
        self.batch_size = batch_size
        self.gamma = gamma
        self.tau = tau
        self.initial_random_steps = initial_random_steps
        
        # 2. set device
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        
        # 3. set target entropy, log alpha and alpha optimizer
        ###????????###
        
        
        # 4. init actor network
        self.actor = DDPGActor(obs_dim, action_dim).to(self.device)
        self.actor_target = DDPGActor(obs_dim, action_dim).to(self.device)
        self.actor_target.load_state_dict(self.actor.state_dict())

        # 5. init value fuction (value critic)
        self.critic = DDPGCritic(obs_dim + action_dim).to(self.device)
        self.critic_target = DDPGCritic(obs_dim + action_dim).to(self.device)
        self.critic_target.load_state_dict(self.critic.state_dict())
        
        # 6. init OUNoise 
        self.noise = OUNoise(action_dim, theta=ou_noise_theta, sigma=ou_noise_sigma)
        
        # 7. set Optimizers
        self.actor_optim = optim.Adam(self.actor.parameters(), lr=3e-4)
        self.critic_optim = optim.Adam(self.critic.parameters(), lr=1e-3)
        
        # consider stroring transitions in memeory, counting steps and specify train/test mode
        self.transition = list()
        self.total_step = 0
        self.is_test = False
        # ==================================== Your Code (End) ====================================
    
    
    def select_action(self, state: np.ndarray) -> np.ndarray:
        """Select an action from the input state."""
        
        # ==================================== Your Code (Begin) ====================================
        # 1. check if initial random action should be conducted
        if not self.is_test and self.total_step < self.initial_random_steps:
            act = self.env.action_space.sample()
        else:
            act = self.actor(torch.from_numpy(state).to(self.device))
            act = act.detach().cpu().numpy()
        
        # 2. add noise for exploration during training
        if not self.is_test:
            act = act + self.noise.sample()
            act = np.clip(act, -1, 1)
        
        # 3. store transition
        self.transition = [state, act]
        
        # return selected action
        return act
        # ==================================== Your Code (End) ====================================
    
    
    def step(self, action: np.ndarray) -> Tuple[np.ndarray, np.float64, bool]:
        """Take an action and return the response of the env."""
        # ==================================== Your Code (Begin) ====================================
        # step in environment and save transition in memory if you are not in test mode
        next_obs, rew, done, _, _ = self.env.step(action)
        
        if not self.is_test:
            self.transition += [rew, next_obs, done]
            self.memory.store(*self.transition)
    
        return (next_obs, rew, done)
        # ==================================== Your Code (End) ====================================
    
    
    def update_model(self) -> torch.Tensor:
        """Update the model by gradient descent."""
        # ==================================== Your Code (Begin) ====================================
        # 1. set device
        #self.device
        
        # 2. get a batch from memory and calculate the return
        batch = self.memory.sample_batch()
        obs, actions, next_obs, rewards, done = batch.values()
        obs = obs.to(self.device)
        actions = obs.to(self.device)
        next_obs = next_obs.to(self.device)
        rewards = rewards.to(self.device)
        done = done.to(self.device)
        
        # r+(1-d)*gamma*Q_target(s',a')
        return_ = rewards + (1-done) * self.gamma *
                            self.target_critic(next_obs, self.target_actor(next_obs))
        
        # 3. calculate the loss for actor and critic networks
        
        # Critic
        target_y = return_
        y = self.critic(obs, actions) #Q(s,a)
        
        loss_critic = F.mse_loss(y, target_y)
        
        self.critic_optim.zero_grad()
        loss_critic.backward()
        self.critic_optim.step()
        
        # Actor
        loss_actor = -self.critic(obs, self.actor(obs)).mean()
        self.actor_optim.zero_grad()
        loss_actor.backward()
        self.actor_optim.step()
        
        # 4. update target
        self._target_soft_update()
        
        return loss_critic, loss_actor
        # ==================================== Your Code (End) ====================================
    
    def train(self, num_frames: int, plotting_interval: int = 200):
        """Train the agent."""
        # ==================================== Your Code (Begin) ====================================
        # 1. set the status of trainig
        self.is_test = False
        
        # 2. Reset environment
        state = self.env.reset()
        actor_losses = []
        critic_losses = []
        scores = []
        score = 0
        
        # 3. for number of frames:
        for self.total_step in range(num_frames):
            
            # 4. select an action
            act = self.select_action(state)
            
            # 5. step in environment
            next_obs, rew, done = self.step(act)
            score+=rew
            
            if done:
                state = self.env.reset()
                scores.append(score)
                score = 0
            else:
                state = next_obs
            
            # 6. update model
            if self.total_step > self.inital_random_steps:
                actor_loss, critic_loss = self.update_model()
                actor_losses.append(actor_loss)
                critic_losses.append(critic_loss)
            
            # 7. plot the computed variables
            if self.total_step % plotting_interval == 0:
                self._plot(
                    self.total_step, 
                    scores, 
                    actor_losses, 
                    critic_losses,
                )
                
        # 8. terminate environment after training is finished
        self.env.close()
        # ==================================== Your Code (End) ====================================
        
        
    def test(self):
        """Test the agent."""
        # ==================================== Your Code (Begin) ====================================
        # 1. set the status of trainig
        self.is_test = True
        
        # 2. Reset environment
        state = self.env.reset()
        done = False
        frames = []
        total_reward = 0
        
        # 3. roll out one episode living in the environment and save frames for getting render
        while not done:
            act = self.target_actor(torch.from_numpy(state).to(self.device)).detach().cpu().numpy()
            act = np.clip(act, -1, 1)
            frames.append(self.env.render(mode="rgb_array"))
            
            next_state, reward, done = self.step(act)
            total_reward += reward
            
        print("Total Reward: ", total_reward)
        self.env.close()
        
        return frames
        # ==================================== Your Code (End) ====================================
    
    
    def _target_soft_update(self):
        """Soft-update: target = tau*local + (1-tau)*target."""
        # ==================================== Your Code (Begin) ====================================
        self.target_critic.weights.data = self.tau*self.critic.weights.data + 
                                          (1-self.tau) * self.target_critic.weights.data 
        
        self.target_actor.weights.data = self.tau*self.actor.weights.data + 
                                          (1-self.tau) * self.target_actor.weights.data
        # ==================================== Your Code (End) ====================================
    
    
    def _plot(
        self, 
        frame_idx: int, 
        scores: List[float], 
        actor_losses: List[float], 
        critic_losses: List[float], 
    ):
        """Plot the training progresses."""
        # ==================================== Your Code (Begin) ====================================
        def subplot(loc: int, title: str, values: List[float]):
            plt.subplot(loc)
            plt.title(title)
            plt.plot(values)

        subplot_params = [
            (131, f"frame {frame_idx}. score: {np.mean(scores[-10:])}", scores),
            (132, "actor_loss", actor_losses),
            (133, "critic_loss", critic_losses),
        ]
        
        clear_output(True)
        plt.figure(figsize=(30, 5))
        for loc, title, values in subplot_params:
            subplot(loc, title, values)
        plt.show()
        # ==================================== Your Code (End) ====================================

# Environment
*ActionNormalizer* is an action wrapper class to normalize the action values ranged in (-1. 1). Thanks to this class, we can make the agent simply select action values within the zero centered range (-1, 1).

In [None]:
class ActionNormalizer(gym.ActionWrapper):
    """Rescale and relocate the actions."""

    def action(self, action: np.ndarray) -> np.ndarray:
        """Change the range (-1, 1) to (low, high)."""
        # ==================================== Your Code (Begin) ====================================
        return self.low + (self.action_space.high - self.action_space.low)/2 * (action + 1)
        # ==================================== Your Code (End) ====================================

        
    def reverse_action(self, action: np.ndarray) -> np.ndarray:
        """Change the range (low, high) to (-1, 1)."""
        # ==================================== Your Code (Begin) ====================================
        return -1 + 2/(self.action_space.high - self.action_space.low) * (action + self.action_space.low)
        # ==================================== Your Code (End) ====================================


You can see [the code](https://github.com/openai/gym/blob/master/gym/envs/classic_control/pendulum.py) and [configurations](https://github.com/openai/gym/blob/cedecb35e3428985fd4efad738befeb75b9077f1/gym/envs/__init__.py#L81) of Pendulum-v1 from OpenAI's repository.

In [None]:
# environment
env_id = "Pendulum-v1"
env = gym.make(env_id)
env = ActionNormalizer(env)
env.seed(seed)

# Train & Test 

## Initialize

In [None]:
# parameters
num_frames = 50000
memory_size = 20000
batch_size = 128
ou_noise_theta = 1.0
ou_noise_sigma = 0.1
initial_random_steps = 10000

ppo_agent = PPOAgent(
    env,
    gamma = 0.9,
    tau = 0.8,
    batch_size = 64,
    epsilon = 0.2,
    epoch = 64,
    rollout_len = 2048,
    entropy_weight = 0.005
)

ddpg_agent = DDPGAgent(
    env, 
    memory_size, 
    batch_size,
    ou_noise_theta,
    ou_noise_sigma,
    initial_random_steps=initial_random_steps
)

## Train PPO

In [None]:
ppo_agent.train(num_frames)

## Train DDPG

In [None]:
ddpg_agent.train(num_frames)

## Test

In [None]:
# test
if IN_COLAB:
    ppo_agent.env = gym.wrappers.Monitor(ppo_agent.env, "videos", force=True)
    ddpg_agent.env = gym.wrappers.Monitor(ddpg_agent.env, "videos", force=True)
ppo_frames = ppo_agent.test()
ddpg_frames = ddpg_agent.test()

## Render

In [None]:
if IN_COLAB:  # for colab
    import base64
    import glob
    import io
    import os

    from IPython.display import HTML, display

    def ipython_show_video(path: str) -> None:
        """Show a video at `path` within IPython Notebook."""
        if not os.path.isfile(path):
            raise NameError("Cannot access: {}".format(path))

        video = io.open(path, "r+b").read()
        encoded = base64.b64encode(video)

        display(HTML(
            data="""
            <video alt="test" controls>
            <source src="data:video/mp4;base64,{0}" type="video/mp4"/>
            </video>
            """.format(encoded.decode("ascii"))
        ))

    list_of_files = glob.glob("videos/*.mp4")
    latest_file = max(list_of_files, key=os.path.getctime)
    print(latest_file)
    ipython_show_video(latest_file)

else:  # for jupyter
    from matplotlib import animation
    from JSAnimation.IPython_display import display_animation
    from IPython.display import display


    def display_frames_as_gif(frames):
        """Displays a list of frames as a gif, with controls."""
        patch = plt.imshow(frames[0])
        plt.axis('off')

        def animate(i):
            patch.set_data(frames[i])

        anim = animation.FuncAnimation(
            plt.gcf(), animate, frames = len(frames), interval=50
        )
        display(display_animation(anim, default_mode='loop'))


    # display 
    display_frames_as_gif(ppo_frames)
    display_frames_as_gif(ddpg_frames)