In [1]:
% pip install optuna

import torch
import torch.nn as nn
import os

import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from statistics import mean 
import random
import copy

import optuna
import pandas as pd

from google.colab import drive
drive.mount('/content/gdrive/')
import sys
sys.path.append('/content/gdrive/Shared drives/DATA_690_deep_learning_final_project/')

from snake_game_modified import SnakeGame

Drive already mounted at /content/gdrive/; to attempt to forcibly remount, call drive.mount("/content/gdrive/", force_remount=True).


In [2]:
torch.backends.cudnn.deterministic=True
torch.manual_seed(42)
np.random.seed(42)

In [3]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

In [4]:
def moveTo(obj, device):
    """
    obj: the python object to move to a device, or to move its contents to a device
    device: the compute device to move objects to
    """
    if hasattr(obj, "to"):
        return obj.to(device)
    elif isinstance(obj, list):
        return [moveTo(x, device) for x in obj]
    elif isinstance(obj, tuple):
        return tuple(moveTo(list(obj), device))
    elif isinstance(obj, set):
        return set(moveTo(list(obj), device))
    elif isinstance(obj, dict):
        to_ret = dict()
        for key, value in obj.items():
            to_ret[moveTo(key, device)] = moveTo(value, device)
        return to_ret
    else:
        return obj

In [5]:
class Flatten(nn.Module):
    def forward(self, input):
        return input.view(input.size(0), -1)

## Loss function

$$[Q(s,a,\theta)-(r(s,a)+\gamma\max_aQ(s',a,\theta))]^2$$

Reference: https://towardsdatascience.com/qrash-course-deep-q-networks-from-the-ground-up-1bbda41d3677

In [6]:
def bellman_loss_function(max_current_q, current_reward, max_future_q=None, gamma=.85):
  current_reward = moveTo(torch.tensor(current_reward), device)
  gamma = moveTo(torch.tensor(gamma), device)
  if max_future_q is not None:
    return torch.pow(torch.subtract(max_current_q, torch.add(current_reward, torch.multiply(gamma, max_future_q))), 2)
  else:
    return torch.pow(torch.subtract(max_current_q, current_reward), 2)

## Batch training function

In [7]:
def train_model(trainable_model, constant_model, optimizer, device, current_states, current_rewards, future_states, batch_size, epochs=10, gamma=.85):

  trainable_model = trainable_model.train()
  constant_model = constant_model.eval()
  current_states_tensor = moveTo(torch.tensor(current_states, dtype=torch.float), device)
  current_rewards_tensor = moveTo(torch.tensor(current_rewards, dtype=torch.float), device)
  future_states_tensor = moveTo(torch.tensor(future_states, dtype=torch.float), device)

  loss_list = []

  for epoch in range(epochs):

    current_q = trainable_model(current_states_tensor)

    with torch.no_grad():
      future_q = constant_model(future_states_tensor)

    max_current_q =  moveTo(torch.zeros(batch_size, 1), device)
    max_future_q = moveTo(torch.zeros(batch_size, 1), device)

    for i, q in enumerate(current_q):
      max_q, action = torch.max(q, 0)
      index = torch.tensor([i])
      max_current_q = max_current_q.index_put([index],max_q)
    for i, q in enumerate(future_q):
      max_q, action = torch.max(q, 0)
      index = torch.tensor([i])
      max_future_q = max_future_q.index_put([index],max_q)

    loss = bellman_loss_function(max_current_q=max_current_q, current_reward=current_rewards, max_future_q=max_future_q, gamma=gamma)
    loss.mean().backward()

    optimizer.step()
    optimizer.zero_grad()

    loss_list.append(loss.mean().item())
  return trainable_model, constant_model, mean(loss_list)

## Generate fully connected model

In [8]:
def get_fully_connected_model(input_size=0, num_hidden_layers=20, num_nodes=250, output_size=0):

  model = nn.Sequential(
    nn.Flatten(),
    nn.Linear(input_size,  num_nodes),
    nn.LeakyReLU()
  )

  for hidden_layer in range(num_hidden_layers):
    model.add_module(f'linear_{hidden_layer}', nn.Linear(num_nodes,  num_nodes))
    model.add_module(f'non_linearity_{hidden_layer}', nn.LeakyReLU())

  model.add_module(f'output', nn.Linear(num_nodes, output_size))

  return model


## Reward function

Calculate Eucledian distance between the head of the snake, then multiply its reciprocal to the rate of return, and finally add the result to the actual reward from the move. This means that as the snake gets closer, the return gets bigger but only by a tiny fraction which is controlled by the rate of return:

$$r(s, f, v) = \rho\left[\frac{1}{\sqrt{(f_0 - s_0)^2 + (f_1 - s_1)^2}}\right]+v$$

Where $s$ is the position of the head of the snake, $f$ is the position of the food, $v$ is the reward returned by the game (-1, 0, or 1) and $\rho$ is the rate that controls how much of the distance function to return.

In [9]:
def get_reward(snake, food, reward, rho = 0.1):
  snake_head = snake[0]
  x_distance = np.subtract(food[0], snake_head[0])
  y_distance = np.subtract(food[1], snake_head[1])
  euclidean_distance = np.sqrt(np.add(np.power(x_distance, 2), np.power(y_distance, 2)))
  if euclidean_distance > 0 and reward > -1:
    return np.add(np.multiply(np.divide(1,euclidean_distance), rho), reward)
    # return np.add(np.divide(1,euclidean_distance), reward)
  else:
    return reward


## Process training trial

Train model with different set of hyperparamters

In [10]:
def process_training_trial(trial):
  board_width = 22
  board_height = 22

  batch_size = trial.suggest_int("batch_size", 8, 32)

  epsilon = trial.suggest_float("epsilon", 0.1, 0.25)

  training_rounds = trial.suggest_int("training_rounds", 50, 200)

  num_hidden_layers = trial.suggest_int("num_hidden_layers", 5, 200)

  learning_rate = trial.suggest_float("learning_rate", 0.0000001, 0.01)

  gamma = trial.suggest_float("gamma", 0.9, 0.99)

  switch_model_threshold = trial.suggest_int("switch_model_threshold", 10, 20)

  trainable_model = get_fully_connected_model(input_size=(board_width*board_height), num_hidden_layers=num_hidden_layers, output_size=4)
  constant_model = get_fully_connected_model(input_size=(board_width*board_height), num_hidden_layers=num_hidden_layers, output_size=4)

  # trainable_model = get_cnn_model(flattened_size=(board_width*board_height), output_size=4)
  # constant_model = get_cnn_model(flattened_size=(board_width*board_height), output_size=4)

  optimizer_name = trial.suggest_categorical("optimizer", ["Adam", "AdamW", "SGD"])

  rho = trial.suggest_float("rho", 5, 10)

  if optimizer_name == "Adam":
    optimizer = torch.optim.Adam(trainable_model.parameters(), lr=learning_rate)
  elif optimizer_name == "AdamW":
    optimizer = torch.optim.AdamW(trainable_model.parameters(), lr=learning_rate)
  else:
    optimizer = torch.optim.SGD(trainable_model.parameters(), lr=learning_rate)

  game = SnakeGame(board_width = board_width-2, board_height = board_height-2, gui = False)

  row_index = f'{batch_size}_{round(epsilon, 3)}_{training_rounds}_{num_hidden_layers}_{round(learning_rate, 7)}_{optimizer_name}_{rho}_{gamma}_{switch_model_threshold}'

  results_path = '/content/gdrive/Shared drives/DATA_690_deep_learning_final_project/results'

  if os.path.exists(f'{results_path}/score_resuts_v2.csv'):
    score_results_df = pd.read_csv(f'{results_path}/score_resuts_v2.csv', index_col='hyperparameters')
  else:
    os.makedirs(results_path, exist_ok=True)
    score_results_df = pd.DataFrame(columns=['hyperparameters'])
    score_results_df = score_results_df.set_index('hyperparameters')

  if os.path.exists(f'{results_path}/loss_resuts_v2.csv'):
    loss_results_df = pd.read_csv(f'{results_path}/loss_resuts_v2.csv', index_col='hyperparameters')
  else:
    os.makedirs(results_path, exist_ok=True)
    loss_results_df = pd.DataFrame(columns=['hyperparameters'])
    loss_results_df = loss_results_df.set_index('hyperparameters')

  score_results = {
      "games": [],
      "scores": []
  }

  loss_results = {
      "rounds": [],
      "loss": [] 
  }

  state = np.zeros((1,1,board_width,board_height), dtype=int)

  done, score, snake, food, reward = game.start()

  for snake_point in snake:
      state[0, 0, snake_point[0], snake_point[1]] = 1

  state[0, 0, food[0], food[1]] = 2

  round_index = 0
  game_number = 1

  while round_index < training_rounds:
    
    current_states = np.zeros((batch_size, 1, board_width, board_height), dtype=int)
    rewards = np.zeros((batch_size,1), dtype=int)
    future_states = np.zeros((batch_size, 1, board_width, board_height), dtype=int)

    trainable_model.to(device)
    constant_model.to(device)
    constant_model = constant_model.eval()
    trainable_model = trainable_model.eval()
    
    batch_index = 0

    while batch_index < batch_size:

      state_tensor = moveTo(torch.tensor(state, dtype=torch.float), device)
      
      if random.gauss(.5, .25) > epsilon:
        with torch.no_grad():
          q = trainable_model(state_tensor)
        max_q, action = torch.max(q, 1)
        action = action.detach().cpu().item()
      else:
        action = random.randint(0, 3)

      done, score, snake, food, reward = game.step(action)
    
      new_state = np.zeros((1, 1, board_width,board_height), dtype=int)
      for snake_point in snake:
          new_state[0, 0, snake_point[0], snake_point[1]] = 1
      new_state[0, 0, food[0], food[1]] = 2

      current_states[batch_index] = state[0]
      rewards[batch_index] = get_reward(snake, food, reward, rho)
      future_states[batch_index] = new_state[0]

      if done:
        score_results["games"].append(game_number)
        score_results["scores"].append(score)
        score_results_df.loc[f'{row_index}', f'game_{game_number}'] = score
        game_number += 1
        game = SnakeGame(board_width = board_width-2, board_height = board_height-2, gui = False)
        done, score, snake, food, reward = game.start()
        state = np.zeros((1, 1, board_width,board_height), dtype=int)
        for snake_point in snake:
            state[0, 0, snake_point[0], snake_point[1]] = 1
        state[0, 0, food[0], food[1]] = 2
      else:
        state = new_state
      batch_index += 1

    trainable_model, constant_model, loss = train_model(
        trainable_model,
        constant_model,
        optimizer, device,
        current_states,
        rewards,
        future_states,
        batch_size=batch_size,
        epochs=20,
        gamma=gamma
    )

    if round_index%switch_model_threshold == 0:
      constant_model = copy.deepcopy(trainable_model)

    loss_results["rounds"].append(round_index)
    loss_results["loss"].append(loss)
    loss_results_df.loc[f'{row_index}', f'round_{round_index}'] = loss
    round_index += 1

  loss_mean = np.mean(loss_results["loss"])
  loss_results_df.loc[f'{row_index}', f'mean'] = loss_mean
  loss_results_df.to_csv(f'{results_path}/loss_resuts_v2.csv')

  scores_mean = np.mean(score_results["scores"])
  score_results_df.loc[f'{row_index}', f'mean'] = scores_mean
  score_results_df.to_csv(f'{results_path}/score_resuts_v2.csv')

  return scores_mean

## Hyperparameter tuning

In [None]:
# from https://github.com/optuna/optuna/blob/master/examples/pytorch_simple.py

study = optuna.create_study(direction="maximize")
study.optimize(process_training_trial, n_trials=100)

pruned_trials = [t for t in study.trials if t.state == optuna.trial.TrialState.PRUNED]
complete_trials = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE]

print("Study statistics: ")
print("  Number of finished trials: ", len(study.trials))
print("  Number of pruned trials: ", len(pruned_trials))
print("  Number of complete trials: ", len(complete_trials))

print("Best trial:")
trial = study.best_trial

print("  Value: ", trial.value)

print("  Params: ")
for key, value in trial.params.items():
    print("    {}: {}".format(key, value))

[32m[I 2020-12-07 23:19:54,027][0m A new study created in memory with name: no-name-6980852e-d192-40df-8b8e-e148606ff7b6[0m
[32m[I 2020-12-07 23:22:30,484][0m Trial 0 finished with value: 0.014925373134328358 and parameters: {'batch_size': 11, 'epsilon': 0.24768646745584463, 'training_rounds': 73, 'num_hidden_layers': 191, 'learning_rate': 0.004980980134786481, 'gamma': 0.9246351942512071, 'switch_model_threshold': 18, 'optimizer': 'Adam', 'rho': 6.545472838885006}. Best is trial 0 with value: 0.014925373134328358.[0m
[32m[I 2020-12-07 23:26:40,749][0m Trial 1 finished with value: 0.04081632653061224 and parameters: {'batch_size': 11, 'epsilon': 0.1583061005868739, 'training_rounds': 159, 'num_hidden_layers': 144, 'learning_rate': 0.0009083559199512158, 'gamma': 0.9334498772750115, 'switch_model_threshold': 11, 'optimizer': 'Adam', 'rho': 7.8138787331263195}. Best is trial 1 with value: 0.04081632653061224.[0m
[32m[I 2020-12-07 23:28:20,728][0m Trial 2 finished with value: 0

In [None]:
# sns.lineplot(x='games', y='scores', data=score_results, label=f'Scores - batch_size: {batch_size} - training_rounds: {training_rounds} - num_hidden_layers: {num_hidden_layers}')

In [None]:
# g = sns.lineplot(x='rounds', y='loss', data=loss_results, label=f'Loss - batch_size: {batch_size} - training_rounds: {training_rounds} - num_hidden_layers: {num_hidden_layers}')
# g.set_yscale("log")