# A2C, PPO & Optuna

This notebook contains the code of the Optuna search, Advantage Actor Critic and Proximal Policy Optimization algorithms used in .

## Imports and function definitions

In [None]:
# Import following packages

import numpy as np
import scipy as sci
import pandas as pd
import random

import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical

import matplotlib.pyplot as plt

from typing import Literal, Optional, Union, Any, Dict

import gymnasium as gym

from stable_baselines3 import PPO, A2C

import optuna
from optuna.pruners import MedianPruner
from optuna.samplers import TPESampler

In [None]:
# Function definitions


def tuple_or_list_to_str(list_tuple_list: list[Union[tuple, list]]) -> list[str]:

    # Store the elements of a list with tuples/lists into a list with strings.

    # Inputs

    # list_tuple_list : list[Union[tuple, list]] List of tuples/lists to store into a list of strs.

    # Outputs

    # list[str]: Elements of the tuple/list stored in a str.

    return list(map(lambda x: " ".join(map(str, x)), list_tuple_list))


# Environment class, needs to be a valid gym.Env class


class HTC_env(gym.Env):

    # Inputs

    # size: Number of layers of the physical system, we use 16

    # Initialization of the environment

    def __init__(self, size):

        super(HTC_env, self).__init__()  # Initialize parent classes

        self.size = size  # Number of layers of the physical system

        self.observation_space = gym.spaces.Box(
            0, 1, shape=(self.size,), dtype=int
        )  # Shape of the possible states
        self.action_space = gym.spaces.Discrete(
            2 * self.size
        )  # Shape of the possibla actions

        self.render_mode = (
            None  # In case something asks about rendering the environment
        )

        self._counter = 0  # Countdown to reset the episode
        self._max_counter = 2 * self.size  # Length of episode

        self.terminated = False  # Checks if there is a need to reset the episode

        self.seen = (
            []
        )  # Holds the states that have been visited by this environment instance
        self.best = 0.0  # Holds the best state found of this environment instance
        self.seen_recording = (
            []
        )  # Holds how many states have been seen as the environment is used
        self.best_recording = (
            []
        )  # Holds which is the best state found yet, as the environment is used

    # Internal function to calculate the next state and return given an action

    def _take_action(self, action):

        new_state_slice = np.copy(self._current_state[0])  # Copy the previous state

        if action < self.size:

            new_state_slice[action] = int(0)  # Make the selected layer dielctric

        else:

            new_state_slice[action - self.size] = int(
                1
            )  # Make the selected layer metallic

        new_value = htc_series[
            tuple_or_list_to_str([new_state_slice.astype(int)])
        ].values[0] / (
            norm
        )  # HTC of the next state

        new_state = [new_state_slice, new_value]  # New state tuple
        reward = new_value.astype("float") - rew_base  # Reward from new state

        self._current_state = new_state  # Save this state as current

        return new_state, reward

    # Function to reset the state after the episode ends

    def reset(self, seed=None, options=None):

        self.terminated = False  # Reset the flag

        self._counter = 0  # Reset the countdown to reset the episode

        super().reset(seed=seed)  # So that self.np_random is seeded, in case its needed

        self._reset_state = np.zeros((self.size,)).astype(
            int
        )  # Seed for the starting state, all 0s

        self._current_state = [
            self._reset_state,
            htc_series[tuple_or_list_to_str([self._reset_state])].values[0] / (norm),
        ]  # Starting state, all 0s

        state = self._current_state[0]  # Current state, back to 0
        HTC = self._current_state[1]  # HTC of current state

        return state, {}

    # Function to apply an action and recover the next state and rewards

    def step(self, action):

        self._counter += 1  # Add one step to the counter

        if (
            self._counter == self._max_counter + 1
        ):  # If next step would be outside episode, time to reset

            next_state, _ = self.reset()  # Reset episode
            reward = (
                htc_series[tuple_or_list_to_str([next_state])].values[0] / (norm)
                - rew_base
            )  # What the reward would have been
            self.terminated = True  # Put flag to true

        else:

            next_state, reward = self._take_action(
                action
            )  # Apply action to obtain new state and reward

        if next_state[1] not in self.seen:  # Check if it's a new state

            self.seen.append(next_state[1])  # Save the state to the list

            if (
                next_state[1] > self.best
            ):  # If it's better than any before, save it as the best

                self.best = next_state[1]

            self.seen_recording.append(
                len(self.seen)
            )  # Save how many states we have seen so far
            self.best_recording.append(self.best)  # Save which is the best among them

        return (
            next_state[0],
            reward,
            self.terminated,
            False,
            {},
        )  # Output structure is the gym.Env structure

In [None]:
# Defining and loading the datasets

vect = np.genfromtxt("../data/HTC/16layers_index.txt").astype(
    int
)  # Combination of materials
htc_vals = np.genfromtxt(
    "../data/HTC/16layer_data.txt", dtype="float64"
)  # Associated HTC

htc_series = pd.Series(
    data=htc_vals, index=tuple_or_list_to_str(vect)
)  # Series form, for much faster searching

norm = 1e5  # Normalization constant for HTC, units (10^5 W/m^2 K)

base_seed = np.array(
    [1, 0, 0, 1, 1, 0, 0, 1, 1, 0, 0, 1, 1, 0, 0, 1]
)  # State to use as baseline for the rewards
rew_base = htc_series[tuple_or_list_to_str([base_seed.astype(int)])][0] / (
    norm
)  # Baseline for the rewards

## Optuna search

In [None]:
# Optuna hyperparameter search, simple code

# We choose A2C for the search, almost the same code is valid for PPO

# Step 1: hyperparameters of the optuna search, random seeding and preallocations

N_TRIALS = 100  # Maximum number of trials
N_TIMESTEPS = int(1e5)  # Training budget
N_EVAL_EPISODES = 32  # Length of each evaluation episode
N_STARTUP_TRIALS = 5  # Stop random sampling after N_STARTUP_TRIALS
TIMEOUT = int(60 * 30 * 8)  # How long to wait for until stopping, in seconds

seed = 42  # Random seed

random.seed(seed)  # Seed the random library
torch.manual_seed(seed)  # Seed the pytorch library
np.random.seed(int(seed))  # Seed the numpy library

# Dictionary that holds the hyperparameters that all trials will share and never explore

DEFAULT_HYPERPARAMS = {
    "policy": "MlpPolicy",
    "env": HTC_env(16),
    "use_rms_prop": False,  # Delete in case of PPO
}

# Step 2: define external functions: evaluation of the model, sampling of hyperparams and the goal of the search

# Function 1: evaluation for the performance of the model: stop and run 10 episodes, then average over them


def custom_evaluation(env, model, len_eps):

    # Will return the mean last reward of an episode

    last_rewards = np.zeros((10,))

    # Step 1: reset the environment

    obs_evals, _ = env.reset()

    # Step 2 (repeated 10 times): train the model for an episode, record the last reward

    for i in range(10):

        for j in range(len_eps):

            action, _ = model.predict(obs_evals)
            obs_evals, reward_evals, _, _, _ = env.step(action)

        last_rewards[i] = reward_evals
        obs_evals, _ = env.reset()

    # Step 3: return the mean of the last rewards

    return np.mean(last_rewards)


# Function 2: parameter sampling for optuna


def sample_a2c_params(trial: optuna.Trial) -> Dict[str, Any]:

    # Hyperparameters we are searching for

    gamma = 1.0 - trial.suggest_float("gamma", 0.0001, 0.1, log=True)
    max_grad_norm = trial.suggest_float("max_grad_norm", 0.5, 5.0, log=True)

    gae_lambda = trial.suggest_float("gae_lambda", 0.0, 1.0)

    learning_rate = trial.suggest_float("lr", 1e-6, 1e-3, log=True)
    # activation_fn = trial.suggest_categorical("activation_fn", ["selu"])

    # Hyperparameters we will leave fixed

    activation_fn = nn.SELU
    net_arch = {"pi": [100, 100, 100, 100], "vf": [100, 100, 100, 100]}
    n_steps = 2**5

    # Display true values

    trial.set_user_attr("gamma_", gamma)
    trial.set_user_attr("n_steps", n_steps)

    # Return dictionary with all chosen hyperparameters

    return {
        "n_steps": n_steps,  # Steps per episode
        "gamma": gamma,  # Discount factor for RL
        "learning_rate": learning_rate,  # Learning rate
        "max_grad_norm": max_grad_norm,  # Maximum gradient
        "gae_lambda": gae_lambda,  # GAE exponential factor
        # "batch_size": n_steps,             # Size of the batches considered # Add in case of PPO
        "policy_kwargs": {
            "net_arch": net_arch,  # Network architecture (actor and critic)
            "activation_fn": activation_fn,  # Activation function
        },
    }


# Function 3: objective for optuna to search and evaluate configurations
# Given a configuration, it will sample hyperparameters, evaluate it and report the result


def objective(trial: optuna.Trial) -> float:

    # 1. Sample hyperparameters and update the keyword arguments, adding the new ones to the default

    kwargs = DEFAULT_HYPERPARAMS.copy()
    kwargs.update(sample_a2c_params(trial))

    # Create the RL model

    model = A2C(**kwargs)

    # 2. Create envs used for evaluation

    eval_env = HTC_env(16)

    # 3. Train the model

    model.learn(N_TIMESTEPS)

    # 4. Evaluate the trained model

    score = custom_evaluation(eval_env, model, N_EVAL_EPISODES)

    model.env.close()  # Reset environment
    eval_env.close()  # Reset environment

    return score


# Step 3: Perform the optuna search

# Set pytorch num threads to 1 for faster training

torch.set_num_threads(1)

# Select the sampler, can be random, TPESampler, CMAES, ...

sampler = TPESampler(n_startup_trials=N_STARTUP_TRIALS)

# Do not prune before 1/5 of the max budget is used

pruner = MedianPruner(
    n_startup_trials=N_STARTUP_TRIALS, n_warmup_steps=N_TIMESTEPS // 5
)

# Create the study and start the hyperparameter optimization

study = optuna.create_study(sampler=sampler, pruner=pruner, direction="maximize")

try:
    study.optimize(objective, n_trials=N_TRIALS, n_jobs=1, timeout=TIMEOUT)

except KeyboardInterrupt:

    pass

# Step 4: print results

print("Number of finished trials: ", len(study.trials))

print("Best trial:")
trial = study.best_trial  # (.trials for all of them)

print(f"  Value: {trial.value}")

print("  Params: ")
for key, value in trial.params.items():
    print(f"    {key}: {value}")

print("  User attrs:")
for key, value in trial.user_attrs.items():
    print(f"    {key}: {value}")

## A2C algorithm

In [None]:
# Step 1: pre-allocating, loading and previous steps

seed = 42  # Fix the random seed

random.seed(seed)  # Seed the random library
torch.manual_seed(seed)  # Seed the pytorch library
np.random.seed(int(seed))  # Seed the numpy library

torch.backends.cudnn.deterministic = True  # Set cuda parameters
torch.backends.cudnn.benchmark = False  # Set cuda parameters

device = torch.device(
    "cuda:0" if torch.cuda.is_available() else "cpu"
)  # Use GPU if available

n_eval_episodes = 32  # Number of steps per evaluation episode
N_timesteps = int(4000 * 32)  # Number of steps (4000 episodes of 32 steps each)

torch.set_num_threads(1)  # Set pytorch num threads to 1 for faster training

# Step 2: set the environment and the model hyperparameters

our_env = HTC_env(16)  # Set environment (using the custom class)

# Hyperparameters chosen by optuna

n_steps = 2**5  # Steps per episode
gamma = 1.0 - 0.05354748279639477  # Discount factor for RL
learning_rate = 0.0003529867715614526  # Learning rate
gae_lambda = 0.9972303247713417  # GAE exponential factor
max_grad_norm = 1.2858503485554726  # Maximum gradient
net_arch = {
    "pi": [100, 100, 100, 100],
    "vf": [100, 100, 100, 100],
}  # Network architecture (actor and critic)
activation_fn = nn.SELU  # Activation function

# Hyperparameter dictionary

DEFAULT_HYPERPARAMS = {
    "policy": "MlpPolicy",
    "env": our_env,
    "use_rms_prop": False,
    "device": device,
}

Hyperparams = {
    "n_steps": n_steps,
    "gamma": gamma,
    "learning_rate": learning_rate,
    "max_grad_norm": max_grad_norm,
    "gae_lambda": gae_lambda,
    "policy_kwargs": {
        "net_arch": net_arch,
        "activation_fn": activation_fn,
    },
}

kwargs = DEFAULT_HYPERPARAMS.copy()  # Start the dictionary with the default ones
kwargs.update(Hyperparams)  # Add the ones found by optuna

# Step 3: train the model

model = A2C(**kwargs)  # Create the model, using the previous hyperparams

model.learn(N_timesteps)  # Train the model

model.env.close()  # Close the env

## PPO algorithm

In [None]:
# Step 1: pre-allocating, loading and previous steps

seed = 42  # Fix the random seed

random.seed(seed)  # Seed the random library
torch.manual_seed(seed)  # Seed the pytorch library
np.random.seed(int(seed))  # Seed the numpy library

torch.backends.cudnn.deterministic = True  # Set cuda parameters
torch.backends.cudnn.benchmark = False  # Set cuda parameters

device = torch.device(
    "cuda:0" if torch.cuda.is_available() else "cpu"
)  # Use GPU if available

n_eval_episodes = 32  # Number of steps per evaluation episode
N_timesteps = int(4000 * 32)  # Number of steps (4000 episodes of 32 steps each)

torch.set_num_threads(1)  # Set pytorch num threads to 1 for faster training

# Step 2: set the environment and the model hyperparameters

our_env = HTC_env(16)  # Set environment (using the custom class)

# Hyperparameters chosen by optuna

n_steps = 2**5  # Steps per episode
gamma = 1.0 - 0.00012343188973544484  # Discount factor for RL
learning_rate = 6.24161237241945e-05  # Learning rate
gae_lambda = 0.8714387482390391  # GAE exponential factor
max_grad_norm = 4.042358248277151  # Maximum gradient
net_arch = {
    "pi": [100, 100, 100, 100],
    "vf": [100, 100, 100, 100],
}  # Network architecture (actor and critic)
activation_fn = nn.SELU  # Activation function

# Hyperparameter dictionary

DEFAULT_HYPERPARAMS = {"policy": "MlpPolicy", "env": our_env, "device": device}

Hyperparams = {
    "n_steps": n_steps,
    "gamma": gamma,
    "learning_rate": learning_rate,
    "max_grad_norm": max_grad_norm,
    "gae_lambda": gae_lambda,
    "batch_size": n_steps,
    "policy_kwargs": {
        "net_arch": net_arch,
        "activation_fn": activation_fn,
    },
}

kwargs = DEFAULT_HYPERPARAMS.copy()  # Start the dictionary with the default ones
kwargs.update(Hyperparams)  # Add the ones found by optuna

# Step 3: train the model

model = PPO(**kwargs)  # Create the model, using the previous hyperparams

model.learn(N_timesteps)  # Train the model

model.env.close()  # Close the env