In [None]:
import torch
import time
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from torchvision.datasets import EMNIST
from torchvision.transforms import ToTensor
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Download and load the EMNIST "Balanced" dataset
emnist_train = EMNIST(root='emnist_data', split='balanced', train=True, transform=ToTensor(), download=True)
emnist_test = EMNIST(root='emnist_data', split='balanced', train=False, transform=ToTensor(), download=True)

# Split the train data into train and validation sets
train_len = int(len(emnist_train) * 0.8)
valid_len = len(emnist_train) - train_len
train_dataset, valid_dataset = random_split(emnist_train, [train_len, valid_len])

# Create DataLoaders for train, validation, and test datasets
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(emnist_test, batch_size=64, shuffle=False)

def show_example(img, label):
    plt.imshow(img.squeeze().numpy(), cmap='gray')
    plt.title(f'Label: {label}')
    plt.show()

# Display a few examples from the train dataset
for i in range(5):
    img, label = emnist_train[i]
    show_example(img, label)

print(f"Number of training samples: {len(emnist_train)}")
print(f"Number of testing samples: {len(emnist_test)}")


In [None]:
# Define the MLP (Multilayer Perceptron) class
class MLP(nn.Module):
    def __init__(self, activation='relu', dropout_rate=0.0, batch_normalization=True, l1_reg=0.0):
      # Initialize the MLP with given hyperparameters
        super(MLP, self).__init__()
        # Define fully connected layers
        self.fc1 = nn.Linear(28 * 28, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 32)
        self.fc4 = nn.Linear(32, 47)
        # Define dropout layer
        self.dropout = nn.Dropout(dropout_rate)
        # Set batch normalization flag
        self.batch_normalization = batch_normalization
        self.l1_reg = l1_reg
        # Initialize batch normalization layers if enabled
        if self.batch_normalization:
            self.bn1 = nn.BatchNorm1d(128)
            self.bn2 = nn.BatchNorm1d(64)
            self.bn3 = nn.BatchNorm1d(32)

        # Set the activation function based on the given parameter
        if activation == 'relu':
            self.activation = nn.ReLU()
        elif activation == 'leaky_relu':
            self.activation = nn.LeakyReLU()
        elif activation == 'elu':
            self.activation = nn.ELU()

    # Define forward pass for the MLP
    def forward(self, x):
      # Flatten input tensor
        x = x.view(-1, 28 * 28)
        # Apply first fully connected layer, batch normalization (if enabled), activation, and dropout
        x = self.fc1(x)
        if self.batch_normalization:
            x = self.bn1(x)
        x = self.activation(x)
        x = self.dropout(x)

        x = self.fc2(x)
        if self.batch_normalization:
            x = self.bn2(x)
        x = self.activation(x)
        x = self.dropout(x)

        x = self.fc3(x)
        if self.batch_normalization:
            x = self.bn3(x)
        x = self.activation(x)
        x = self.dropout(x)

        # Apply fourth fully connected layer
        x = self.fc4(x)
        return x

# Define the CNN (Convolutional Neural Network) class
class CNN(nn.Module):
  # Initialize the CNN with given hyperparameters
    def __init__(self, activation='relu', dropout_rate=0.0, batch_normalization=True, l1_reg=0.0):
        super(CNN, self).__init__()
        # Define convolutional layers
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        # Define fully connected layers
        self.fc1 = nn.Linear(64 * 7 * 7, 128)
        self.fc2 = nn.Linear(128, 47)
        self.dropout = nn.Dropout(dropout_rate)
        self.batch_normalization = batch_normalization
        self.l1_reg = l1_reg
        # Add batch normalization layers if specified
        if self.batch_normalization:
            self.bn1 = nn.BatchNorm2d(32)
            self.bn2 = nn.BatchNorm2d(64)
            self.bn3 = nn.BatchNorm1d(128)

        # Choose the activation function based on the provided activation type
        if activation == 'relu':
            self.activation = nn.ReLU()
        elif activation == 'leaky_relu':
            self.activation = nn.LeakyReLU()
        elif activation == 'elu':
            self.activation = nn.ELU()

    # Define the forward pass of the CNN
    def forward(self, x):
      # First convolutional layer
        x = self.conv1(x)
        # Apply batch normalization if specified
        if self.batch_normalization:
            x = self.bn1(x)
        x = self.activation(x)  # Apply the activation function
        # Apply max pooling
        x = nn.MaxPool2d(kernel_size=2)(x)

        # Second convolutional layer
        x = self.conv2(x)
        if self.batch_normalization:
            x = self.bn2(x)
        x = self.activation(x)
        x = nn.MaxPool2d(kernel_size=2)(x)
        x = x.view(x.size(0), -1)  # Flatten the tensor
        # First fully connected layer
        x = self.fc1(x)
        if self.batch_normalization:
            x = self.bn3(x)
        x = self.activation(x)
        x = self.dropout(x)
        # Second fully connected layer
        x = self.fc2(x)
        return x

#create functions to create MLP and CNN models with given hyperparameters passed as the 'params' argument. 
def create_mlp(params):
     return MLP(activation=params['activation'], dropout_rate=params['dropout_rate'], batch_normalization=params['batch_normalization'], l1_reg=params['l1_reg'])

def create_cnn(params):
     return CNN(activation=params['activation'], dropout_rate=params['dropout_rate'], batch_normalization=params['batch_normalization'], l1_reg=params['l1_reg'])

def train_model(model, train_loader, valid_loader, optimizer, criterion, scheduler, epochs, params):
    # Set the model to train mode
    model.train()
    # Initialize lists to store training and validation losses and accuracies
    train_losses = []
    train_accuracies = []
    valid_losses = []
    valid_accuracies = []

    # Loop through all epochs
    for epoch in range(epochs):
        running_loss = 0.0
        correct = 0
        total = 0

          # Loop through all batches in the training dataset
        for i, data in enumerate(train_loader, 0):
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad() # Zero the gradients of the optimizer
            outputs = model(inputs)  # Forward pass through the model
            loss = criterion(outputs, labels)

            # Add L1 regularization if needed
            if params['l1_reg'] > 0:
                l1_regularization = torch.tensor(0.).to(device)
                for param in model.parameters():
                    l1_regularization += torch.norm(param, 1)
                loss += params['l1_reg'] * l1_regularization
            loss.backward() # Backpropagate the gradients
            optimizer.step() # Update the weights

            # Update the running loss and accuracy
            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        scheduler.step() # Update the learning rate scheduler

        # Calculate and store average training loss and accuracy for the current epoch
        train_losses.append(running_loss / (i + 1))
        train_accuracies.append(100 * correct / total)

        # Evaluate the model on the validation dataset and store the loss and accuracy
        valid_loss, valid_accuracy = evaluate_model(model, valid_loader)
        valid_losses.append(valid_loss)
        valid_accuracies.append(valid_accuracy)

        # Evaluate the model on the validation dataset and store the loss and accuracy
        print(f"Epoch {epoch + 1}, Loss: {train_losses[-1]}, Validation Accuracy: {valid_accuracies[-1]}%")

    return train_losses, valid_losses, train_accuracies, valid_accuracies

def evaluate_model(model, valid_loader):
  # Set the model to evaluation mode
    model.eval()
    correct = 0
    total = 0
    running_loss = 0.0
    with torch.no_grad():
        for data in valid_loader:
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs) # Forward pass through the model

            # Calculate the loss
            loss = criterion(outputs, labels)
            running_loss += loss.item()

            # Update the running accuracy
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    # Calculate the validation loss and accuracy
    valid_accuracy = 100 * correct / total
    valid_loss = running_loss / len(valid_loader)
    return valid_loss, valid_accuracy

def create_optimizer(model, params):
  # Create the specified optimizer with provided hyperparameters
    if params['optimizer'] == 'SGD':
        optimizer = optim.SGD(model.parameters(), lr=params['lr'], weight_decay=params['l2_reg'])
    elif params['optimizer'] == 'ADAM':
        optimizer = optim.Adam(model.parameters(), lr=params['lr'], weight_decay=params['l2_reg'])
    elif params['optimizer'] == 'RMSprop':
        optimizer = optim.RMSprop(model.parameters(), lr=params['lr'], weight_decay=params['l2_reg'])
    return optimizer

def create_scheduler(optimizer, params):
  # Create the specified optimizer with provided hyperparameters
    if params['scheduler'] == 'StepLR':
        scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)
    elif params['scheduler'] == 'ExponentialLR':
        scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.1)
    return scheduler
    
criterion = nn.CrossEntropyLoss() # Define the loss criterion

def cross_validation(create_model_func, train_dataset, k_folds, params, criterion):
  # Initialize k-fold cross-validation
    kfold = KFold(n_splits=k_folds, shuffle=True)
    results = []

    # Iterate over each fold
    for train_indices, valid_indices in kfold.split(train_dataset):
      # Create training and validation subsets for the current fold
        train_subset = torch.utils.data.Subset(train_dataset, train_indices)
        valid_subset = torch.utils.data.Subset(train_dataset, valid_indices)

        # Create data loaders for the training and validation subsets
        train_loader = DataLoader(train_subset, batch_size=64, shuffle=True)
        valid_loader = DataLoader(valid_subset, batch_size=64, shuffle=False)

        # Create the model and move it to the appropriate device
        model = create_model_func(params)
        model.to(device)

        optimizer = create_optimizer(model, params) # Create the optimizer and learning rate scheduler

        scheduler = create_scheduler(optimizer, params)

        # Create the optimizer and learning rate scheduler
        train_model(model, train_loader, valid_loader, optimizer, criterion, scheduler, params['epochs'], params)

        # Evaluate the model on the validation subset and store the accuracy
        _, accuracy = evaluate_model(model, valid_loader)
        results.append(accuracy)

    return sum(results) / len(results) # Return the average accuracy across all folds

# Initial baseline configuration
params = {
    'optimizer': 'ADAM',
    'lr': 0.01,
    'l2_reg': 0.0001,
    'l1_reg': 0,
    'scheduler': 'StepLR',
    'activation': 'relu',
    'dropout_rate': 0.5,
    'batch_normalization': True,
    'epochs': 10
}

#Search for best set of hyperparameters
def find_best_hyperparameters(model_fn, train_dataset, criterion, k_folds=2):

    # Initialize best hyperparameters and accuracy
    best_accuracy = 0
    best_params = params.copy()

    # Explore learning rate schedulers
    schedulers = ['StepLR', 'ExponentialLR']
    best_scheduler_accuracy = 0
    best_scheduler = None

    for scheduler in schedulers:
        params['scheduler'] = scheduler
        accuracy = cross_validation(model_fn, train_dataset, k_folds=k_folds, params=params, criterion=criterion)
        if accuracy > best_scheduler_accuracy:
            best_scheduler_accuracy = accuracy
            best_scheduler = scheduler

    params['scheduler'] = best_scheduler

    # Explore activation functions
    activations = ['relu', 'leaky_relu', 'elu']
    best_activation_accuracy = 0
    best_activation = None

    for activation in activations:
        params['activation'] = activation
        accuracy = cross_validation(model_fn, train_dataset, k_folds=k_folds, params=params, criterion=criterion)
        if accuracy > best_activation_accuracy:
            best_activation_accuracy = accuracy
            best_activation = activation

    params['activation'] = best_activation

    # Explore optimizers
    optimizers = ['SGD', 'ADAM', 'RMSprop']
    best_optimizer_accuracy = 0
    best_optimizer = None

    for optimizer in optimizers:
        params['optimizer'] = optimizer
        accuracy = cross_validation(model_fn, train_dataset, k_folds=k_folds, params=params, criterion=criterion)
        if accuracy > best_optimizer_accuracy:
            best_optimizer_accuracy = accuracy
            best_optimizer = optimizer

    params['optimizer'] = best_optimizer

    # Explore L1 and L2 regularization
    regularizations = [(0, 0), (0.001, 0), (0, 0.0001)]
    best_regularization_accuracy = 0
    best_regularization = None

    for l1_reg, l2_reg in regularizations:
        params['l1_reg'] = l1_reg
        params['l2_reg'] = l2_reg
        accuracy = cross_validation(model_fn, train_dataset, k_folds=k_folds, params=params, criterion=criterion)
        if accuracy > best_regularization_accuracy:
            best_regularization_accuracy = accuracy
            best_regularization = (l1_reg, l2_reg)

    params['l1_reg'], params['l2_reg'] = best_regularization

    # Explore Dropout
    dropout_rates = [0, 0.5]
    best_dropout_accuracy = 0
    best_dropout_rate = None

    for dropout_rate in dropout_rates:
        params['dropout_rate'] = dropout_rate
        accuracy = cross_validation(model_fn, train_dataset, k_folds=k_folds, params=params, criterion=criterion)
        if accuracy > best_dropout_accuracy:
            best_dropout_accuracy = accuracy
            best_dropout_rate = dropout_rate

    params['dropout_rate'] = best_dropout_rate

    # Explore Batch Normalization
    batch_norm_options = [True, False]
    best_batch_norm_accuracy = 0
    best_batch_norm = None

    for batch_norm in batch_norm_options:
        params['batch_normalization'] = batch_norm
    accuracy = cross_validation(model_fn, train_dataset, k_folds=k_folds, params=params, criterion=criterion)
    if accuracy > best_batch_norm_accuracy:
          best_batch_norm_accuracy = accuracy
          best_batch_norm = batch_norm

    params['batch_normalization'] = best_batch_norm

  # Print best hyperparameters and accuracy
    print("Best hyperparameters:", best_params)
    print("Best accuracy:", best_accuracy)

    return best_params, best_accuracy

# Start measuring time
start_time = time.time()

# Find the best hyperparameters for the MLP model using the training dataset and criterion
mlp_best_params, mlp_best_accuracy = find_best_hyperparameters(create_mlp, train_dataset, criterion)

# Find the best hyperparameters for the CNN model using the training dataset and criterion
cnn_best_params, cnn_best_accuracy = find_best_hyperparameters(create_cnn, train_dataset, criterion)

# Print the best hyperparameters and corresponding accuracy for the MLP model
print("MLP best hyperparameters:", mlp_best_params)
print("MLP best accuracy:", mlp_best_accuracy)

# Print the best hyperparameters and corresponding accuracy for the MLP model
print("CNN best hyperparameters:", cnn_best_params)
print("CNN best accuracy:", cnn_best_accuracy)

# Print total training time and if the cpu or gpu was used
end_time = time.time()

if device.type == 'cuda':
    print("Using GPU")
else:
    print("Using CPU")
print(f"Total training time: {end_time - start_time} seconds")


In [None]:
# Train MLP and CNN models with best hyperparameters
mlp_model = create_mlp(mlp_best_params).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = create_optimizer(mlp_model, mlp_best_params)
scheduler = create_scheduler(optimizer, mlp_best_params)
mlp_train_losses, mlp_train_accuracies, mlp_valid_losses, mlp_valid_accuracies = train_model(mlp_model, train_loader, valid_loader, optimizer, criterion, scheduler, mlp_best_params['epochs'], mlp_best_params)

cnn_model = create_cnn(cnn_best_params).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = create_optimizer(cnn_model, cnn_best_params)
scheduler = create_scheduler(optimizer, cnn_best_params)
cnn_train_losses, cnn_train_accuracies, cnn_valid_losses, cnn_valid_accuracies = train_model(cnn_model, train_loader, valid_loader, optimizer, criterion, scheduler, cnn_best_params['epochs'], cnn_best_params)

# Plot the loss function graph with respect to the iteration/epoch
plt.figure()
plt.plot(mlp_train_losses, label='MLP Train Loss')
plt.plot(mlp_valid_losses, label='MLP Validation Loss')
plt.plot(cnn_train_losses, label='CNN Train Loss')
plt.plot(cnn_valid_losses, label='CNN Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

# Plot the accuracy graph with respect to the iteration/epoch
plt.figure()
plt.plot(mlp_train_accuracies, label='MLP Train Accuracy')
plt.plot(mlp_valid_accuracies, label='MLP Validation Accuracy')
plt.plot(cnn_train_accuracies, label='CNN Train Accuracy')
plt.plot(cnn_valid_accuracies, label='CNN Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.show()


In [None]:
def evaluate_model(model, test_loader):
    model.eval()

    # Print predictions and true labels for the top six samples in the testing dataset
    with torch.no_grad():
        for i, data in enumerate(test_loader, 0):
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            
            for j in range(6):
                print(f"Sample {i * 64 + j + 1}: True label: {labels[j]}, Predicted label: {predicted[j]}")
            
            break  # Only go through the first batch

    # Calculate performance metrics
    accuracy, precision, recall, f1 = calculate_metrics(model, test_loader)

    # Print the results
    print(f"Accuracy: {accuracy * 100:.2f}%")
    print(f"Precision: {precision:.2f}")
    print(f"Recall: {recall:.2f}")
    print(f"F1 Score: {f1:.2f}")

def calculate_metrics(model, test_loader):
  # Initialize lists to store true and predicted labels for all test samples
    true_labels = []
    predicted_labels = []

    with torch.no_grad(): # Iterate through the test dataset using the test_loader
        for inputs, labels in test_loader:
          # Move input data and labels to the same device as the model (GPU or CPU)
            inputs, labels = inputs.to(device), labels.to(device)
            # Pass the inputs through the model to obtain the predicted outputs
            outputs = model(inputs)
            # Get the class with the highest predicted probability for each input
            _, predicted = torch.max(outputs.data, 1) 
            # Iterate through the test dataset using the test_loader
            true_labels.extend(labels.cpu().numpy())
            predicted_labels.extend(predicted.cpu().numpy())

    # Calculate the accuracy, precision, recall, and F1-score using the true and predicted labels
    accuracy = accuracy_score(true_labels, predicted_labels)
    precision = precision_score(true_labels, predicted_labels, average='weighted')
    recall = recall_score(true_labels, predicted_labels, average='weighted')
    f1 = f1_score(true_labels, predicted_labels, average='weighted')

    return accuracy, precision, recall, f1 # Return the calculated metrics

# Evaluate MLP model
print("Evaluating MLP model:")
evaluate_model(mlp_model, test_loader)

# Evaluate CNN model
print("\nEvaluating CNN model:")
evaluate_model(cnn_model, test_loader)


In [None]:
def get_confusion_matrix(model, test_loader):
    true_labels = []
    predicted_labels = []

    # Evaluate the model without updating gradients
    with torch.no_grad(): 
      # Iterate through the test dataset using the test_loader
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            # Get the class with the highest predicted probability for each input
            _, predicted = torch.max(outputs.data, 1)
            # Add the true labels and predicted labels to their respective lists
            true_labels.extend(labels.cpu().numpy())
            predicted_labels.extend(predicted.cpu().numpy())
# Return the confusion matrix computed using the true and predicted labels
    return confusion_matrix(true_labels, predicted_labels)

# Get confusion matrices for both models
mlp_cm = get_confusion_matrix(mlp_model, test_loader)
cnn_cm = get_confusion_matrix(cnn_model, test_loader)

# Combine confusion matrices
combined_cm = mlp_cm + cnn_cm

# Plot the combined confusion matrix
plt.figure(figsize=(10, 10))
sns.heatmap(combined_cm, annot=True, cmap='Blues', fmt='g')
plt.xlabel('Predicted label')
plt.ylabel('True label')
plt.title('Combined Confusion Matrix for MLP and CNN Models')
