## Create QNetwork Class

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class QNetwork(nn.Module):
    """Actor (Policy) Model."""

    def __init__(self, state_size, action_size, seed, fc1_units=64, fc2_units=64):
        """Initialize parameters and build model.
        Params
        ======
            state_size (int): Dimension of each state
            action_size (int): Dimension of each action
            seed (int): Random seed
            fc1_units (int): Number of nodes in first hidden layer
            fc2_units (int): Number of nodes in second hidden layer
        """
        super(QNetwork, self).__init__()
        self.seed = torch.manual_seed(seed)
        self.fc1 = nn.Linear(state_size, fc1_units)
        self.fc2 = nn.Linear(fc1_units, fc2_units)
        self.fc3 = nn.Linear(fc2_units, action_size)

    def forward(self, state):
        """Build a network that maps state -> action values."""
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        return self.fc3(x)

: 

### Create Replay Buffer

In [None]:
import numpy as np
import random
from collections import namedtuple, deque

import torch


class ReplayBuffer:
    """Fixed-size buffer to store experience tuples."""

    def __init__(
        self,
        action_size,
        buffer_size,
        batch_size,
        seed,
        device=torch.device("cuda:0" if torch.cuda.is_available() else "cpu"),
    ):
        """Initialize a ReplayBuffer object.
        Params
        ======
            action_size (int): dimension of each action
            buffer_size (int): maximum size of buffer
            batch_size (int): size of each training batch
            seed (int): random seed
        """
        self.action_size = action_size
        self.memory = deque(maxlen=buffer_size)
        self.batch_size = batch_size
        self.experience = namedtuple(
            "Experience",
            field_names=["state", "action", "reward", "next_state", "done"],
        )
        self.seed = random.seed(seed)
        self.device = device

    def add(self, state, action, reward, next_state, done):
        """Add a new experience to memory."""
        e = self.experience(state, action, reward, next_state, done)
        self.memory.append(e)

    def sample(self):
        """Randomly sample a batch of experiences from memory."""
        experiences = random.sample(self.memory, k=self.batch_size)

        states = (
            torch.from_numpy(np.vstack([e.state for e in experiences if e is not None]))
            .float()
            .to(self.device)
        )
        actions = (
            torch.from_numpy(
                np.vstack([e.action for e in experiences if e is not None])
            )
            .long()
            .to(self.device)
        )
        rewards = (
            torch.from_numpy(
                np.vstack([e.reward for e in experiences if e is not None])
            )
            .float()
            .to(self.device)
        )
        next_states = (
            torch.from_numpy(
                np.vstack([e.next_state for e in experiences if e is not None])
            )
            .float()
            .to(self.device)
        )
        dones = (
            torch.from_numpy(
                np.vstack([e.done for e in experiences if e is not None]).astype(
                    np.uint8
                )
            )
            .float()
            .to(self.device)
        )

        return (states, actions, rewards, next_states, dones)

    def __len__(self):
        """Return the current size of internal memory."""
        return len(self.memory)


### Create Utility Functions to Train Agent

In [None]:
from collections import deque
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
import optuna
from optuna.samplers import TPESampler
import torch

def train_agent(
    agent,
    env,
    agent_type,
    n_episodes=1000,
    max_t=1000,
    eps_start=1.0,
    eps_end=0.01,
    save_agent=False,
):
    """Deep Q-Learning.

    Params
    ======
        n_episodes (int): maximum number of training episodes
        max_t (int): maximum number of timesteps per episode
        eps_start (float): starting value of epsilon, for epsilon-greedy action selection
        eps_end (float): minimum value of epsilon
        eps_decay (float): multiplicative factor (per episode) for decreasing epsilon
    """
    scores = []
    episode_lengths = []
    losses = []
    exploitative_actions = []
    exploratory_actions = []

    scores_window = deque(maxlen=100)  # last 100 scores
    eps = eps_start  # initialize epsilon
    eps_change = [eps]

    for i_episode in range(1, n_episodes + 1):
        state, _ = env.reset()
        score = 0
        episode_length = 0
        for _ in range(max_t):

            # Increment the episode length counter
            episode_length += 1

            action = agent.act(state, eps)
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated | truncated
            agent.step(state, action, reward, next_state, done)
            state = next_state
            score += reward
            if done:
                break

        # save total loss during the episode and reset it
        losses.append(agent.loss)
        agent.loss = 0

        exploitative_actions.append(agent.num_exploitative_actions)
        agent.num_exploitative_actions = 0

        exploratory_actions.append(agent.num_exploratory_actions)
        agent.num_exploratory_actions = 0

        episode_lengths.append(episode_length)

        scores_window.append(score)  # save most recent score
        scores.append(score)  # save most recent score

        eps = max(eps_end, agent.eps_decay * eps)  # decrease epsilon
        eps_change.append(eps)
        print(
            "\rEpisode {}\tAverage Score: {:.2f}".format(
                i_episode, np.mean(scores_window)
            ),
            end="",
        )
        if i_episode % 100 == 0:
            print(
                "\rEpisode {}\tAverage Score: {:.2f}".format(
                    i_episode, np.mean(scores_window)
                )
            )
        if np.mean(scores_window) >= 200.0:
            print(
                "\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}".format(
                    i_episode - 100, np.mean(scores_window)
                )
            )
            if save_agent:
                torch.save(
                    agent.target_network.state_dict(),
                    f"checkpoints/{agent_type}_target_network_{i_episode}.pth",
                )
                torch.save(
                    agent.online_network.state_dict(),
                    f"checkpoints/{agent_type}_online_network_{i_episode}.pth",
                )
            break

    return {
        "scores": scores,
        "episode_lengths": episode_lengths,
        "losses": losses,
        "exploitative_actions": exploitative_actions,
        "exploratory_actions": exploratory_actions,
        "eps_change": eps_change,
    }
    
def get_optimal_hyperparamters(env, agent_type, n_trials=10, n_episodes=1000, seed=42):
    def objective(trial):

        # Sample hyperparameter values
        buffer_size = trial.suggest_int("buffer_size", 1000, 10000)
        batch_size = trial.suggest_int("batch_size", 32, 256)
        fc1_units = trial.suggest_int("fc1_units", 16, 128)
        fc2_units = trial.suggest_int("fc2_units", 16, 128)
        gamma = trial.suggest_float("gamma", 0.9, 1.0)
        tau = trial.suggest_float("tau", 1e-5, 1e-3)
        lr = trial.suggest_float("lr", 1e-4, 1e-2)
        update_every = trial.suggest_int("update_every", 1, 6)
        eps_decay = trial.suggest_float("eps_decay", 0.9, 0.999)
        loss_fn = trial.suggest_categorical("loss_fn", ["mse", "huber"])

        # Create and train DDQN agent
        agent = (
            DDQNAgent(
                state_size=8,
                action_size=4,
                seed=seed,
                buffer_size=buffer_size,
                batch_size=batch_size,
                fc1_units=fc1_units,
                fc2_units=fc2_units,
                eps_decay=eps_decay,
                gamma=gamma,
                tau=tau,
                lr=lr,
                update_every=update_every,
                loss_fn=loss_fn,
            )
            if agent_type == "ddqn"
            else DQNAgent(
                state_size=8,
                action_size=4,
                seed=seed,
                buffer_size=buffer_size,
                batch_size=batch_size,
                fc1_units=fc1_units,
                fc2_units=fc2_units,
                eps_decay=eps_decay,
                gamma=gamma,
                tau=tau,
                lr=lr,
                update_every=update_every,
            )
        )
        metrics = train_agent(agent, env, n_episodes=n_episodes, agent_type=agent_type)
        # Return average reward over all episodes
        return np.mean(metrics["scores"])

    study = optuna.create_study(
        study_name=f"{agent_type}_study",
        direction="maximize",
        sampler=TPESampler(seed=seed),
        storage=None,
    )
    study.optimize(objective, n_trials=n_trials, show_progress_bar=True)

    optuna.visualization.plot_optimization_history(study)

    optuna.visualization.plot_slice(study)

    optuna.visualization.plot_param_importances(study)
   
    return study.best_params

In [None]:
n_trials = 5
n_episodes = 500
agent_type = "ddqn"
env = gym.make("LunarLander-v2")

opt_params = get_optimal_hyperparamters(
        env, n_trials=n_trials, n_episodes=n_episodes, agent_type=agent_type
)

print(opt_params)