# Exercise 3: Lunar Lander

In this notebook we will implement **Deep Q-Learning** (DQN) to solve the [Lunar Lander](https://gymnasium.farama.org/environments/box2d/lunar_lander/) environment.

<p align="center"><img src="./images/lunar_lander.png" alt="drawing" width="500"/></p>

In this environment, we want a lunar lander to learn how to land. The environment is defined by:

A state which is an 8-dimensional vector containing the lander's position (x, y), velocity (x, y), angle, and angular velocity, as well as the status of the left and right legs, whether they are touching the ground (1) or not (0).

The lander can do four different actions:

- 0: Do nothing
- 1: Fire the left engine
- 2: Fire the main engine
- 3: Fire the right engine

The lander will receive rewards according to multiple transitions. The reward will be increased if:
- The lander is closer to the lander pad.
- The lander moves slow.
- Legs are in contact with the ground.
- The lander lands safely (+100)

The reward will decreased if:
- The lander is far from the lander pad.
- The lander moves fast.
- The lander tilts (non-horizontal angle).
- The side engines are firing.
- The lander crashes (-100)

If the lander scores 200 points at an episode, it is considered solved.

The episode terminates if the lander crashes or gets outside the viewport. For more information, check the documentation.

In [None]:
import os
import random
from collections import deque

import gymnasium as gym
import imageio
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

from IPython.display import Video
from tqdm import tqdm

In [None]:
dict_env = {'id': 'LunarLander-v3',
            'render_mode': 'rgb_array'}

env = gym.make(**dict_env)

In [None]:
# Set a seed for reproducibility
seed = 5678

# This is the initial state of the environment.
observation, info = env.reset(seed=seed)
print(f'Initial observation: {observation}')
print(f'Initial info: {info}')

In [None]:
# Set the seed for the action space to ensure reproducibility
seed = 5678
observation, info = env.reset(seed=seed)
env.action_space.seed(seed)
torch.manual_seed(seed)
np.random.seed(seed)

Let's run one episode to see how well does the agent perform under a random policy.

In [None]:
done = False
total_reward = 0
frames = []

while not done:
    # Save the frame for the video
    frame = env.render()
    frames.append(frame)

    # Sample a random action from the action space
    action = env.action_space.sample()
    
    # Take a step in the environment
    observation, reward, terminated, truncated, info = env.step(action)
    
    # Add the reward to the total reward
    total_reward += reward

    # Check if the episode has ended
    done = terminated or truncated

frame = env.render()
frames.append(frame)

print('Episode finished. Total reward:', total_reward)
env.close()

In [None]:
if not os.path.exists('videos'):
    os.makedirs('videos')

In [None]:
random_video_path = os.path.join('videos', f'lunar_lander_random.mp4')
imageio.mimsave(random_video_path, frames, fps=24)

In [None]:
Video(random_video_path, width=400)

Well, it's no surprise that the agent did not perform well under a random policy. The agent needs to learn how to land the lunar lander effectively.

### Exercise 3.1: Define a Deep Q-Network (DQN)

Let's implement a DQN so the agent can land the lunar lander. We use a DQN for several reasons.

1. **Continuous state space**: The states are 8-dimensional and real-valued, so if we used a lookup table it would treat each combination as a new state and never generalize (due to float-point precision).
2. **Nonlinear value functions**: Most problems have a nonlinear relationship between variables. We could use linear function approximation, but we would need to craft nonlinear features, which would be time consuming.
3. **Automatic feature learning**: Using a DQN allows the agent to learn rich representations automatically from the states (inputs).

The neural network will approximate the q-values using parameters $\theta$:

$$
q(s, a) \approx q_\theta(s, a)
$$

In this first exercise, we are going to implement the `DeepQNetwork` class. We are going to use a neural network with two hidden layers.

In [None]:
class DeepQNetwork(torch.nn.Module):
    def __init__(self, state_size=8, action_size=4, hidden_size=64):
        super().__init__()
        # Use linear layers to create a simple feedforward neural network.
        raise NotImplementedError('You need to implement the neural network architecture.')
        self.layer1 = ...
        self.layer2 = ...
        self.layer3 = ...

    def forward(self, state):
        x = torch.relu(self.layer1(state))
        x = torch.relu(self.layer2(x))
        return self.layer3(x)

<details>
<summary>Double click to see the solution.</summary>

```python
class DeepQNetwork(torch.nn.Module):
    def __init__(self, state_size=8, action_size=4, hidden_size=64):
        super().__init__()
        # Use linear layers to create a simple feedforward neural network.
        self.layer1 = torch.nn.Linear(state_size, hidden_size)  # Input to hidden
        self.layer2 = torch.nn.Linear(hidden_size, hidden_size)  # Hidden to hidden
        self.layer3 = torch.nn.Linear(hidden_size, action_size)  # HIdden to output 

    def forward(self, state):
        x = torch.relu(self.layer1(state))
        x = torch.relu(self.layer2(x))
        return self.layer3(x)
```

### Exercise 3.2: Implement experience replay 

One challenge when using DQN is that consecutive transitions are highly correlated, which can cause large oscillating updates during training, which can be very unstable. To mitigate this, we will use an experience **replay buffer** to store past experiences and sample from them during training:

- We will store each observed transition $(s, a, r, s', \text{done})$ in a fix-size buffer.
- During training, we sample random mini-batches from this buffer, which will break correlations.
- When the buffer exceeds its capacity, we will remove the oldest transitions in FIFO (first in, first out) order.

in the next cell, we implement the `ReplayBuffer` class. You'll need to implement the `push` method which appends the transition to the buffer.

In [None]:
class ReplayBuffer:
    def __init__(self, buffer_size=10000):
        self.buffer = deque(maxlen=buffer_size)

    def push(self, state, action, reward, next_state, done):
        # Append the transition to the buffer.
        # Each transition is a tuple (state, action, reward, next_state, done).
        # Hint treat self.buffer as a python list
        raise NotImplementedError('You need to implement the push method.')
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        states, actions, rewards, next_states, dones = zip(*random.sample(self.buffer, batch_size))
        return np.stack(states), np.stack(actions), np.stack(rewards), np.stack(next_states), np.stack(dones)

    def __len__(self):
        return len(self.buffer)

<details>
<summary>Double click to see the solution.</summary>

```python
class ReplayBuffer:
    def __init__(self, buffer_size=10000):
        self.buffer = deque(maxlen=buffer_size)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        states, actions, rewards, next_states, dones = zip(*random.sample(self.buffer, batch_size))
        return np.stack(states), np.stack(actions), np.stack(rewards), np.stack(next_states), np.stack(dones)

    def __len__(self):
        return len(self.buffer)
```

### Exercise 3.3: Implement the lunar lander agent

Next, we are going to implement the `LunarLanderAgent` class, which will handle the training and action selection for the Lunar Lander environment. This agent will use a Deep Q-Network (DQN) to learn how to land the lunar lander safely.

We implemented an epsilon decay to calculate the epsilon at each episode. The epsilon decay encourages initial exploration but later exploitation in a nonlinear way. You can check the `get_epsilon` method.

In [None]:
class LunarLanderAgent:
    def __init__(self, state_dim, action_dim, gamma=0.99, lr=1e-4, start_epsilon=1.0,
                 final_epsilon=0.1, epsilon_decay=0.995, batch_size=64, buffer_size=10000,
                 device='cpu'):
        
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.start_epsilon = start_epsilon
        self.final_epsilon = final_epsilon
        self.epsilon_decay = epsilon_decay
        self.device = device

        # Define the Deep Q-Network model and add it to the device.
        raise NotImplementedError('You need to define the Deep Q-Network model.')
        self.model = ...
        self.lr = lr
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.lr)
        self.criterion = nn.MSELoss()
        
        # Initialize the replay buffer we implemented before. Pass the buffer_size parameter.
        raise NotImplementedError('You need to initialize the replay buffer.')
        self.buffer = ...
        self.batch_size = batch_size

    def select_action(self, state, episode):
        if np.random.rand() < self.get_epsilon(episode):
            return np.random.randint(0, self.action_dim)        
        state = torch.from_numpy(state).unsqueeze(0).to(self.device)
        with torch.no_grad():
            q_values = self.model(state)
        return q_values.argmax().item()
      
    def get_epsilon(self, episode):
        return max(self.final_epsilon, self.start_epsilon * (self.epsilon_decay ** episode))

    def store_transition(self, state, action, reward, next_state, done):
        self.buffer.push(state, action, reward, next_state, done)

    def update(self):
        # Sample a batch of transitions from the replay buffer.
        raise NotImplementedError('You need to sample from the buffer.')
        states, actions, rewards, next_states, dones = ...
        
        states = torch.from_numpy(states).float().to(self.device)
        actions = torch.from_numpy(actions).long().to(self.device)
        rewards = torch.from_numpy(rewards).float().to(self.device)
        next_states = torch.from_numpy(next_states).float().to(self.device)
        dones = torch.from_numpy(np.array(dones).astype(np.uint8)).float().to(self.device)

        q_values = self.model(states).gather(1, actions.unsqueeze(-1))

        with torch.no_grad():
            # Pass the next states through the model to get the Q-values.
            raise NotImplementedError('You need to implement the next state Q-value calculation.')
            y = ...
            # Get the maximum Q-value for the next states.
            next_q_values = y.max(dim=1)[0].detach()
            q_target = rewards + (1 - dones) * self.gamma * next_q_values

        loss = self.criterion(q_values, q_target.unsqueeze(1))

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

<details>
<summary>Double click to see the solution.</summary>

```python
class LunarLanderAgent:
    def __init__(self, state_dim, action_dim, gamma=0.99, lr=1e-4, start_epsilon=1.0,
                 final_epsilon=0.1, epsilon_decay=0.995, batch_size=64, buffer_size=10000,
                 device='cpu'):
        
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.start_epsilon = start_epsilon
        self.final_epsilon = final_epsilon
        self.epsilon_decay = epsilon_decay
        self.device = device

        # Define the Deep Q-Network model and add it to the device.
        self.model = DeepQNetwork(state_dim, action_dim).to(self.device)
        self.lr = lr
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.lr)
        self.criterion = nn.MSELoss()
        
        # Initialize the replay buffer we implemented before. Pass the buffer_size parameter.
        self.buffer = ReplayBuffer(buffer_size=buffer_size)   
        self.batch_size = batch_size

    def select_action(self, state, episode):
        if np.random.rand() < self.get_epsilon(episode):
            return np.random.randint(0, self.action_dim)        
        state = torch.from_numpy(state).unsqueeze(0).to(self.device)
        with torch.no_grad():
            q_values = self.model(state)
        return q_values.argmax().item()
      
    def get_epsilon(self, episode):
        return max(self.final_epsilon, self.start_epsilon * (self.epsilon_decay ** episode))

    def store_transition(self, state, action, reward, next_state, done):
        self.buffer.push(state, action, reward, next_state, done)

    def update(self):
        # Sample a batch of transitions from the replay buffer.
        states, actions, rewards, next_states, dones = self.buffer.sample(self.batch_size)
        
        states = torch.from_numpy(states).float().to(self.device)
        actions = torch.from_numpy(actions).long().to(self.device)
        rewards = torch.from_numpy(rewards).float().to(self.device)
        next_states = torch.from_numpy(next_states).float().to(self.device)
        dones = torch.from_numpy(np.array(dones).astype(np.uint8)).float().to(self.device)

        q_values = self.model(states).gather(1, actions.unsqueeze(-1))

        with torch.no_grad():
            # Pass the next states through the model to get the Q-values.
            y = self.model(next_states)
            # Get the maximum Q-value for the next states.
            next_q_values = y.max(dim=1)[0].detach()
            q_target = rewards + (1 - dones) * self.gamma * next_q_values

        loss = self.criterion(q_values, q_target.unsqueeze(1))

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
```

In [None]:
seed = 9876
observation, info = env.reset(seed=seed)
env.action_space.seed(seed)
torch.manual_seed(seed)
np.random.seed(seed)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

In [None]:
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
agent = LunarLanderAgent(state_dim, action_dim)

In [None]:
num_episodes = 2000

rewards = []
rewards_window = deque(maxlen=100)
tqdm_bar = tqdm(range(1, num_episodes+1), desc=f'Episodes')

for episode in tqdm_bar:
    state, _ = env.reset(seed=episode)
    total_reward = 0

    while True:
        action = agent.select_action(state, episode)
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        agent.store_transition(state, action, reward, next_state, done)

        if len(agent.buffer) >= agent.batch_size:
            agent.update()

        state = next_state
        total_reward += reward

        if done:
            break
    
    rewards.append(total_reward)
    rewards_window.append(total_reward)
    tqdm_bar.set_postfix_str(f'Avg. Score = {np.mean(rewards_window):.2f}')

    if np.mean(rewards_window) >= 200:
        print(f'\nEnvironment solved in {episode} episodes!')
        break

env.close()

In [None]:
window_size = 10
average_filter = np.ones(window_size) / window_size

plt.plot(np.convolve(rewards, average_filter, mode='valid'))
plt.xlabel('Episode')
plt.ylabel('Average Reward')
plt.title(f'Moving Average of Rewards (size={window_size})')
plt.show()

As you can see, the agent learns to play the game over time, and the average reward increases as the training progresses. The moving average smooths out the fluctuations in the rewards.

In [None]:
done = False
total_reward = 0
frames = []

state, _ = env.reset(seed=episode+1)

while True:
    frame = env.render()
    frames.append(frame)
    
    action = agent.select_action(state, episode)
    next_state, reward, terminated, truncated, _ = env.step(action)
    done = terminated or truncated

    state = next_state
    total_reward += reward
    if done:
        break

frame = env.render()
frames.append(frame)

print('Episode finished. Total reward:', total_reward)
env.close()

In [None]:
random_video_path = os.path.join('videos', f'lunar_lander_trained.mp4')
imageio.mimsave(random_video_path, frames, fps=24)

In [None]:
Video(random_video_path, width=400)

### Summary

- In this notebook, we implemented a **Deep Q-Network (DQN)** agent to solve the LunarLander environment. The agent uses a neural network to approximate the Q-values for each action given a state. 

- To prevent instabilities during training, we used a **experience replay** to store transitions and sample batches for training.