In [None]:
import numpy as np
import torch
import torch.nn as nn
import tensorflow as tf
from collections import Counter
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
import pandas as pd
import seaborn as sns
import torch.optim as optim
import plotly.express as px
import plotly.graph_objects as go
from torchsummary import summary
import plotly.subplots as sp
import plotly.figure_factory as ff
from sklearn.metrics import confusion_matrix, classification_report

device = 'cuda' if torch.cuda.is_available() else 'cpu'

np.random.seed(42)
tf.random.set_seed(42)

In [None]:
dataset_name = "mnist"
Encoding = "pca8"
input_size = 8
pca_n_components = 8
classes = 10
final_layer_size = int(input_size / 4)
print(f"device being used --- {device}")
print(f'final layer size of the cnn model {final_layer_size}')
plot_pca = True

In [None]:
# datasize to choose for training and test set
train_datasize = 2000
test_datasize = 1000

if dataset_name == "fashion_mnist":
  (x_train, y_train), (x_test, y_test) = tf.keras.datasets.fashion_mnist.load_data()
elif dataset_name == "mnist":
  (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
len_unique_classes = np.unique(y_test)
print(f"Loaded Fashion MNIST dataset with {len(x_train)} training samples and {len(x_test)} test samples with # classes {len_unique_classes}")

# shuffle the training data
train_indices = np.random.permutation(len(x_train))
x_train = x_train[train_indices]
y_train = y_train[train_indices]

# shuffle the test data
test_indices = np.random.permutation(len(x_test))
x_test = x_test[test_indices]
y_test = y_test[test_indices]

# slice the datasize
x_train = x_train[:train_datasize]
x_test = x_test[:test_datasize]
y_train = y_train[:train_datasize]
y_test = y_test[:test_datasize]

# count the number of each class in x_train and y_test
train_class_counts = Counter(y_train)
test_class_counts = Counter(y_test)

# the class counts in x_train
print("Class counts in x_train:")
for label, count in train_class_counts.items():
    print(f"Class {label}: {count}")

# the class counts in x_test
print("Class counts in x_test:")
for label, count in test_class_counts.items():
    print(f"Class {label}: {count}")

def check_imbalance(class_counts, datasize):
  avg_count = datasize / 10
  # taking imbalance threshold, 20% of average count
  threshold = 0.2 * avg_count
  for label, count in class_counts.items():
    if abs(count - avg_count) > threshold:
      return True, label, count
  return False, None, None

# check for imbalance in training data and test_data
is_imbalanced_train, train_imbalanced_class, train_imbalanced_count = check_imbalance(train_class_counts, train_datasize)
if is_imbalanced_train:
    print(f"\nImbalance detected in training data for class {train_imbalanced_class} with count {train_imbalanced_count}")
else:
    print("\nNo significant imbalance detected in training data")
is_imbalanced_test, test_imbalanced_class, test_imbalanced_count = check_imbalance(test_class_counts, test_datasize)
if is_imbalanced_test:
    print(f"\nImbalance detected in test data for class {test_imbalanced_class} with count {test_imbalanced_count}")
else:
    print("\nNo significant imbalance detected in test data")

In [None]:
# plot the first 5 images
plt.figure(figsize=(10, 2))
for i in range(5):
    plt.subplot(1, 5, i + 1)
    plt.imshow(x_train[i])
    plt.title(f"Label: {y_train[i]}")
    plt.axis('off')

plt.show()

In [None]:
# normalize the images data
X_train, X_test = x_train[..., np.newaxis] / 255.0, x_test[..., np.newaxis] / 255.0
Y_train = y_train
Y_test = y_test

In [None]:
# apply pca and flatten original 28x28 images
X_train = tf.image.resize(X_train[:], (784, 1)).numpy()
X_test = tf.image.resize(X_test[:], (784, 1)).numpy()
X_train, X_test = tf.squeeze(X_train), tf.squeeze(X_test)
# apply pca
pca = PCA(pca_n_components)
X_train = pca.fit_transform(X_train)
X_test = pca.transform(X_test)
# Explained variance ratio
explained_variance = pca.explained_variance_ratio_
print(f"Explained variance ratio of the {pca_n_components} components:", explained_variance)
if plot_pca:
  # plot the first three PCA components
  df = pd.DataFrame({
        'Principal Component 1': X_train[:, 0],
        'Principal Component 2': X_train[:, 1],
        'Principal Component 3': X_train[:, 2],
        'Digit': y_train
    })
  # create the interactive 3D plot
  fig = px.scatter_3d(df, x='Principal Component 1', y='Principal Component 2', z='Principal Component 3',
                      color='Digit', labels={'Digit': 'Digit'}, opacity=0.7)
  fig.update_layout(title=f'PCA of {dataset_name} dataset (First 3 Components of {pca_n_components})',
                    scene = dict(
                          xaxis_title='Principal Component 1',
                          yaxis_title='Principal Component 2',
                          zaxis_title='Principal Component 3'), width=800, height=600)
  fig.show()

In [None]:
n_feature = 2
# Define the CNN model
CNN_model = nn.Sequential(
    nn.Conv1d(in_channels=1, out_channels=n_feature, kernel_size=2, padding=1),
    nn.ReLU(),
    nn.MaxPool1d(kernel_size=2),
    nn.Conv1d(in_channels=n_feature, out_channels=n_feature, kernel_size=2, padding=1),
    nn.ReLU(),
    nn.MaxPool1d(kernel_size=2),
    nn.Flatten(),
    nn.Linear(n_feature * final_layer_size, classes),  # Output size for MNIST 10 classes
)
if device == "cuda":
  CNN_model.to(device)

In [None]:
# Use torchsummary to get the model summary
sample_image = X_test[0].reshape(1, X_test[0].shape[0])
summary(CNN_model, input_size=sample_image.shape)

In [None]:
# compute accuracy
def compute_accuracy(preds, labels):
  _, preds_max = torch.max(preds, 1)
  correct = (preds_max == labels).sum().item()
  return correct / labels.size(0)

In [None]:
n_epochs = 30
batch_size = 64
learning_rate = 0.01

train_loss_history = []
train_acc_history = []
test_loss_history = []
test_acc_history = []

optimizer = torch.optim.SGD(CNN_model.parameters(), lr=learning_rate, momentum=0.9, nesterov=True)
cost_function = nn.CrossEntropyLoss()
#scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.1, patience=2, verbose=True)

tr_steps_per_epoch = len(X_train) // batch_size

print(f'starting training model for {n_epochs} epochs')
for epoch in range(n_epochs):
  epoch_loss = 0
  epoch_accuracy = 0
  steps = 0
  # shuffle the data for each epoch
  indices = np.random.permutation(len(X_train))
  X_train_shuffled = X_train[indices]
  Y_train_shuffled = Y_train[indices]
  for step in range(tr_steps_per_epoch):
    # create mini-batch
    X_batch = X_train_shuffled[step * batch_size: (step + 1) * batch_size]
    Y_batch = Y_train_shuffled[step * batch_size: (step + 1) * batch_size]
    X_train_batch_torch = torch.tensor(X_batch, dtype=torch.float32).view(batch_size, 1, input_size).to(device)
    #X_train_batch_torch.resize_(batch_size, 1, input_size)
    Y_train_batch_torch = torch.tensor(Y_batch, dtype=torch.long).to(device)
    # zero the gradients
    optimizer.zero_grad()
    # send the data to model
    Y_pred_batch_torch = CNN_model(X_train_batch_torch)
    # compute loss function
    loss = cost_function(Y_pred_batch_torch, Y_train_batch_torch)
    train_loss_history.append(loss.item())

    # backward pass and optimize
    loss.backward()
    optimizer.step()

    # Accumulate loss and accuracy
    epoch_loss += loss.item()
    epoch_accuracy += compute_accuracy(Y_pred_batch_torch, Y_train_batch_torch)
    steps += 1
  # compute average loss and accuracy for the epoch
  avg_epoch_loss = epoch_loss / steps
  avg_epoch_accuracy = epoch_accuracy / steps
  train_loss_history.append(avg_epoch_loss)
  train_acc_history.append(avg_epoch_accuracy)

  print(f'Epoch [{epoch+1}/{n_epochs}], Loss: {avg_epoch_loss:.4f}, Accuracy: {avg_epoch_accuracy:.4f}')
  # setup the scheduler
  #scheduler.step(avg_epoch_accuracy)

In [None]:
# Plotting the training history using Plotly
fig = sp.make_subplots(rows=1, cols=2, subplot_titles=("Training Loss", "Training Accuracy"))

# Training Loss
fig.add_trace(go.Scatter(x=list(range(1, n_epochs+1)), y=train_loss_history, mode='lines+markers', name='Loss'), row=1, col=1)

# Training Accuracy
fig.add_trace(go.Scatter(x=list(range(1, n_epochs+1)), y=train_acc_history, mode='lines+markers', name='Accuracy'), row=1, col=2)

fig.update_layout(title_text=f"Training Loss and Accuracy Over Epochs : {dataset_name}", height=600, width=1000,)
fig.update_xaxes(title_text="Epoch", row=1, col=1)
fig.update_xaxes(title_text="Epoch", row=1, col=2)
fig.update_yaxes(title_text="Loss", row=1, col=1)
fig.update_yaxes(title_text="Accuracy", row=1, col=2)

fig.show()

In [None]:
# Evaluate the model on the test set
def evaluate_model(model, X_test, Y_test, device):
    model.eval()  # Set the model to evaluation mode
    X_test_torch = torch.tensor(X_test, dtype=torch.float32).view(len(X_test), 1, input_size).to(device)
    Y_test_torch = torch.tensor(Y_test, dtype=torch.long).to(device)
    with torch.no_grad():
      Y_pred_test_torch = model(X_test_torch)
    test_accuracy = compute_accuracy(Y_pred_test_torch, Y_test_torch)
    return Y_pred_test_torch.cpu().numpy(), test_accuracy

predictions, test_accuracy = evaluate_model(CNN_model, X_test, Y_test, device)
predicted_labels = np.argmax(predictions, axis=1)
print(f'Test Accuracy: {test_accuracy:.4f}')

In [None]:
# Confusion Matrix and Classification Report
conf_matrix = confusion_matrix(Y_test, predicted_labels)
class_report = classification_report(Y_test, predicted_labels, output_dict=True)

# Plotting the confusion matrix using seaborn
plt.figure()
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=range(classes), yticklabels=range(classes))
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title(f'CNN Confusion Matrix : {dataset_name}')
plt.show()

In [None]:
# Convert classification report to DataFrame
class_report_df = pd.DataFrame(class_report).transpose()

# Plotting the classification report using Seaborn
plt.figure(figsize=(10, 6))
sns.heatmap(class_report_df.iloc[:-1, :-1].T, annot=True, cmap='Blues')
plt.title(f'CNN Classification Report : {dataset_name}')
plt.show()