# Reinforcement Learning &#x2013; Intro to Deep Q Networks

**Prerequisites**

- Tensorflow, keras
- Reinforcement Learning -- Q learning with continuous state spaces

**Outcomes**

- Be able to use tensorflow to use a neural network to approximate $Q(s, a)$ for continuous spaces $\mathcal{S}$

**References**

- Barto & Sutton book (online by authors [here](http://incompleteideas.net/book/the-book.html)) chapters 9-11

In [None]:
from collections import deque
import random
from typing import List

import keras
import gymnasium as gym
import tensorflow as tf

import numpy as np

## Review: CartPole

- We previously studied the cart pole problem:
    - Pole fastened to a cart, but can freely rotate
    - Pole starts vertical, but with some angular velocity
    - Goal: move cart left and right to keep pole vertical
    - $\mathcal{S} = \{\text{ cart position}, \text{cart velocity}, \text{ pole angle}, \text{ pole angular velocity} \} \subset \mathbb{R}^4$
    - $\mathcal{A}(s) = \{\text{ left, right }\}\; \forall s$
- Need $Q$ to generalize between observations from continuous $\mathcal{S}$
- Used complete polynomial to represent $Q$ and obtained about 120/200 tiem steps (random 30/120)
- Need more flexible method for approximating $Q$...

### Objective: DQN

- The objective for this lecture will be to use a MLP for approximating $Q$
- A few key concepts:
    - Will represent $Q(s): \mathcal{S} -> \mathbb{R}^{|\mathcal{A}|}$
    - To support mini-batch training (and other reasons we'll learn about in another lecture 😉) we will use *experience replay*
- Experience replay:
    - Store $(s, a, r, s')$ transitions in a memory bank of fixed size
    - As new transitions are added, "forget" oldest transitions if memory full
    - When training, sample randomly from current memory bank to form batch

## DQN

- Below we implement a Deep Q Network -- or a Q learning agent that uses a deep neural network for representing Q

In [28]:
class DQN(object):
    def __init__(
            self,
            environment,
            optimizer=keras.optimizers.Adam(learning_rate=0.01),
            loss=keras.losses.mean_squared_error,
            hidden_layer_sizes: List[int] = [24, 24],
            batch_size: int = 64,
            memory_size: int = 5_000,
            epsilon: float=0.9,
            beta: float=0.5
        ):
        # check that environment is what we think it is
        self.env = environment
        self.Ns = self.env.observation_space.shape[0]

        assert isinstance(self.env.action_space, gym.spaces.Discrete)
        self.Na = self.env.action_space.n
        self.A_s = np.arange(self.Na)

        # set up Q function
        self.Q = keras.Sequential(
            [keras.layers.InputLayer((self.Ns,))] +
            [keras.layers.Dense(n, activation="relu") for n in hidden_layer_sizes] +
            [keras.layers.Dense(
                self.Na, activation="linear",
                kernel_initializer=tf.keras.initializers.RandomUniform(
                    minval=-0.03, maxval=0.03
                ),
                bias_initializer=tf.keras.initializers.Constant(-0.2)
            )]
        )
        self.Q.compile(loss=loss, optimizer=optimizer)
        self.optimizer = optimizer
        self.loss = loss

        # set up memory
        self.memory = deque(maxlen=memory_size)
        self.batch_size = batch_size

        # store hyper parameters
        self.epsilon = epsilon
        self.beta = beta

    def get_greedy(self, s):
        assert s.shape[0] == 1
        Q_s = self.Q.predict(s, verbose=0)[0]
        max_val = max(Q_s)
        return random.choice(self.A_s[Q_s == max_val])

    def remember(self, s, a, r, sp, done):
        self.memory.append((s, a, r, sp, done))

    def act(self, s):
        if random.random() > self.epsilon:
            return self.env.action_space.sample()
        return self.get_greedy(s)

    def learn_replay(self):
        if len(self.memory) < self.batch_size:
            # not enough memory yet...
            return

        # sample a batch
        batch = random.sample(self.memory, self.batch_size)
        sarsd = list(zip(*batch))

        # reconstruct s, a, r, s' arrays
        s = keras.ops.concatenate(sarsd[0], axis=0)
        a = np.row_stack(sarsd[1])[:, 0]
        r = np.row_stack(sarsd[2])[:, 0]
        sp = np.concatenate(sarsd[3])

        # compute temporal difference target using greedy policy
        td_target = r + self.beta * tf.reduce_max(self.Q.predict(sp, verbose=0), axis=1)

        # apply one hot encoding for easy application of `a` below
        a_hot = keras.ops.one_hot(a, self.Na)

        # compute the loss between current Q(s, a) and the targets
        with tf.GradientTape() as tape:
            Q_s = self.Q(s)
            Q_sa = tf.reduce_sum(Q_s * a_hot, axis=1)
            l = self.loss(td_target, Q_sa)

        # backprop -- compute and then allow optimizer to apply gradients
        grads = tape.gradient(l, self.Q.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.Q.trainable_variables))

### Setup

- Below we set random seeds, create env, optimizer, and agent

In [29]:
tf.random.set_seed(42)
random.seed(42)
np.random.seed(42)
env = gym.make("CartPole-v1")
optimizer = keras.optimizers.Adam(learning_rate = 0.01, )
agent = DQN(env, optimizer, epsilon=0.9, beta=0.8, batch_size=64)

In [30]:
agent.Q.summary()

### Training DQN

- Below we have a routine for training our DQN
- Notice that we need to make sure that the `s` array is [1, Ns] before handing to tensorflow
    - Tensorflow expects first dimension to be for batch size and subsequent dimensions to be for data

In [31]:
def train(dqn, N_episodes=100):
    for episode in range(N_episodes):
        s_, _ = dqn.env.reset()
        s = s_[None, :]
        step = 0

        while True:
            step += 1
            a = dqn.act(s)
            sp, r, done, _truncated, _ = env.step(a)
            sp = sp[None, :]
            r = r if (not done or step >= 200) else -r  # penalize learner when fails

            dqn.remember(s, a, r, sp, done)
            if done:
                # if episode % 100 == 0:
                print(f"episode: {episode}, steps: {step}")
                break

            # step forward in time
            s = sp

            # learn!
            dqn.learn_replay()

In [32]:
train(agent, 50)

episode: 0, steps: 9
episode: 1, steps: 9
episode: 2, steps: 10
episode: 3, steps: 10
episode: 4, steps: 9
episode: 5, steps: 10


  a = np.row_stack(sarsd[1])[:, 0]
  r = np.row_stack(sarsd[2])[:, 0]


episode: 6, steps: 8
episode: 7, steps: 10
episode: 8, steps: 11
episode: 9, steps: 10
episode: 10, steps: 9
episode: 11, steps: 11
episode: 12, steps: 26
episode: 13, steps: 18
episode: 14, steps: 13
episode: 15, steps: 12
episode: 16, steps: 18
episode: 17, steps: 40
episode: 18, steps: 24
episode: 19, steps: 20
episode: 20, steps: 59
episode: 21, steps: 41
episode: 22, steps: 56
episode: 23, steps: 51
episode: 24, steps: 61
episode: 25, steps: 97
episode: 26, steps: 107
episode: 27, steps: 85
episode: 28, steps: 86
episode: 29, steps: 67
episode: 30, steps: 99
episode: 31, steps: 115
episode: 32, steps: 121
episode: 33, steps: 102
episode: 34, steps: 89
episode: 35, steps: 86
episode: 36, steps: 73
episode: 37, steps: 112
episode: 38, steps: 122
episode: 39, steps: 73
episode: 40, steps: 129
episode: 41, steps: 121
episode: 42, steps: 83
episode: 43, steps: 121
episode: 44, steps: 121
episode: 45, steps: 107
episode: 46, steps: 110
episode: 47, steps: 126
episode: 48, steps: 115
epi

- After only 50 episodes our agent is regularly achieving the full 200 steps
    - We chose $\epsilon=0.9$, so we are forcing the agent to make random decisions 10% of the time
    - If we evaluate in greedy mode, we would expect to see perfect scores more often
- The added flexibility and generalization power we get from the MLP (relative to complete polynomial) is sufficient to succesfully complete this task!
- We'll learn more about DQN and its recent exciting applications in another lecture