# Advanced Task - DQN

This file covers the code for the joint advanced tasks of the coursework. We define the same custom grid world enviroment as the basic task, with a maze to be solved in as few steps as possible, but with a few modifications. We implement DQN and two improvements: double and dueling.

In [None]:
import numpy as np 
import random
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
from collections import namedtuple, deque

### The Environment

In [1]:
class Maze:
    
    def __init__(self, N=25):
        """
        This class defines a maze environment in a square 2-dimensional numpy array.
        N: dimensions of array, integer, default 25

        There are penalties placed at every dead end in the maze.
        The agent and door are at randomised positions in the maze.
        In the array, these are represented by:
        0: part of the maze path, 1: the agent, 2: maze wall/obstacle, 4: door
        """
        self.N = N

        # Generating the maze.
        self.grid = np.full((N,N),2)
        self.generate_maze()

        # Placing penalties at every dead end.
        self.penalties_pos = self.generate_penalties()

        for cell in self.penalties_pos:
            self.grid[cell] = 3

        # Adding door and agent at random positions, and getting the current state of the agent.
        self.door_pos = self.get_rand_empty_cells(1)
        self.grid[self.door_pos] = 4

        self.agent_pos = self.get_rand_empty_cells(1)
        self.grid[self.agent_pos] = 1

        self.state = self.get_state(self.agent_pos)

        # Setting a step counter, and the maximum number of steps per episode, which is N squared.
        self.time_elapsed = 0
        self.time_limit = self.N*self.N

        # Defining actions available to the agent.
        self.actions = [('up',-1,0), ('down', 1, 0), ('left', 0, -1), ('right', 0, 1)]

        # Defining the reward and transition matrices for the environment.
        self.reward_matrix, self.transition_matrix = self.get_matrices()

    def generate_maze(self):
        """
        Generate a maze using an iterative depth-first search algorithm. 
        """

        # Making every other cell in the grid a 1, denoting a cell that will be part of the path but hasn't been visuted by the algorithm yet.
            # This means every other cell is currently a wall, and leaves a border of walls around the grid.
        self.grid[1::2, 1::2] = 1

        # Picking a random unvisited cell and making it the current cell
        rand_x = random.randrange(1,self.N,2)
        rand_y = random.randrange(1,self.N,2)
        current_cell = (rand_x, rand_y) 

        self.grid[current_cell] = 0 # Adding the current cell to the maze path.

        cell_stack = [current_cell] # Creating a stack to store cells to be visited by the algorithm.

        # Algorithm runs while there are still cells in the stack to check.
        while len(cell_stack) > 0:
            # Retrieving most recent cell in stack, removing it from stack, and obtaining its unvisited neighbouring path cells.
            current_cell = cell_stack[-1] 
            cell_stack.pop(-1) 
            borders = self.get_nearby_cells(current_cell, dist=2)
            unvisited_borders = [border for border in borders if self.grid[border] == 1]
            # If there are unvisited borders:
            if len(unvisited_borders) > 0:
                cell_stack.append(current_cell) # Add current cell back to stack.
                # Pick bordering cell at random and remove the wall between current cell and that cell, add that cell to stack.
                chosen_cell = random.choice(unvisited_borders) 
                self.grid[(chosen_cell[0] + current_cell[0])//2, (chosen_cell[1] + current_cell[1])//2] = 0
                self.grid[chosen_cell] = 0
                cell_stack.append(chosen_cell)
    
    def generate_penalties(self):
        """
        Return a list of the coordinates of every dead end in the maze.
        """
        penalties_pos = []
        # Iterating through all path cells to check if they are dead ends.
        for i in list(np.argwhere(self.grid == 0)):
            path_cell = tuple(i)
            # Getting bordering cells, and checking which ones are also path cells.
            borders = self.get_nearby_cells(path_cell)
            path_borders = [border for border in borders if self.grid[border] == 0]
            # If there is only one bordering path cell, then it is dead end.
            if len(path_borders) == 1:
                penalties_pos.append(path_cell)
        return penalties_pos

    def get_matrices(self):
        """
        Generate the reward and transition matrices for the environment.

        In the reward matrix, a given value represents the reward that the agent would recieve for taking a given action.
        Reward matrix indices are [current_state, action_index].

        In the transition matrix, a value of 0 means a given action is not allowed, and a 1 means that it is.
        Transition matrix indices are [current_state, action_index, next_state].
 
        Action indices - 0: up 1: down 2: left 3: right
        """
        # Creating arrays to hold matrices - reward is by default -1 for a step, and actions are by default disallowed.
        reward_matrix = -1 * np.ones((self.N*self.N, 4))
        transition_matrix = np.zeros((self.N*self.N, 4, self.N*self.N))

        # Iterating through entire grid.
        for i in range(self.N):

            for j in range(self.N):
                current_cell = (i,j)
                current_cell_type = self.grid[i, j]
                current_state = self.get_state(current_cell)

                for action_index, action_tuple in enumerate(self.actions):
                    next_cell = (i + action_tuple[1], j + action_tuple[2])

                    if self.out_of_bounds(next_cell):
                        next_cell_type = current_cell_type
                        next_state = current_state
                        reward_matrix[current_state, action_index] -= self.N*self.N//4 # Negative reward for going out of bounds.
                        transition_matrix[current_state, action_index, next_state] = 1 # Transition from one state to itself is allowed.
                    else:
                        next_cell_type = self.grid[next_cell]
                        next_state = self.get_state(next_cell)

                    # Transitions to empty path cells, penalties, and the door are allowed.
                    if next_cell_type == 0:
                        transition_matrix[current_state, action_index, next_state] = 1 
                    elif next_cell_type == 2: 
                        reward_matrix[current_state, action_index] -= self.N*self.N//4
                        transition_matrix[current_state, action_index, next_state] = 0
                    elif next_cell_type == 3:
                        reward_matrix[current_state, action_index] -= self.N*self.N//2 # Transitioning into a penality incurs a negative reward.
                        transition_matrix[current_state, action_index, next_state] = 1
                    elif next_cell_type == 4:
                        reward_matrix[current_state, action_index] += self.N*self.N # Transitioning into the door gives a large positive reward.
                        transition_matrix[current_state, action_index, next_state] = 1

        return reward_matrix, transition_matrix 

    def print_reward_matrices(self):
        """
        Display the reward matrices nicely by action.
        """    
        for action_index, action_tuple in enumerate(self.actions):
            print(action_tuple[0])
            print(self.reward_matrix[:, action_index].reshape(self.N, self.N))

    def get_nearby_cells(self, cell, dist=1):
        """
        For a given cell, get the coordinates of all orthogonal cells that are a given distance away.
        dist - distance of desired cells, integer, default 1

        Distance of 1 means returning neighbouring cells.
        """
        cell_up = (cell[0]-dist, cell[1])
        cell_down = (cell[0]+dist, cell[1])
        cell_left = (cell[0], cell[1]-dist)
        cell_right = (cell[0], cell[1]+dist)

        cell_list = [cell_up, cell_down, cell_left, cell_right]

        return [c for c in cell_list if not self.out_of_bounds(c)]

    def out_of_bounds(self, cell):
        """
        Check whether given cell coordinates are out of the bounds of the grid.
        """
        if ((cell[0] < 0) or (cell[0] > self.N-1)) or ((cell[1] < 0) or (cell[1] > self.N-1)):
            return True
        return False

    def get_rand_empty_cells(self, n=1):
        """
        Retreive a random selection of n empty cells from the grid.
        n - integer, default 1

        Returns a list of tuples specifying the coordinates of the selected cells.
        """
        if n == 1:
            zero_cells = np.argwhere(self.grid == 0) # retrieving empty cells
            rand_cell = random.choice(zero_cells)
            rand_cells = tuple(rand_cell)
        else:
            zero_cells = np.argwhere(self.grid == 0) 
            rand_cells = random.choices(zero_cells, k=n)
            rand_cells = [tuple(i) for i in rand_cells]

        return rand_cells

    def get_state(self, cell):
        """
        Return the state number of a given cell.
        States are numbered from 0 to (N*N)-1 (N is the size of the grid),
        starting in the top left and moving left to right along each row to the bottom right.
        """
        state_grid = np.arange(0, self.N*self.N).reshape(self.N, self.N)
        state = state_grid[cell[0], cell[1]]
        return state

    def display(self):
        """
        Display the grid as an image.
        """
        return plt.imshow(self.grid)
    
    def step(self, action_index):
        """
        One step in the grid.
        action_index - the index of the action to be taken, 0: up 1: down 2: left 3: right

        Returns:
        end - boolean, whether or not the episode has ended
        reward - integer, the reward for the taken step
        self.state - integer, the new state of the environment after the step
        """
        # Retreive reward value for action from reward matrix.
        reward = self.reward_matrix[self.state, action_index] 
        
        # Work out new position and state if action were to be taken.
        new_pos = (self.agent_pos[0]+self.actions[action_index][1],
                    self.agent_pos[1]+self.actions[action_index][2]) 
        new_state = self.get_state(new_pos)

        self.time_elapsed += 1
        end = False

        # If transition matrix says action is allowed, do action.
        if self.transition_matrix[self.state, action_index, new_state] == 1:
            self.grid[self.agent_pos] = 0
            self.agent_pos = new_pos
            self.grid[self.agent_pos] = 1
        
        # If agent has reached the door, terminate episode.
        if self.agent_pos == self.door_pos:
            end = True
        
        # If number of steps is over the time limit, end episode.
        if self.time_elapsed > self.time_limit:
            end = True

        self.state = self.get_state(self.agent_pos)

        return end, reward, self.state
    
    def reset(self):
        """
        Reset the grid to its original state, but with the agent in a new random position.
        Maze and door remain unchanged.

        Returns the new state of the environment.
        """
        for cell in self.penalties_pos:
            self.grid[cell] = 3

        self.grid[self.door_pos] = 4
        
        self.grid[self.agent_pos] = 0
        self.agent_pos = self.get_rand_empty_cells(1)
        self.grid[self.agent_pos] = 1

        self.time_elapsed = 0

        self.state = self.get_state(self.agent_pos)

        return self.state

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
class MazeDQN(Maze):

    def __init__(self):
        super().__init__(N=25)
        
    def return_start_state(self):
        '''Return initial state without having to reset the environment'''
        return self.preprocess_state()
    
    def step(self, action_index):
        '''Apply preprocessing to state and reward returned by the step function in the Maze class'''
        end, reward, state = super().step(action_index)
        # Return state in appropriate format for DQN
        input_state = self.preprocess_state()
        # Return reward in appropriate format for DQN
        reward_tensor = torch.FloatTensor([float(reward)], device=device).unsqueeze(0)
        return end, reward_tensor, input_state

    def preprocess_state(self):
        '''Preprocess state to be used as input for DQN'''
        # Calculate coordinates of the agent position relative to the door
        relative_coordinates = np.array([self.door_pos[0] - self.agent_pos[0], self.door_pos[1] - self.agent_pos[1]])
        
        # Pad maze edge with 1s so that taking the surrounding cells of an edge cell does not return an index error
        maze_padded = np.ones( (self.N + 2, self.N + 2), dtype = np.int8)
        maze_padded[1:self.N+1, 1:self.N+1] = self.grid[:,:]
        
        # Take surrounding cells using agent's position
        surroundings = maze_padded[ self.agent_pos[0] - 1: self.agent_pos[0] + 2,
                                     self.agent_pos[1] - 1: self.agent_pos[1] + 2]
        surroundings = surroundings.flatten()
        
        # Preprocess to acquire the state's DQN input format
        DQN_input_state = np.concatenate([relative_coordinates, surroundings])
        DQN_input_state = torch.FloatTensor(DQN_input_state, device=device).unsqueeze(0)                         
        return DQN_input_state

### Defining required models and functions

In [None]:
Experience = namedtuple('Experience', ('state', 'action', 'reward', 'next_state'))

In [None]:
# Inspiration for class from Lab 6
class ExperienceReplayBuffer:

    def __init__(self, N_buffer):
        self.size_lim = N_buffer # Define size limit of buffer
        self.buffer = deque(maxlen=N_buffer) # Initialise buffer as a list

    def store(self, state_tensor, action, reward_tensor, next_state_tensor):
        '''Store an experience'''
        action_tensor = torch.tensor([action], device=device).unsqueeze(0) # convert action to tensor
        experience = Experience(state_tensor, action_tensor, reward_tensor, next_state_tensor) # wrap experience in namedTuple
        self.buffer.append(experience) # append to buffer

    def __len__(self):
        return len(self.buffer)
    
    def sample(self, BATCH_SIZE):
        '''Sample batch from the replay buffer'''
        return random.sample(self.buffer, BATCH_SIZE)
    
    def fill_replay_buffer(self):
        '''Fill replay buffer prior to DQN training'''
        buffer_filled = False
        while not buffer_filled:
            maze = MazeDQN() # define a new randomised maze environment each episode
            state = maze.return_start_state() # return starting state
            end = False # 
            while not end:
                # Obtain action from policy using q values acquired from Q_policy_net
                action_index = policy(state, Q_policy_net) 
                # Take the specified action in an emulator to acquire reward and next state
                end, reward, next_state = maze.step(action_index) 
                if end:
                    next_state = None # Set next_state to terminal state label if the goal has been reached
                self.store(state, action_index, reward, next_state) # Store the experience in the replay buffer
                state = next_state # Update the state
                buffer_filled = self.size_lim == len(self) # Check if the buffer is full
        print('Buffer Filled')

In [None]:
# Class directly taken from Lab 6 code, already being appropriate for our environment
class DQN(nn.Module):

    def __init__(self, input_size, size_hidden, output_size):
        
        super().__init__()
        
        self.fc1 = nn.Linear(input_size, size_hidden)
        self.bn1 = nn.BatchNorm1d(size_hidden)
        
        self.fc2 = nn.Linear(size_hidden, size_hidden)   
        self.bn2 = nn.BatchNorm1d(size_hidden)

        self.fc3 = nn.Linear(size_hidden, size_hidden)  
        self.bn3 = nn.BatchNorm1d(size_hidden)

        self.fc4 = nn.Linear(size_hidden, output_size)
        
        
    def forward(self, x):
        h1 = F.relu(self.bn1(self.fc1(x.float())))
        h2 = F.relu(self.bn2(self.fc2(h1)))
        h3 = F.relu(self.bn3(self.fc3(h2)))
        output = self.fc4(h3.view(h3.size(0), -1))
        return output

In [None]:
# Inspiration from https://towardsdatascience.com/dueling-deep-q-networks-81ffab672751
class DuelingDQN(nn.Module):

    def __init__(self, input_size, size_hidden, output_size):
        super(DuelingDQN, self).__init__()
        self.input_size = input_size
        self.size_hidden = size_hidden
        self.output_size = output_size
        
        self.hidden_layer = nn.Sequential(
            nn.Linear(self.input_size, self.size_hidden),
            nn.ReLU(),
            nn.Linear(self.size_hidden, self.size_hidden),
            nn.ReLU())
        
        self.state_value_stream = nn.Sequential(
            nn.Linear(self.size_hidden, self.size_hidden),
            nn.ReLU(),
            nn.Linear(self.size_hidden, 1))
        
        self.advantage_stream = nn.Sequential(
            nn.Linear(self.size_hidden, self.size_hidden),
            nn.ReLU(),
            nn.Linear(self.size_hidden, self.output_size))
        
        
    def forward(self, x):
        hidden_values = self.hidden_layer(x)
        state_values = self.state_value_stream(hidden_values)
        advantages = self.advantage_stream(hidden_values)
        Q_values = state_values + (advantages - advantages.mean(dim=1).unsqueeze(1))
        return Q_values

In [None]:
class eGreedyDecayPolicy:

    def __init__(self, epsilon = 1, decay = 0.999, min_epsilon = 0.001):
        """
        This class defines an epsilon greedy policy with decay.
        epsilon - float, default 1
        decay - float, default 0.999

        In the case where decay = 1, epsilon will remain constant.
        In this case:
        If epsilon = 1, the policy will always choose an action randomly.
        If epsilon = 0, the policy will always choose greedily. 
        """
        self.eps_current = epsilon
        self.eps_initial = epsilon
        self.min_eps = min_epsilon
        self.decay = decay

    def __call__(self, state, Q_policy_net):

        greedy = random.random() > self.eps_current

        if greedy: # if the policy takes the greedy action
            with torch.no_grad(): # disable gradient computation
                Q_policy_net.eval() # switch to evaluation mode
                
                # Acquire action index from policy network
                action_index = Q_policy_net(state).max(1)[1].view(1, 1).numpy()[0][0]
                
                Q_policy_net.train() # return to training mode
        
        else:
            action_index = random.randint(0, 3) # return random action
        
        return action_index
    
    def update(self):
        self.eps_current = self.eps_current*self.decay # apply decay to epsilon
        if self.eps_current < self.min_eps:
            self.eps_current = self.min_eps # if epsilon has gone below the min epsilon value, set epsilon to the min value

    def reset(self):
        self.eps_current = self.eps_initial # reset epsilon

In [None]:
# function adapted from code in Lab 6
def optimize_DQN():
    
    # (state_tensor, action_tensor, reward_tensor, next_state_tensor)

    batch = replay_buffer.sample(BATCH_SIZE) # acquire batch
    batch = Experience(*zip(*batch)) # make batch experience variables available by variable name
    state_batch = torch.cat(batch.state) # collect states
    action_batch = torch.cat(batch.action) # collect actions
    reward_batch = torch.cat(batch.reward) # collect rewards
    
    # Collect tensor of boolean values based on next state values: True if the state is not terminal, False if it is
    non_final_next_states_mask = torch.tensor(tuple(map(lambda state: state is not None, batch.next_state)), device=device, dtype=torch.bool)
    
    # Collect tensor of next state values for all next states that are not terminal
    non_final_next_states = torch.cat([state for state in batch.next_state if state is not None])
    
    Q_values = Q_policy_net(state_batch).gather(1, action_batch) # compute Q-values through policy network

    next_Q_values = torch.zeros(BATCH_SIZE, device=device) # initialise next state Q-values as tensor of zeros
    # Compute next state Q-values of non-terminal states using Q network, taking the maximum Q-value for each input
    next_Q_values[non_final_next_states_mask] = Q_policy_net(non_final_next_states).max(1)[0].detach()
    next_Q_values = next_Q_values.unsqueeze(1)
    
    # Compute target Q-values
    target_Q_values = reward_batch + (next_Q_values * GAMMA) 
    
    # Compute Mean Squared Error Loss
    loss = F.mse_loss(Q_values, target_Q_values)
    
    optimizer.zero_grad() # set optimiser gradients to 0
    loss.backward() # calculate gradients through a backwards pass
    
    # Avoid gradient clipping
    for param in Q_policy_net.parameters():
        param.grad.data.clamp_(-1, 1)
        
    optimizer.step() # perform parameter update based on stored gradients
    
    return loss

In [None]:
# function adapted from code in Lab 6
def train_DQN():
    policy.reset() # reset policy hyperparameters
    replay_buffer.fill_replay_buffer() # fill buffer
    episode_rewards = []
    for episode in range(NUM_EPISODES):
        maze = MazeDQN() # new maze every episode
        state = maze.return_start_state() # acquire first state of the episode
        end = False # reset episode termination variable
        episode_reward = 0 # reset episode reward

        while not end:
            # Acquire action index based on q-values from the policy network, using specified policy
            action_index = policy(state, Q_policy_net) 
            
            end, reward, next_state = maze.step(action_index) # take the action in an emulator
            
            if end: # if the agent has reached the exit, set the next state to terminal state label
                next_state = None 

            replay_buffer.store(state, action_index, reward, next_state) # store the transition in memory
            state = next_state # update the state
            episode_reward += float(reward) # add step reward to episode reward
            
            # Run a single batch through Double DQN and update the model
            loss = optimize_DQN()
            
        episode_rewards.append(float(episode_reward)) # append episode reward to list of episode rewards
        policy.update() # update policy hyperparameters

        if episode % 100 == 0:

            print('Episode {}: reward : {} epsilon: {} loss: {}'.format(episode, episode_reward, 
                  policy.eps_current, np.round(loss.detach().numpy(), 3)))   
            print('Average of reward for last 100 episodes: {}'.format(sum(episode_rewards[-100:])/100)) 
    print('Model trained')
    return episode_rewards

In [None]:
# function adapted from code in Lab 6
def optimize_doubleDQN():
    
    # (state_tensor, action_tensor, reward_tensor, next_state_tensor)

    batch = replay_buffer.sample(BATCH_SIZE) # acquire batch
    batch = Experience(*zip(*batch)) # make batch experience variables available by variable name
    state_batch = torch.cat(batch.state) # collect states
    action_batch = torch.cat(batch.action) # collect actions
    reward_batch = torch.cat(batch.reward) # collect rewards
    
    # Collect tensor of boolean values based on next state values: True if the state is not terminal, False if it is
    non_final_next_states_mask = torch.tensor(tuple(map(lambda state: state is not None, batch.next_state)), device=device, dtype=torch.bool)
    
    # Collect tensor of next state values for all next states that are not terminal
    non_final_next_states = torch.cat([state for state in batch.next_state if state is not None])
    
    Q_values = Q_policy_net(state_batch).gather(1, action_batch) # compute q-values through policy network

    next_Q_values = torch.zeros(BATCH_SIZE, device=device) # initialise next state q-values as tensor of zeros
    # Compute next state q-values of non-terminal states using target Q network, taking the maximum q-value for each input
    next_Q_values[non_final_next_states_mask] = Q_target_net(non_final_next_states).max(1)[0].detach()
    next_Q_values = next_Q_values.unsqueeze(1)
    
    # Compute target q-values
    target_Q_values = reward_batch + (next_Q_values * GAMMA) 
    
    # Compute Loss
    loss = F.mse_loss(Q_values, target_Q_values)
    
    optimizer.zero_grad() # set optimiser gradients to 0
    loss.backward() # calculate gradients through a backwards pass
    
    # Avoid gradient clipping
    for param in Q_policy_net.parameters():
        param.grad.data.clamp_(-1, 1)
        
    optimizer.step() # perform parameter update based on stored gradients
    
    return loss

In [None]:
# function adapted from code in Lab 6
def train_doubleDQN():
    policy.reset() # reset policy hyperparameters
    replay_buffer.fill_replay_buffer() # fill buffer
    episode_rewards = []
    for episode in range(NUM_EPISODES):
        maze = MazeDQN() # new maze every episode
        state = maze.return_start_state() # acquire first state of the episode
        end = False # reset episode termination variable
        episode_reward = 0 # reset episode reward

        while not end:
            # Acquire action index based on q-values from the policy network, using specified policy
            action_index = policy(state, Q_policy_net) 
            
            end, reward, next_state = maze.step(action_index) # take the action in an emulator
            
            if end: # if the agent has reached the exit, set the next state to terminal state label
                next_state = None 

            replay_buffer.store(state, action_index, reward, next_state) # store the transition in memory
            state = next_state # update the state
            episode_reward += float(reward) # add step reward to episode reward
            
            # Run a single batch through Double DQN and update the model
            loss = optimize_doubleDQN()
            
        episode_rewards.append(float(episode_reward)) # append episode reward to list of episode rewards
        policy.update() # update policy hyperparameters

        # Update the target network, copying all weights and biases in DQN
        if episode % TARGET_UPDATE_FREQ == 0:
            Q_target_net.load_state_dict(Q_policy_net.state_dict())

        if episode % 100 == 0:

            print('Episode {}: reward : {} epsilon: {} loss: {}'.format(episode, episode_reward, 
                  policy.eps_current, np.round(loss.detach().numpy(), 3)))   
            print('Average of reward for last 100 episodes: {}'.format(sum(episode_rewards[-100:])/100)) 
    print('Model trained')
    return episode_rewards


### Running the models

#### DQN

In [None]:
# Defining Q-network and optimiser parameters
INPUT_SIZE = 3*3 + 2
HIDDEN_SIZE = 128
NUM_ACTIONS = 4
ALPHA = 0.01

# Define policy Q-network and optimiser
Q_policy_net = DQN(INPUT_SIZE, HIDDEN_SIZE, NUM_ACTIONS).to(device)
optimizer = optim.SGD(Q_policy_net.parameters(), lr=ALPHA)

In [None]:
# Define DQN training hyperparameters
BUFFER_SIZE = 10000
BATCH_SIZE = 256
GAMMA = 0.5 # Discount
MAZE_SIZE = 25
NUM_EPISODES = 5000

In [None]:
# Define policy
policy = eGreedyDecayPolicy()
# Define buffer
replay_buffer = ExperienceReplayBuffer(BUFFER_SIZE)
# Train DQN
DQN_episode_rewards = train_DQN()

In [None]:
DQN_episode_rewards = np.asarray(DQN_episode_rewards)
np.savetxt('vanilla_DQN_results.csv', DQN_episode_rewards, delimiter=',')

#### Double DQN

In [None]:
# Defining Q-network and optimiser parameters
INPUT_SIZE = 3*3 + 2
HIDDEN_SIZE = 128
NUM_ACTIONS = 4
ALPHA = 0.01

# Define policy Q-network and optimiser
Q_policy_net = DQN(INPUT_SIZE, HIDDEN_SIZE, NUM_ACTIONS).to(device)
optimizer = optim.SGD(Q_policy_net.parameters(), lr=ALPHA)

# Define target Q-network
Q_target_net = DQN(INPUT_SIZE, HIDDEN_SIZE, NUM_ACTIONS).to(device)
# Copy parameter values from policy Q-network
Q_target_net.load_state_dict(Q_policy_net.state_dict())

In [None]:
# Define DQN training hyperparameters
BUFFER_SIZE = 10000
BATCH_SIZE = 256
TARGET_UPDATE_FREQ = 100 # Number of episodes per update of the target DQN parameters
GAMMA = 0.5 # Discount
MAZE_SIZE = 25
NUM_EPISODES = 5000

In [None]:
# Define policy
policy = eGreedyDecayPolicy()
# Define buffer
replay_buffer = ExperienceReplayBuffer(BUFFER_SIZE)
# Train DQN
DDQN_episode_rewards = train_doubleDQN()

In [None]:
DDQN_episode_rewards = np.asarray(DDQN_episode_rewards)
np.savetxt('double_DQN_results.csv', DDQN_episode_rewards, delimiter=',')

#### Dueling Double DQN

In [None]:
# Defining Q-network and optimiser parameters
INPUT_SIZE = 3*3 + 2
HIDDEN_SIZE = 128
NUM_ACTIONS = 4
ALPHA = 0.01

# Define policy Q-network and optimiser
Q_policy_net = DuelingDQN(INPUT_SIZE, HIDDEN_SIZE, NUM_ACTIONS).to(device)
optimizer = optim.SGD(Q_policy_net.parameters(), lr=ALPHA)

# Define target Q-network
Q_target_net = DuelingDQN(INPUT_SIZE, HIDDEN_SIZE, NUM_ACTIONS).to(device)
# Copy parameter values from policy Q-network
Q_target_net.load_state_dict(Q_policy_net.state_dict())

In [None]:
# Define DQN training hyperparameters
BUFFER_SIZE = 10000
BATCH_SIZE = 256
TARGET_UPDATE_FREQ = 100 # Number of episodes per update of the target DQN parameters
GAMMA = 0.5 # Discount
MAZE_SIZE = 25
NUM_EPISODES = 3000

In [None]:
# Define policy
policy = eGreedyDecayPolicy()
# Define buffer
replay_buffer = ExperienceReplayBuffer(BUFFER_SIZE)
# Train DQN with prioritised experience replay
DDDQN_episode_rewards = train_doubleDQN()

In [None]:
DDDQN_episode_rewards = np.asarray(DDDQN_episode_rewards)
np.savetxt('DDDQN_results.csv', DDDQN_episode_rewards, delimiter=',')