In [None]:
!pip install datasets

In [None]:
from google.colab import drive
drive.mount('/content/drive',force_remount=True)

In [None]:
%cd /content/drive/MyDrive/2024 Term 8/50.021 AI/project/RL
!pwd
!ls

In [None]:
import numpy as np
import scipy.special as sp

from IPython.display import display, clear_output
import matplotlib.pyplot as plt
import copy
import time
import random

import torch
import torch.nn as nn
import torch.optim as optim
import collections
from torch.utils.data import Dataset
from datasets import Dataset, Features, Array2D, Value, load_dataset, load_from_disk

from environment import MazeEnvironment
from agent import Agent

from models.experience import ExperienceReplay
from models.fcn import fc_nn
from models.cnn import conv_nn
from training import train
from evaluation import evaluation
from utils import *

from sklearn.metrics import f1_score, accuracy_score
from torchsummary import summary

from matplotlib.colors import ListedColormap
from statistics import mean

from tqdm.notebook import tqdm
import json
import ast

%load_ext autoreload
%autoreload 2

In [None]:
class config:
    maze_size = "9x9"
    buffer_capacity = 10000
    buffer_start_size = 1000
    network = "cnn"
    num_epochs = 5000
    cutoff = 3000
    device = 'cuda'
    batch_size = 32
    gamma = 0.9
    generalized = True
    folder = f'{maze_size}_{"generalized" if generalized else "simple"}_{network}_{num_epochs}'
    save_filename = f'{folder}/{maze_size}_{"generalized" if generalized else "simple"}_{network}_{num_epochs}.torch'

In [None]:
class paths:
    train_9x9 = "./data/train/9x9_train.hf"
    train_11x11 = "./data/train/11x11_train.hf"
    train_15x15 = "./data/train/15x15_train.hf"
    train_21x21 = "./data/train/21x21_train.hf"
    test_9x9 = "./data/test/9x9/borderless_9x9_test.hf"
    test_11x11 = "./data/test/11x11/borderless_11x11_test.hf"
    test_15x15 = "./data/test/15x15/borderless_15x15_test.hf"
    test_21x21 = "./data/test/21x21/borderless_21x21_test.hf"

In [None]:
print(config.save_filename)
print(config.folder)

In [None]:
import os

if os.path.exists(config.folder):
    print('The file exists')
else:
    print('The file does not exist')
    os.makedirs(config.folder)

# Load training data

In [None]:
training_data = load_data(paths.train_9x9)
visualize_mazes(training_data, n = 20)

# Visualize Maze Environment

In [None]:
maze = training_data[0]['maze'].numpy()
initial_position = [0,0]
goal = [len(maze)-1, len(maze)-1]
maze_env = MazeEnvironment(maze, initial_position, goal)

print("Visualization of maze:")
maze_env.draw(f'maze_{config.maze_size}.pdf')

In [None]:
memory_buffer = ExperienceReplay(config.buffer_capacity)
agent = Agent(maze = maze_env,
              memory_buffer = memory_buffer,
              use_softmax = True
             )

if config.network == "cnn":
    net = conv_nn(maze.shape[0], maze.shape[1], 4)
else:
    net = fc_nn(maze.size, maze.size, maze.size, 4)

net.to(config.device)
optimizer = optim.Adam(net.parameters(), lr=1e-4)

summary(net, (9,9))

In [None]:
epsilon = np.exp(-np.arange(config.num_epochs)/(config.cutoff))
epsilon[epsilon > epsilon[100*int(config.num_epochs/config.cutoff)]] = epsilon[100*int(config.num_epochs/config.cutoff)]

In [None]:
# Visualize epsilon
mp = []
mpm = []
reg = 200
for e in epsilon:
    a = agent.env.reset_policy(e)
    mp.append(np.min(a))
    mpm.append(np.max(a))

plt.plot(epsilon/1.3, color = 'orangered', ls = '--', alpha = 0.5,
         label= 'Epsilon profile (arbitrary units)')

plt.plot(np.array(mpm)-np.array(mp), label = 'Probability difference', color = 'cornflowerblue')
plt.xlabel('Epochs')
plt.ylabel(r'max $p^r$ - min $p^r$')
plt.legend()
plt.savefig(f'{config.folder}/reset_policy.pdf', dpi = 300, bbox_inches = 'tight')
plt.show()

# Training

In [None]:
loss_log, maze_change_log, move_log, result_log = train(agent, net, optimizer, epsilon, training_data, config)

In [None]:
torch.save(net.state_dict(), config.save_filename)

In [None]:
print("=======================")
print(f'Loss logs: {loss_log}')
print(f'Log for which epoch where maz is changed: {maze_change_log}')
print(f'Num of moves at each epoch: {move_log}')
print(f'Number of times maze is changed: {len(maze_change_log)}')
print("------------------------")

print(f'Win/loss result at each epoch: {result_log}')
won_log = [1 if x == "won" else 0 for x in result_log]
print("# win:", sum(won_log))
print("# loss:", len(won_log)-sum(won_log))
print("=======================")

In [None]:
# Visualize loss
fig, ax = plt.subplots(figsize=(10,5))

ax.plot(epsilon*90, alpha = 0.6, ls = '--', label = 'Epsilon profile (arbitrary unit)', color = 'orangered')
ax.plot((np.array(mpm)-np.array(mp))*120, alpha = 0.6, ls = '--',
         label = 'Probability difference (arbitrary unit)', color = 'dimgray')
ax.plot(loss_log, label = 'Loss', color = 'cornflowerblue')

# multiple lines all full height
ax.vlines(x = maze_change_log, ymin = 0, ymax = 120,
           colors = 'purple',
           linestyles = "dotted", label = "Maze change")


ax.set_xlabel('Epoch')
ax.set_ylabel('')
ax.legend()
fig.savefig('loss.pdf', dpi = 300, bbox_inches='tight')
plt.show()

In [None]:
# Win loss plot
plt.rcParams["figure.figsize"] = (10, 2)  # Setting figure size

# Creating a 2D view of won_log by adding a new axis
won_log_2d = np.array(won_log)[np.newaxis, :]

# Plotting won_log as a heatmap
fig, ax = plt.subplots()
extent = [0, len(won_log), 0, 1]  # Define the extent of the axes [xstart, xend, ystart, yend]
cmap = ListedColormap(['#FFA07A', '#6495ED'])  # Bright red and bright blue

cax = ax.imshow(won_log_2d, aspect="auto", cmap=cmap, extent=extent)
ax.set_yticks([])  # No need for y ticks in a 1D heatmap
ax.set_xlabel("Epochs")
ax.set_title("Results Heatmap")

# Removing the box (spines)
for spine in ax.spines.values():
    spine.set_visible(False)

# Adding a legend
# Create a colorbar as a legend, since the data is continuous for color mapping
colorbar = fig.colorbar(cax, ax=ax, orientation='vertical', fraction=0.02, pad=0.04)
colorbar.set_label('Win Status')
colorbar.set_ticks([0, 1])  # Set ticks to match binary data
colorbar.set_ticklabels(['Lost', 'Won'])  # Set custom labels

plt.tight_layout()
plt.show()


# Testing

In [None]:
class test_config:
    max_step = 31
    test_directions = {"RIGHT": '→',
                   "LEFT": '←',
                   "DOWN": '↓',
                   "UP": '↑',
                   "END": 0}

In [None]:
test_data = load_data(paths.test_9x9)
visualize_mazes(test_data)

In [None]:
overall_accuracy, overall_f1, overall_num_moves_goal, overall_solved_maze = evaluation(test_data, net, memory_buffer, test_config)

In [None]:
print(f'List of accuracies: {overall_accuracy}')
print(f'List of f1: {overall_f1}')
print(f'Number of moves taken to solve each maze: {overall_num_moves_goal}')
print(f'Resulst for each maze: {overall_solved_maze}')

In [None]:
print(f'Mazes unsolved: {len(overall_solved_maze)}')
print(f'Mazes solved: {sum(overall_solved_maze)}')

In [None]:
final_acc = mean(overall_accuracy)
final_f1 = mean(overall_f1)
print(f'Accuracy: {final_acc}')
print(f'F1: {final_f1}')

In [None]:
# Data
group1_values = overall_accuracy
group2_values = overall_f1
categories = list(range(1, len(group1_values) + 1))

# Set the width of the bars
barWidth = 0.3

plt.figure(figsize=(20, 6))  # 10 inches in width and 6 inches in height

# Set position of bar on X axis
r1 = np.arange(len(group1_values))
r2 = [x + barWidth for x in r1]

# Make the plot
plt.bar(r1, group1_values, width=barWidth, edgecolor='blue', label='Group 1')
plt.bar(r2, group2_values, width=barWidth, edgecolor='orange', label='Group 2')

# Adjust range to match the number of categories
plt.xticks([r + barWidth/2 for r in range(len(categories))], categories)

# Label only certain intervals on the x-axis to avoid clutter
tick_positions = [r + barWidth/2 for r in range(len(categories)) if r % 1000 == 0]
tick_labels = [categories[i] for i in range(len(categories)) if i % 1000 == 0]

plt.xticks(tick_positions, tick_labels)

# Create legend & Show graphic
plt.legend(labels = ['Acuracy', 'F1'])
plt.show()


In [None]:
# best_net = copy.deepcopy(net)
net.load_state_dict(torch.load(config.save_filename))