# Deep Q-Learning for Lunar Landing

## Part 0 - Installing the required packages and importing the libraries

### Installing Gymnasium

In [20]:
!pip install gymnasium # install the library
!pip install "gymnasium[atari, accept-rom-license]" # install the atari envieronment support and accept the ROM license which are required.
!apt-get install -y swig # using apt-get package managr install swig which is dependancy of Gym enveriornments. -y Automatically answers yes to any prompts during installation. apt-get is used in Linux distributions like Ubuntu to handle the installation, removal, and management of software packages. We don't use pip install to install swig because swig is not a Python package; it's a software tool used to connect C/C++ programs with various high-level programming languages like Python
!pip install gymnasium[box2d] # installs support for box2d environments. which include 2D physics-based simulations like LunarLander

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
swig is already the newest version (4.0.2-1ubuntu1).
0 upgraded, 0 newly installed, 0 to remove and 49 not upgraded.


### Importing the libraries

In [21]:
import os # for opoerating system
import random # use for random parameters
import numpy as np # work with array and mathmatics
import torch # we will build and train AI using Pytorch
import torch.nn as nn # imported neural network module from the torch library
import torch.optim as optim # importing the optim module. provides a variety of optimization algorithms, adjust the parameters of a model to minimize the loss function during training such as optim.SGD (stochastic gradient descent), .Adam and .RMSprop
import torch.nn.functional as F # imports the functional sub module. provides functions for commonly used operations in neural networks, such as activation functions, loss functions, and more.

# In PyTorch, tensors are the central data structure / a multi-dimensional array and are very similar to NumPy arrays but with additional capabilities
# A scalar is a 0-dimensional tensor (a single number).
# A vector is a 1-dimensional tensor (a list of numbers).
# A matrix is a 2-dimensional tensor (a grid of numbers).
# Higher-dimensional tensors (3D, 4D, etc.) are used for more complex data.

# import torch
# Creating a 1D tensor (vector)
# vec = torch.tensor([1, 2, 3])
# Creating a 2D tensor (matrix)
# mat = torch.tensor([[1, 2], [3, 4]])
# Creating a 3D tensor
# tensor_3d = torch.tensor([[[1, 2], [3, 4]], [[5, 6], [7, 8]]])

import torch.autograd as autograd  # autograd module provides automatic differentiation for all operations on tensors. It computes gradients, which are essential for training neural networks using backpropagation.
from torch.autograd import Variable # Variable class tracked operations on tensors and stored a computational graph. This graph was used during the backward pass to compute gradients.
from collections import deque, namedtuple # imports deque and namedtuple from Python's built-in collections module. deque: A double-ended queue that supports adding and removing elements from both ends. namedtuple: A factory function for creating tuple subclasses with named fields.

## Part 1 - Building the AI

### Creating the architecture of the Neural Network

In [22]:
class Network(nn.Module): # Module is base class for all neural networks in PyTorch

# state size is the number of inputs to the network. action_size: number of possible actions the agent can take. seed: A random seed for reproducibility.

  def __init__(self, state_size, action_size, seed = 42):
      super(Network, self).__init__()
      self.seed = torch.manual_seed(seed) # activate the seed
      self.fc1 = nn.Linear(state_size, 64) # first fully connected layer. nn.Linear applies a linear transformation to the input data. two parameter - number of input and output of layer. 64 is experimental
      self.fc2 = nn.Linear(64, 64)
      self.fc3 = nn.Linear(64, action_size)

  def forward(self, state):
    x = self.fc1(state)
    x = F.relu(x)
    x = self.fc2(x)
    x = F.relu(x)
    return self.fc3(x)

## Part 2 - Training the AI

### Setting up the environment

In [23]:
import gymnasium as gym
env = gym.make("LunarLander-v3")

state_shape = env.observation_space.shape # vector
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

In [24]:
print(state_shape)
print(state_size)
print(action_size)

(8,)
8
4


### Initializing the hyperparameters

In [25]:
# all values are decided after lot of experiments.
learning_rate = 5e-4 # five multiply 10 to the power of -4 = 0.0005. how much to adjust the model's weights during training. A smaller learning rate means smaller updates, leading to more stable but slower learning
minibatch_size = 100 # number of observations used in 1 step of training to update the model parameters. In mini-batch gradient descent, instead of using the entire dataset or a single data point for each update, a mini-batch of data points is used
discount_factor = 0.99 # also called gamma. determines the importance of future rewards in the agent's decisions. close to 1 means the agent will consider future rewards more heavily, making it more "far-sighted." A value close to 0 makes the agent more "short-sighted," focusing more on immediate rewards.
replay_buffer_size = int(1e5) # memory of AI. stores past experiences including state, action, reward, next state and whether done or not.
interpolation_perameter = 1e-3 # tau

### Implementing Experience Replay

In [26]:
class ReplayMemory(object):

  def __init__(self, capacity):
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # use GPU if available
    self.capacity = capacity
    self.memory = [] # list to store experiences

  # Add a new experience to the memory.
  def push(self, event): # event: An experience tuple (state, action, reward, next_state, done).
    self.memory.append(event)
    if len(self.memory) > self.capacity:
      del self.memory[0]

  #  Randomly sample a batch of experiences
  def sample(self, batch_size):  # batch_size is The number of experiences to sample.
    experiences = random.sample(self.memory, k = batch_size) # contain smapled experiences in batch. sample(), 1st arg is from where to sample, 2nd is number to sample
    # extract and convert into tensors.
    # np.vstach extract data vertically.
    states = torch.from_numpy(np.vstack([e[0] for e in experiences if e is not None])).float().to(self.device)
    actions = torch.from_numpy(np.vstack([e[1] for e in experiences if e is not None])).long().to(self.device)
    rewards = torch.from_numpy(np.vstack([e[2] for e in experiences if e is not None])).float().to(self.device)
    next_states = torch.from_numpy(np.vstack([e[3] for e in experiences if e is not None])).float().to(self.device)
    dones = torch.from_numpy(np.vstack([e[4] for e in experiences if e is not None]).astype(np.uint8)).float().to(self.device) # uint8 is used to convet into boolean.
    return states, next_states, actions, rewards, dones

### Implementing the DQN class

In [40]:
class Agent():

  def __init__(self, state_size, action_size):
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    self.state_size = state_size
    self.action_size = action_size
    # predicts Q-values for the agent's actions in the current state.
    self.local_Qnetwork = Network(state_size, action_size).to(self.device)
    # provides stable Q-value targets for training. periodically updated to match the local Q-network.
    self.target_Qnetwork = Network(state_size, action_size).to(self.device)
    self.optimizer = optim.Adam(self.local_Qnetwork.parameters(), lr = learning_rate)
    self.memory = ReplayMemory(replay_buffer_size)
    self.t_step = 0 # tracks the number of steps taken by the agent. used to control when the agent will learn from experiences and update the Q-network parameters.

    # store experience
  def step(self, state, action, reward, next_state, done):
    self.memory.push((state, action, reward, next_state, done))
    self.t_step = (self.t_step + 1) % 4  # learn after every 4 step
    if self.t_step == 0:
      if len(self.memory.memory) > minibatch_size:
        experiences = self.memory.sample(100)
        self.learn(experiences, discount_factor)

  # select action based on a state
  def act(self, state, epsilon = 0.): # if epsilon is 0.1. means out of 10, 9 times agent will select the best Q value action but 1 time it will select random. A parameter that controls the exploration vs. exploitation trade-off.
    state = torch.from_numpy(state).float().unsqueeze(0).to(self.device) # extra dimmension, from which batch the state belong.unsqueeze(0): Adds a new dimension at position 0
    self.local_Qnetwork.eval() # Puts the local Q-network in evaluation mode
    with torch.no_grad(): # maTemporarily disables gradient calculation. Since this is a forward pass to select an action, there's no need to compute gradients, which saves memory and computation.
      action_values = self.local_Qnetwork(state) # Passes the state through the local Q-network to get the action values (predicted Q-values)
    self.local_Qnetwork.train() #resets the network back to training mode after the evaluation step.
    if random.random() > epsilon: # random.random(): Generates a random float between 0 and 1.
      return np.argmax(action_values.cpu().data.numpy()) # if exploiting, the agent selects the action with the highest Q-value. argmax() set largest value to 1 and others to 0
    else:
      return random.choice(np.arange(self.action_size)) # If exploring, the agent randomly selects one of the possible actions.

  # updates the local Q-network based on a batch of experiences from the replay memory.
  def learn(self, experiences, discount_factor):
    states, next_states, actions, rewards, dones = experiences # Unpacks the batch of experiences into separate variables.
    next_q_targets = self.target_Qnetwork(next_states).detach().max(1)[0].unsqueeze(1) # Computes the target Q-values for the next states using the target Q-network. self.target_Qnetwork(next_states): Passes the next states through the target Q-network to predict Q-values. detach(): Detaches the tensor from the computation graph to prevent backpropagation through the target network. max(1)[0]: Selects the maximum Q-value across the actions (axis 1) for each state, corresponding to the best action. nsqueeze(1): Adds a dimension to align with the expected shape for further computation
    q_targets = rewards + (discount_factor * next_q_targets * (1 - dones)) # Computes the target Q-values using the Bellman equation. rewards: Immediate rewards from taking the actions. discount_factor: Factor to discount future rewards, balancing immediate vs. future rewards. next_q_targets: Maximum Q-value for the next state, representing the best future action's value. (1 - dones): Ensures the Q-value for terminal states is only the immediate reward, as there are no future states to consider.
    q_expected = self.local_Qnetwork(states).gather(1, actions) # Computes the Q-values for the actions taken in the current states using the local Q-network. self.local_Qnetwork(states): Passes the current states through the local Q-network to get Q-values for all actions. gather(1, actions): Selects the Q-values corresponding to the actions taken, using the indices provided by actions.
    loss = F.mse_loss(q_expected, q_targets)
    self.optimizer.zero_grad() # Clears the gradients of the local Q-network to prevent accumulation from previous backpropagation steps.
    loss.backward()
    self.optimizer.step()
    self.soft_update(self.local_Qnetwork, self.target_Qnetwork, interpolation_perameter) # Soft updates the target Q-network’s parameters towards the local Q-network’s parameters using the interpolation parameter.

  # method is used to gradually update the target network's parameters using the parameters from the local network. This approach helps to stabilize the learning process by ensuring that the target network's parameters change slowly over time, avoiding the instability issues that arise from updating it too frequently.
  def soft_update(self, local_model, target_model, interpolation_perameter):
    # loop over the parameter of local and target Q network
    for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
      target_param.data.copy_(interpolation_perameter * local_param.data + (1.0 - interpolation_perameter) * target_param.data)

### Initializing the DQN agent

In [41]:
agent = Agent(state_size, action_size)

### Training the DQN agent

In [45]:
number_episodes = 2000 # maximum number of episodes over which the agent will be trained. Each episode represents one full iteration of the environment, from the initial state until the agent either reaches a terminal state (like landing or crashing in the Lunar Lander)
timesteps = 1000 # maximum number of timesteps (or actions) the agent can take in a single episode. If the agent doesn't reach a terminal state within these timesteps, the episode will end. This prevents episodes from running indefinitely.
epsilon_initial = 1.0 #  Epsilon determines the probability of taking a random action (exploration) versus taking the action that maximizes the predicted Q-value (exploitation). Starting with epsilon at 1.0 means the agent will explore completely randomly at the beginning.
epsilon_final = 0.01 # The minimum value to which epsilon can decay.
epsilon_decay = 0.995 # After each episode, epsilon is multiplied by this factor to slowly decrease the exploration rate, moving the agent from exploration to exploitation over time
epsilon = epsilon_initial #  Initializes the current epsilon value to the starting value
score_100_episodes = deque(maxlen = 100) #  A deque (double-ended queue) with a maximum length of 100, used to store the scores from the last 100 episodes. helping to track the agent's performance over time

for episode in range(1, number_episodes + 1):
  # first step in reinforcement learning is always reset environment to its initial state at beginning of each episode.
  state, _ = env.reset() # returns the initial state. The underscore _ is used to ignore additional information
  score = 0 # cummalative reward.
  for t in range(timesteps): # inner loop iterates up to a maximum of timesteps per episode
    action = agent.act(state, epsilon) #  agent selects an action
    next_state, reward, done, _, _ = env.step(action) # The agent takes the chosen action in the environment using env.step(action). next_state: The next state of the environment after taking the action. reward: The reward received from the environment for taking the action. done: A boolean indicating whether the episode has ended (either by reaching a terminal state or reaching the time limit). the underscores _ ignore additional values
    agent.step(state, action, reward, next_state, done) # The agent records the experience (state, action, reward, next state, done) in its replay memory
    state = next_state # The state is updated
    score += reward # The cumulative reward (or score) for the current episode is updated
    if done: # If done is True, the episode is terminated early, and the loop breaks out of the timestep loop.
      break
  score_100_episodes.append(score) # the total score for that episode is added to the deque
  epsilon = max(epsilon_final, epsilon_decay * epsilon) # updates the epsilon
  # printing training results. dynamic print. get average score of each episode in dynmaic way. episode by episode we will see the average reward with overriding effect, each line print will be removed to place the next one.
  print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(score_100_episodes)), end = "") # prints the current episode number and the average score over the last 100 episodes. The \r is a carriage return, which moves the cursor back to the beginning of the line, allowing the new output to overwrite the previous one. In the loop, this updates the console dynamically with the latest average score, providing a real-time view of the agent's training performance.
  if episode % 100 == 0: # Every 100 episodes, prints the episode number and the average score on a new line
    print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(score_100_episodes)))
  if np.mean(score_100_episodes) >= 200.0: # checks if the average score over the last 100 episodes has reached or exceeded 200.0
    print('\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(episode - 100, np.mean(score_100_episodes))) # prints a congratulatory message indicating that the environment was solved, along with the episode in which this happened and the average score.
  # save model parameters:
    torch.save(agent.local_Qnetwork.state_dict(), 'checkpoint.pth') # 1st arg: model parameter to save. 2nd: file where to save. This saves the trained parameters of the local_Qnetwork to a file named checkpoint.pth using torch.save().
    break # If the environment is solved (average score ≥ 200.0), the loop breaks, ending further training because the task is considered achieved.


Episode 100	Average Score: -140.04
Episode 200	Average Score: -106.54
Episode 300	Average Score: -11.11
Episode 400	Average Score: 108.86
Episode 500	Average Score: 129.54
Episode 600	Average Score: 191.80
Episode 648	Average Score: 200.95
Environment solved in 548 episodes!	Average Score: 200.95


## Part 3 - Visualizing the results

In [49]:
import glob # Finds all pathnames matching a specified pattern.
import io # Handles input/output operations.
import base64 # Encodes and decodes data using Base64.
import imageio #  Reads and writes image data, including creating videos.
from IPython.display import HTML, display #  Provides functions to display content like HTML or images in Jupyter notebooks.

# This function runs the agent in the specified environment (env_name), collects frames, and saves them as a video.
def show_video_of_model(agent, env_name):
    env = gym.make(env_name, render_mode='rgb_array') # Creates a gym environment with rendering mode set to output RGB frames.
    state, _ = env.reset() # Resets the environment to its initial state.
    done = False
    frames = []
    while not done:
        frame = env.render()
        frames.append(frame) # Stores each rendered frame for the video.
        action = agent.act(state, epsilon) # The agent selects an action based on the current state.
        state, reward, done, _, _ = env.step(action.item()) # Applies the action in the environment and returns the new state and other information.
    env.close()
    imageio.mimsave('video.mp4', frames, fps=30) #  Saves the collected frames as a video (video.mp4) with 30 frames per second.

show_video_of_model(agent, 'LunarLander-v3')

# This function searches for the video file, encodes it in Base64, and displays it in a Jupyter notebook using HTML.
def show_video():
    mp4list = glob.glob('*.mp4') # Finds all MP4 files in the current directory
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read() # Reads the video file in binary mode.
        encoded = base64.b64encode(video) # Encodes the video content in Base64 to embed it in an HTML tag.
        display(HTML(data='''<video alt="test" autoplay # Displays the HTML video element with controls in the Jupyter notebook.
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video() # Displays the saved video in the notebook.

