<a href="https://colab.research.google.com/github/mohit3agarwal/lunar-landing-simulation/blob/main/Lunar_Landing_using_Deep_Q_Learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Deep Q-Learning for Lunar Landing

## Installing the required packages and importing the libraries

### Installing Gymnasium

Using the dataset from gymnasium

Link: https://gymnasium.farama.org/environments/box2d/lunar_lander/

In [None]:
!pip install gymnasium
!pip install "gymnasium[atari, accept-rom-license]"
!apt-get install -y swig
!pip install gymnasium[box2d]

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
swig is already the newest version (4.0.2-1ubuntu1).
0 upgraded, 0 newly installed, 0 to remove and 45 not upgraded.


### Importing the libraries and dependencies

In [None]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.autograd as autograd
from torch.autograd import Variable
from collections import deque, namedtuple

## Part 1 - Building the AI

### Creating the architecture of the Neural Network

In [None]:
class Network(nn.Module):    # creating a class to train Neural Network

    def __init__(self, num_state, num_action, seed=42):
        super(Network, self).__init__()
        self.seed = torch.manual_seed(seed)  # setting up the seed
        self.fc1 = nn.Linear(num_state, 64)  # defining the first layer of the NN, and setting number of neurons in 1st layer to 64
        self.fc2 = nn.Linear(64, 64)         # defining the second layer of the NN, and setting number of neurons in 2nd layer to 64
        self.fc3 = nn.Linear(64, num_action) # defining the final output layer

    # defining the forward propagation of the NN
    def forward(self, state):
        x = self.fc1(state)     # propagate signal from input layer to 1st NN layer
        x = F.relu(x)           # activate the signal using rectifier activation function
        x = self.fc2(x)         # repeat for 2ndlayers
        x = F.relu(x)
        return self.fc3(x)      # return the output layer

## Part 2 - Training the AI

### Setting up the environment

In [None]:
import gymnasium as gym
env = gym.make('LunarLander-v2')
state_shape = env.observation_space.shape    # get the shape of state
num_state = env.observation_space.shape[0]   # get the number of states (1st index of shape)
num_actions = env.action_space.n             # get the number of actions
print(f'State shape: {state_shape}')
print(f'State size: {num_state}')
print(f'Number of actions: {num_actions}')

State shape: (8,)
State size: 8
Number of actions: 4


### Initializing the hyperparameters

In [None]:
learning_rate = 5e-4           # setting up the value of learning rate
minibatch_size = 100           # setting value for mini batch (number of obs used in one step to train the NN)
gamma = 0.99                   # setting up the discount factor
replay_buffer_size = int(1e5)  # setting up the replay buffer size i.e experience replay
tau = 1e-3                     # setting up the interpolation parameter

### Implementing Experience Replay

In [None]:
# creating a class to set up experience replay
class ReplayMemory(object):    # also called as MemoryBuffer
  def __init__(self, capacity):
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")   # to execute code using GPU
    self.capacity = capacity
    self.memory = []       # the list that stores the experience data

  # creating a method that adds an experience into the memory buffer
  def push(self, event):
    self.memory.append(event)

    # to make sure that it doesnt exceed its capacity
    if len(self.memory) > self.capacity:
      del self.memory[0]   # delete the oldest event

    # sample method that selects a random batch of experience from the memory buffer
  def sample(self, batch_size):
    experiences =  random.sample(self.memory, k = batch_size)
    states = torch.from_numpy(np.vstack([e[0] for e in experiences if e is not None])).float().to(self.device)  # extracting and stacking the states in float datatype and then converting it into pytorch tensor
    actions = torch.from_numpy(np.vstack([e[1] for e in experiences if e is not None])).long().to(self.device)  # setting data type as long
    rewards = torch.from_numpy(np.vstack([e[2] for e in experiences if e is not None])).float().to(self.device)
    next_states = torch.from_numpy(np.vstack([e[3] for e in experiences if e is not None])).float().to(self.device)
    dones = torch.from_numpy(np.vstack([e[4] for e in experiences if e is not None]).astype(np.uint8)).float().to(self.device)  # setting data type as uint8 which is used to represent Boolean values before converting them to float
    return states,  next_states, actions, rewards, dones

### Implementing the DQN class

In [None]:
# create a class to represent the agent/AI
class Agent():
  def __init__(self, num_state, num_action):
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")   # to execute code using GPU
    self.num_state = num_state
    self.num_action = num_action

    # introducing Q-Learning
    self.local_qnetwork = Network(num_state, num_action).to(self.device)    # creating a local Q-network
    self.target_qnetwork = Network(num_state, num_action).to(self.device)   # creating the target Q-network

    # creating the optimiser using the Adam Class
    self.optimizer = optim.Adam(self.local_qnetwork.parameters(), lr=learning_rate)

    # setting the memory
    self.memory = ReplayMemory(replay_buffer_size)

    # setting the time-step parameter
    self.t_step = 0

  # creating the step method
  def step(self, state, action, reward, next_state, done):
    self.memory.push((state, action, reward, next_state, done))   # store the experience memory
    self.t_step = (self.t_step + 1) % 4                           # increment the time step and reset after evry 4 steps

    # learning from minibatch every 4 steps
    if self.t_step == 0:
      if len(self.memory.memory) > minibatch_size:
        experiences = self.memory.sample(100)
        self.learn(experiences, gamma)

  # defining an action state using epsilon-greedy
  def act(self, state, epsilon = 0.):
    # convert state from numpy array to pytorch tensor and adding an extra dimension which will correspond to the batch
    state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)

    self.local_qnetwork.eval()                           # set the network to evaluation mode
    with torch.no_grad():                                # to make sure any gradient computation is disabled
      action_values = self.local_qnetwork(state)         # make prediction from local qnetwork to get action values
    self.local_qnetwork.train()                          # set the network back to training mode

    # epsilon-greedy action selection
    if random.random() > epsilon:
      return np.argmax(action_values.cpu().data.numpy())  # select and return the action that has max value
    else:
      return random.choice(np.arange(self.num_action))    # select and return a random action

  # creating the learn method
  def learn(self, experiences, gamma):
    states, next_states, actions, rewards, dones = experiences

    # get max predicted Q values (for next states) from target qnetwork
    next_q_targets = self.target_qnetwork(next_states).detach().max(1)[0].unsqueeze(1)    # detach and get max value of q tensor from 1st index and then add extra dimension of batch as poition 1
    # compute Q targets for current states
    q_targets = rewards + (gamma * next_q_targets * (1 - dones))  # formula to calculate q targets

    # get expected Q values from local qnetwork
    q_expected = self.local_qnetwork(states).gather(1, actions)   # to gather all the q values

    loss = F.mse_loss(q_expected, q_targets)   # compute loss from MSE
    self.optimizer.zero_grad()                 # optimize the model
    loss.backward()                            # back-propagate the losses
    self.optimizer.step()                      # singly optimize to update model weights

    # update the target network
    self.soft_update(self.local_qnetwork, self.target_qnetwork, tau)

  # defining the soft update method
  def soft_update(self, local_model, target_model, tau):
    for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):   # using zip to get both target and local parameters at same time
      target_param.data.copy_(tau*local_param.data + (1.0-tau)*target_param.data)                # update the target paramters


### Initializing the DQN agent

In [None]:
agent = Agent(num_state, num_actions)   # num_actions and not num_action

### Training the DQN agent

In [None]:
# initialise the training parameters
number_episodes = 2000          # maximum number of episodes for training
max_t = 1000                    # maximum number of time steps per episode
epsilon_start = 1.0             # initial value of epsilon
epsilon_final = 0.01            # final value of epsilon
epsilon_decay = 0.995           # decay rate of epsilon
epsilon = epsilon_start         # initialize epsilon
scores = deque(maxlen = 100)    # initialize the scores of last 100 episode using a double ended queue

# implement the training loop
for episode in range(number_episodes + 1):
  state, _ = env.reset()   # reset the environment
  score = 0                # initialize the score to 0
  for t in range(max_t):
    action = agent.act(state, epsilon)                    # get an action
    next_state, reward, done, _, _ = env.step(action)     # take the action and get the next state, reward and done
    agent.step(state, action, reward, next_state, done)   # update the agent
    state = next_state                                    # update the state
    score += reward                                       # update the score
    if done:
      break

  scores.append(score)                                    # append the score
  epsilon = max(epsilon_final, epsilon_decay * epsilon)   # update the epsilon

  # dynamic printing
  print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(scores)), end="")
  if episode % 100 == 0:
    print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(scores)))
  if np.mean(scores) >= 200.0:
    print('\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(episode-100, np.mean(scores)))
    torch.save(agent.local_qnetwork.state_dict(), 'checkpoint.pth')    # save the weights
    break

Episode 0	Average Score: -223.83
Episode 100	Average Score: -169.46
Episode 200	Average Score: -114.42
Episode 300	Average Score: -90.90
Episode 400	Average Score: -45.89
Episode 500	Average Score: 25.04
Episode 600	Average Score: 95.11
Episode 700	Average Score: 162.02
Episode 800	Average Score: 186.33
Episode 812	Average Score: 200.38
Environment solved in 712 episodes!	Average Score: 200.38


## Part 3 - Visualizing the results

In [None]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display
from gym.wrappers.monitoring.video_recorder import VideoRecorder

def show_video_of_model(agent, env_name):
    env = gym.make(env_name, render_mode='rgb_array')
    state, _ = env.reset()
    done = False
    frames = []
    while not done:
        frame = env.render()
        frames.append(frame)
        action = agent.act(state)
        state, reward, done, _, _ = env.step(action.item())
    env.close()
    imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, 'LunarLander-v2')

def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()

