In [1]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
from collections import namedtuple, deque
import random
import os
from tensordict import TensorDict
from torchrl.data import TensorDictReplayBuffer, LazyMemmapStorage



In [2]:
env = gym.make("LunarLander-v2")

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [4]:
class DQN(nn.Module):
    def __init__(self, state_dim, action_dim, device):
        super(DQN, self).__init__()
        self.device = device
        self.fc1 = nn.Linear(state_dim, 128).to(self.device)
        self.fc2 = nn.Linear(128, 128).to(self.device)
        self.fc3 = nn.Linear(128, 64).to(self.device)
        self.fc4 = nn.Linear(64, action_dim).to(self.device)

    def forward(self, state):
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        return self.fc4(x)


In [5]:
BATCH_SIZE = 128
GAMMA = .99
EPS_START = 0.7
EPS_END = 0.4
EPS_DECAY = 0.99
TAU = 0.005
LR = 5*1e-6

In [6]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))

In [7]:
class ReplayMemory:
    def __init__(self, capacity):
        self.memory = deque(maxlen=capacity)

    def push(self, state, action, next_state, reward):
        action = torch.tensor([[action]], device=device, dtype=torch.long) if not isinstance(action, torch.Tensor) else action
        reward = torch.tensor([reward], device=device, dtype=torch.float32)
        self.memory.append((state, action, next_state, reward))
    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)
    def __len__(self):
        return len(self.memory)

In [8]:

class DDQNAgent:
    def __init__(self, state_dim, action_dim, device, model_path="ddqn_lunar_lander.pth"):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.device = device
        self.memory = ReplayMemory(1000000)
        self.gamma = GAMMA  
        self.epsilon = EPS_START  
        self.epsilon_decay = 0.9999998
        self.epsilon_min = EPS_END
        self.learning_rate = LR
        self.batch_size = BATCH_SIZE
        self.model_path = model_path
        self.policy_net = DQN(state_dim, action_dim, device)
        self.target_net = DQN(state_dim, action_dim, device)
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=self.learning_rate, amsgrad=True)
        self.save_every = 5e5
        self.steps = 0
        if os.path.exists(self.model_path):
            self.load_model()

        self.target_net.load_state_dict(self.policy_net.state_dict())

    def act(self, state, learn=True):
        if (np.random.rand() < self.epsilon) and learn:
            action_idx = np.random.randint(self.action_dim)
        else:
            with torch.no_grad():
                q_values = self.policy_net(state)
                action_idx = torch.argmax(q_values, dim=1).item()
        self.epsilon *= self.epsilon_decay
        self.epsilon = max(self.epsilon_min,self.epsilon)
        self.steps += 1
        return action_idx

    def train(self):
        if len(self.memory) < BATCH_SIZE:
            return
        transitions = self.memory.sample(BATCH_SIZE)
        batch = Transition(*zip(*transitions))
        non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
        non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)

        state_action_values = self.policy_net(state_batch).gather(1, action_batch)
        next_state_values = torch.zeros(BATCH_SIZE, device=device)
        with torch.no_grad():
            next_state_values[non_final_mask] = self.target_net(non_final_next_states).max(1).values
        expected_state_action_values = (next_state_values * GAMMA) + reward_batch

        criterion = nn.SmoothL1Loss()
        loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

        self.optimizer.zero_grad()
        loss.backward()
        #torch.nn.utils.clip_grad_value_(self.policy_net.parameters(), 100)
        #self.optimizer.step()
        torch.nn.utils.clip_grad_norm_(self.policy_net.parameters(), max_norm=10)  # Gradient clipping
        self.optimizer.step()    

    def save_model(self):
        torch.save({
            'model_state_dict': self.policy_net.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'epsilon': self.epsilon
        }, self.model_path)
        print(f"Model saved to {self.model_path}")

    def load_model(self):
        checkpoint = torch.load(self.model_path, map_location=self.device)
        self.policy_net.load_state_dict(checkpoint['model_state_dict'])
        self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        self.epsilon = checkpoint['epsilon']
        print(f"Model loaded from {self.model_path}")


    def update_policy(self):
        target_net_state_dict = self.target_net.state_dict()
        policy_net_state_dict = self.policy_net.state_dict()
        for key in policy_net_state_dict:
            target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)
        self.target_net.load_state_dict(target_net_state_dict)

In [9]:

agent = DDQNAgent(state_dim=8, action_dim=4, device=device)
agent.learning_rate = 0.7
episodes = 10000
for e in range(episodes):
    state = env.reset()
    state = state[0] if isinstance(state, tuple) else state
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
    done = False
    total_reward = 0
    steps = 0
    episode_steps = []
    won = False
    while not done:
        action = agent.act(state)
        next_state, reward, done, _, _ = env.step(action)
        episode_steps.append((state.cpu().numpy(), action))
        
        if steps > 500:
            reward = -100
            done = True
        if reward > 99:
            won = True
            done = True
            total_reward += reward
            print(f"Episode {e+1}/{episodes}, Total Reward: {total_reward}, Steps: {steps}, Epsilon: {agent.epsilon} Won: {won} Totalsteps: {agent.steps}")
            if total_reward >= 200:
                reward += 200
                print('MEGAWIN')    
        if reward < -0.1:
            reward -= 0.05
        if done:
            next_state = None
        else:
            next_state = torch.tensor(next_state, dtype=torch.float32, device=device).unsqueeze(0)
        
        agent.memory.push(state, action, next_state, reward)
        state = next_state
        total_reward += reward
        agent.train()
        steps += 1
        agent.update_policy()

    # Save the model at regular intervals
    if (e + 1) % 50 == 0:
        agent.save_model()
        print(f"Episode {e+1}/{episodes}, Total Reward: {total_reward}, Steps: {steps}, Epsilon: {agent.epsilon} Won: {won} Totalsteps: {agent.steps}")

env.close()

  checkpoint = torch.load(self.model_path, map_location=self.device)


Model loaded from ddqn_lunar_lander.pth
Model saved to ddqn_lunar_lander.pth
Episode 50/100000, Total Reward: -22.81848196152022, Steps: 103, Epsilon: 0.4 Won: False Totalsteps: 5744
Model saved to ddqn_lunar_lander.pth
Episode 100/100000, Total Reward: -32.72084243084895, Steps: 66, Epsilon: 0.4 Won: False Totalsteps: 11546


In [None]:
import time
print("Running a new episode with the trained agent...")

env = gym.make("LunarLander-v2", render_mode="human")

state = env.reset()
state = state[0] if isinstance(state, tuple) else state
state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
done = False
total_reward = 0
stepps = 0
while not done:
    env.render()
    # Get the action from the trained agent with learn=False
    action = agent.act(state, learn=False)
    next_state, reward, done, _, _ = env.step(action)
    state = torch.tensor(next_state, dtype=torch.float32, device=device).unsqueeze(0)
    total_reward += reward
    if reward > 95:
            
        won = True
        done = True
        print(f"Total Reward: {total_reward} Won: {won} Steps: {stepps}")      
    
    stepps += 1
    # Add a small sleep to slow down the visualization
    

print(f"New Episode: Total Reward: {total_reward}")
env.close()