In [None]:
# Libraries
import tensorflow as tf
import numpy as np
import time
import pandas as pd
import os
import shutil
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import Model
from tensorflow.keras.models import model_from_json
from tensorflow.keras.models import load_model
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import LabelEncoder
from keras.utils import np_utils
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.special import gamma
from scipy import pi
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.signal import savgol_filter

In [None]:
# Functions
def add_neuron(model, chosen_layer: int):
  model.layers[chosen_layer].units += 1
  n_model = model_from_json(model.to_json())
  return n_model
def remove_neuron(model, chosen_layer: int):
  model.layers[chosen_layer].units -= 1
  n_model = model_from_json(model.to_json())
  return n_model
def add_layer(model, position, neurons):
  '''
  Adds a relu layer to a keras.Sequential model. Note: The implementation is a bit hacky and I am not satisfied with it.
  Try to come up with other solutions
  '''
  layer_list = [layr.units for layr in model.layers]
  #take the TensorFlow dimension, flips it, removes the None values, changes it into a tuple. This is done as in not to result in a multiple output shapes error (underdefined model)
  input = tuple(filter(lambda item: item is not None, np.flip(model.input.shape)))
  last_layer = layers.Dense(1, activation='sigmoid') # This is optimised for binary classification problems
  layer_list.insert(position, neurons)
  layer_list.pop()
  n_model = keras.Sequential()
  n_model.add(layers.Input(shape=input))
  for neur in layer_list:
    n_model.add(layers.Dense(units=neur, activation='relu'))
  n_model.add(last_layer)
  return n_model
def remove_layer(model, position):
  '''
  works almost identically as add layer
  '''
  layer_list = [layr.units for layr in model.layers]
  #take the TensorFlow dimension, flips it, removes the None values, changes it into a tuple. This is done as in not to result in a multiple output shapes error (underdefined model)
  input = tuple(filter(lambda item: item is not None, np.flip(model.input.shape)))
  last_layer = layers.Dense(1, activation='sigmoid') # This is optimised for binary classification problems
  layer_list.pop(position)
  layer_list.pop()
  n_model = keras.Sequential()
  n_model.add(layers.Input(shape=input))
  for layr in layer_list:
    n_model.add(layers.Dense(units=layr, activation='relu'))
  n_model.add(last_layer)
  return n_model  
def remove_layer_experimental(model, position, neurons):
  '''
  I like this version of remove layer more. It's more versatile. However, I can see it causing plenty of bugs.
  '''
  layer_list = model.layers
  #take the TensorFlow dimension, flips it, removes the None values, changes it into a tuple. This is done as in not to result in a multiple output shapes error (underdefined model)
  input = tuple(filter(lambda item: item is not None, np.flip(model.input.shape)))
  layer_list.pop(position)
  n_model = keras.Sequential()
  n_model.add(layers.Input(shape=input))
  for layer in layer_list:
    n_model.add(layer)
  return n_model
def calculate_neuron_acceptance_probability( NN:float, energy_N:float, energy_N_1, beta:float , nu:float) -> "float":
  coeff = 1/(NN+1)
  z = np.exp(-beta*nu)
  exponent  = np.exp(-beta*(energy_N_1-energy_N))
  probability = coeff * exponent * z
  if(probability > 1):
    probability = 1
  return probability 
def calculate_neuron_removal_probability(NN:float, energy_N:float, energy_N_1, beta:float , nu:float) -> "float":
  coeff = NN
  inverse_z = np.exp(beta*nu)
  exponent  = np.exp(-beta*(energy_N-energy_N_1))
  probability = coeff * exponent * inverse_z
  if(probability > 1):
    probability = 1
  return probability 
def calculate_layer_acceptance_probability(NL: float, M:float, energy_NL:float, energy_NL_1, beta:float , nu:float) -> "float":
  coeff = (NL*M) / (NL+1)
  z = np.exp(-beta*nu*M)
  exponent = np.exp(-beta*(energy_NL_1-energy_NL))
  probability = coeff * exponent * z
  if(probability > 1):
    probability = 1
  return probability
def calculate_layer_removal_probability(NL: float, M:float, energy_NL:float, energy_NL_1, beta:float , nu:float) -> "float":
  coeff = NL / (M*(NL-1))
  inverse_z = np.exp(beta*nu*M)
  exponent = np.exp(-beta*(energy_NL-energy_NL_1))
  probability = coeff * exponent * inverse_z
  if(probability > 1):
    probability = 1
  return probability
def construct_model_binary_classification(input_shape: tuple, layer_list: list):
  '''
  Adapt the last layer to anything
  '''
  model = keras.Sequential()
  model.add(keras.layers.InputLayer(input_shape=input_shape))
  for neurons in layer_list: 
    model.add(keras.layers.Dense(neurons, activation='relu'))
  model.add(keras.layers.Dense(1, activation='sigmoid'))
  return model
def calculate_GC(model, loss:  float, nu: float) -> "float":  
  return loss + nu*model.count_params


In [None]:
# run simulation functions
# FOR DEBUGGING PURPOSES ONLY
probability_list = []
loss_list = []
accuracy_list = []
layer_list = []
model_evolution_list = []
neuron_total_list = []
layer_list = []
neuron_layer_list = []
# FOR DEBUGGING PURPOSES ONLY/end
#REMEMBER TO ADD ANOTATIONS TO THE NEW SAVING METHODS 
def compare_losses(model, saved_model_labels, x_test, y_test, repo):
  #could probably be made more efficient by also saving the losses?
  model_loss, model_accuracy = model.evaluate(x_test, y_test)
  if(len(saved_model_labels)<5):
    repo += str(time.time())
    model.save(repo)
    return
  for model_challenged_label in saved_model_labels:
    model_challenged = load_model(repo + str(model_challenged_label))
    #what is more efficient? writing a new code as a function or just rolling with it? I mean in all honesty the function would have to be changed with every new configuration so...same thing?
    model_challenged_loss, model_challenged_accuracy = model_challenged.evaluate(x_test, y_test)
    if model_loss < model_challenged_loss :
      shutil.rmtree(repo + model_challenged_label)
      repo += str(time.time())
      model.save(repo)
      #add removal of worst model, somehow

      break
  return
def save_best_model(model, x_test, y_test, repo):
  saved_model_labels = []
  for filename in os.listdir(repo):
    saved_model_labels.append(filename)
  compare_losses(model, saved_model_labels, x_test, y_test, repo)
  return
def save_model(model, x_train, y_train, x_test, y_test):
  #ADDED THE DATA FOR DEBUGING
  #ADDED THIS FOR DEBUGGING AND ANALISING DATA
  _layers = model.layers
  _layers_neurons = [_layer.units for _layer in _layers]
  _neurons_with_layers = list(zip(_layers_neurons, _layers))
  neurons = sum(_layers_neurons)
  layer_list.append(len(_layers))
  neuron_total_list.append(neurons)
  neuron_layer_list.append(_neurons_with_layers)
  #END THINGS USED FOR PLOTTING
  name = folderName
  save_best_model(model, x_test, y_test, name)
  name += str(time.time())
  return
def run_model(model, x_train, y_train, x_test, y_test):
  #REMEMBER TO MAKE BOTH IDENTICAL AT ALL TIMES
  opt = keras.optimizers.Adam(learning_rate=0.01)
  model.compile(loss='binary_crossentropy', metrics=['accuracy'], optimizer=opt)
  model.fit(x_train, y_train, epochs=10, batch_size=15,verbose=0)
  loss , accuracy = model.evaluate(x_test, y_test)
  return loss
def run_model_DEBUG(model, x_train, y_train, x_test, y_test):
  #REMEMBER TO MAKE BOTH IDENTICAL AT ALL TIMES
  opt = keras.optimizers.Adam(learning_rate=0.01)
  model.compile(loss='binary_crossentropy', metrics=['accuracy'], optimizer=opt)
  model.fit(x_train, y_train, epochs=10, batch_size=15,verbose=0)
  loss , accuracy = model.evaluate(x_test, y_test)
  return loss, accuracy

def run_sim_neuron(model, loss, x_train, y_train, x_test, y_test, beta, nu):
  '''
  function that runs move 1. loss was the term chosen for the energy of the system before the move, energy was chosen for the energy fo the system after the move.
  even if they are technically the same in terms of units (both losses/energy)
  this was chosen to avoid confusion between the two
  '''
  #brute force functions to solve the neuron removal problem
  check_neurons_list = [lay.units <= 1 for lay in model.layers[1:-1]]
  if all(check_neurons_list) == True:
    return model
  while True: 
    if(len(model.layers) == 2): 
      NN = 1
    else:
      layer = np.random.randint(low=1, high=len(model.layers)-1)
    NN = model.layers[layer].units
    if NN > 1: 
      break
  #END BRUTE FORCE
  add_neuron_prob = np.random.choice([True, False])
  if add_neuron_prob == True: 
    new_model = add_neuron(model, layer)
  else:
    new_model = remove_neuron(model, layer)
  #calculate model loss 
  energy, accuracy = run_model_DEBUG(new_model, x_train, y_train, x_test, y_test) # ADDED ACCURACY FOR DEBUGGING
  #run acceptance
  if(add_neuron_prob == True):
    probability = calculate_neuron_acceptance_probability(NN, loss, energy, beta, nu)
  else:
    probability = calculate_neuron_removal_probability(NN-1, energy, loss, beta, nu)
    # it is assumed that for this NN+1(i) is the model before the operation, and NN(i) is the model after the operation, hence the NN-1, and the reversed orders of energy and loss.
  if np.random.rand() < probability:
    probability_list.append(probability) #FOR DEBUGGING ONLY 
    #just for trial, we save every model we ever make for this time
    save_model(new_model, x_train, y_train, x_test, y_test)
    loss_list.append(energy)
    accuracy_list.append(accuracy)
    model_evolution_list.append(energy)

    return new_model
  else:
    model_evolution_list.append(loss) #FOR DEBUGGING, both
    return model

def run_sim_layer(model, loss, x_train, y_train, x_test, y_test, beta, nu, M):
  '''
  function that runs move 1. loss was the term chosen for the energy of the system before the move, energy was chosen for the energy fo the system after the move.
  even if they are technically the same in terms of units (both losses/energy)
  this was chosen to avoid confusion between the two
  '''
  neurons = np.random.randint(low=1, high=M)
  add_layer_prob = np.random.choice([True, False])
  if add_layer_prob == True: 
    layer = np.random.randint(low=1, high=len(model.layers)-1)
    new_model = add_layer(model, layer, neurons)
  else:
    #brute force way
    if(len(model.layers)-2 <= 1):
      return model
    layer = np.random.randint(low=1, high=len(model.layers)-2)
    removed_neurons = model.layers[layer].units
    new_model = remove_layer(model, layer)
  #calculate model loss 
  NL = len(new_model.layers)
  energy, accuracy = run_model_DEBUG(new_model, x_train, y_train, x_test, y_test) #ADDED ACCURACY FOR DEBUGGING
  #run acceptance
  if(add_layer_prob == True):
    probability = calculate_layer_acceptance_probability(NL, neurons, loss, energy, beta, nu)
  else:
    probability = calculate_layer_removal_probability(NL, removed_neurons, energy, loss, beta, nu)
    # it is assumed that for this NN+1(i) is the model before the operation, and NN(i) is the model after the operation, hence the NN-1, and the reversed orders of energy and loss.
  if np.random.rand() < probability:
    probability_list.append(probability) #FOR DEBUGGING PURPOSES ONLY
    #just for trial, we save every model we ever make for this time
    save_model(new_model, x_train, y_train, x_test, y_test)
    loss_list.append(energy)
    accuracy_list.append(accuracy)
    model_evolution_list.append(energy)
    return new_model
  else:
    model_evolution_list.append(loss)
    return model

def run_sim_step(model, x_train, y_train, x_test, y_test, f, beta, nu, M):
  loss = run_model(model, x_train, y_train, x_test, y_test)
  if np.random.rand() < f:
    model = run_sim_neuron(model, loss, x_train, y_train, x_test, y_test, beta, nu)
  else:
    model = run_sim_layer(model, loss, x_train, y_train, x_test, y_test, beta, nu, M)
  return model 
def run_sim_for_loop(x_train, y_train, x_test, y_test, f, beta, nu, M, number_of_starting_layers, low_neuron, high_neuron, iterations):
  starting_layers = np.random.randint(low=2, high=number_of_starting_layers)
  if(low_neuron == high_neuron):
    model_layers = [low_neuron for i in range(starting_layers)]
  else:
    model_layers = np.random.randint(low=low_neuron, high=high_neuron, size=starting_layers)
  model = construct_model_binary_classification(x_train.shape[1], model_layers)
  loss = run_model(model, x_train, y_train, x_test, y_test)
  save_model(model, x_train, y_train, x_test, y_test)
  energy, accuracy = run_model_DEBUG(model, x_train, y_train, x_test, y_test)
  accuracy_list.append(accuracy)
  model_evolution_list.append(energy)
  for i in range(iterations):
    model = run_sim_step(model, x_train, y_train, x_test, y_test, f, beta, nu, M)
  return model


In [None]:
# for debugging purposes 
def run_sim_for_loop_loss_included(x_train, y_train, x_test, y_test, f, beta, nu, M, number_of_starting_layers, low_neuron, high_neuron, iterations, loss_list=[], mae_list=[]):
  starting_layers = np.random.randint(low=2, high=number_of_starting_layers)
  if(low_neuron == high_neuron):
    model_layers = [low_neuron for i in range(starting_layers)]
  else:
    model_layers = np.random.randint(low=low_neuron, high=high_neuron, size=starting_layers)
  model = construct_model_binary_classification(x_train.shape[1], model_layers)
  loss = run_model(model, x_train, y_train, x_test, y_test)
  save_model(model, x_train, y_train, x_test, y_test)
  loss_list.append(loss)
  energy, accuracy = run_model_DEBUG(model, x_train, y_train, x_test, y_test)
  accuracy_list.append(accuracy)
  model_evolution_list.append(energy)
  for i in range(iterations):
    model = run_sim_step(model, x_train, y_train, x_test, y_test, f, beta, nu, M)
    #loss, mae = model.evaluate(x_test, y_test)
    #loss_list.append(loss)
    #mae_list.append(mae)
  return model # loss_list, mae_list