In [None]:
! pip install tensorflow
! pip install gym[box2d]

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import gym
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import tensorflow as tf
import warnings
warnings.filterwarnings('ignore')

# Set random seeds for reproducibility
seed = 1
tf.random.set_seed(seed)
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(seed)

# Define the Q-network
class QNetwork(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(input_dim, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Define the DQN agent
class DQNAgent:
    def __init__(self, state_dim, action_dim, learning_rate=0.001, gamma=0.99, epsilon=1.0, epsilon_min=0.01, epsilon_decay=0.995, memory_size=1000000, batch_size=64):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.learning_rate = learning_rate
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.batch_size = batch_size
        self.memory = []
        self.memory_size = memory_size
        self.model = QNetwork(state_dim, action_dim)
        self.target_model = QNetwork(state_dim, action_dim)
        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
        self.loss_fn = nn.MSELoss()

    def remember(self, state, action, reward, next_state, done):
        if len(self.memory) >= self.memory_size:
            self.memory.pop(0)
        self.memory.append((state, action, reward, next_state, done))

    # def act(self, state):
    #     if np.random.rand() < self.epsilon:
    #         return np.random.choice(self.action_dim)
    #     state_tensor = torch.FloatTensor(state)
    #     q_values = self.model(state_tensor)
    #     return torch.argmax(q_values).item()

    def select_action(self, state, explore=True):
        if explore and np.random.rand() < self.epsilon:
            return np.random.choice(self.action_dim)
        state_tensor = torch.FloatTensor(state)
        q_values = self.model(state_tensor)
        return torch.argmax(q_values).item()

    def replay(self):
        if len(self.memory) < self.batch_size:
            return
        samples = random.sample(self.memory, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*samples)
        states_tensor = torch.FloatTensor(states)
        next_states_tensor = torch.FloatTensor(next_states)
        actions_tensor = torch.LongTensor(actions).unsqueeze(1)
        rewards_tensor = torch.FloatTensor(rewards).unsqueeze(1)
        dones_tensor = torch.FloatTensor(dones).unsqueeze(1)

        # Compute the Q-values for the current state and the next state
        q_values = self.model(states_tensor).gather(1, actions_tensor)
        next_q_values = self.target_model(next_states_tensor).detach()
        max_next_q_values = next_q_values.max(1)[0].unsqueeze(1)
        target_q_values = rewards_tensor + (1 - dones_tensor) * self.gamma * max_next_q_values

        # Compute the loss and backpropagate
        loss = self.loss_fn(q_values, target_q_values)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def update_target_model(self):
        self.target_model.load_state_dict(self.model.state_dict())

    def decay_epsilon(self):
        self.epsilon = max(self.epsilon_min, self.epsilon_decay * self.epsilon)

# Create the environment
env = gym.make('LunarLander-v2')
env.seed(1)

# Set hyperparameters
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
learning_rate = 0.001
gamma = 0.99
epsilon = 1.0
epsilon_min = 0.01
epsilon_decay = 0.995
memory_size = 1000000
batch_size = 64
num_episodes = 640

# Create the DQN agent
agent = DQNAgent(state_dim, action_dim, learning_rate, gamma, epsilon, epsilon_min, epsilon_decay, memory_size, batch_size)

# Train the agent
total_rewards_train = []
for episode in range(num_episodes):
    state = env.reset()
    total_reward = 0
    done = False
    while not done:
        # Choose an action
        action = agent.select_action(state, explore=True)

        # Take a step
        next_state, reward, done, _ = env.step(action)

        # Update the agent's memory
        agent.remember(state, action, reward, next_state, done)

        # Update the Q-value function
        agent.replay()

        # Update the target network
        agent.update_target_model()

        # Update the state and the total reward
        state = next_state
        total_reward += reward
    # Append the total reward to the list
    total_rewards_train.append(total_reward)
    # Decay the exploration rate
    agent.decay_epsilon()

# Test the agent
num_test_episodes = 100
total_rewards_test = []
for episode in range(num_test_episodes):
    state = env.reset()
    total_reward = 0
    done = False
    while not done:
        # Choose the best action
        action = agent.select_action(state, explore=False)

        # Take a step
        next_state, reward, done, _ = env.step(action)

        # Update the total reward
        total_reward += reward

        # Update the state
        state = next_state

    # Append the total reward to the list
    total_rewards_test.append(total_reward)

# Print the average reward over the test episodes
avg_reward = sum(total_rewards_test) / num_test_episodes
print("Average Reward: {:.2f}".format(avg_reward))
print("")

# Define the figure with two subplots
fig, axs = plt.subplots(2, 1, figsize=(10, 12))

# Plot average rewards during training
axs[0].set_title("Lunar Lander Training", fontsize=20)
axs[0].set_xlabel("Episode", fontsize=16)
axs[0].set_ylabel("Total Reward", fontsize=16)
axs[0].tick_params(axis="both", labelsize=14)
axs[0].grid(linestyle='--', alpha=0.7)
axs[0].plot(range(len(total_rewards_train)), total_rewards_train, color='#1f77b4', linewidth=2)

# Plot total rewards during testing
axs[1].set_title("Lunar Lander Testing", fontsize=20)
axs[1].set_xlabel('Episode', fontsize=16)
axs[1].set_ylabel('Total Reward', fontsize=16)
axs[1].tick_params(axis='both', labelsize=14)
axs[1].grid(linestyle='--', alpha=0.7)
axs[1].plot(range(len(total_rewards_test)), total_rewards_test, color='#ff7f0e', linewidth=2)

# Add space between the subplots
plt.subplots_adjust(hspace=0.5)

# Add background color
fig.patch.set_facecolor('#F5F5F5')
axs[0].set_facecolor('#FFFFFF')
axs[1].set_facecolor('#FFFFFF')

plt.show()