## Imports and Google Drive Connect ##


In [None]:
!pip install gymnasium[atari]
!pip install torch torchvision stable_baselines3
!pip install gymnasium[accept-ROM-license]

In [None]:
# Mount to Google Drive for storing runs
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
import gymnasium as gym
import numpy as np
from stable_baselines3 import PPO, DQN
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.env_checker import check_env

from stable_baselines3.common.callbacks import CheckpointCallback
from stable_baselines3.common.callbacks import EvalCallback
from stable_baselines3.common.monitor import Monitor

import torch as th
import torch.nn as nn
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor
from stable_baselines3.common.logger import HParam, configure

## CNN Policiy ##

In [None]:
# Create all CNN options
class CustomCNN(BaseFeaturesExtractor):

    def __init__(self, observation_space: gym.spaces.Box, features_dim: int = 256):
        super(CustomCNN, self).__init__(observation_space, features_dim)
        # We assume CxHxW images (channels first)
        # Re-ordering will be done by pre-preprocessing or wrapper
        n_input_channels = observation_space.shape[0]
        self.cnn = nn.Sequential(
            nn.Conv2d(n_input_channels, 32, kernel_size=8, stride=4, padding=0),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=0),
            nn.ReLU(),
            nn.Flatten(),)

        # Compute shape by doing one forward pass
        with th.no_grad():
            n_flatten = self.cnn(
                th.as_tensor(observation_space.sample()[None]).float()).shape[1]
        self.linear = nn.Sequential(nn.Linear(n_flatten, features_dim), nn.ReLU())

    def forward(self, observations: th.Tensor) -> th.Tensor:
        return self.linear(self.cnn(observations))




class CustomResCNN(BaseFeaturesExtractor):
    def __init__(self, observation_space: gym.spaces.Box, features_dim: int = 256):
        super(CustomResCNN, self).__init__(observation_space, features_dim)
        n_input_channels = observation_space.shape[0]

        self.cnn1 = nn.Sequential(
            nn.Conv2d(n_input_channels, 32, kernel_size=8, stride=4, padding=0),
            nn.ReLU()
        )
        self.cnn2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=0),
            nn.ReLU()
        )
        self.cnn3 = nn.Sequential(
            nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU()
        )

        self.flatten = nn.Flatten()

        with th.no_grad():
            n_flatten = self.flatten(
                self.cnn3(self.cnn2(self.cnn1(th.as_tensor(observation_space.sample()[None]).float())))).shape[1]
        self.linear = nn.Sequential(nn.Linear(n_flatten, features_dim), nn.ReLU())

    def forward(self, observations: th.Tensor) -> th.Tensor:
        x = self.cnn1(observations)
        x = self.cnn2(x)
        x = x + self.cnn3(x)  # Residual connection
        x = self.flatten(x)
        return self.linear(x)





class CustomDeepCNN(BaseFeaturesExtractor):

    def __init__(self, observation_space: gym.spaces.Box, features_dim: int = 256):
        super(CustomDeepCNN, self).__init__(observation_space, features_dim)
        n_input_channels = observation_space.shape[0]
        self.cnn = nn.Sequential(
            nn.Conv2d(n_input_channels, 32, kernel_size=8, stride=4, padding=0),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=0),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.Flatten(),
        )

        with th.no_grad():
            n_flatten = self.cnn(
                th.as_tensor(observation_space.sample()[None]).float()).shape[1]
        self.linear = nn.Sequential(nn.Linear(n_flatten, features_dim), nn.ReLU())

    def forward(self, observations: th.Tensor) -> th.Tensor:
        return self.linear(self.cnn(observations))





class CustomMaxPoolCNN(BaseFeaturesExtractor):
    def __init__(self, observation_space: gym.spaces.Box, features_dim: int = 256):
        super(CustomMaxPoolCNN, self).__init__(observation_space, features_dim)
        n_input_channels = observation_space.shape[0]
        self.cnn = nn.Sequential(
            nn.Conv2d(n_input_channels, 32, kernel_size=8, stride=4, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Flatten(),
        )

        with th.no_grad():
            n_flatten = self.cnn(
                th.as_tensor(observation_space.sample()[None]).float()).shape[1]
        self.linear = nn.Sequential(nn.Linear(n_flatten, features_dim), nn.ReLU())

    def forward(self, observations: th.Tensor) -> th.Tensor:
        return self.linear(self.cnn(observations))

## Load Environment ##

In [None]:
environment_name = 'SpaceInvadersNoFrameskip-v4'
env = gym.make(environment_name, render_mode="rgb_array")
env.metadata['render_fps'] = 10000
env = Monitor(env)
env = gym.wrappers.AtariPreprocessing(env, noop_max = 30, frame_skip = 4, screen_size = 84, terminal_on_life_loss = False, grayscale_obs = True, grayscale_newaxis = False, scale_obs = False)
env = gym.wrappers.FrameStack(env, 4)


In [None]:
# ONLY USED FOR LEARNING/TESTING, NOT USED IN REPORT:
# episodes = 2
# mean_score = 0
# for episode in range(1, episodes+1):
#     obs = env.reset()                                                   #returns initial observation
#     done = False
#     score = 0

#     while not done:
#         env.render()
#         action = env.action_space.sample()                              #take sample of action space
#         obs, reward, terminated, truncated, info = env.step(action)     #take step and obtain obs,reward,etc...
#         score += reward                                                 #add up score

#         done = terminated or truncated
#     print('Episode:{} Score:{}'.format(episode, score))
#     mean_score += score

# print(mean_score/episodes)
# env.close()

## Create Model ##

In [None]:
# Create log path
log_path = os.path.join('drive', 'MyDrive', 'Bio_Inspired_Intelligence', 'Training', 'Logs_v2')

In [None]:
# Current settings: default

dqn_hyperparams = {
    'learning_rate': 1e-3,                  # default = 1e-3
    'buffer_size': 10000,                   # default = 10000
    'learning_starts': 1000,                # default = 1000
    'batch_size': 32,                       # default=32, mini batch size
    'tau': 1.0,                             # for soft update of target parameters, default = 1.0
    'gamma': 0.99,                          # discount factor, default = 0.99
    'train_freq': 4,
    'gradient_steps': 1,
    'target_update_interval': 10000,        # default = 10000
    'exploration_fraction': 0.1,            # default = 0.1
    'exploration_final_eps': 0.02,          # default = 0.02
    'exploration_initial_eps': 1.0,         # default = 1.0
    'verbose': 1,
    'tensorboard_log': log_path,

    'policy_kwargs': {
        'features_extractor_class': CustomDeepCNN,
        'features_extractor_kwargs': {'features_dim':128},
        'activation_fn': th.nn.ReLU
    },

}


# Create the DQN model with the defined hyperparameters
model = DQN('CnnPolicy', env, **dqn_hyperparams)

## Train, Save & Reload Model ##

In [None]:
checkpoint_callback = CheckpointCallback(save_freq=1000, save_path='./logs/', name_prefix='rl_model')
eval_callback = EvalCallback(env, best_model_save_path='./logs/', log_path='./logs/', eval_freq=500, deterministic=True, render=False)

# tb_log_name = 'model_save_default_CustomDeepCNN'
tb_log_name = 'model_save_FINALv3_CustomDeepCNN'

model.learn(total_timesteps=300000, tb_log_name=tb_log_name, callback=[checkpoint_callback, eval_callback])            # run first time

# saved_model = 'DQN_model_default_CustomDeepCNN'
saved_model = 'DQN_model_FINALv3_CustomDeepCNN'


# Make path
DQN_path = os.path.join('drive', 'MyDrive', 'Bio_Inspired_Intelligence', 'Training', 'Saved Models')
model.save(DQN_path)

In [None]:
tb_log_name = 'model_save_FINALv3_CustomDeepCNN'
saved_model = 'DQN_model_FINALv3_CustomDeepCNN'
del model

DQN_path = os.path.join('drive', 'MyDrive', 'Bio_Inspired_Intelligence', 'Training', 'Saved Models')
model = DQN.load(DQN_path, env=env)

# use this every time you 're-train':
checkpoint_callback = CheckpointCallback(save_freq=1000, save_path='./logs/', name_prefix='rl_model')
eval_callback = EvalCallback(env, best_model_save_path='./logs/', log_path='./logs/', eval_freq=500, deterministic=True, render=False)

# Retrain
model.set_env(env)
model.learn(total_timesteps=50000, tb_log_name=tb_log_name, callback=[checkpoint_callback, eval_callback], reset_num_timesteps=False) # run next steps to build upon initial

# Save final model
model.save(DQN_path)

## Evaluation ##

In [None]:
# ONLY USED FOR LEARNING/TESTING, NOT USED IN REPORT:
# episodes = 100
# acc_score = 0
# for episode in range(1, episodes+1):
#     obs = env.reset()
#     done = False
#     score = 0


#     while not done:
#         env.render()
#         action, _ = model.predict(obs)     # NOW USING MODEL HERE
#         # obs, reward, terminated, truncated, info = env.step(action)
#         obs, reward, done, info = env.step(action)
#         score += reward

#         # done = terminated or truncated
#     print('Episode:{} Score:{}'.format(episode, score))
#     acc_score += score

# print(acc_score/episodes)
# env.close()

In [None]:
%reload_ext tensorboard
%tensorboard --logdir drive/MyDrive/Bio_Inspired_Intelligence/Training/Logs_v2/


## Uncertainty Analysis ##

In [None]:
import numpy as np
from sklearn.metrics import mean_squared_error

X = 3
results_1 = []
results_2 = []
results_3 = []

# Run the training and evaluation process X times
for i in range(X):
    tb_log_name = f'u/model_save_FINALv3_Uncertainty_{i}'
    saved_model = f'u/DQN_model_FINALv3_Uncertainty_{i}'

    checkpoint_callback = CheckpointCallback(save_freq=1000, save_path=f'./logs/{saved_model}', name_prefix='rl_model')
    eval_callback = EvalCallback(env, best_model_save_path=f'./logs/{saved_model}', log_path=f'./logs/{tb_log_name}', eval_freq=500, deterministic=True, render=False)

    # Create and train the model
    model = DQN('CnnPolicy', env, **dqn_hyperparams)
    model.learn(total_timesteps=50000, tb_log_name=tb_log_name, callback=[checkpoint_callback, eval_callback])

    # Evaluate the model
    mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=10)

    if i == 0:
        results_1.append(mean_reward)
    elif i == 1:
        results_2.append(mean_reward)
    elif i == 2:
        results_3.append(mean_reward)

# Print the results from each run
print("Results from model run 1:", results_1)
print("Results from model run 2:", results_2)
print("Results from model run 3:", results_3)

# Calculate the average results list from the three runs
average_results = np.mean([results_1, results_2, results_3], axis=0)

# Print the average results
print("\nAverage Results List:", average_results)

# Calculate the mean squared error between the average results and each run's results
mse_1 = mean_squared_error(average_results, results_1)
mse_2 = mean_squared_error(average_results, results_2)
mse_3 = mean_squared_error(average_results, results_3)

# Relative MSE to mean squared
mse_1_rel = mse_1 / (average_results ** 2)
mse_2_rel = mse_2 / (average_results ** 2)
mse_3_rel = mse_3 / (average_results ** 2)

# Print the mean squared errors
print(f"\nRelative Mean Squared Error between Average and Run 1: {mse_1_rel}")
print(f"Relative Mean Squared Error between Average and Run 2: {mse_2_rel}")
print(f"Relative Mean Squared Error between Average and Run 3: {mse_3_rel}")
