<a href="https://colab.research.google.com/github/usman312003/MiCard/blob/main/Usman_Deep_Q_Learning_for_Lunar_Landing_Partial_Code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Deep Q-Learning for Lunar Landing

## Part 0 - Installing the required packages and importing the libraries

### Installing Gymnasium

In [2]:
!sudo apt-get install python3.10
!sudo update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.10 1
!pip install swig
!pip install box2d


Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
python3.10 is already the newest version (3.10.12-1~22.04.10).
python3.10 set to manually installed.
0 upgraded, 0 newly installed, 0 to remove and 35 not upgraded.
Collecting swig
  Downloading swig-4.3.1-py3-none-manylinux_2_12_x86_64.manylinux2010_x86_64.whl.metadata (3.5 kB)
Downloading swig-4.3.1-py3-none-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (1.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m29.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: swig
Successfully installed swig-4.3.1
Collecting box2d
  Downloading Box2D-2.3.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (573 bytes)
Downloading Box2D-2.3.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.7/3.7 MB[0m [31m39.4 MB/s[0m eta [36m0:00:00[0m
[?25hIn

### Importing the libraries

In [3]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.autograd as autograd
from torch.autograd import Variable
from collections import deque, namedtuple

## Part 1 - Building the AI

### Creating the architecture of the Neural Network

In [4]:
import torch.nn as nn
import torch.nn.functional as F

class Network(nn.Module):
    def __init__(self, state_size, action_size, seed=42):
        super(Network, self).__init__()
        self.seed = torch.manual_seed(seed)
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_size)

    def forward(self, state):
        x = self.fc1(state)
        x = F.relu(x)
        x = self.fc2(x)
        x = F.relu(x)
        return self.fc3(x)

## Part 2 - Training the AI

In [5]:
import gymnasium as gym
env = gym.make("LunarLander-v3") # Use v2 as v3 might be causing issues
state_shape = env.observation_space.shape
state_size = env.observation_space.shape[0]
number_actions = env.action_space.n
print("state shape", state_shape)
print("state size", state_size)
print("number of actions", number_actions)

state shape (8,)
state size 8
number of actions 4


### Initializing the hyperparameters

In [6]:
learnig_rate = 5e-4
minibatch_size = 100
discount_factor = 0.99

replay_buffer_size = 1* (10** 5)
interpolation_parameter = 1e-3



### Implementing Experience Replay

In [7]:
import torch
import random
import numpy as np

class ReplayMemory(object):
    def __init__(self, capacity):
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        self.capacity = capacity
        self.memory = []

    def push(self, event):
        # Add a new event (experience) to memory
        self.memory.append(event)
        # If memory exceeds capacity, remove the oldest event
        if len(self.memory) > self.capacity:
            del self.memory[0]

    def sample(self, batch_size):
        # Randomly sample a batch of experiences
        experiences = random.sample(self.memory, k=batch_size)

        # Separate states, actions, rewards, etc.
        states = np.vstack([e[0] for e in experiences if e is not None])
        states = torch.from_numpy(states).float().to(self.device)

        actions = np.vstack([e[1] for e in experiences if e is not None])
        actions = torch.from_numpy(actions).long().to(self.device)

        rewards = np.vstack([e[2] for e in experiences if e is not None])
        rewards = torch.from_numpy(rewards).float().to(self.device)

        next_states = np.vstack([e[3] for e in experiences if e is not None])
        next_states = torch.from_numpy(next_states).float().to(self.device)

        # Convert boolean 'done' to numpy array before vstack and astype
        dones = np.vstack([np.array(e[4]).astype(np.uint8) for e in experiences if e is not None])
        dones = torch.from_numpy(dones).float().to(self.device)


        return (states, next_states, actions, rewards, dones)

### Implementing the DQN class

In [13]:
import torch
import random
import numpy as np

class Agent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        self.qnetwork_local = Network(state_size, action_size).to(self.device)
        self.qnetwork_target = Network(state_size, action_size).to(self.device)
        self.optimizer = optim.Adam(self.qnetwork_local.parameters(), lr=learnig_rate)
        self.memory = ReplayMemory(replay_buffer_size)
        self.t_step = 0

    def step(self, state, action, reward, next_state, done):
        self.memory.push((state, action, reward, next_state, done))
        self.t_step = (self.t_step + 1) % 4
        if self.t_step == 0:
            if len(self.memory.memory) > minibatch_size:
                experiences = self.memory.sample(minibatch_size)
                self.learn(experiences, discount_factor)

    def act(self, state, epsilon=0.0):
        state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
        self.qnetwork_local.eval()
        with torch.no_grad():
            action_values = self.qnetwork_local(state)
        self.qnetwork_local.train()
        if random.random() > epsilon:
            return np.argmax(action_values.cpu().data.numpy())
        else:
            return random.choice(np.arange(self.action_size))


    def learn (self, experiences, gamma):
        states, next_states, actions, rewards, dones = experiences
        Q_targets_next = self.qnetwork_target(next_states).detach().max(1)[0].unsqueeze(1)
        Q_targets = rewards + (gamma * Q_targets_next * (1 - dones))
        Q_expected = self.qnetwork_local(states).gather(1, actions)
        loss = F.mse_loss(Q_expected, Q_targets)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        self.soft_update(self.qnetwork_local, self.qnetwork_target, interpolation_parameter)

    def soft_update(self, local_model, target_model, interpolation_parameter):
        for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
            target_param.data.copy_(
                interpolation_parameter * local_param.data + (1.0 - interpolation_parameter) * target_param.data
            )

### Initializing the DQN agent

In [9]:
agent = Agent(state_size, number_actions)

### Training the DQN agent

In [11]:
number_episodes = 2000
maximum_number_timesteps = 1000
number_timesteps = 0
epsilon_starting_value = 1.0
epsilon_ending_value = 0.01
epsilon_decay_rate = 0.995
epsilon = epsilon_starting_value
scores_on_100_episodes= deque(maxlen = 100)
for episode in range(1, number_episodes + 1):
    state, _ = env.reset()
    score = 0
    for t in range(maximum_number_timesteps):
        action = agent.act(state, epsilon)
        next_state, reward, done, _, _ = env.step(action)
        agent.step(state, action, reward, next_state, done)
        state = next_state
        score += reward
        number_timesteps += 1
        if done:
            break
    scores_on_100_episodes.append(score)
    epsilon = max(epsilon_ending_value, epsilon_decay_rate * epsilon) # decrease epsilon

    if episode % 100 == 0:
        print('Episode {}\tAverage Score: {:.2f}'.format(episode, np.mean(scores_on_100_episodes)))
        if np.mean(scores_on_100_episodes) >= 200:
            print('Environment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(episode - 100, np.mean(scores_on_100_episodes)))
            torch.save(agent.qnetwork_local.state_dict(), 'checkpoint.pth')
            break

Episode 100	Average Score: -156.30
Episode 200	Average Score: -127.51
Episode 300	Average Score: -80.08
Episode 400	Average Score: 1.32
Episode 500	Average Score: 88.89
Episode 600	Average Score: 160.14
Episode 700	Average Score: 153.50
Episode 800	Average Score: 202.49
Environment solved in 700 episodes!	Average Score: 202.49


## Part 3 - Visualizing the results

In [14]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display
import gymnasium as gym # Import gymnasium again to ensure it's available

def show_video_of_model(agent, env_name):
    env = gym.make(env_name, render_mode='rgb_array')
    state, _ = env.reset()
    done = False
    frames = []
    while not done:
        frame = env.render()
        frames.append(frame)
        action = agent.act(state)
        state, reward, done, _, _ = env.step(action.item())
    env.close()
    imageio.mimsave('video.mp4', frames, fps=30)

# Use 'LunarLander-v3' as LunarLander-v2 is deprecated
show_video_of_model(agent, 'LunarLander-v3')

def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()

