In [None]:
import logging
import pprint
import time
import datetime

import tensorflow as tf
import keras as keras
import sklearn.model_selection
import sklearn.datasets
import numpy as np
import typing
import matplotlib.pyplot as plt
import pandas as pd
import os
import shutil



In [None]:
import logging
logging.basicConfig()
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)

log.debug("testing debug")

# Setup

## Globals

In [None]:
default_params = {
  "optimizer" : "SGD",
  "batch_size" : 64,
  "activation_function" : "relu",
  "layer_sizes" : [4]
}

## Data

In [None]:
X_mc,y_mc = sklearn.datasets.make_classification(
  n_samples=10000,
  n_features=10,
  n_redundant=2,
  n_classes=3,
  n_clusters_per_class=1,
  class_sep=0.5
)

X_train_mc, X_test_mc, y_train_mc, y_test_mc = sklearn.model_selection.train_test_split(
  X_mc,
  y_mc,
  test_size=0.33,
  random_state=42
)

## Functions

In [None]:

def plot_multiple_histories(histories : typing.Dict, metric='loss', use_greyscale=True, show_validation=False, sort_agg_func=np.mean, sort_history_length=2, *args, **kwargs):
  """
  Plots the training history of multiple models on the same plot for comparison.
  
  Parameters:
    histories: List of History objects from multiple model's fit() calls.
    metric: The metric to plot ('loss', 'accuracy', etc.).
    labels: List of labels for the models. If None, defaults to 'Model 1', 'Model 2', etc.
  """
  # Check if labels are provided, otherwise generate default labels
  # if labels is None:
  #   labels = [f'Model {i+1}' for i in range(len(histories))]
  
  # Initialize the plot
  plt.figure(figsize=(10, 6))

  norm = plt.Normalize(vmin=0, vmax=len(histories))
  grayscale_cmap = plt.colormaps['gray']
  
  sorted_keys = sorted(histories.keys(), key=(lambda k: sort_agg_func(histories[k].history[metric][:-sort_history_length])))
  
  log.debug(f"sorted_keys: {sorted_keys}")
  log.debug(f"accuracies: {[ sort_agg_func(histories[k].history[metric][:-sort_history_length]) for k in sorted_keys]}")
  
  for i, label in enumerate(sorted_keys):
    history = histories[label]
    #for i, (label, history) in enumerate(histories.items()):
    epochs = range(1, len(history.history[metric]) + 1)
    
    # Plot training metric
    plt.plot(epochs, history.history[metric], label=f'{label} {metric.capitalize()}', color=grayscale_cmap(norm(i)))
    
    # Plot validation metric if available
    if show_validation and f'val_{metric}' in history.history:
      plt.plot(epochs, history.history[f'val_{metric}'], '--', label=f'{label} Validation {metric.capitalize()}', color=grayscale_cmap(norm(i)))
  
  # Add labels and title
  plt.title(f'Comparison of {metric.capitalize()} Between Models')
  plt.xlabel('Epochs')
  plt.ylabel(metric.capitalize())
  plt.legend()
  
  # Show plot
  plt.show()


In [None]:
def get_model(
        num_input_features=4, 
        activation_function="elu", 
        layer_sizes=None, 
        num_output_classes=3
):
  
  if layer_sizes is None:
    layer_sizes = [10]
  inputs = keras.layers.Input(shape=(num_input_features,)) 
  x = inputs
  for layer_size in layer_sizes:
    x = keras.layers.Dense(layer_size, activation=activation_function)(x)
  
  outputs = keras.layers.Dense(num_output_classes, activation='softmax')(x)
  return keras.models.Model(inputs=inputs, outputs=outputs)



In [None]:


def evaluate_params(params, verbose=1, num_epochs=10):
  log.debug(f"params: {pprint.pformat(params)}")
  
  # default parameters are used if not supplied
  pars = default_params.copy()
  pars.update(params)
  
  activation_function = pars['activation_function']
  layer_sizes = pars['layer_sizes']
  
  early_stopping = keras.callbacks.EarlyStopping(monitor='val_loss', patience=10)
  
  model = get_model(
    num_input_features=X_train_mc.shape[1],
    activation_function=activation_function, 
    layer_sizes=layer_sizes
  )
  model.compile(
    optimizer=pars["optimizer"], 
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"]
  )
  
  history = model.fit(
    X_train_mc, 
    y_train_mc, 
    epochs=num_epochs, 
    callbacks=[early_stopping],
    batch_size=pars["batch_size"], 
    validation_split=0.3, 
    verbose=verbose
  )
  mean_acc = np.mean(history.history['val_accuracy'][-2:])
  
  return pars, mean_acc, history


In [None]:
plot_multiple_histories({
    "10" : evaluate_params({
      "activation_function" : "elu",
      "layer_sizes" : [10]
    })[2],
    "10-10" : evaluate_params({
      "activation_function" : "elu",
      "layer_sizes" : [10, 10]
    })[2],
    "10-10-10" : evaluate_params({
      "activation_function" : "elu",
      "layer_sizes" : [10, 10, 10]
    })[2],
  },
  show_validation=True
)

# Search Approches


## Grid Search

In [None]:


def get_name(params):
  return '-'.join([str(params[key]) for key in sorted(params.keys())])

def grid_search(param_grid, verbose=1, num_epochs=10):
  
  # params_list = []
  # acc_list = []
  # history_list = []
  history_dict = {}
  for params in sklearn.model_selection.ParameterGrid(param_grid):
    print(f"params: {params}")
    pars, acc, history = evaluate_params(params, verbose=verbose, num_epochs=num_epochs)
    
    # params_list.append(pars)
    # acc_list.append(acc)
    # history_list.append(history)
    history_dict[get_name(pars)] = history
  
  return history_dict

In [None]:
history_dict = grid_search(
  param_grid={
    "activation_function" : ["elu", "relu"],
    "layer_sizes" : [[10], [10, 10], [10,10,10]],
    "batch_size" : [128]
  },
  # num_epochs=100
)
plot_multiple_histories(history_dict)

## Random Search

In [None]:
def random_search(param_grid, num_tests=5, random_state=None, verbose=1, num_epochs=10):
  
  # create a list of unique parameter combinations
  param_combs = list(sklearn.model_selection.ParameterGrid(param_grid))
  if len(param_combs) < num_tests:
    num_tests = len(param_combs)
  random_combs = np.random.choice(param_combs, size=num_tests,
    replace=False)
  
  # evaluate each of the combinations
  history_dict = {}
  for params in random_combs:
    print(f"params: {params}")
    pars, acc, history = evaluate_params(params, verbose=verbose, num_epochs=num_epochs)
    
    history_dict[get_name(pars)] = history
  
  return history_dict


In [None]:
 history_dict = random_search(
  param_grid={
    "activation_function" : ["elu", "relu"],
    "layer_sizes" : [[10], [10, 10], [10,10,10]],
    "batch_size" : [128]
  }
)
plot_multiple_histories(history_dict, sort_key="accuracy")

In [None]:
history_dict = random_search(
  param_grid={
    "activation_function" : ["elu", "relu"],
    "layer_sizes" : [[2**layer_size] * num_layers for layer_size in range(0,5) for num_layers in range(0,4)],
    "batch_size" : [128]
  },
  # num_epochs=200,
  num_tests=10
)

plot_multiple_histories(history_dict)

## Grid Search with logger

In [None]:
import logging

class TuningLogger:
  default_log_dir = "logs/"
  
  def __init__(self, log_dir=None):
    
    current_time = datetime.datetime.now()
    self.id = current_time.strftime("%d%b%Y-%H-%M-%S-%f")   # hour, minute, microseconds
    
    self.logger = logging.getLogger("TuningLogger")
    if self.logger.hasHandlers():
      for h in self.logger.handlers:
        self.logger.removeHandler(h)
    
    self.logger.setLevel(logging.INFO)
    
    if log_dir is None:
      log_dir = self.__class__.default_log_dir
      
    if not os.path.exists(log_dir):
      os.mkdir(log_dir)
    
    log_path = os.path.join(log_dir, f"{self.__class__.__name__}-{self.id}.log")
    
    file_handler = logging.FileHandler(log_path)
    file_handler.setFormatter(logging.Formatter('%(message)s'))
    self.logger.addHandler(file_handler)
  
  def log_trial(self, params, history, metrics=["loss"], agg_history=3):
    report = {
      "id": self.id,
      "time": time.time(), # time since epoch
    }
    report.update(params)
    for metric in metrics:
      report[metric] = np.mean(history.history[metric][:-agg_history])
    self.logger.info(report)
    return


In [None]:

def grid_search_logging(param_grid, verbose=1, num_epochs=10, log_results=True):
  
  tuning_logger = TuningLogger()
  
  history_dict = {}
  for params in sklearn.model_selection.ParameterGrid(param_grid):
    print(f"params: {params}")
    pars, acc, history = evaluate_params(params, verbose=verbose, num_epochs=num_epochs)
    if log_results:
      tuning_logger.log_trial(pars, history)
    history_dict[get_name(pars)] = history
  
  return history_dict
  


In [None]:
history_dict = grid_search_logging(
  param_grid={
    "activation_function" : ["elu"], #["elu", "relu"],
    "layer_sizes" : [[2**layer_size] * num_layers for layer_size in range(3,5) for num_layers in range(2,4)],
    "batch_size" : [2**batch_exponent for batch_exponent in range(8,11)]
  },
)
plot_multiple_histories(history_dict)

# Optuna

In [None]:
import optuna

In [None]:
class Objective:
  def __init__(self, X, y):
    self.X = X
    self.y = y
  
  def __call__(self, trial):
    """ Objective function. """
    
    # hyperparameters
    params = default_params
    params['batch_size']   = trial.suggest_int('batch_size', 2, 8, step=2)
    # params['dropout_prob'] = trial.suggest_float('dropout_prob', 0.0, 0.25, step=0.05)
    print(params)
    
    pars, acc, history = evaluate_params(params, self.X, self.y)
    
    return acc[0]


In [None]:

def objective(trial):
  # params = default_params
  
  layer_size = trial.suggest_int("layer_size", 0, 5)
  num_layers = trial.suggest_int("num_layers", 0, 4)
  batch_exponent = trial.suggest_int("batch_exponent", 2, 8)
  params = {
    "activation_function" : trial.suggest_categorical("activation", ["relu", "elu"]),
    "layer_sizes" : [2**layer_size for _ in range(num_layers)],
    "batch_size" : 2**batch_exponent
  }
  pars, acc, history = evaluate_params(params)
  return acc
  

study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=200)

best_trial = study.best_trial
print(best_trial.values[0])
print(best_trial.params)
