# Exercises
Introduce Experience Replay and Fixed Target Q-Networks into the DQN algorithm.

## Imports and setup optimization

In [12]:
import numpy as np
import matplotlib.pyplot as plt 
from matplotlib import animation, rc
from  dataclasses import dataclass, field
from typing import List
import collections

import tensorflow as tf
from keras.models import Sequential
from keras.layers import Dense, ReLU
from keras.optimizers import RMSprop

# set up the optimizer
optimizer = RMSprop()

## Brain class

In [4]:
class Brain:
    # initialize the brain of the agent with the following parameters
    # build the model
    def __init__(self, n_states, n_hidden, n_actions, gamma=0.9, r=0.99):
        self.epsilon = 1.0 # initial exploration rate
        self.gamma = gamma # discount factor
        self.r = r # decay rate of epsilon

        self.experiences = collections.deque(maxlen=10000) # store the experiences
        self.batch_size = 32 # mini-batch size

        model = Sequential()
        model.add(Dense(n_hidden, input_shape=(n_states,)))
        model.add(ReLU())
        model.add(Dense(n_hidden))
        model.add(ReLU())
        model.add(Dense(n_actions))
        model.compile(loss='mse', optimizer=optimizer)
        self.model = model

    # train the model with the bellman equation using the experience replay
    def train(self):
        batch_size = min(self.batch_size, len(self.experiences))

        # randomly sample the experiences for the mini-batch
        batch = np.random.choice(self.experiences, batch_size)
        states_batch = np.array([e.state for e in batch])
        next_states_batch = np.array([e.next_state for e in batch])
        actions_batch = np.array([e.action for e in batch])
        rewards_batch = np.array([e.reward for e in batch])
        terminal_batch = np.array([e.terminal for e in batch])

        # target is the reward + discounted Q value of the next state
        q = self.model.predict(states_batch)
        q_next = self.model.predict(next_states_batch, verbose=0)
        t = np.copy(q)

        # target is the reward + discounted Q value of the next state
        for i in range(batch_size):
            if terminal_batch[i]:
                t[i, actions_batch[i]] = rewards_batch[i]
            else:
                t[i, actions_batch[i]] = rewards_batch[i] + self.gamma * np.max(q_next[i])
        
        # train the model
        self._train_on_batch(states_batch, t)

    # _train_on_batch is a wrapper for the keras train_on_batch function
    @tf.function
    def _train_on_batch(self, states_batch, t):
        with tf.GradientTape() as tape:
            # get the loss
            loss = self.model.loss(t, self.model(states_batch))
        
        # get the gradients
        gradients = tape.gradient(loss, self.model.trainable_variables)

        # update the weights
        optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))


    # get the action with the epsilon-greedy policy
    def get_action(self, state):
        q = self.model.predict(state)
        if np.random.rand() < self.epsilon:
            action =  np.random.randint(q.shape[1], size=q.shape[0])
        else:
            action = np.argmax(q, axis=1)
        if self.epsilon > 0.1: # decay the exploration rate
            self.epsilon *= self.r
        return action
    
    # store the experience
    def store_experience(self, state, next_state, action, reward, terminal):
        self.experiences.append(Experience(state, next_state, action, reward, terminal))

## Experience Replay

In [10]:
@dataclass
class Experience:
    state: np.ndarray = np.array([[]])
    next_state: np.ndarray = np.array([[]])
    action: List[int] = field(default_factory=list)
    reward: int = 0
    terminal: bool = False

## Agent class

In [2]:
class Agent:
    # initialize the agent with the following action parameters
    def __init__(self, v_x, v_y_sigma, v_jump, brain, target_brain):
        self.v_x = v_x # velocity in the x direction
        self.v_y_sigma = v_y_sigma # std of the velocity in the y direction
        self.v_jump = v_jump # velocity in the y direction when the agent jumps
        self.brain = brain # the brain of the agent
        self.target_brain = target_brain # the target brain of the agent
        self.reset()

    # reset the agent position and velocity in the y direction
    # initial position is (x, y) = (-1, 0)
    # initial velocity in the y direction is randomly sampled from a normal distribution
    def reset(self):
        self.x = -1. # x position
        self.y = 0. # y position
        self.v_y = self.v_y_sigma * np.random.randn() # y velocity

    # update the target brain
    def update_target_brain(self):
        self.target_brain.model.set_weights(self.brain.model.get_weights())

    # update the agent steate, reward, and brain
    # the reward is 0 if the agent position is y = -1 or y = 1
    # the reward is 1 if the agent position is x = 1
    # if the agent jumps, the velocity in the y direction is updated by the jump velocity
    # if the agent does not jump, the velocity in the y direction is updated by the gravity
    def step(self, g):
        # get the current state
        states = np.array([[self.y, self.v_y]])

        # update the agent position and get the reward and terminal
        self.x += self.v_x
        self.y += self.v_y
        reward = 0
        terminal = False 
        if self.y < -1.0 or self.y > 1.0:
            reward = -1
            terminal = True
        elif self.x > 1.0:
            reward = 1
            terminal = True
        reward = np.array([reward])
        
        # get the action from the target brain and get the next state
        action = self.target_brain.get_action(states)
        if action[0] == 0:
            self.v_y -= g
        else:
            self.v_y = self.v_jump
        next_states = np.array([[self.y, self.v_y]])

        # store the experience for the brain
        self.brain.store_experience(states, next_states, action, reward, terminal)
        
        # train the brain
        self.brain.train()

        # if the episode is terminated, reset the agent
        if terminal:
            self.reset()

        return reward, terminal

## Environment class

In [4]:
class Enviroment:
    # initialize the enviroment with the following parameters
    def __init__(self, g):
        self.g = g

    # advance the agent by one step
    def step(self):
        return self.agent.step(self.g)

## Training

In [6]:
# train the agent and show the animation
# build the q-net brain and target brain
n_states = 2 # number of states
n_hidden = 32 # number of hidden units
n_actions = 2 # number of actions
q_net_brain = Brain(n_states, n_hidden, n_actions, r=0.99)
target_brain = Brain(n_states, n_hidden, n_actions, r=0.99)

# build the agent
v_x = 0.05 # velocity in the x direction
v_y_sigma = 0.1 # std of the velocity in the y direction
v_jump = 0.2 # velocity in the y direction when the agent jumps
agent = Agent(v_x, v_y_sigma, v_jump, q_net_brain, target_brain)

# build the enviroment
g = 0.2 # gravity
enviroment = Enviroment(agent, g)

# episode parameters
n_episodes = 1000 # number of episodes
terminal = False
reward_histroy = []
step_history = []
for episord in range(n_episodes):
    step = 0
    while not terminal:
        step += 1
        # advance the enviroment by one step
        reward, terminal = enviroment.step()

        # end the step if terminal
        if terminal:
            step_history.append(step)
            reward_histroy.append(reward)
            # the target brain updated every 100 steps
            if step % 100 == 0:
                agent.update_target_brain()
            break

## Show the results

In [None]:
x = np.arrange(len(step_history))

# show the reward history
plt.plot(x, reward_histroy)
plt.xlabel('Episode')
plt.ylabel('Reward')
plt.show()

# show the step history
plt.plot(x, step_history)
plt.xlabel('Episode')
plt.ylabel('Step')
plt.show()