# Deep Q-Learning for Lunar Landing

## Part 0 - Installing the required packages and importing the libraries

### Installing Gymnasium

In [1]:
!pip install gymnasium
!pip install "gymnasium[atari, accept-rom-license]"
!apt-get install -y swig
!pip install gymnasium[box2d]

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
swig is already the newest version (4.0.2-1ubuntu1).
0 upgraded, 0 newly installed, 0 to remove and 15 not upgraded.


### Importing the libraries

In [2]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.autograd as autograd
from torch.autograd import Variable
from collections import deque, namedtuple
#gymnasium.farama.org is an environment website with plenty of examples of toy AI
# we're gonna start with the Atari Game Kung Fu Master and Lunar Landing

## Part 1 - Building the AI

### Creating the architecture of the Neural Network

In [3]:
class Network(nn.Module):

#state size for lunar landing will be an 8 value vector
  def __init__(self, state_size, action_size, seed = 42):
      super(Network, self).__init__()
      self.seed = torch.manual_seed(seed)
      # fcl fully connected layer, finding the number of neurons is a lot of trial and error
      self.fc1 = nn.Linear(state_size, 64)
      self.fc2 = nn.Linear(64, 64)
      self.fc3 = nn.Linear(64, action_size)

  def forward(self, state):
      #store the response of layer 1
      x = self.fc1(state)
      #rectifier activation function
      x = F.relu(x)
      x = self.fc2(x)
      x = F.relu(x)
      return self.fc3(x) # return the output of the final layer



## Part 2 - Training the AI

### Setting up the environment

In [4]:
import gymnasium as gym
env = gym.make('LunarLander-v2')
state_shape = env.observation_space.shape
state_size = env.observation_space.shape[0]
number_actions = env.action_space.n
print('State shape: ', state_shape)
print('State size: ', state_size)
print('Number of actions: ', number_actions)

State shape:  (8,)
State size:  8
Number of actions:  4


### Initializing the hyperparameters

In [5]:
learning_rate = 5e-4 #5 times 10 to the power of -4
minibatch_size = 100 # common practice number
discount_factor = 0.99 #represents the depressing value of the rewards so we consider future rewards
replay_buffer_size = int(1e5) #how many experiences in the memory in the agent
interpolation_parameter = 1e-3 #sometimes called Tau

  and should_run_async(code)


### Implementing Experience Replay

In [6]:
class ReplayMemory(object): #no inheritance
    def __init__(self, capacity):
      self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") #try gpu
      self.capacity = capacity
      self.memory = []

    def push(self, event): # add new event
      self.memory.append(event)
      if len(self.memory) > self.capacity:
        del self.memory[0]

    def sample(self, batch_size):
      experiences = random.sample(self.memory, k = batch_size)
      states = torch.from_numpy(np.vstack([e[0] for e in experiences if e is not None])).float().to(self.device)
      #stack all the states together
      #convert to pytorch with torch.from_numpy()
      actions = torch.from_numpy(np.vstack([e[1] for e in experiences if e is not None])).long().to(self.device)
      rewards = torch.from_numpy(np.vstack([e[2] for e in experiences if e is not None])).float().to(self.device)
      next_states = torch.from_numpy(np.vstack([e[3] for e in experiences if e is not None])).float().to(self.device)
      dones = torch.from_numpy(np.vstack([e[4] for e in experiences if e is not None]).astype(np.uint8)).float().to(self.device)
      return states, next_states, actions, rewards, dones


### Implementing the DQN class

In [7]:
class Agent():

  def __init__(self, state_size, action_size):
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") #try gpu
    self.state_size = state_size
    self.action_size = action_size #number of actions
    #local and target Q-networks
    self.local_qnetwork = Network(state_size, action_size).to(self.device)
    self.target_qnetwork = Network(state_size, action_size).to(self.device)
    self.optimizer = optim.Adam(self.local_qnetwork.parameters(), lr = learning_rate)
    self.memory = ReplayMemory(replay_buffer_size)
    self.t_step = 0 #time counter

  def step(self, state, action, reward, next_state, done):
    self.memory.push((state, action, reward, next_state, done))
    self.t_step = (self.t_step+1) % 4 #reset every 4 time counter
    if self.t_step == 0: #learn by minibatch every 4 time
      if len(self.memory.memory) > minibatch_size:
        experiences = self.memory.sample(100)
        self.learn(experiences, discount_factor) #learning by the batch

  #uses an epsilon greedy policy with random change of doing something different.
  def act(self, state, epsilon = 0.):
    state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
    #convert to pytorch and add extra dimension to represent batch number
    self.local_qnetwork.eval() #evaluation mode
    with torch.no_grad(): #no gradient functions
      action_values = self.local_qnetwork(state)
    self.local_qnetwork.train()
    if random.random() > epsilon:
      return np.argmax(action_values.cpu().data.numpy()) #chooose the highest value
    else:
      return random.choice(np.arange(self.action_size)) #randomly choose action

  def learn(self, experiences, discount_factor):
    states, next_states, actions, rewards, dones = experiences
    next_q_targets = self.target_qnetwork(next_states).detach().max(1)[0].unsqueeze(1) #get the max action value
    q_targets = rewards + discount_factor * next_q_targets * (1 - dones)
    q_expected = self.local_qnetwork(states).gather(1, actions) #expected values form the q-network
    loss = F.mse_loss(q_expected, q_targets) #mean square error loss
    #time to backpropogate based off the loss
    self.optimizer.zero_grad() #reset the optimizer
    loss.backward() #backpropagate
    self.optimizer.step() #a single optimization step to update parameters
    self.soft_update(self.local_qnetwork, self.target_qnetwork, interpolation_parameter)

  #the soft update BLENDS the parameters for both target and local, to prevent abrupt changes that could destabilize training
  def soft_update(self, local_model, target_model, interpolation_parameter):
    for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
      #a for loop with a tuple from the zip
      target_param.data.copy_(interpolation_parameter * local_param.data + (1.0 - interpolation_parameter) * target_param.data)
      #update target parameters



### Initializing the DQN agent

In [8]:
agent = Agent(state_size, number_actions) #congrats we've made an AI

### Training the DQN agent

In [9]:
number_episodes = 2000 #max number of eps in which we'll train the agents
maximum_number_timesteps_per_episode = 1000
epsilon_starting_value = 1.0 #starting value which will decay
epsilon_ending_value = 0.01
epsilon_decay_value = 0.995 #multiply it by this every time
epsilon = epsilon_starting_value
scores_on_100_episodes = deque(maxlen = 100) #contain window of scores on last 100 episodes

for episode in range(1,number_episodes +1):
  #start by reseting the environment
  state, _ = env.reset()
  #initialize the score aka cumulated reward
  score = 0
  for t in range(maximum_number_timesteps_per_episode): #loop over the timesteps
    action = agent.act(state, epsilon) #pick out the greedy action usually
    #now that you've moved, update the state and rewards
    next_state, reward, done, _, _ = env.step(action)
    #step method includes the backpropagation so use that part
    agent.step(state, action, reward, next_state, done)
    state = next_state
    score += reward
    if done:
      break
  scores_on_100_episodes.append(score)
  epsilon = max(epsilon_ending_value, epsilon_decay_value * epsilon)
  print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(scores_on_100_episodes)), end = "")
  if episode % 100 == 0:
    print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(scores_on_100_episodes))) #no dynamic override effect
  #you want to save the model if you win
  if np.mean(scores_on_100_episodes) >= 200.0:
    print('\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(episode - 100, np.mean(scores_on_100_episodes)))
    torch.save(agent.local_qnetwork.state_dict(), 'checkpoint.pth')
    break



  and should_run_async(code)


Episode 100	Average Score: -150.03
Episode 200	Average Score: -115.31
Episode 300	Average Score: -50.77
Episode 400	Average Score: 15.87
Episode 500	Average Score: 145.00
Episode 582	Average Score: 200.36
Environment solved in 482 episodes!	Average Score: 200.36


## Part 3 - Visualizing the results

In [10]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display
from gym.wrappers.monitoring.video_recorder import VideoRecorder

def show_video_of_model(agent, env_name):
    env = gym.make(env_name, render_mode='rgb_array')
    state, _ = env.reset()
    done = False
    frames = []
    while not done: #in the visualization you don't see the agent step method, no more training
        frame = env.render()
        frames.append(frame)
        action = agent.act(state)
        state, reward, done, _, _ = env.step(action.item())
    env.close()
    imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, 'LunarLander-v2')

def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()



In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import gym
import matplotlib.pyplot as plt
from collections import namedtuple
import random

# Define the Q-network
class QNetwork(nn.Module):
    def __init__(self, input_size, output_size):
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, output_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)

# Define the experience replay buffer
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward', 'done'))

class ReplayBuffer:
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

# Define the DQN agent
class DQNAgent:
    def __init__(self, state_size, action_size, epsilon_start=1.0, epsilon_end=0.01, epsilon_decay=0.995, gamma=0.99):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.q_network = QNetwork(state_size, action_size).to(self.device)
        self.target_network = QNetwork(state_size, action_size).to(self.device)
        self.target_network.load_state_dict(self.q_network.state_dict())
        self.target_network.eval()
        self.optimizer = optim.Adam(self.q_network.parameters())
        self.epsilon = epsilon_start
        self.epsilon_end = epsilon_end
        self.epsilon_decay = epsilon_decay
        self.gamma = gamma
        self.steps_done = 0

    def select_action(self, state):
        self.epsilon = max(self.epsilon_end, self.epsilon * self.epsilon_decay)
        self.steps_done += 1
        if random.random() < self.epsilon:
            return torch.tensor([random.randrange(self.q_network.fc3.out_features)], device=self.device, dtype=torch.long)
        else:
            with torch.no_grad():
                return self.q_network(state).max(1)[1].view(1, 1)

    def optimize_model(self, batch_size):
        if len(memory) < batch_size:
            return
        transitions = memory.sample(batch_size)
        batch = Transition(*zip(*transitions))

        non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), device=self.device, dtype=torch.bool)
        non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])

        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)

        state_action_values = self.q_network(state_batch).gather(1, action_batch)

        next_state_values = torch.zeros(batch_size, device=self.device)
        next_state_values[non_final_mask] = self.target_network(non_final_next_states).max(1)[0].detach()

        expected_state_action_values = reward_batch + self.gamma * next_state_values

        loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))

        self.optimizer.zero_grad()
        loss.backward()
        for param in self.q_network.parameters():
            param.grad.data.clamp_(-1, 1)
        self.optimizer.step()

# Hyperparameters
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
batch_size = 64
capacity = 10000
episodes = 1000

# Initialize environment and agent
env = gym.make('LunarLander-v2')
agent = DQNAgent(state_size, action_size)
memory = ReplayBuffer(capacity)

# Training loop
for episode in range(episodes):
    state = torch.tensor(env.reset(), device=agent.device, dtype=torch.float32).view(1, -1)
    total_reward = 0
    for t in range(1000):  # Limit the number of timesteps per episode
        action = agent.select_action(state)
        next_state, reward, done, _ = env.step(action.item())
        total_reward += reward

        if not done:
            next_state = torch.tensor(next_state, device=agent.device, dtype=torch.float32).view(1, -1)
        else:
            next_state = None

        reward = torch.tensor([reward], device=agent.device, dtype=torch.float32)

        action = torch.tensor([action], device=agent.device, dtype=torch.long)
        memory.push(state, action, next_state, reward, done)
        state = next_state

        agent.optimize_model(batch_size)

        if done:
            break

    # Update the target network every 10 episodes
    if episode % 10 == 0:
        agent.target_network.load_state_dict(agent.q_network.state_dict())

    print(f"Episode {episode + 1}, Total Reward: {total_reward}")

# Generate a video of the trained agent
env = gym.make('LunarLander-v2')
state = torch.tensor(env.reset(), device=agent.device, dtype=torch.float32).view(1, -1)
frames = []
while True:
    frames.append(env.render(mode='rgb_array'))
    with torch.no_grad():
        action = agent.q_network(state).max(1)[1].view(1, 1)
    next_state, _, done, _ = env.step(action.item())
    if done:
        break
    state = torch.tensor(next_state, device=agent.device, dtype=torch.float32).view(1, -1)

# Save the frames as a video
import imageio
imageio.mimsave('lunar_lander_video.gif', frames)

NameError: ignored