# Dependencies Import


In [13]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader, Subset
from torchvision import transforms
from sklearn.model_selection import StratifiedShuffleSplit
from torch.optim.lr_scheduler import StepLR    # learning rate scheduler
import matplotlib.pyplot as plt
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import accuracy_score, recall_score, precision_score
from sklearn.decomposition import PCA
from sklearn.neighbors import KNeighborsClassifier
from itertools import product
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, precision_score, recall_score
from sklearn.metrics import ConfusionMatrixDisplay


ValueError: numpy.dtype size changed, may indicate binary incompatibility. Expected 96 from C header, got 88 from PyObject

# Model 

The class below represents our final model and an instance of it is initialized with our tuned hyperparameters

The convolutional layers of the model can be changed by giving an argument of a list with kernel size, number of filters, and dropout rate

In [None]:
class ImageCNN(nn.Module):
    def __init__(self, num_classes=10, 
                 #conv_filters=[32, 64, 128],
                 #kernel_sizes=[3,3,3], 
                 #dropout_rate=0.5
                 conv_layers = [{"out_ch":32, "kernel_size":5, "dropout_rate":0.5},
                                {"out_ch":64, "kernel_size":5, "dropout_rate":0.5},
                                {"out_ch":128, "kernel_size":5, "dropout_rate":0.5}],
                mlp_layers = {"out_ch":256, "dropout_rate":0.5}
                 ):
        super(ImageCNN, self).__init__()
        layers = []
        in_ch = 1  # single-channel input
        for spec in conv_layers:
            layers += [
                nn.Conv2d(in_ch, spec["out_ch"], kernel_size=spec["kernel_size"], padding='same'),
                nn.BatchNorm2d(spec["out_ch"]),
                nn.ReLU(inplace=True),
                nn.MaxPool2d(2),
                nn.Dropout2d(spec["dropout_rate"]),
            ]
            in_ch = spec["out_ch"]  # update input channels for next layer
        self.conv = nn.Sequential(*layers)

        with torch.no_grad():
            sample = torch.zeros(1,1,100,100)
            feat = self.conv(sample)
        flat_dim = feat.view(1, -1).size(1)
        self.fc1 = nn.Linear(flat_dim, mlp_layers["out_ch"])
        self.bn1 = nn.BatchNorm1d(mlp_layers["out_ch"])
        self.dropout = nn.Dropout(mlp_layers["dropout_rate"])
        
        self.fc2 = nn.Linear(256, num_classes)
        self._initialize_weights()

    def forward(self, x):
        x = self.conv(x)                 # (batch, C, H, W)
        x = x.view(x.size(0), -1)        # flatten
        x = F.relu(self.bn1(self.fc1(x)))
        x = self.dropout(x)              # (batch, 256)
        # x = self.mlp(x)                 # (batch, num_classes)
        x = self.fc2(x)                 # (batch, num_classes)
        return x

    # Kaiming weight init
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.zeros_(m.bias)


# Model Instance and tuned hyperparameters
conv_layers = [
                {"out_ch":16, "kernel_size": 7, "dropout_rate": 0.3},
        {"out_ch":32, "kernel_size": 7, "dropout_rate": 0.3},
        {"out_ch":64, "kernel_size": 7, "dropout_rate": 0.3},
        {"out_ch":128, "kernel_size": 7, "dropout_rate": 0.3}
                
            ]
model = ImageCNN(conv_layers=conv_layers, mlp_layers={"out_ch":256, "dropout_rate":0.5}) 

# Training

The cell below contains the main train function that can be run with different batch sizes, learning rates and epochs.

It needs a matrix (np array) x_train with the feature data and a label vector y_train (np array) to train correctly

In [None]:
# data augmentations
def get_augmented_transforms():
     return transforms.Compose([
         transforms.ToPILImage(),             # expects CxHxW or HxW
         transforms.RandomHorizontalFlip(),
         transforms.RandomRotation(degrees=15),
         transforms.RandomResizedCrop(size=100, scale=(0.8, 1.0)),
         transforms.ToTensor(),               # gets 1x100x100 for 1 channel
     ])


In [None]:

def train(model, x_train, y_train , epochs=25, lr=1e-3, batch_size = 64, device=None):

    X = x_train.reshape(-1, 100, 100)  # reshape to (N, 100, 100)
    X = X / 255.0  # normalize if needed
    X = X[:, np.newaxis, :, :]  # add channel dim: (N, 1, 100, 100)
    #  to tensors
    X_train_tensor = torch.from_numpy(X)
    y_train_tensor = torch.from_numpy(y_train)

    augment = get_augmented_transforms()
    augmented_imgs = torch.stack([augment(img) for img in X_train_tensor])


    X_train_tensor = torch.cat([X_train_tensor, augmented_imgs], dim=0)

    y_train_tensor = torch.cat([y_train_tensor, y_train_tensor], dim=0)

    train_ds = TensorDataset(X_train_tensor, y_train_tensor)

    # create loader
    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
    device = device or ('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    scheduler = StepLR(optimizer, step_size=5, gamma=0.5)  # lr ← lr * 0.1 every 10 epochs

    history = {
        'train_loss': [],
        'train_acc': []
    }

    for epoch in range(1, epochs+1):
        # training
        model.train()
        train_loss, correct, total = 0.0, 0, 0
        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()
            train_loss += loss.item() * X_batch.size(0)
            preds = outputs.argmax(dim=1)
            correct += (preds == y_batch).sum().item()
            total += y_batch.size(0)
        avg_train_loss = train_loss / total
        train_acc = correct / total * 100

        # for keeping track
        history['train_loss'].append(avg_train_loss)
        history['train_acc'].append(train_acc)

        scheduler.step()  # update learning rate
        print(f"Epoch {epoch}/{epochs} — "
              f"Train Loss: {avg_train_loss:.4f}, Training Accuracy: {train_acc:.2f}% | "
              f" — Current Learning Rate: {scheduler.get_last_lr()[0]:.6f}")
    return history


# Testing

The test function below requires a trained model, a matrix (np array) x_test and a label vector y_test (np array)

It outputs an array predictions and a test accuracy

In [None]:
def test(model, x_test, y_test):
    '''
    Takes in two numpy arrays for features: x_test
    and for labels: y_test
    and outputs a vector of predictions,
    and accuracy
    '''
    X = x_test
    X = X.reshape(-1, 100, 100)
 
    X /= 255.0

    X = X[:, np.newaxis, :, :]  # add channel dim: (N, 1, 100, 100)
    #  to tensors
    X_tensor = torch.from_numpy(X)

    device = 'cuda' if torch.cuda.is_available() else 'cpu'

    model.eval()

    preds = model(X_tensor.to(device=device))

    preds =  preds.argmax(dim=1)

    accuracy = np.sum(preds.cpu().numpy()==y_test)/len(y_test)
    print("Accuracy is: ", accuracy)
    return preds, accuracy



# Sample Test

This is a sample test, to show how we train and test

In [None]:
conv_layers = [
                {"out_ch":16, "kernel_size": 7, "dropout_rate": 0.3},
        {"out_ch":32, "kernel_size": 7, "dropout_rate": 0.3},
        {"out_ch":64, "kernel_size": 7, "dropout_rate": 0.3},
        {"out_ch":128, "kernel_size": 7, "dropout_rate": 0.3}
                
            ]
model = ImageCNN(conv_layers=conv_layers, mlp_layers={"out_ch":256, "dropout_rate":0.5})

feats_csv = 'x_train_project.csv'   
labels_csv = 't_train_project.csv'

x_train = pd.read_csv(feats_csv, header=None).values.astype(np.float32)
y_train = pd.read_csv(labels_csv, header=None).values.squeeze().astype(np.int64)

# train
history = train(model, x_train, y_train, epochs=25, lr=1e-2)
# expect high accuracy as testing on same data
predictions, accuracy = test(model, x_train,y_train)