<a href="https://colab.research.google.com/github/vaghyjuli/RL/blob/main/Joris_DQN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [115]:
## based on https://github.com/DanielPalaio/LunarLander-v2_DeepRL


!pip install gym[box2d]==0.17

import gym
import numpy as np
import pandas as pd
import random



In [116]:
epsilon = 1 # amount of exploration
gamma = .99 # temporal discount
batch_size = 64
min_eps = 0.01
update_rate = 120 # update rate of target network
learning_rate = 0.001
memory_buffer_size= 50000
n_actions = 4
input_dims= 8
num_episodes = 1000

from enum import Enum
class DecayType(Enum):
    EXPONENTIAL = 0
    LINEAR = 1

epsilon_decay_type = DecayType.LINEAR

lin_epsilon_decay_param = 0.001
exp_epsilon_decay_param = 0.99

In [117]:
class MemoryBuffer:
    def __init__(self):
        self.states = np.zeros((memory_buffer_size, 8), np.float64)
        self.actions = np.zeros(memory_buffer_size, np.intc)
        self.rewards = np.zeros(memory_buffer_size, np.float64)
        self.new_states = np.zeros((memory_buffer_size, 8), np.float64)
        self.dones = np.zeros(memory_buffer_size, np.bool_)

        self.head = 0

    def write(self, state, action, reward, new_state, done):
        index = self.head % memory_buffer_size

        self.states[index] = state
        self.actions[index] = action
        self.rewards[index] = reward
        self.new_states[index] = new_state
        self.dones[index] = done

        self.head += 1

    def sample(self):
        sample = np.random.choice(min(self.head, memory_buffer_size), batch_size)

        return (self.states[sample],
                self.actions[sample],
                self.rewards[sample],
                self.new_states[sample],
                self.dones[sample])

In [118]:
import matplotlib.pyplot as plt
import pandas as pd

from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from keras.activations import relu, linear

from tensorflow import keras
import tensorflow as tf

class DQN(tf.keras.Sequential):
  def __init__(self):
    super().__init__()
    self.add(keras.layers.Dense(256, input_dim=8, activation=relu))
    self.add(keras.layers.Dense(256, activation=relu))
    self.add(keras.layers.Dense(n_actions, activation=linear))
    self.compile(loss="mse", optimizer=keras.optimizers.Adam(learning_rate=learning_rate ))

In [119]:
class Agent:
    def __init__(self, env, buffer):
        self.env = env
        self.epsilon = epsilon
        self.step_counter = 0
        self.buffer = buffer
        self.q_net = DQN()
        self.q_target_net = DQN()

    def get_action(self, state):
        if np.random.random() < self.epsilon:
            return np.random.choice(n_actions)
        state = np.array([state])
        action_values = self.q_net(state)
        return np.argmax(action_values)

    def update(self):
        if self.step_counter % update_rate == 0:
            self.q_target_net.set_weights(self.q_net.get_weights())

        state_batch, action_batch, reward_batch, new_state_batch, done_batch = \
            self.buffer.sample()

        q_predicted = self.q_net(state_batch)
        q_next = self.q_target_net(new_state_batch)
        q_max_next = tf.math.reduce_max(q_next, axis=1, keepdims=True).numpy()
        q_target = np.copy(q_predicted)

        for idx in range(done_batch.shape[0]):
            target_q_val = reward_batch[idx]
            if not done_batch[idx]:
                target_q_val += gamma*q_max_next[idx]
            q_target[idx, action_batch[idx]] = target_q_val
        self.q_net.train_on_batch(state_batch, q_target)
        if self.epsilon > min_eps:
            if epsilon_decay_type == DecayType.LINEAR:
                self.epsilon -= lin_epsilon_decay_param
            elif epsilon_decay_type == DecayType.EXPONENTIAL:
                self.epsilon *= exp_epsilon_decay_param
            else:
                print("please choose decay type")
                exit()
        self.step_counter += 1


In [None]:
###
# Main loop
###

env = gym.make("LunarLander-v2")

buffer = MemoryBuffer()
agent = Agent(env, buffer)

episodes = range(num_episodes)
scores, timesteps, epsilons = [], [], []
for i in episodes:
    score = 0
    t = 1
    state = env.reset()
    for t in range(100000):
        action = agent.get_action(state)
        new_state, reward, done, _ = env.step(action)
        score += reward
        buffer.write(state, action, reward, new_state, done)
        state = new_state
        agent.update()
        if done:
            scores.append(score)
            epsilons.append(agent.epsilon)
            timesteps.append(t)
            print("Episode = {}, Score = {}, Avg_Score = {}".format(i, score, np.mean(scores[-100:])))
            break



Episode = 0, Score = -118.78760412210974, Avg_Score = -118.78760412210974
Episode = 1, Score = -220.2165029024912, Avg_Score = -169.50205351230045
Episode = 2, Score = -140.30938490230898, Avg_Score = -159.77116397563663
Episode = 3, Score = -166.83142137170165, Avg_Score = -161.53622832465288


In [None]:
# Plot

import matplotlib.pyplot as plt

kernel_size = 20
kernel = np.ones(kernel_size) / kernel_size
cr = np.convolve(scores, kernel, mode="valid")
tst = np.convolve(timesteps, kernel, mode="valid")
eps = np.convolve(epsilons, kernel, mode="valid")

rew = plt.figure(1)
target = [200 for _ in episodes]
plt.plot(episodes, target, color='red', linewidth=2, linestyle='dashed',
          label='Solved Requirement')
plt.plot(episodes, scores, color="black", linewidth=2, label='Score')
plt.suptitle('DQN')
plt.xlabel('Episode')
plt.ylabel('Cumulative reward')
plt.show()

timest = plt.figure(2)
plt.plot(tst, color="blue")
plt.suptitle('DQN')
plt.xlabel('Episode')
plt.ylabel('Number of time steps')
timest.show()

timest = plt.figure(3)
plt.plot(eps, color="green", label='Epsilon')
plt.suptitle('DQN')
plt.xlabel('Episode')
plt.ylabel('Epsilon')
timest.show()

plt.legend()

In [None]:
###
# Testing
###

scores, timesteps, epsilons = [], [], []
for i in episodes:
    score = 0
    t = 1
    state = env.reset()
    for t in range(100000):
        action = agent.get_action(state)
        new_state, reward, done, _ = env.step(action)
        score += reward
        state = new_state
        if done:
            scores.append(score)
            epsilons.append(agent.epsilon)
            timesteps.append(t)
            print("Episode = {}, Score = {}, Avg_Score = {}".format(i, score, np.mean(scores[-100:])))
            break

# Plot

import matplotlib.pyplot as plt

kernel_size = 20
kernel = np.ones(kernel_size) / kernel_size
cr = np.convolve(scores, kernel, mode="valid")
tst = np.convolve(timesteps, kernel, mode="valid")
eps = np.convolve(epsilons, kernel, mode="valid")

rew = plt.figure(1)
target = [200 for _ in episodes]
plt.plot(episodes, target, color='red', linewidth=2, linestyle='dashed',
          label='Solved Requirement')
plt.plot(episodes, scores, color="black", linewidth=2, label='Score')
plt.suptitle('DQN')
plt.xlabel('Episode')
plt.ylabel('Cumulative reward')
plt.show()

timest = plt.figure(2)
plt.plot(tst, color="blue")
plt.suptitle('DQN')
plt.xlabel('Episode')
plt.ylabel('Number of time steps')
timest.show()

timest = plt.figure(3)
plt.plot(eps, color="green", label='Epsilon')
plt.suptitle('DQN')
plt.xlabel('Episode')
plt.ylabel('Epsilon')
timest.show()

plt.legend()