In [1]:
# Install required packages
!pip3 install gymnasium[classic_control] optuna

# Imports
import math
import random
from collections import namedtuple, deque
from itertools import count

import gymnasium as gym
import matplotlib.pyplot as plt
import numpy as np
import optuna
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Replay Memory
Transition = namedtuple("Transition", ("state", "action", "next_state", "reward"))

class ReplayMemory:
    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

# Noisy Linear Layer
class NoisyLinear(nn.Linear):
    def __init__(self, in_features, out_features, sigma_init=0.017, bias=True):
        super(NoisyLinear, self).__init__(in_features, out_features, bias=bias)
        self.sigma_weight = nn.Parameter(torch.full((out_features, in_features), sigma_init))
        self.register_buffer("epsilon_weight", torch.zeros(out_features, in_features))
        if bias:
            self.sigma_bias = nn.Parameter(torch.full((out_features,), sigma_init))
            self.register_buffer("epsilon_bias", torch.zeros(out_features))
        self.reset_parameters()

    def reset_parameters(self):
        std = math.sqrt(3 / self.in_features)
        self.weight.data.uniform_(-std, std)
        if self.bias is not None:
            self.bias.data.uniform_(-std, std)

    def forward(self, input):
        self.epsilon_weight.normal_()
        bias = self.bias
        if bias is not None:
            self.epsilon_bias.normal_()
            bias = bias + self.sigma_bias * self.epsilon_bias.data
        return F.linear(input, self.weight + self.sigma_weight * self.epsilon_weight.data, bias)

# Dueling DQN with Optional Noisy Layers
class DuelingDQN(nn.Module):
    def __init__(self, n_observations, n_actions, hidden_size, noisy=False, sigma_init=0.017):
        super(DuelingDQN, self).__init__()
        Linear = NoisyLinear if noisy else nn.Linear

        self.base = nn.Sequential(
            Linear(n_observations, hidden_size),
            nn.ReLU(),
            Linear(hidden_size, hidden_size),
            nn.ReLU(),
        )

        self.adv_layer = nn.Sequential(
            Linear(hidden_size, hidden_size),
            nn.ReLU(),
            Linear(hidden_size, n_actions),
        )

        self.val_layer = nn.Sequential(
            Linear(hidden_size, hidden_size),
            nn.ReLU(),
            Linear(hidden_size, 1),
        )

    def forward(self, x):
        x = self.base(x)
        advantage = self.adv_layer(x)
        value = self.val_layer(x)
        return value + advantage - advantage.mean(dim=1, keepdim=True)

# Dueling DQN Agent
class DuelingDQNAgent:
    def __init__(self, env, trial, noisy=False):
        self.env = env
        self.device = device

        # Hyperparameters from Optuna
        self.batch_size = trial.suggest_int("batch_size", 64, 128)
        self.lr = trial.suggest_float("lr", 1e-4, 5e-4, log=True)
        self.hidden_size = trial.suggest_int("hidden_size", 128, 256)
        self.memory_size = trial.suggest_int("memory_size", 5000, 20000)
        self.eps_start = trial.suggest_float("eps_start", 0.8, 1.0)
        self.eps_end = trial.suggest_float("eps_end", 0.01, 0.1)
        self.eps_decay = trial.suggest_int("eps_decay", 1000, 3000)
        self.gamma = trial.suggest_float("gamma", 0.95, 0.99)
        self.tau = 0.005

        # Initialize networks
        n_actions = env.action_space.n
        n_observations = len(env.observation_space.low)

        self.policy_net = DuelingDQN(n_observations, n_actions, self.hidden_size, noisy).to(self.device)
        self.target_net = DuelingDQN(n_observations, n_actions, self.hidden_size, noisy).to(self.device)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=self.lr)
        self.memory = ReplayMemory(self.memory_size)

        self.steps_done = 0

    def select_action(self, state):
        sample = random.random()
        eps_threshold = self.eps_end + (self.eps_start - self.eps_end) * \
            math.exp(-1.0 * self.steps_done / self.eps_decay)
        self.steps_done += 1

        if sample > eps_threshold:
            with torch.no_grad():
                return self.policy_net(state).max(1)[1].view(1, 1)
        else:
            return torch.tensor([[self.env.action_space.sample()]], device=self.device, dtype=torch.long)

    def optimize_model(self):
        if len(self.memory) < self.batch_size:
            return

        transitions = self.memory.sample(self.batch_size)
        batch = Transition(*zip(*transitions))

        non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), device=self.device, dtype=torch.bool)
        non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])

        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)

        state_action_values = self.policy_net(state_batch).gather(1, action_batch)
        next_state_values = torch.zeros(self.batch_size, device=self.device)
        with torch.no_grad():
            next_actions = self.policy_net(non_final_next_states).max(1)[1].unsqueeze(1)
            next_state_values[non_final_mask] = self.target_net(non_final_next_states).gather(1, next_actions).squeeze(1)

        expected_state_action_values = (next_state_values * self.gamma) + reward_batch

        criterion = nn.SmoothL1Loss()
        loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

        self.optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_value_(self.policy_net.parameters(), 100)
        self.optimizer.step()

        # Soft update of target network
        if self.steps_done % 10 == 0:
            target_net_state_dict = self.target_net.state_dict()
            policy_net_state_dict = self.policy_net.state_dict()
            for key in policy_net_state_dict:
                target_net_state_dict[key] = policy_net_state_dict[key] * self.tau + \
                    target_net_state_dict[key] * (1 - self.tau)
            self.target_net.load_state_dict(target_net_state_dict)

# Plotting Episode Durations
episode_durations = []

def plot_durations(show_result=False):
    plt.figure(1)
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    if show_result:
        plt.title('Result')
    else:
        plt.clf()
        plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())
    plt.pause(0.001)
    if not show_result:
        display.display(plt.gcf())
        display.clear_output(wait=True)
    else:
        display.display(plt.gcf())

# Objective Function for Optuna
def objective(trial):
    env = gym.make("CartPole-v1")
    noisy = trial.suggest_categorical("noisy_layers", [True, False])
    agent = DuelingDQNAgent(env, trial, noisy)

    n_episodes = 500  # More episodes for better evaluation
    global episode_durations
    episode_durations = []

    for episode in range(n_episodes):
        state, _ = env.reset()
        state = (state - env.observation_space.low) / (env.observation_space.high - env.observation_space.low)
        state = torch.tensor(state, dtype=torch.float32, device=agent.device).unsqueeze(0)
        episode_reward = 0

        for t in range(500):
            action = agent.select_action(state)
            observation, reward, terminated, truncated, _ = env.step(action.item())

            # Reward shaping
            cart_position, pole_angle = observation[0], observation[2]
            reward = 1.0 - (abs(cart_position) / 2.4) - (abs(pole_angle) / 0.209)
            reward = max(reward, 0.0)
            reward = torch.tensor([reward], device=agent.device)

            episode_reward += reward.item()

            done = terminated or truncated
            next_state = None if terminated else (observation - env.observation_space.low) / (
                env.observation_space.high - env.observation_space.low)
            next_state = torch.tensor(next_state, dtype=torch.float32, device=agent.device).unsqueeze(0)
            agent.memory.push(state, action, next_state, reward)
            state = next_state

            agent.optimize_model()

            if done:
                episode_durations.append(t + 1)
                plot_durations()
                break

        # Early stopping for poor configurations
        if episode >= 10 and np.mean(episode_durations[-10:]) < 50:
            return 0

    plot_durations(show_result=True)
    return np.mean(episode_durations[-10:])





In [2]:
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=50)

print("Best trial:")
print("  Value: ", study.best_trial.value)
print("  Params: ")
for key, value in study.best_trial.params.items():
    print(f"    {key}: {value}")


[I 2025-01-27 23:00:49,353] A new study created in memory with name: no-name-f4ec3406-2f71-4f32-af84-01bdc5666822
  state = (state - env.observation_space.low) / (env.observation_space.high - env.observation_space.low)
  next_state = None if terminated else (observation - env.observation_space.low) / (
[W 2025-01-27 23:00:49,926] Trial 0 failed with parameters: {'noisy_layers': True, 'batch_size': 77, 'lr': 0.00036549267670801257, 'hidden_size': 178, 'memory_size': 8361, 'eps_start': 0.8723596688245032, 'eps_end': 0.010116734046200025, 'eps_decay': 2013, 'gamma': 0.9802691173810043} because of the following error: TypeError('must be real number, not NoneType').
Traceback (most recent call last):
  File "/home/edward/anaconda3/envs/bfh3/lib/python3.12/site-packages/optuna/study/_optimize.py", line 197, in _run_trial
    value_or_values = func(trial)
                      ^^^^^^^^^^^
  File "/tmp/ipykernel_23506/3778968220.py", line 230, in objective
    next_state = torch.tensor(next_st

TypeError: must be real number, not NoneType

In [3]:
import matplotlib.pyplot as plt
from IPython import display

# Store episode durations for plotting
episode_durations = []

def plot_durations(show_result=False):
    """
    Plot the durations of each episode along with a moving average.
    If `show_result` is True, the plot will display 'Result' in the title.
    """
    plt.figure(1)
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    if show_result:
        plt.title('Result')
    else:
        plt.clf()
        plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    
    # Compute and plot moving average (window size = 100)
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)  # Pause to update the plot
    if not show_result:
        display.display(plt.gcf())
        display.clear_output(wait=True)
    else:
        display.display(plt.gcf())