# Deep Q-Learning for Lunar Landing

## Part 0 - Installing the required packages and importing the libraries

### Installing Gymnasium

In [1]:
# !pip install gymnasium
# !pip install "gymnasium[atari]"
# !apt-get install -y swig
# !pip install gymnasium[box2d]

### Importing the libraries

In [2]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.autograd as autograd
from torch.autograd import Variable
from collections import deque, namedtuple
import matplotlib.pyplot as plt
import IPython.display as ipythondisplay

## Part 1 - Building the AI

### Creating the architecture of the Neural Network

In [3]:
class Network(nn.Module):
    def __init__(self, state_size: int, action_size: int, seed: int = 42) -> None:
        """Initialize parameters and build model."""
        super(Network, self).__init__()
        if seed != -1:
            self.seed = torch.manual_seed(seed)
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_size)

    def forward(self, state) -> torch.Tensor:
        """Build a network that maps state -> action values."""
        x = self.fc1(state)
        x = F.relu(x)
        x = self.fc2(x)
        x = F.relu(x)
        return self.fc3(x)

## Part 2 - Training the AI

### Setting up the environment

In [4]:
import gymnasium as gym

env = gym.make("LunarLander-v2", render_mode="rgb_array")
state_shape = env.observation_space.shape
state_size = state_shape[0]
no_of_actions = env.action_space.n

print("State shape: ", state_shape)
print("State Size: ", state_size)
print("Number of actions: ", no_of_actions)

State shape:  (8,)
State Size:  8
Number of actions:  4


### Initializing the hyperparameters

In [5]:
learning_rate = 5e-4
minibatch_size = 100
gamma = 0.99  # discount factor
replay_buffer_size = int(1e5)
interpolation_param = 1e-3

### Implementing Experience Replay

In [6]:
class ReplayMemory(object):

  def __init__(self, capacity):
    self.device = torch.device("cpu")
    self.capacity = capacity
    self.memory = []

  def push(self, event):
    self.memory.append(event)
    if len(self.memory) > self.capacity:
      del self.memory[0]

  def sample(self, batch_size):
    experiences = random.sample(self.memory, k = batch_size)
    states = torch.from_numpy(np.vstack([e[0] for e in experiences if e is not None])).float().to(self.device)
    actions = torch.from_numpy(np.vstack([e[1] for e in experiences if e is not None])).long().to(self.device)
    rewards = torch.from_numpy(np.vstack([e[2] for e in experiences if e is not None])).float().to(self.device)
    next_states = torch.from_numpy(np.vstack([e[3] for e in experiences if e is not None])).float().to(self.device)
    dones = torch.from_numpy(np.vstack([e[4] for e in experiences if e is not None]).astype(np.uint8)).float().to(self.device)
    return states, next_states, actions, rewards, dones

### Implementing the DQN class

In [7]:
class Agent:
    def __init__(self, state_size, action_size):
        self.device = torch.device("cpu")
        self.state_size = state_size
        self.action_size = action_size
        self.local_qnetwork = Network(state_size, action_size).to(self.device)
        self.target_qnetwork = Network(state_size, action_size).to(self.device)
        self.optimizer = optim.Adam(self.local_qnetwork.parameters(), lr=learning_rate)
        self.memory = ReplayMemory(replay_buffer_size)
        self.t_step = 0

    def step(self, state, action, reward, next_state, done) -> None:
        """Saves an event."""
        self.memory.push((state, action, reward, next_state, done))
        self.t_step = (self.t_step + 1) % 4
        if self.t_step == 0:
            if len(self.memory.memory) > minibatch_size:
                experiences = self.memory.sample(100)
                self.learn(experiences, gamma)

    def act(self, state, eps=0.0) -> int:
        """Returns actions for given state as per current policy."""
        state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
        self.local_qnetwork.eval()
        with torch.no_grad():
            action_values = self.local_qnetwork(state)
        self.local_qnetwork.train()
        # Epsilon-greedy action selection
        if random.random() > eps:
            return np.argmax(action_values.cpu().data.numpy())
        else:
            return random.choice(np.arange(self.action_size))

    def learn(self, experiences, discount_factor):
        states, next_states, actions, rewards, dones = experiences
        next_q_targets = (
            self.target_qnetwork(next_states).detach().max(1)[0].unsqueeze(1)
        )
        q_targets = rewards + discount_factor * next_q_targets * (1 - dones)
        q_expected = self.local_qnetwork(states).gather(1, actions)
        loss = F.mse_loss(q_expected, q_targets)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        self.soft_update(self.local_qnetwork, self.target_qnetwork, interpolation_param)

    def soft_update(self, local_model, target_model, interpolation_parameter):
        for target_param, local_param in zip(
            target_model.parameters(), local_model.parameters()
        ):
            target_param.data.copy_(
                interpolation_parameter * local_param.data
                + (1.0 - interpolation_parameter) * target_param.data
            )

### Initializing the DQN agent

In [8]:
agent = Agent(state_size, no_of_actions)

### Training the DQN agent

In [9]:
import os
import shutil

# Create the folders
os.makedirs("videos/lunar", exist_ok=True)
os.makedirs("checkpoints/lunar", exist_ok=True)

# Delete all files inside the folders
folder_paths = ["videos/lunar", "checkpoints/lunar"]
for folder_path in folder_paths:
  for filename in os.listdir(folder_path):
    file_path = os.path.join(folder_path, filename)
    if os.path.isfile(file_path):
      os.remove(file_path)


In [10]:
import imageio
no_of_episodes = 2000
max_timesteps = 1000  # max number of timesteps in an episode
epsilon_start = 1.0
epsilon_end = 0.01
epsilon_decay = 0.995
eps = epsilon_start
scores = deque(maxlen=100)  # list containing scores from last 100 episodes

os.environ['IMAGEIO_FFMPEG_EXE'] = "/opt/homebrew/bin/ffmpeg"

for episode in range(1, no_of_episodes + 1):
    state, _ = env.reset()
    score = 0
    frames = []
    for t in range(max_timesteps):
        action = agent.act(state, eps)
        next_state, reward, done, _, _ = env.step(action)
        agent.step(state, action, reward, next_state, done)
        
        screen = env.render()
        frames.append(screen)
        
        state = next_state
        score += reward
        if done:
            break
    imageio.mimsave(f'videos/lunar/video_{episode}_{score:.0f}.mp4', frames, fps=30, macro_block_size=1)
    torch.save(agent.local_qnetwork.state_dict(), f"checkpoints/lunar/checkpoint_{episode}_{score:.0f}.pth")

    scores.append(score)
    eps = max(epsilon_end, epsilon_decay * eps)
    print(f"\rEpisode {episode}\tAverage Score: {np.mean(scores):.2f}\tCurrent Score: {score:.2f}    ", end="")
    if episode % 100 == 0:
        print(f"\rEpisode {episode}\tAverage Score: {np.mean(scores):.2f}                          ")
    if np.mean(scores) >= 500.0:
        print(f"\nEnvironment solved in {episode:d} episodes!\tAverage Score: {np.mean(scores):.2f}")
        torch.save(agent.local_qnetwork.state_dict(), "checkpoint.pth")
        break

Episode 100	Average Score: -167.64                           
Episode 200	Average Score: -113.21                           
Episode 300	Average Score: -26.52                            
Episode 400	Average Score: 34.06                           
Episode 500	Average Score: 129.55                          
Episode 600	Average Score: 181.22                          
Episode 700	Average Score: 207.10                          
Episode 800	Average Score: 215.18                          
Episode 900	Average Score: 258.09                          
Episode 1000	Average Score: 254.46                          
Episode 1100	Average Score: 254.36                          
Episode 1200	Average Score: 231.97                          
Episode 1300	Average Score: 208.02                           
Episode 1400	Average Score: 202.37                           
Episode 1500	Average Score: 229.99                           
Episode 1600	Average Score: 221.72                           
Episode 1700	Average Sc

## Part 3 - Visualizing the results

In [11]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display
from gymnasium.wrappers.monitoring.video_recorder import VideoRecorder
import os

os.environ['IMAGEIO_FFMPEG_EXE'] = "/opt/homebrew/bin/ffmpeg"

def show_video_of_model(agent, env_name):
    env = gym.make(env_name, render_mode='rgb_array')
    state, _ = env.reset()
    done = False
    frames = []
    while not done:
        frame = env.render()
        frames.append(frame)
        action = agent.act(state)
        state, reward, done, _, _ = env.step(action.item())
    env.close()
    imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, 'LunarLander-v2')





In [12]:
def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()