In [1]:
!pip install swig
!pip install gym[box2d]
!pip install moviepy
!pip install ffmpeg --upgrade
!pip install moviepy --upgrade

Collecting swig
  Downloading swig-4.2.1-py2.py3-none-manylinux_2_5_x86_64.manylinux1_x86_64.whl (1.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m10.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: swig
Successfully installed swig-4.2.1
Collecting box2d-py==2.3.5 (from gym[box2d])
  Downloading box2d-py-2.3.5.tar.gz (374 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m374.4/374.4 kB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting pygame==2.1.0 (from gym[box2d])
  Downloading pygame-2.1.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (18.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m18.3/18.3 MB[0m [31m50.2 MB/s[0m eta [36m0:00:00[0m
Building wheels for collected packages: box2d-py
  Building wheel for box2d-py (setup.py) ... [?25l[?25hdone
  Created wheel for box2d-py: filename=box2d_py-2.3.5-cp3

In [7]:
import gym
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
import math, random
from itertools import count
from collections import namedtuple, deque
import matplotlib
from IPython import display
from gym.wrappers import RecordVideo


#You can use seed 1 for a nice result
#seed = 1
#random.seed(seed)
#np.random.seed(seed)

# define the Lunar Lander environment
tmp_env = gym.make('LunarLander-v2', render_mode='rgb_array')
env = gym.wrappers.RecordVideo(env = tmp_env, video_folder="/Users/Admin/Desktop", video_length = 0, name_prefix="lunar-agent-video", episode_trigger=lambda eps: eps % 50 == 0)


# check if running in an IPython environment
is_ipython = 'inline' in matplotlib.get_backend()

# set device, dictated by the availability of NVIDIA CUDA
if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

# Define the transition tuple
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))


class DuelingDQN(nn.Module):
    """
        Source for Dueling DQN
        Curt-Park (2024) Notebook on nbviewer, Jupyter Notebook Viewer. Available at: https://nbviewer.org/github/Curt-Park/rainbow-is-all-you-need/blob/master/04.dueling.ipynb (Accessed: 04 May 2024).

        Source for Double DQN
        Curt-Park (2024) Double DQN, GitHub. Available at: https://github.com/Curt-Park/rainbow-is-all-you-need/blob/master/02.double_q.ipynb (Accessed: 04 May 2024).
    """
    def __init__(self, n_observations, n_actions):

        super(DuelingDQN, self).__init__()
         # Defines the first Linear layer of the network.
        # It takes `n_observations` as input size (the dimensionality of the state space)
        # and outputs 256 features to the next layer.
        self.layer1 = nn.Linear(n_observations, 256)

        # The second Linear layer takes 256 input features (from layer1) and
        # also outputs 128 features for consistency and depth of the network.
        self.layer2 = nn.Linear(256, 128)

        # The third Linear layer takes the 128 input features (from layer2) and outputs `n_actions` features, where each feature corresponds to the
        # value (Q-value) of each possible action given the input state to the network.
        # This approximates the action-value function Q(s, a).
        #this takes the prior layer and aims to esitmate and find the single value for the value function of a state
        self.value_head = nn.Linear(128, 1)
        #this aims to take the same information from the prior layer and find the value of the advantage function
        self.advantage_head = nn.Linear(128, n_actions)

    def forward(self, state):
         # Applies a ReLU (Rectified Linear Unit) activation function to the output of the first linear layer. This adds non-linearity to the model,
        #  helping it to learn more complex functions.
        state = F.relu(self.layer1(state))

        # Again, applies a ReLU activation function to the output
        # of the second linear layer. Allows for further complexity.
        state = F.relu(self.layer2(state))

        # The output layer: outputs the raw values for each action directly,
        # without applying any non-linearity (like softmax).
        # These values can be interpreted as action preference scores in the
        # Q-learning context.
        # Compute value and advantage streams
        #find the best value to choose from one of the networks with the aim of finding a value that is state dependant and is action independant
        value = self.value_head(state)
        #find how much better an action is compared to other value is compared with the finding from the value head network
        advantage = self.advantage_head(state)

        # Combine value and advantage to get final Q-values
        #use the above values to find the Q value for this DQN, then we want to ensure that the advantage value isn't too large so that it doens't mess with the overall q values too much
        q_values = value + (advantage - advantage.mean(dim = 1, keepdim = True))
        return q_values


#this replay buffer is here to ensure that the weights in teh network don't become too suspectible to sudden changes during training
class PrioritizedReplayMemory(object):
    """
    Crab&eacute;, G. (2020) How to implement prioritized experience replay for a deep Q-Network, Medium. Available at: https://towardsdatascience.com/how-to-implement-prioritized-experience-replay-for-a-deep-q-network-a710beecd77b (Accessed: 04 May 2024).
    """
    def __init__(self, storage, alpha = 0.6):
         # Create a new replay memory with the specified maximum storage
        self.memory = deque([], maxlen =storage)
        self.alpha = alpha
        self.priorities = deque([], maxlen=storage)

    def push(self, *args):
        # Add a new transition to the memory. If memory is full, oldest entries are dropped
        if self.priorities:
            max_priority = max(self.priorities)
        else:
            max_priority = 1.0

        self.memory.append(Transition(*args))
        self.priorities.append(max_priority)

    def sample(self, batch_size):
        # Randomly sample a batch of transitions from memory
        priorities = np.array(self.priorities)

        #determine the value of the current prioristies by applying the alpha value
        priorities = priorities ** self.alpha

        #determine the probability any given priority will be selected
        probs = priorities / sum(priorities)
        #randomly select a bunch of positions within the memory that is equalivant of the size of the batch
        indices = np.random.choice(len(self.memory), batch_size, p=probs)
        #get values out of the memory so that it can be a sample of the replay buffer that can be used for updating networks
        samples = []
        for indice in indices:
            sample = self.memory[indice]
            samples.append(sample)
        return samples

    def update_priorities(self, indices, priorities):
        #update the priorities within the replay buffer --> this ensures that the most valuable experiences are kept
        for indice, priority in zip(indices, priorities):
            self.priorities[indice] = priority

    def __len__(self):
        # Return the current size of the internal memory
        return len(self.memory)


class Trainer:
    """
    Source for Base DQN Paszke, A. (2024) Reinforcement learning (DQN) tutorial, Reinforcement Learning (DQN) Tutorial - PyTorch Tutorials 2.3.0+cu121 documentation.
    Available at: https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html (Accessed: 04 May 2024).


    Paszke, A. (2024) Reinforcement learning (DQN) tutorial, Reinforcement Learning (DQN) Tutorial - PyTorch Tutorials 2.3.0+cu121 documentation. Available at: https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html (Accessed: 04 May 2024).
    """
    def __init__(self):
        # Initializing training parameters
        self.batch_size = 128 # Number of samples for each training batch
        self.gamma = 0.99 # Discount factor for future rewards
        self.epsilon_start_decay = 0.9  #Starting value of epsilon for the epsilon-greedy action selection
        self.epsilon_end_decay = 0.05 # Final value of epsilon for the epsilon-greedy action selection
        self.epsilon_decay_rate = 1000 # Rate of decay of epsilon
        self.tau = 0.005 # Soft update parameter for target network
        self.lr = 0.001  # Learning rate for the optimizer
        self.num_episodes = 300
        self.steps_done = 0 # Counter for total steps done
        self.episode_durations = [] # List to store duration of each episode
        self.episode_rewards = [] #keep count of the reward to avoid the graph issue we had when migrating to new environment
        self.episode_lengths = [] #ditto

    def setup_neural_networks(self):
        # Initialize environment
        self.env = env
        n_actions = env.action_space.n # Get number of possible actions from environment
        state = self.env.reset() # Reset environment to start state
        #Change to this if the line above isn't working: state,_ = self.env.reset()
        n_observations = len(state) # Get state dimension

         # Initialize policy network and target network with the same architecture and weights
        # part one of the psedudeo solution (apart of the class so it can be accessed everywhere
        self.policy_net = DuelingDQN(n_observations, n_actions).to(device)
        self.target_net = DuelingDQN(n_observations, n_actions).to(device)

        # Copy weights from the policy network to target network so that they start exactly the same
        self.target_net.load_state_dict(self.policy_net.state_dict())

        #set the target network to the evulation mode so that it behaves as intended --> rather than being a bit fucky
        self.target_net.eval()

    def setup_replay_memory(self):
        # Replay memory to store transitions
        self.memory = PrioritizedReplayMemory(50000)  # Initialize replay memory with storage of 50,000

    def setup_optimizer(self):
        # Initialize optimizer for policy network
        self.optimizer = optim.AdamW(self.policy_net.parameters(), lr = self.lr, amsgrad = True)

    def select_action(self, state, eps):
        # Generate a random sample for epsilon-greedy selection
        # Start only using the q values (No more exploration)
        if (eps >= 150):
            with torch.no_grad():
                q_values = self.policy_net(state)
                return q_values.argmax(dim = 1, keepdim = True)
        else:
            # Find the dynamically changing epsilon threshold for epsilon-greedy strategy --> allows for the rate of exploration to decline and improve the agents performance
            eps_threshold = self.epsilon_end_decay + (self.epsilon_start_decay - self.epsilon_end_decay) * math.exp(-1. * self.steps_done / self.epsilon_decay_rate)

            # Increment the number of steps done so far
            self.steps_done += 1

            #this is the choice where the agent will exploit or explore
            if random.random() > eps_threshold:
                #exploit option
                with torch.no_grad():
                    #get the Q values and selext the next best option
                    q_values = self.policy_net(state)
                    return q_values.argmax(dim=1, keepdim=True)
            else:
                #exploration option
                return torch.tensor([[self.env.action_space.sample()]], device=device, dtype=torch.long)

    def huber_loss_optimize_model(self):
        if len(self.memory) < self.batch_size:
            return # Exit if not enough samples

        #get a sample of the memory so that a set of transitions can be further manipulated
        transitions = self.memory.sample(self.batch_size)
        #get the data ready for data manipulation
        batch = Transition(*zip(*transitions))

        #create a mask to filter the states that are the non final states leading up to the end of the episode
        non_final_mask = []
        for elementA in batch.next_state:
          if elementA is not None:
            non_final_mask.append(True)
          else:
            non_final_mask.append(False)
        non_final_mask = torch.tensor(non_final_mask, device = device, dtype = torch.bool)

        #further filter of the non final states based whether it lead to a none None value
        non_final_next_states = []
        for elementB in batch.next_state:
          if elementB is not None:
            non_final_next_states.append(elementB)
        non_final_next_states = torch.cat(non_final_next_states)

        #combine/concatenate the current states, the actions and the rewards for the episode
        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)

        #find the Q value for the current states and action taken from the policy network
        state_action_values = self.policy_net(state_batch).gather(1, action_batch)

        #intialise the next state values
        next_state_values = torch.zeros(self.batch_size, device = device)
        #do not do the gradiant calcluations (this messed with prior iterations and is required code)
        with torch.no_grad():
            #predict actions for the next states using the policy network
            next_state_actions = self.policy_net(non_final_next_states).argmax(1, keepdim=True)
            #choose actions from the Q network from the target network
            next_state_values[non_final_mask] = self.target_net(non_final_next_states).gather(1, next_state_actions).squeeze()

        #Bellman equation for finding the expected Q values
        expected_state_action_values = (next_state_values * self.gamma) + reward_batch

        # Find Huber loss value
        loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))

        #reset the graidents within the optimiser so that training can occur as intended
        self.optimizer.zero_grad()
        #propergate the huber loss through the network --> who doesn't want to change their weight(s)
        loss.backward()
        #clip the gradiant value so avoid the chance that learning gets destabilised
        torch.nn.utils.clip_grad_value_(self.policy_net.parameters(), 100)
        #ensure that the graidants and the networks weights are updated
        self.optimizer.step()

    def update_target_network(self):
        #update target network's weights based on the policy network's weights
        target_net_state_dict = self.target_net.state_dict()
        policy_net_state_dict = self.policy_net.state_dict()
        for key in policy_net_state_dict:

            # Blend weights from policy network into target network
            target_net_state_dict[key] = policy_net_state_dict[key] * self.tau + target_net_state_dict[key] * (1 - self.tau)
        self.target_net.load_state_dict(target_net_state_dict)

    def plot_durations(self, show_result=False):
        plt.figure(2)
        plt.clf()
        rewards_t = torch.tensor(self.episode_rewards, dtype=torch.float)
        if show_result:
            plt.title('Result')
        else:
            plt.title('Training the Agent')
        plt.xlabel('Episode')

        plt.ylabel('Cumulative Reward')
        plt.plot(rewards_t.numpy())
        if len(rewards_t) >= 100:
            means = rewards_t.unfold(0, 100, 1).mean(1).view(-1)
            means = torch.cat((torch.zeros(99), means))
            plt.plot(means.numpy())

        #prevents eye pain
        plt.pause(0.001)
        if is_ipython:
            if not show_result:
                display.display(plt.gcf())
                display.clear_output(wait=True)
            else:
                display.display(plt.gcf())


    def additional_graphs(self):
        plt.figure(figsize=(12, 5))

        plt.subplot(1, 2, 1)
        plt.plot(self.episode_rewards)
        plt.title('Cumulative Reward per Episode')
        plt.xlabel('Episode')
        plt.ylabel('Cumulative Reward')

        plt.subplot(1, 2, 2)
        plt.plot(self.episode_lengths)
        plt.title('Episode Length over Time')
        plt.xlabel('Episode')
        plt.ylabel('Length')

        plt.tight_layout()
        plt.show()

    def train(self):
        """
        Mnih, V., Kavukcuoglu, K., Silver, D., Rusu, A.A., Veness, J., Bellemare, M.G., Graves, A., Riedmiller, M., Fidjeland, A.K., Ostrovski, G. and Petersen, S., 2015. Human-level control through deep reinforcement learning. nature, 518(7540), pp.529-533.
        """
        """
        Simonini, T., 2022. Deep Q-Learning With Space Invaders. Available at: https://huggingface.co/blog/deep-rl-dqn (Accessed 5 May 2024).
        """
        #initialisation phase
        self.setup_replay_memory()
        self.setup_neural_networks()
        self.setup_optimizer()
        #self.env.start_video_recorder()

        # Loop over each episode
        for i_episode in range(self.num_episodes):
            #Below is there to ensure that the recordings for the video is ther as intended
            # Selective rendering
            # NOTE: do not delete
            #if i_episode % 50 == 0:
                #self.env.start_video_recorder()
            #if i_episode % 25 == 0:
                #tmp_env = gym.make('LunarLander-v2', render_mode='human')
                #self.env = gym.wrappers.RecordVideo(env=tmp_env, video_folder="/Desktop/video", name_prefix="test-video", episode_trigger=lambda x: x % 25 == 0)
                #self.env = RecordVideo(self.env, video_folder="./videos", episode_trigger=i_episode, disable_logger=True)

           # else:
                #self.env = gym.make('LunarLander-v2', render_mode=None)

            #initialise the episode
            cumulative_reward = 0
            state = self.env.reset()
            #Change to this if the line above isn't working: state,_ = self.env.reset()
            state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)

             # 1000 long loop for each step in the episode --> hard limit needed for the environment
            for t in range(1000):
                #first classic block similar to tabular Q learning:
                action = self.select_action(state, i_episode) # Select an action based on the current state
                observation, reward, done, _= self.env.step(action.item()) # Perform the action in the environment
                # Change to this if the line above isn't working: observation, reward, done, _, info= self.env.step(action.item())

                cumulative_reward += reward # Add the reward to the cumulative reward

                # Convert reward to tensor and flag if it's the end of the episode
                reward = torch.tensor([reward], device=device)
                if done:
                  next_state = None
                else:
                  next_state = torch.tensor(observation, dtype = torch.float32, device = device).unsqueeze(0)
                #env.render()
                #Second Big difference:
                # Store the transition in the replay memory
                self.memory.push(state, action, next_state, reward)

                #update the target network
                self.update_target_network()

                # optimize the policy wiht Huber loss
                self.huber_loss_optimize_model()

                # Move to the next state to continue the learning process
                state = next_state

                # The episode has been completed
                if done:
                    self.episode_rewards.append(cumulative_reward)
                    self.episode_lengths.append(t + 1)
                    self.episode_durations.append(t + 1)
                    self.plot_durations()
                    break

        print('Complete')
        self.plot_durations(show_result=True) # Show final results as a plot
        plt.ioff() # Turn off interactive plotting -- as i can be jarring
        plt.show() # Show our amazing agent

        #env.close_video_recorder()
        #env.close()


# Train the agent
trainer = Trainer()
trainer.train()
trainer.additional_graphs()

KeyboardInterrupt: 

<Figure size 640x480 with 0 Axes>

In [None]:
#References:

'''
Source for Base DQN
Paszke, A. (2024) Reinforcement learning (DQN) tutorial, Reinforcement Learning (DQN) Tutorial - PyTorch Tutorials 2.3.0+cu121 documentation. Available at: https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html (Accessed: 04 May 2024).
'''
'''
Source for Dueling DQN
Curt-Park (2024) Dueling Network, Jupyter Notebook Viewer. Available at: https://nbviewer.org/github/Curt-Park/rainbow-is-all-you-need/blob/master/04.dueling.ipynb (Accessed: 04 May 2024).
'''
'''
Source for Prioritized Experience Replay
Crab&eacute;, G. (2020) How to implement prioritized experience replay for a deep Q-Network, Medium. Available at: https://towardsdatascience.com/how-to-implement-prioritized-experience-replay-for-a-deep-q-network-a710beecd77b (Accessed: 04 May 2024).
'''
'''
Source for Double DQN
Curt-Park (2024) Double DQN, GitHub. Available at: https://github.com/Curt-Park/rainbow-is-all-you-need/blob/master/02.double_q.ipynb (Accessed: 04 May 2024).
'''
'''
Mnih, V., Kavukcuoglu, K., Silver, D., Rusu, A.A., Veness, J., Bellemare, M.G., Graves, A., Riedmiller, M., Fidjeland, A.K., Ostrovski, G. and Petersen, S., 2015. Human-level control through deep reinforcement learning. nature, 518(7540), pp.529-533.
'''
"""
Simonini, T., 2022. Deep Q-Learning With Space Invaders. Available at: https://huggingface.co/blog/deep-rl-dqn (Accessed 5 May 2024).
"""

SyntaxError: invalid syntax (1836190973.py, line 1)