# Train 3 Types of CNNs: Basic, Lenet, Alexnet
Base code was adapted form the following tutorial:
https://www.analyticsvidhya.com/blog/2021/08/beginners-guide-to-convolutional-neural-network-with-implementation-in-python/

## Import Necessary Libraries

In [None]:
# Load in the coco annotations
import numpy as np

# visualize the data
import matplotlib.pyplot as plt

# Load in the names_labels_df csv and turn it into a df
import pandas as pd
from sklearn.model_selection import train_test_split

import tensorflow as tf
from tensorflow import keras
import keras_tuner as kt
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import MaxPool2D
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import Dense

import os

from sklearn.metrics import accuracy_score, cohen_kappa_score

from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

# Calculate the kappa score
from sklearn.metrics import cohen_kappa_score

# Calculate AUROC
from sklearn.metrics import roc_auc_score

## Load and Visualize The Data

### Load the Images and Resize Them If Needed

In [None]:
################################### State Your Paths ###################################

patches_image_directory = "your_path_here/All_Patches/"
split_data_folder = "your_path_here/Train_test_develop_split_all_combined/"

num_classes = 3

In [None]:
################################# Load in the data ######################################

# Load in the names_images_labels_df csv and turn it into a df
train_names_labels_df = pd.read_csv(f"{split_data_folder}tra_val_names_labels_df.csv")
test_names_labels_df = pd.read_csv(f"{split_data_folder}dev_names_labels_df.csv")

# Get the lists of file names and labels
train_names = train_names_labels_df['patch_name'].tolist()
train_labels = train_names_labels_df['label'].tolist()

# Get the lists of file names and labels for the testing data
test_names = test_names_labels_df['patch_name'].tolist()
test_labels = test_names_labels_df['label'].tolist()

In [None]:
# Define the split ratio
split_ratio = 0.8

##################### Here I use the Real Dataset ################################

# Split your data into training and validation sets with stratified randomization
new_train_names, new_val_file_names, new_train_labels, new_val_labels = train_test_split(
    train_names, train_labels, test_size=1 - split_ratio, random_state=42, stratify=train_labels) 

#################### Here I use the Real Dataset ##################################

# Create empty NumPy arrays for your images 
image_size = (224, 224, 1) 
# image_size = (28, 28, 1) 
# image_size = (32, 32, 1) 

x_train = np.empty((len(new_train_names), *image_size), dtype=np.float32)
x_val = np.empty((len(new_val_file_names), *image_size), dtype=np.float32)
x_test = np.empty((len(test_names), *image_size), dtype=np.float32)

# Load and preprocess the training images
for i, image_name in enumerate(new_train_names):
    image_path = os.path.join(patches_image_directory, image_name)
    image = tf.io.read_file(image_path)
    # decode the image keeping only the first channel
    image = tf.image.decode_image(image, channels=1)
    image = tf.image.resize(image, (224, 224))
    # image = tf.image.resize(image, (28, 28))
    # image = tf.image.resize(image, (32, 32))
    # image = tf.image.grayscale_to_rgb(image)
    image = tf.image.convert_image_dtype(image, tf.float32)
    x_train[i] = image.numpy().astype(np.float32)  # Convert to NumPy array and cast to float32

# Load and preprocess the validation images
for i, image_name in enumerate(new_val_file_names):
    image_path = os.path.join(patches_image_directory, image_name)
    image = tf.io.read_file(image_path)
    image = tf.image.decode_image(image, channels=1)
    image = tf.image.resize(image, (224, 224))
    # image = tf.image.resize(image, (28, 28))
    # image = tf.image.resize(image, (32, 32))
    # image = tf.image.grayscale_to_rgb(image)
    image = tf.image.convert_image_dtype(image, tf.float32)
    x_val[i] = image.numpy().astype(np.float32)  # Convert to NumPy array and cast to float32

# Load and preprocess the testing images
for i, image_name in enumerate(test_names):
    image_path = os.path.join(patches_image_directory, image_name)
    image = tf.io.read_file(image_path)
    image = tf.image.decode_image(image, channels=1)
    image = tf.image.resize(image, (224, 224))
    # image = tf.image.resize(image, (28, 28))
    # image = tf.image.resize(image, (32, 32))
    # image = tf.image.grayscale_to_rgb(image)
    image = tf.image.convert_image_dtype(image, tf.float32)
    x_test[i] = image.numpy().astype(np.float32)  # Convert to NumPy array and cast to float32
    
# Where: "categories":[{"id":1,"name":"Full","supercategory":""},{"id":2,"name":"Partial","supercategory":""},{"id":3,"name":"Empty","supercategory":""}]

# Convert the lists of labels to NumPy arrays. Convert each to an integer
y_train = np.array(new_train_labels, dtype=np.float32) -1
y_val = np.array(new_val_labels, dtype=np.float32) -1
y_test = np.array(test_labels, dtype=np.float32) -1

# y_train = np.array(new_train_labels, dtype=np.int32) - 1
# y_val = np.array(new_val_labels, dtype=np.int32) - 1
# y_test = np.array(test_labels, dtype=np.int32) - 1

train = x_train, y_train
val = x_val, y_val

load_data = train, val

# print the shapes of all the data
y_train.shape, y_val.shape, y_test.shape, x_train.shape, x_val.shape, x_test.shape

# (2m 43s) with metal

In [None]:
######################################## Basic Model  ###########################################

# np.savez_compressed(f"{split_data_folder}train_images_labels_28x28smallimgs.npz", x_train=x_train, y_train=y_train)
# np.savez_compressed(f"{split_data_folder}val_images_labels_28x28smallimgs.npz", x_val=x_val, y_val=y_val)
# np.savez_compressed(f"{split_data_folder}test_images_labels_28x28smallimgs.npz", x_test=x_test, y_test=y_test)

########################################### Lenet  ###############################################

# np.savez_compressed(f"{split_data_folder}train_images_labels_32x32smallimgs.npz", x_train=x_train, y_train=y_train)
# np.savez_compressed(f"{split_data_folder}val_images_labels_32x32smallimgs.npz", x_val=x_val, y_val=y_val)
# np.savez_compressed(f"{split_data_folder}test_images_labels_32x32smallimgs.npz", x_test=x_test, y_test=y_test)

###########################################  Alexnet ###############################################

np.savez_compressed(f"{split_data_folder}train_images_labels_1ch.npz", x_train=x_train, y_train=y_train)
np.savez_compressed(f"{split_data_folder}val_images_labels_1ch.npz", x_val=x_val, y_val=y_val)
np.savez_compressed(f"{split_data_folder}test_images_labels_1ch.npz", x_test=x_test, y_test=y_test)

In [None]:
# # load in the .npz files for the basic model
# train_data = np.load(f"{split_data_folder}train_images_labels_28x28smallimgs.npz")
# x_train = train_data['x_train']
# y_train = train_data['y_train']
# val_data = np.load(f"{split_data_folder}val_images_labels_28x28smallimgs.npz")
# x_val = val_data['x_val']
# y_val = val_data['y_val']
# test_data = np.load(f"{split_data_folder}test_images_labels_28x28smallimgs.npz")
# x_test = test_data['x_test']
# y_test = test_data['y_test']
# print("script message: loaded in all data")

# # load in the .npz files for lenet
# train_data = np.load(f"{split_data_folder}train_images_labels_32x32smallimgs.npz")
# x_train = train_data['x_train']
# y_train = train_data['y_train']
# val_data = np.load(f"{split_data_folder}val_images_labels_32x32smallimgs.npz")
# x_val = val_data['x_val']
# y_val = val_data['y_val']
# test_data = np.load(f"{split_data_folder}test_images_labels_32x32smallimgs.npz")
# x_test = test_data['x_test']
# y_test = test_data['y_test']
# print("script message: loaded in all data")

# load in the .npz files for alexnet
train_data = np.load(f"{split_data_folder}train_images_labels_1ch.npz")
x_train = train_data['x_train']
y_train = train_data['y_train']
val_data = np.load(f"{split_data_folder}val_images_labels_1ch.npz")
x_val = val_data['x_val']
y_val = val_data['y_val']
test_data = np.load(f"{split_data_folder}test_images_labels_1ch.npz")
x_test = test_data['x_test']
y_test = test_data['y_test']
print("script message: loaded in all data")


# Normalize the data
x_train=x_train/255
x_test=x_test/255


### Visualize the Data

In [None]:
#print unique values in y_train
print(np.unique(y_train))

In [None]:
print(type(x_train[0][0][0][0]))
print(type(y_train[0]))
print(x_train.shape)
print(y_train.shape)
print(x_train[0][0][0][0])
print(type(x_train))
print(type(y_train))

print(x_train.shape)
print(x_test.shape)

In [None]:
# visualize the data
plt.imshow(x_train[5])
plt.show()
# Show the label
print(y_train[5])
print(x_train[5].shape)

## Build The Model

In [None]:
tf.keras.backend.clear_session()

### Simple Model

In [None]:
model = Sequential()
model.add(Input(shape=(28, 28, 1)))  
model.add(Conv2D(32, (3, 3), activation='relu'))
model.add(MaxPool2D((2, 2)))
model.add(Flatten())
model.add(Dense(100, activation='relu'))
model.add(Dense(num_classes, activation='softmax'))
model.compile(loss='sparse_categorical_crossentropy', optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), metrics=['accuracy'])

In [None]:
# The code in this cell is adapted from this tutorial: https://www.tensorflow.org/tutorials/keras/keras_tuner

def model_builder(hp):
  model = keras.Sequential()
  model.add(Input(shape=(28, 28, 1)))  
  
  # Tune the number of filters in the Conv2D layer. Choose an optimal value between 6-384
  hp_filters = hp.Int('filters', min_value=32, max_value=256, step=32)
  hp_kernel_size = hp.Choice('kernel_size', values=[2, 3, 5])
  model.add(Conv2D(filters=hp_filters, kernel_size=hp_kernel_size, activation='relu')) 
  
  # Tune the pool_size and the strides
  hp_pool_size = hp.Choice('pool_size', values=[2, 3])
  hp_strides = hp.Choice('strides', values=[1, 2])
  model.add(MaxPool2D(pool_size=hp_pool_size, strides=hp_strides))
  
  
  model.add(Flatten())
  
  # Tune the number of units in the Dense layers. Choose an optimal value between 10-1000
  hp_units = hp.Int('units', min_value=100, max_value=500, step=10)
  model.add(Dense(units=hp_units, activation='relu'))
  model.add(Dense(num_classes, activation='softmax'))

  # Tune the learning rate for the optimizer. Choose an optimal value from 0.01, 0.001, or 0.0001
  hp_learning_rate = hp.Choice('learning_rate', values=[1e-3, 1e-4])

  model.compile(optimizer=keras.optimizers.Adam(learning_rate=hp_learning_rate),
                loss='sparse_categorical_crossentropy',
                metrics=['accuracy'])

  return model

### LeNet Model

In [None]:
# LeNet

model = Sequential()
model.add(Input(shape=(32, 32, 1))) 
model.add(Conv2D(filters=6, kernel_size=5, padding='valid', activation='sigmoid'))
model.add(MaxPool2D(pool_size=2, strides=2))
model.add(Conv2D(filters=16, kernel_size=5, padding='valid', activation='sigmoid'))
model.add(MaxPool2D(pool_size=2, strides=2))
model.add(Flatten())
model.add(Dense(120, activation='sigmoid'))
model.add(Dense(84, activation='sigmoid'))
model.add(Dense(num_classes, activation='softmax'))

model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

In [None]:
# LeNet

def model_builder(hp):
    model = keras.Sequential()
    model.add(Input(shape=(32, 32, 1))) 
    
    # Tune the number of filters in the Conv2D layer. Choose an optimal value between 6-384
    hp_filters1 = hp.Int('filters1', min_value=6, max_value=9, step=3)
    hp_kernel_size1 = hp.Choice('kernel_size1', values=[3, 5])
    model.add(Conv2D(filters=hp_filters1, kernel_size=hp_kernel_size1, padding='valid', activation='sigmoid')) 
    
    # Tune the pool_size and the strides
    hp_pool_size1 = hp.Choice('pool_size1', values=[2])
    hp_strides1 = hp.Choice('strides1', values=[2])
    model.add(MaxPool2D(pool_size=hp_pool_size1, strides=hp_strides1))
    
    hp_filters2 = hp.Int('filters2', min_value=12, max_value=20, step=4)
    hp_kernel_size2 = hp.Choice('kernel_size2', values=[3, 5])
    model.add(Conv2D(filters=hp_filters2, kernel_size=hp_kernel_size2, padding='valid', activation='sigmoid'))
    
    hp_pool_size2 = hp.Choice('pool_size2', values=[2])
    hp_strides2 = hp.Choice('strides2', values=[2])
    model.add(MaxPool2D(pool_size=hp_pool_size2, strides=hp_strides2))
    
    model.add(Flatten())
    
    hp_units1 = hp.Int('units1', min_value=100, max_value=400, step=10)
    model.add(Dense(units=hp_units1, activation='sigmoid'))
    
    hp_units2 = hp.Int('units2', min_value=80, max_value=100, step=4)
    model.add(Dense(units=hp_units2, activation='sigmoid'))
    
    model.add(Dense(num_classes, activation='softmax'))
    
    hp_learning_rate = hp.Choice('learning_rate', values=[1e-3, 1e-4])
    
    model.compile(optimizer=keras.optimizers.Adam(learning_rate=hp_learning_rate),
                loss='sparse_categorical_crossentropy',
                metrics=['accuracy'])

    return model

### AlexNet Model

In [None]:
model = Sequential()
model.add(Input(shape=(224, 224, 1)))  
model.add(Conv2D(filters=96, kernel_size=11, strides=4, activation='relu'))
model.add(MaxPool2D(pool_size=3, strides=2))
model.add(Conv2D(filters=256, kernel_size=5, padding='same', activation='relu'))
model.add(MaxPool2D(pool_size=3, strides=2))

model.add(Conv2D(filters=384, kernel_size=3, padding='same',activation='relu'))
model.add(Conv2D(filters=384, kernel_size=3, padding='same',activation='relu'))
model.add(Conv2D(filters=256, kernel_size=3, padding='same',activation='relu'))

model.add(MaxPool2D(pool_size=3, strides=2))
model.add(Flatten())

model.add(Dense(4096, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(4096, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(num_classes, activation='softmax'))

model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])


In [None]:
def model_builder(hp):
    model = Sequential()
    model.add(Input(shape=(224, 224, 1)))
    
    hp_filters1 = hp.Int('filters1', min_value=78, max_value=120, step=6)
    hp_kernel_size1 = hp.Choice('kernel_size1', values=[2, 6, 11])
    model.add(Conv2D(filters=hp_filters1, kernel_size=hp_kernel_size1, strides=4, padding='valid', activation='relu'))
    
    hp_pool_size1 = hp.Choice('pool_size1', values=[2, 3])
    hp_strides1 = hp.Choice('strides1', values=[1, 2])
    model.add(MaxPool2D(pool_size=hp_pool_size1, strides=hp_strides1, padding='valid'))
    
    hp_filters2 = hp.Int('filters2', min_value=156, max_value=356, step=50)
    hp_kernel_size2 = hp.Choice('kernel_size2', values=[2, 3, 5])
    model.add(Conv2D(filters=hp_filters2, kernel_size=hp_kernel_size2, padding='same', activation='relu'))
    
    hp_pool_size2 = hp.Choice('pool_size2', values=[2, 3])
    hp_strides2 = hp.Choice('strides2', values=[1, 2])
    model.add(MaxPool2D(pool_size=hp_pool_size2, strides=hp_strides2, padding='valid'))

    hp_filters3 = hp.Int('filters3', min_value=324, max_value=420, step=12)
    hp_kernel_size3 = hp.Choice('kernel_size3', values=[2, 3])
    model.add(Conv2D(filters=hp_filters3, kernel_size=hp_kernel_size3, padding='same',activation='relu'))
    model.add(Conv2D(filters=hp_filters3, kernel_size=hp_kernel_size3, padding='same',activation='relu'))
    
    hp_filters4 = hp.Int('filters4', min_value=156, max_value=356, step=50)
    hp_kernel_size4 = hp.Choice('kernel_size4', values=[2, 3])
    model.add(Conv2D(filters=hp_filters4, kernel_size=hp_kernel_size4, padding='same',activation='relu'))

    hp_pool_size3 = hp.Choice('pool_size3', values=[2, 3])
    hp_strides3 = hp.Choice('strides3', values=[1, 2])
    model.add(MaxPool2D(pool_size=hp_pool_size3, strides=hp_strides3, padding='valid'))
    model.add(Flatten())

    hp_units1 = hp.Int('units1', min_value=1000, max_value=2000, step=100)
    model.add(Dense(units=hp_units1, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(units=hp_units1, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(num_classes, activation='softmax'))
    
    hp_learning_rate = hp.Choice('learning_rate', values=[1e-3, 1e-4])

    model.compile(optimizer=keras.optimizers.Adam(learning_rate=hp_learning_rate),
                loss='sparse_categorical_crossentropy',
                metrics=['accuracy'])

    return model


## Tune Hyperparameters and Train

### Train WITH Hyperparameter Tuning

In [None]:
tuner = kt.RandomSearch(model_builder,
                     objective='val_accuracy',
                     max_trials=15,
                     overwrite=True)

stop_early = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3)

In [None]:
batch_size = 32
validation_split= 0.2
train_imageslen = len(x_train)*(1-validation_split)
steps_per_epoch = int(train_imageslen // batch_size)

tuner.search(x_train, y_train, epochs=7, validation_split=validation_split, batch_size=batch_size, steps_per_epoch=steps_per_epoch, shuffle=True, callbacks=[stop_early])

# Get the optimal hyperparameters
best_hps=tuner.get_best_hyperparameters(num_trials=1)[0]

# uncomment the needed model below 

######################################## Basic Model  ###########################################

# print(f"""
# The hyperparameter search is complete. The optimal:
# 1) filters in the Conv2D layer: {best_hps.get('filters')}
# 2) kernel size in the Conv2D layer: {best_hps.get('kernel_size')}
# 3) pool size in the MaxPool2D layer: {best_hps.get('pool_size')}
# 4) strides in the MaxPool2D layer: {best_hps.get('strides')}
# 1) Number of units in the first densely-connected layer: {best_hps.get('units')} 
# 2) Learning rate for the optimizer: {best_hps.get('learning_rate')}
# """)

########################################### Lenet  ###############################################

# print(f"""
# The hyperparameter search is complete. The optimal:
# 1) filters in the Conv2D layer: {best_hps.get('filters1')}
# 2) kernel size in the Conv2D layer: {best_hps.get('kernel_size1')}
# 3) pool size in the MaxPool2D layer: {best_hps.get('pool_size1')}
# 4) strides in the MaxPool2D layer: {best_hps.get('strides1')}

# 1) filters in the Conv2D layer: {best_hps.get('filters2')}
# 2) kernel size in the Conv2D layer: {best_hps.get('kernel_size2')}
# 3) pool size in the MaxPool2D layer: {best_hps.get('pool_size2')}
# 4) strides in the MaxPool2D layer: {best_hps.get('strides2')}

# 1) Number of units in the first densely-connected layer: {best_hps.get('units1')} 
# 2) Number of units in the first densely-connected layer: {best_hps.get('units2')} 

# 1) Learning rate for the optimizer: {best_hps.get('learning_rate')}
# """)

###########################################  Alexnet ###############################################

# print(f"""
# The hyperparameter search is complete. The optimal:
# 1) filters in the Conv2D layer: {best_hps.get('filters1')}
# 2) kernel size in the Conv2D layer: {best_hps.get('kernel_size1')}
# 3) pool size in the MaxPool2D layer: {best_hps.get('pool_size1')}
# 4) strides in the MaxPool2D layer: {best_hps.get('strides1')}

# 1) filters in the Conv2D layer: {best_hps.get('filters2')}
# 2) kernel size in the Conv2D layer: {best_hps.get('kernel_size2')}
# 3) pool size in the MaxPool2D layer: {best_hps.get('pool_size2')}
# 4) strides in the MaxPool2D layer: {best_hps.get('strides2')}

# 1) filters in the Conv2D layer: {best_hps.get('filters3')}
# 2) kernel size in the Conv2D layer: {best_hps.get('kernel_size3')}
# 1) filters in the Conv2D layer: {best_hps.get('filters4')}
# 2) kernel size in the Conv2D layer: {best_hps.get('kernel_size4')}
# 3) pool size in the MaxPool2D layer: {best_hps.get('pool_size3')}
# 4) strides in the MaxPool2D layer: {best_hps.get('strides3')}

# 5) Number of units densely-connected layers: {best_hps.get('units1')} 

# 6) Learning rate for the optimizer: {best_hps.get('learning_rate')}
# """)

## 42m 0.2s - 133m 32s

In [None]:
# Build the model with the optimal hyperparameters and train it on the data for 50 epochs
model = tuner.hypermodel.build(best_hps)
history = model.fit(x_train, y_train, epochs=15, validation_split=0.2, batch_size=32)

val_acc_per_epoch = history.history['val_accuracy']
best_epoch = val_acc_per_epoch.index(max(val_acc_per_epoch)) + 1
print('Best epoch: %d' % (best_epoch,))

### Train WithOUT Hyperparameter Tuning

In [None]:
# model.fit(x_train,y_train,epochs=10, validation_data=(x_val,y_val), batch_size=32)
model.fit(x_train,y_train,epochs=15, validation_split=0.2, batch_size=32)

## Evaluate the Model

In [None]:
model.evaluate(x_test,y_test)

In [None]:
# Ensure that x_val is of type float32
x_test = x_test.astype(np.float32)
y_test = y_test

# Make predictions on the validation set
y_pred = model.predict(x_test)

# Show the accuracy of the model
accuracy = np.mean(np.argmax(y_pred, axis=1) == y_test)
print(f"Accuracy: {accuracy:.3f}")

# Calculate the kappa score
kappa = cohen_kappa_score(y_test, np.argmax(y_pred, axis=1))
print(f"Kappa: {kappa:.3f}")

# Compute the selectivity for empty capsids
empty_selectivity = confusion_matrix(y_test, np.argmax(y_pred, axis=1))[2, 2] / np.sum(confusion_matrix(y_test, np.argmax(y_pred, axis=1))[2])
print(f"The selectivity for empty capsids is {empty_selectivity}")

# Calculate the AUROC 
# use label_binarize for multi-class data
y_val_binarized = tf.keras.utils.to_categorical(y_test, num_classes=3)
y_pred_binarized = tf.keras.utils.to_categorical(np.argmax(y_pred, axis=1), num_classes=3)
auroc = roc_auc_score(y_val_binarized, y_pred_binarized, multi_class='ovr')
print(f"AUROC: {auroc:.2f}")

# show a confusion matrix 
cm = confusion_matrix(y_test, np.argmax(y_pred, axis=1))
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['Full', 'Partial', 'Empty'])
# disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['Full', 'Partial', 'Empty', 'Agg', 'Ice', 'Broken', 'Backg'])

disp.plot(cmap='Blues')