In [None]:
import time
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras import layers, models

# pip install scikeras==0.10.0
# pip install tensorflow-model-optimization==0.7.3

from scikeras.wrappers import KerasClassifier
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import GridSearchCV

from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.metrics import top_k_accuracy_score

import tensorflow_model_optimization as tfmot

gpus = tf.config.list_physical_devices('GPU')
print("GPU Available: ", gpus)

### Ultility

In [None]:
# Ultility function 1: Training Time Monitoring using tensorflow
class TimeHistory(keras.callbacks.Callback):
    def on_train_begin(self, logs={}):
        self.times = []

    def on_epoch_begin(self, batch, logs={}):
        self.epoch_time_start = time.time()

    def on_epoch_end(self, batch, logs={}):
        self.times.append(time.time() - self.epoch_time_start)

In [None]:
# Ultility function 2: Measure reasoning time
def measure_reasoning_time(model, X):
    start = time.time()
    model.predict(X)
    end = time.time()
    return end - start

In [None]:
# Ultility function 3: Measure the computational complexity of the model (FLOPs)
from tensorflow.python.framework import convert_to_constants
def get_flops(model):
    # Define the input shape for the model
    input_shape = (1,) + model.input_shape[1:]
    
    # Create a concrete function from the model
    inputs = tf.TensorSpec(shape=input_shape, dtype=tf.float32)
    concrete_function = tf.function(model).get_concrete_function(inputs)
    
    # Convert variables to constants
    frozen_func = convert_to_constants.convert_variables_to_constants_v2(concrete_function)
    graph_def = frozen_func.graph.as_graph_def()
    
    # Import the graph definition
    with tf.Graph().as_default() as graph:
        tf.compat.v1.import_graph_def(graph_def, name='')
        run_meta = tf.compat.v1.RunMetadata()
        opts = tf.compat.v1.profiler.ProfileOptionBuilder.float_operation()
        
        # Run the profiler to calculate FLOPs
        flops = tf.compat.v1.profiler.profile(graph=graph, run_meta=run_meta, cmd='scope', options=opts)
        
    # Return the total FLOPs
    return flops.total_float_ops if flops is not None else 0

## Data Loading and Preprocessing

In [None]:
# Load the EMNIST data
train_dict = pd.read_pickle('emnist_train.pkl')
test_dict = pd.read_pickle('emnist_test.pkl')

In [None]:
# Convert the data to np arrays
train_images = np.array(train_dict['data'])
train_labels = np.array(train_dict['labels'])
test_images = np.array(test_dict['data'])
test_labels = np.array(test_dict['labels'])

In [None]:
# Add a channel dimension
train_images = train_images[..., np.newaxis]
test_images = test_images[..., np.newaxis]

In [None]:
# Resize the images to 32x32
train_images = np.array(tf.image.resize(train_images, [32, 32]))
test_images = np.array(tf.image.resize(test_images, [32, 32]))

In [None]:
# Normalize the images
train_images = train_images / 255.0
test_images = test_images / 255.0

In [None]:
# Create TensorFlow datasets
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels))
print("Train Dataset:", train_dataset)
print("Test Dataset:", test_dataset)

## Model: VGG11

### 1. Implement VGG11 Network Structure

In [None]:
def VGG11(input_shape, num_classes=62):
    model = models.Sequential()
    model.add(layers.Input(shape=input_shape))
    
    # Conv Layer Block 1
    model.add(layers.Conv2D(64, (3, 3), padding='same', activation='relu'))
    model.add(layers.MaxPooling2D((2, 2)))
    
    # Conv Layer Block 2
    model.add(layers.Conv2D(128, (3, 3), padding='same', activation='relu'))
    model.add(layers.MaxPooling2D((2, 2)))
    
    # Conv Layer Block 3
    model.add(layers.Conv2D(256, (3, 3), padding='same', activation='relu'))
    model.add(layers.Conv2D(256, (3, 3), padding='same', activation='relu'))
    model.add(layers.MaxPooling2D((2, 2)))
    
    # Conv Layer Block 4
    model.add(layers.Conv2D(512, (3, 3), padding='same', activation='relu'))
    model.add(layers.Conv2D(512, (3, 3), padding='same', activation='relu'))
    model.add(layers.MaxPooling2D((2, 2)))
    
    # Classifier
    model.add(layers.Flatten())
    model.add(layers.Dense(512, activation='relu'))
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(512, activation='relu'))
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(num_classes, activation='softmax'))
    
    return model

In [None]:
# Create VGG11 baseline model
vgg11_model = VGG11(input_shape=(32, 32, 1), num_classes=62)

optimizer = keras.optimizers.Adam(learning_rate=1e-4)
loss = keras.losses.SparseCategoricalCrossentropy()
vgg11_model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])

vgg11_model.summary()

In [None]:
# Train VGG11 baseline model and measure the training time
vgg11_time_callback = TimeHistory()

history = vgg11_model.fit(train_dataset.batch(64), validation_data=test_dataset.batch(64), epochs=10, callbacks=[vgg11_time_callback])

total_training_time = sum(vgg11_time_callback.times)
print(f"Total training time: {total_training_time:.2f} seconds")

In [None]:
# Plot the loss vs epoch graph
plt.plot(history.history['loss'], label='loss')
plt.plot(history.history['val_loss'], label='val_loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
# Visualize hidden layers output
layer_outputs = [layer.output for layer in vgg11_model.layers if 'max_pooling2d' in layer.name]
activation_model = models.Model(inputs=vgg11_model.input, outputs=layer_outputs)
activations = activation_model.predict(test_images[0][np.newaxis, ...])
layer_names = [layer.name for layer in vgg11_model.layers if 'max_pooling2d' in layer.name]
for layer_name, layer_activation in zip(layer_names, activations):
    n_features = layer_activation.shape[-1]
    size = layer_activation.shape[1]      
    plt.figure(figsize=(12,12))
    for i in range(n_features):
        # Display only the first 64 features
        if i >= 64:
            break
        plt.subplot(8, 8, i + 1)
        plt.imshow(layer_activation[0, :, :, i], cmap='viridis')
        plt.axis('off')
    plt.suptitle(f"Layer: {layer_name}")
    plt.show()

### 2. Hyperparameter tuning

In [None]:
# Define a function to create the model
def create_model_vgg11(learning_rate, optimizer_name):
    optimizer_dict = {
    'adam': keras.optimizers.Adam,
    'adagrad': keras.optimizers.Adagrad,
    'rmsprop': keras.optimizers.RMSprop
    }
    if optimizer_name in optimizer_dict:
        optimizer = optimizer_dict[optimizer_name](learning_rate=learning_rate)
    else:
        raise ValueError(f"Unknown optimizer: {optimizer_name}")
    model = VGG11(input_shape=(32, 32, 1), num_classes=62)
    loss = keras.losses.SparseCategoricalCrossentropy()
    model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])
    return model

In [None]:
# Define the hyperparameters
param_grid = {
    'model__learning_rate': [1e-5, 5e-5, 1e-4],
    'model__optimizer_name': ['adam', 'adagrad','rmsprop']
    
    # Debug parameters
    # 'model__learning_rate': [1e-4],
    # 'model__optimizer_name': ['adam']
}

In [None]:
# Wrap the model using KerasClassifier
# Use scikeras.wrappers.KerasClassifier to wrap the model.
keras_model = KerasClassifier(model=create_model_vgg11, epochs=10)

# Perform the grid search
skf = StratifiedKFold(n_splits=1, shuffle=True)
grid = GridSearchCV(estimator=keras_model, param_grid=param_grid, cv=skf)
grid_result = grid.fit(train_images, train_labels)

In [None]:
# Print the results
# Mean measures the best-performing hyperparameter combination.
# Standard deviation measures the stability of the model's performance.
print("Best: %f using %s" % (grid_result.best_score_, grid_result.best_params_))
means = grid_result.cv_results_['mean_test_score']
stds = grid_result.cv_results_['std_test_score']
params = grid_result.cv_results_['params']
for mean, std, param in zip(means, stds, params):
    print("%f (%f) with: %r" % (mean, std, param))

In [None]:
# Train the model with the best hyperparameters
best_lr_vgg11 = grid_result.best_params_['model__learning_rate']
best_opt_vgg11 = grid_result.best_params_['model__optimizer_name']
best_model_vgg11 = create_model_vgg11(best_lr_vgg11, best_opt_vgg11)
history = best_model_vgg11.fit(train_dataset.batch(64), validation_data=test_dataset.batch(64), epochs=10)

In [None]:
y_pred = best_model_vgg11.predict(test_dataset.batch(64))
y_pred = np.argmax(y_pred, axis=1)
y_true = test_labels

vgg11_accuracy = accuracy_score(y_true, y_pred)
vgg11_precision = precision_score(y_true, y_pred, average='weighted')
vgg11_recall = recall_score(y_true, y_pred, average='weighted')
vgg11_f1 = f1_score(y_true, y_pred, average='weighted')
vgg11_top5_accuracy = top_k_accuracy_score(y_true, best_model_vgg11.predict(test_dataset.batch(64)), k=5)

print(classification_report(y_true, y_pred))

## Model: ResNet18

### 1. Implement ResNet18 Network Structure
K. He, X. Zhang, S. Ren and J. Sun, "Deep Residual Learning for Image Recognition," 2016 IEEE Conference on Computer Vision and Pattern Recognition (CVPR), Las Vegas, NV, USA, 2016, pp. 770-778, doi: 10.1109/CVPR.2016.90.

In [None]:
def residual_block(x, filters, kernel_size=3, downsample=False):
    shortcut = x

    x = layers.Conv2D(filters, kernel_size=kernel_size, strides=(2 if downsample else 1), padding='same', activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(filters, kernel_size=kernel_size, strides=1, padding='same', activation='relu')(x)
    x = layers.BatchNormalization()(x)

    # Add downsample layer
    if downsample:
        shortcut = layers.Conv2D(filters, kernel_size=1, strides=2, padding='same')(shortcut)
        shortcut = layers.BatchNormalization()(shortcut)

    # Add shortcut connection
    x = layers.add([x, shortcut])
    x = layers.Activation('relu')(x)
    
    return x

def ResNet18(input_shape, num_classes=62):
    inputs = layers.Input(shape=input_shape)

    # Conv Layer Block 1
    x = layers.Conv2D(64, (7, 7), strides=2, padding='same', activation='relu')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D((3, 3), strides=2, padding='same')(x)

    # Residual Block 1
    x = residual_block(x, filters=64)
    x = residual_block(x, filters=64)

    # Residual Block 2
    x = residual_block(x, filters=128, downsample=True)
    x = residual_block(x, filters=128)

    # Residual Block 3
    x = residual_block(x, filters=256, downsample=True)
    x = residual_block(x, filters=256)

    # Residual Block 4
    # x = residual_block(x, filters=512, downsample=True)
    # x = residual_block(x, filters=512)

    # Classifier
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(num_classes, activation='softmax')(x)

    model = models.Model(inputs, outputs)
    
    return model

In [None]:
# Create ResNet18 baseline model
resnet18_model = ResNet18(input_shape=(32, 32, 1), num_classes=62)

optimizer = keras.optimizers.Adam(learning_rate=1e-5)
loss = keras.losses.SparseCategoricalCrossentropy()
resnet18_model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])

resnet18_model.summary()

In [None]:
# Train the baseline model
resnet18_time_callback = TimeHistory()

history = resnet18_model.fit(train_dataset.batch(64), validation_data=test_dataset.batch(64), epochs=10, callbacks=[resnet18_time_callback])

total_training_time = sum(resnet18_time_callback.times)
print(f"Total training time: {total_training_time:.2f} seconds")

In [None]:
# Plot the loss vs epoch graph
plt.plot(history.history['loss'], label='loss')
plt.plot(history.history['val_loss'], label='val_loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
# Visualize hidden layers output
layer_outputs = [layer.output for layer in resnet18_model.layers if 'conv2d_117' in layer.name or 'conv2d_122' in layer.name or 'conv2d_127' in layer.name]
activation_model = models.Model(inputs=resnet18_model.input, outputs=layer_outputs)
activations = activation_model.predict(test_images[0][np.newaxis, ...])
layer_names = [layer.name for layer in resnet18_model.layers if 'conv2d_117' in layer.name or 'conv2d_122' in layer.name or 'conv2d_127' in layer.name]
for layer_name, layer_activation in zip(layer_names, activations):
    n_features = layer_activation.shape[-1]
    size = layer_activation.shape[1]      
    plt.figure(figsize=(16,16))
    for i in range(n_features):
        # Display only the first 64 features
        if i >= 64:
            break
        plt.subplot(8, 8, i + 1)
        plt.imshow(layer_activation[0, :, :, i], cmap='viridis')
        plt.axis('off')
    plt.suptitle(f"Layer: {layer_name}")
    plt.show()

### 2. Hyperparameter tuning

In [None]:
# Define a function to create the model
def create_model_resnet18(learning_rate, optimizer_name):
    optimizer_dict = {
    'adam': keras.optimizers.Adam,
    'adagrad': keras.optimizers.Adagrad,
    'rmsprop': keras.optimizers.RMSprop
    }
    if optimizer_name in optimizer_dict:
        optimizer = optimizer_dict[optimizer_name](learning_rate=learning_rate)
    else:
        raise ValueError(f"Unknown optimizer: {optimizer_name}")
    model = ResNet18(input_shape=(32, 32, 1), num_classes=62)
    loss = keras.losses.SparseCategoricalCrossentropy()
    model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])
    return model

In [None]:
# Define the hyperparameters
param_grid = {
    'model__learning_rate': [5e-5, 1e-4, 5e-4],
    'model__optimizer_name': ['adam', 'adagrad','rmsprop']

    # Debug parameters
    # 'model__learning_rate': [1e-5],
    # 'model__optimizer_name': ['adam']
}

In [None]:
# Wrap the model using KerasClassifier
# Use scikeras.wrappers.KerasClassifier to wrap the model.
keras_model = KerasClassifier(model=create_model_resnet18, epochs=10)

# Perform the grid search
skf = StratifiedKFold(n_splits=3, shuffle=True)
grid = GridSearchCV(estimator=keras_model, param_grid=param_grid, cv=skf)
grid_result = grid.fit(train_images, train_labels)

In [None]:
# Print the results
print("Best: %f using %s" % (grid_result.best_score_, grid_result.best_params_))
means = grid_result.cv_results_['mean_test_score']
stds = grid_result.cv_results_['std_test_score']
params = grid_result.cv_results_['params']
for mean, std, param in zip(means, stds, params):
    print("%f (%f) with: %r" % (mean, std, param))

In [None]:
# Train the model with the best hyperparameters
best_lr_resnet18 = grid_result.best_params_['model__learning_rate']
best_opt_resnet18 = grid_result.best_params_['model__optimizer_name']
best_model_resnet18 = create_model_resnet18(best_lr_resnet18, best_opt_resnet18)
history = best_model_resnet18.fit(train_dataset.batch(64), validation_data=test_dataset.batch(64), epochs=10)

In [None]:
y_pred = best_model_resnet18.predict(test_dataset.batch(64))
y_pred = np.argmax(y_pred, axis=1)
y_true = test_labels

resnet18_accuracy = accuracy_score(y_true, y_pred)
resnet18_precision = precision_score(y_true, y_pred, average='weighted')
resnet18_recall = recall_score(y_true, y_pred, average='weighted')
resnet18_f1 = f1_score(y_true, y_pred, average='weighted')
resnet18_top5_accuracy = top_k_accuracy_score(y_true, best_model_resnet18.predict(test_dataset.batch(64)), k=5)

print(classification_report(y_true, y_pred))

## Evaluation

In [None]:
models = {
    'VGG11': best_model_vgg11,
    'ResNet18': best_model_resnet18,
    # 'EfficientNetB0': best_model_efficientnetb0,
}
acc = {
    'VGG11': vgg11_accuracy,
    'ResNet18': resnet18_accuracy,
    # 'EfficientNetB0': efficientnetb0_accuracy,
}
training_time = {
    'VGG11': sum(vgg11_time_callback.times),
    'ResNet18': sum(resnet18_time_callback.times),
    # 'EfficientNetB0': sum(efficientnetb0_time_callback.times),
}
reasoning_time = {
    'VGG11': measure_reasoning_time(best_model_vgg11, test_images),
    'ResNet18': measure_reasoning_time(best_model_resnet18, test_images),
    # 'EfficientNetB0': measure_reasoning_time(best_model_efficientnetb0, test_images),
}
flops = {
    'VGG11': get_flops(best_model_vgg11),
    'ResNet18': get_flops(best_model_resnet18),
    # 'EfficientNetB0': get_flops(best_model_efficientnetb0),
}
for name, model in models.items():
    print(f"Model: {name}")
    print(f"Accuracy: {acc[name]:.4f}")
    print(f"Training Time: {training_time[name]:.2f} seconds")
    print(f"Reasoning Time: {reasoning_time[name]:.4f} seconds")
    print(f"FLOPs: {flops[name]:.2f}")
    print("\n"+"\n")

## Comparison

In [None]:
metrics = ['Accuracy', 'Precision', 'Recall', 'F1-score', 'Top-5 Accuracy']
vgg11_metrics = [vgg11_accuracy, vgg11_precision, vgg11_recall, vgg11_f1, vgg11_top5_accuracy]
resnet18_metrics = [resnet18_accuracy, resnet18_precision, resnet18_recall, resnet18_f1, resnet18_top5_accuracy]
# efficientnetb0_metrics = [efficientnetb0_accuracy, efficientnetb0_precision, efficientnetb0_recall, efficientnetb0_f1, efficientnetb0_top5_accuracy]

# Plot the performance metrics
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
for i, metric in enumerate(metrics):
    ax = axes[i//3, i%3]
    ax.bar(['VGG11', 'ResNet18'], [vgg11_metrics[i], resnet18_metrics[i]])
    # ax.bar(['VGG11', 'ResNet18', 'EfficientNetB0'], [vgg11_metrics[i], resnet18_metrics[i], efficientnetb0_metrics[i]])
    ax.set_title(metric + ' Comparison')
    ax.set_ylabel(metric)
    ax.set_xlabel('Model')
plt.tight_layout()
plt.show()

In [None]:
# Plot the training time metric
fig, ax = plt.subplots(figsize=(8, 5))
ax.bar(['VGG11', 'ResNet18'], [training_time['VGG11'], training_time['ResNet18']])
# ax.bar(['VGG11', 'ResNet18', 'EfficientNetB0'], [training_time['VGG11'], training_time['ResNet18'], training_time['EfficientNetB0']])
ax.set_title('Training Time Comparison')
ax.set_ylabel('Training Time (s)')
ax.set_xlabel('Model')
plt.tight_layout()
plt.show()

In [None]:
# Plot the reasoning time metric
fig, ax = plt.subplots(figsize=(8, 5))
ax.bar(['VGG11', 'ResNet18'], [reasoning_time['VGG11'], reasoning_time['ResNet18']])
# ax.bar(['VGG11', 'ResNet18', 'EfficientNetB0'], [reasoning_time['VGG11'], reasoning_time['ResNet18'], reasoning_time['EfficientNetB0']])
ax.set_title('Reasoning Time Comparison')
ax.set_ylabel('Reasoning Time (s)')
ax.set_xlabel('Model')
plt.tight_layout()
plt.show()

In [None]:
# Plot the FLOPs metric
fig, ax = plt.subplots(figsize=(8, 5))
ax.bar(['VGG11', 'ResNet18'], [flops['VGG11'], flops['ResNet18']])
# ax.bar(['VGG11', 'ResNet18', 'EfficientNetB0'], [flops['VGG11'], flops['ResNet18'], flops['EfficientNetB0']])
ax.set_title('FLOPs Comparison')
ax.set_ylabel('FLOPs')
ax.set_xlabel('Model')
plt.tight_layout()
plt.show()