<a href="https://colab.research.google.com/github/zahra-eslamian/artificial_intelligence_A_to_Z/blob/main/dqn_lunar_lander.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import random
from collections import deque

import gymnasium as gym
import imageio
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

# =====================
# Hyperparameters
# =====================
ENV_ID = "LunarLander-v2"

GAMMA = 0.99
LR = 1e-3
BATCH_SIZE = 64
BUFFER_SIZE = 100_000
MIN_REPLAY_SIZE = 10_000

EPS_START = 1.0
EPS_END = 0.05
EPS_DECAY = 500_000  # in steps

TARGET_UPDATE_FREQ = 1000  # in steps
MAX_EPISODES = 1000
MAX_STEPS_PER_EPISODE = 1000

VIDEO_FILENAME = "lunar_lander_dqn.mp4"

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# =====================
# DQN Network
# =====================
class DQN(nn.Module):
    def __init__(self, obs_dim, n_actions):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, n_actions),
        )

    def forward(self, x):
        return self.net(x)


# =====================
# Replay Buffer
# =====================
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        states = torch.tensor(states, dtype=torch.float32, device=device)
        actions = torch.tensor(actions, dtype=torch.int64, device=device)
        rewards = torch.tensor(rewards, dtype=torch.float32, device=device)
        next_states = torch.tensor(next_states, dtype=torch.float32, device=device)
        dones = torch.tensor(dones, dtype=torch.float32, device=device)

        return states, actions, rewards, next_states, dones

    def __len__(self):
        return len(self.buffer)


# =====================
# Epsilon-greedy policy
# =====================
def get_epsilon(step):
    # Linear decay
    epsilon = EPS_END + (EPS_START - EPS_END) * max(0, (EPS_DECAY - step)) / EPS_DECAY
    return epsilon


def select_action(policy_net, state, step):
    epsilon = get_epsilon(step)
    if random.random() < epsilon:
        # Explore
        return random.randrange(n_actions)
    else:
        # Exploit
        state_v = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
        with torch.no_grad():
            q_values = policy_net(state_v)
        return int(torch.argmax(q_values, dim=1).item())


# =====================
# Optimize DQN
# =====================
def compute_loss(policy_net, target_net, optimizer, replay_buffer):
    states, actions, rewards, next_states, dones = replay_buffer.sample(BATCH_SIZE)

    # Current Q values
    q_values = policy_net(states)
    q_values = q_values.gather(1, actions.unsqueeze(1)).squeeze(1)

    # Target Q values
    with torch.no_grad():
        next_q_values = target_net(next_states).max(1)[0]
        target_q_values = rewards + GAMMA * next_q_values * (1 - dones)

    loss = nn.MSELoss()(q_values, target_q_values)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    return loss.item()


# =====================
# Video generation
# =====================
def record_video(policy_net, filename=VIDEO_FILENAME, episodes=1):
    """
    Run the trained policy in a render_mode='rgb_array' env and save an MP4.
    """
    video_env = gym.make(ENV_ID, render_mode="rgb_array")
    frames = []

    for ep in range(episodes):
        state, _ = video_env.reset(seed=ep)
        done = False
        step = 0

        while not done and step < MAX_STEPS_PER_EPISODE:
            state_v = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
            with torch.no_grad():
                q_values = policy_net(state_v)
                action = int(torch.argmax(q_values, dim=1).item())

            next_state, reward, terminated, truncated, _ = video_env.step(action)
            done = terminated or truncated
            frame = video_env.render()
            frames.append(frame)
            state = next_state
            step += 1

    video_env.close()

    # Save to MP4
    print(f"Saving video to {filename} ({len(frames)} frames)...")
    with imageio.get_writer(filename, fps=30) as writer:
        for frame in frames:
            writer.append_data(frame)
    print("Video saved.")


# =====================
# Main training loop
# =====================
if __name__ == "__main__":
    # Create env WITHOUT rendering for training
    env = gym.make(ENV_ID)
    obs_dim = env.observation_space.shape[0]
    n_actions = env.action_space.n

    # Networks
    policy_net = DQN(obs_dim, n_actions).to(device)
    target_net = DQN(obs_dim, n_actions).to(device)
    target_net.load_state_dict(policy_net.state_dict())
    target_net.eval()

    optimizer = optim.Adam(policy_net.parameters(), lr=LR)
    replay_buffer = ReplayBuffer(BUFFER_SIZE)

    # Fill replay buffer with random experience
    print("Filling replay buffer with random policy...")
    state, _ = env.reset(seed=0)
    for _ in range(MIN_REPLAY_SIZE):
        action = env.action_space.sample()
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        replay_buffer.push(state, action, reward, next_state, done)

        if done:
            state, _ = env.reset()
        else:
            state = next_state

    print("Replay buffer filled. Starting training.")

    total_steps = 0
    episode_rewards = []

    for episode in range(1, MAX_EPISODES + 1):
        state, _ = env.reset()
        episode_reward = 0

        for t in range(MAX_STEPS_PER_EPISODE):
            total_steps += 1

            # Choose action
            action = select_action(policy_net, state, total_steps)

            # Environment step
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            # Store experience
            replay_buffer.push(state, action, reward, next_state, done)

            state = next_state
            episode_reward += reward

            # Optimize model
            loss = compute_loss(policy_net, target_net, optimizer, replay_buffer)

            # Update target network
            if total_steps % TARGET_UPDATE_FREQ == 0:
                target_net.load_state_dict(policy_net.state_dict())

            if done:
                break

        episode_rewards.append(episode_reward)
        avg_reward = np.mean(episode_rewards[-20:])

        print(
            f"Episode {episode:4d} | "
            f"Reward: {episode_reward:7.2f} | "
            f"Avg(20): {avg_reward:7.2f} | "
            f"Steps: {total_steps:7d} | "
            f"Epsilon: {get_epsilon(total_steps):.3f}"
        )

        # Simple "solved" condition: average reward â‰¥ 200 over last 20 episodes
        if avg_reward >= 200.0 and episode >= 20:
            print("Environment solved, stopping training.")
            break

    env.close()

    # =====================
    # Record video of the trained agent
    # =====================
    record_video(policy_net, filename=VIDEO_FILENAME, episodes=1)
