#### Deep Reinforcement Learning


A simple DRL model that finds the closest path to the center of an integer plane regardless of where it starts. The allowed actions are left, right, up, down. The positions $(x,y)$ are integer and step = 1 in each direction. A good action receives a +1 reward while a bad action receives a -1 reward.  


In [None]:
import matplotlib.pyplot as plt
import numpy as np
from keras.models import Sequential
from keras.layers import Dense
from collections import deque
from keras.activations import relu, linear
from keras.losses import mean_squared_error
from keras.optimizers import Adam
import random
from datetime import datetime
from keras.models import load_model

In [None]:
class Env:
  def __init__(self, length = 6, width = 6, x = [] ,y = []):
    self.length = length
    self.width = width
    self.x = x
    self.y = y

  def reset(self):
    self.x = random.randint(0, self.length)
    self.y = random.randint(0, self.width)
    return [self.x, self.y]


  def step(self, received_action):

    # previous distance
    prev_distance = np.sqrt((self.x - 3)**2 + ((self.y - 3)**2))

    # received_action is an int variable (0,1,2,3)
    if received_action == 0:
      self.x = max(self.x - 1, 0) # move left
    elif received_action == 1:
      self.x = min(self.x + 1, self.width)  # move right
    elif received_action == 2:
      self.y = max(self.y - 1, 0) # move down
    elif received_action == 3:
      self.y = min(self.y + 1, self.length) # move up

    # next distance
    next_distance = np.sqrt((self.x - 3)**2 + ((self.y - 3)**2))

    if prev_distance > next_distance:
      reward = 1 # good action
    else:
      reward = -1 # bad action

    next_state = [self.x, self.y]

    done = False
    if next_distance <= 0.1:
      done = True
      reward = 5

    info = []

    return next_state, reward, done, info


  def plot(self):
    fig = plt.figure()
    # fig.set_size_inches(self.length, self.width) # Set the size of the figure
    plt.xlim(-0.1, self.width + 0.1)
    plt.ylim(-0.1, self.length + 0.1)
    plt.scatter(self.x, self.y, s=15, c ="blue") # Plot the data
    plt.show() # Show the plot

In [None]:
class DQN:
    def __init__(self, env):
        self.env = env
        self.counter = 0

        # Initialize parameters
        self.density_first_layer = 512
        self.density_second_layer = 256
        self.num_epochs = 3
        self.batch_size = 128
        self.epsilon_min = 0.01

        # epsilon function parameters
        self.epsilon = 0.7
        self.epsilon_decay = 0.995
        self.gamma = 0.99

        # Learning rate
        self.lr = 0.001

        self.rewards_list = []
        self.replay_memory_buffer = deque(maxlen=20000)
        # num_action_space refers to the number of possible actions an agent can take in a given state (here: move up, down, left, right)
        self.num_action_space = 4

        # num_observation_space refers to the set of possible values an agent can observe from the environment at each time step
        self.num_observation_space = 2 # (x,y)

        self.model = self.initialize_model()

    def initialize_model(self):
        model = Sequential()
        model.add(Dense(self.density_first_layer, input_dim=self.num_observation_space, activation=relu))
        model.add(Dense(self.density_second_layer, activation=relu))
        model.add(Dense(self.num_action_space, activation=linear))

        # Compile the model
        model.compile(loss=mean_squared_error, optimizer=Adam(learning_rate=self.lr))
        print(model.summary())
        return model

    def get_action(self, state):
        """The epsilon parameter decides whether we use Q-function to generate the next action or
        a random action"""
        if np.random.rand() < self.epsilon:
            return random.randrange(self.num_action_space)
            # rand_act = random.randrange(self.num_action_space)
            # return [0 if i != rand_act else 1 for i in range(0,4)]

        # Get a list of predictions based on the current state
        predicted_actions = self.model.predict(state)

        # Return the maximum-reward action
        return np.argmax(predicted_actions[0])

    def add_to_replay_memory(self, state, action, reward, next_state, done):
        self.replay_memory_buffer.append((state, action, reward, next_state, done))

    def update_weights(self):

        # Check if we have higher than batch_size actions in the buffer
        if len(self.replay_memory_buffer) < self.batch_size or self.counter != 0:
            return

        # Choose batch of random samples from the memory
        random_sample = self.get_random_sample_from_memory()

        # Get the values from the random batch of samples
        states, actions, rewards, next_states, done_list = self.get_values_from_samples(random_sample)

        # Q-function to estimate the targets using the random batch of next states
        targets = rewards + self.gamma * (np.amax(self.model.predict_on_batch(next_states), axis=1)) * (1 - done_list)

        # Run a prediction on the states in our random sample
        target_vec = self.model.predict_on_batch(states)

        # Create indices for the batch_size
        indexes = np.array([i for i in range(self.batch_size)])

        # The target vector is an array of state predictions
        target_vec[[indexes], [actions]] = targets

        # Train the model with the existing states and target scores and update the weights
        self.model.fit(states, target_vec, epochs=self.num_epochs, verbose=0)

    def get_values_from_samples(self, random_sample):
        states = np.array([i[0] for i in random_sample])
        actions = np.array([i[1] for i in random_sample])
        rewards = np.array([i[2] for i in random_sample])
        next_states = np.array([i[3] for i in random_sample])
        done_list = np.array([i[4] for i in random_sample])
        states = np.squeeze(states)
        next_states = np.squeeze(next_states)
        return np.squeeze(states), actions, rewards, next_states, done_list

    # Get a batch_size sample of previous iterations
    def get_random_sample_from_memory(self):
        random_sample = random.sample(self.replay_memory_buffer, self.batch_size)
        return random_sample

    # Run the keras predict using the current state as input to find the next step
    def predict(self, current_state):
        return self.model.predict(current_state)

    def train(self, num_episodes=2000, can_stop=True):

        frames = []

        for episode in range(num_episodes):

            # state is a vector of (x,y) position
            state = env.reset()
            reward_for_episode = 0
            done = False
            state = np.reshape(state, [1, self.num_observation_space])
            while not done:

                # store the states
                frames.append(state)

                # use epsilon decay to choose the next state
                received_action = self.get_action(state)
                next_state, reward, done, info = env.step(received_action)

                # Reshape the next_state array to match the size of the observation space
                next_state = np.reshape(next_state, [1, self.num_observation_space])

                # Store the experience in replay memory
                self.add_to_replay_memory(state, received_action, reward, next_state, done)

                # add up rewards
                reward_for_episode += reward
                state = next_state
                self.update_counter()

                # update the model
                self.update_weights()

            self.rewards_list.append(reward_for_episode)

            # Decay the epsilon after each experience completion
            if self.epsilon > self.epsilon_min:
                self.epsilon *= self.epsilon_decay

            # Calculate the average reward
            last_rewards_mean = np.mean(self.rewards_list[-100:])

            print(episode, "\t: Episode || Reward: ",reward_for_episode, "\t|| Average Reward: ",last_rewards_mean, "\t epsilon: ", self.epsilon)

            # Once the episode number reaches episode_number, finish the training
            if episode == num_episodes -1:
                self.states_log = frames
                print("DQN Training Complete...")
                break

    def update_counter(self):
        self.counter += 1
        step_size = 2
        self.counter = self.counter % step_size

In [None]:
# Testing 
# x = [i[0][0] for i in model.states_log]
# y = [i[0][1] for i in model.states_log]

# # x = [0, 1, 2, 3, 4]
# # y = [0, 1, 2, 3, 4]
# Env(x = x,y = y).plot()

In [None]:
if __name__=="__main__":
    rewards_list = []

    # Create the Enironment
    env = Env()

    # set the random generator seed
    np.random.seed(21)

    # max number of training episodes
    training_episodes = 1000

    # initialize the Deep-Q Network model
    model = DQN(env)

    # Train the model
    model.train(training_episodes, True)

    now = datetime.now() # current date and time
    date_time = now.strftime("%Y%m%d-%H%M%S")
    # save the model
    model.model.save('/tmp/mymodel-' + date_time + '.h5')

In [None]:
if __name__=="__main__":

    print("Starting Testing of the trained model...")

    it = 0 # number of iterations

    # Run x episodes
    num_test_episode = 1

    # number of test runs
    high_score = 0

    # Create the Enironment
    env = Env()
    rewards_list = []

    # Load the model
    mymodel = load_model("mymodel-20240204-055800.h5")
    done = False
    frames = []

    # Run some test episodes to see how well our model performs
    for test_episode in range(num_test_episode):
        current_state = env.reset()
        num_observation_space = 2
        current_state = np.reshape(current_state, [1, num_observation_space])
        reward_for_episode = 0
        done = False
        while (not done) and (it <= 20):
            frame = current_state
            frames.append(frame)
            it += 1

            selected_action = np.argmax(mymodel.predict(current_state)[0])
            new_state, reward, done, info = env.step(selected_action)
            new_state = np.reshape(new_state, [1, num_observation_space])
            current_state = new_state
            reward_for_episode += reward
        rewards_list.append(reward_for_episode)
        it = 0 # reset
        print(test_episode, "\t: Episode || Reward: ", reward_for_episode)
        frame = current_state
        frames.append(frame)

    rewards_mean = np.mean(rewards_list[-100:])
    print("Average Reward: ", rewards_mean )

In [None]:
# Plotting
x = [i[0][0] for i in frames]
y = [i[0][1] for i in frames]

Env(x = x,y = y).plot()