In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import numpy as np
import matplotlib.pyplot as plt

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cpu


In [2]:
train_transform = transforms.Compose([
    transforms.RandomRotation(15),
    transforms.RandomAffine(degrees=0, scale=(0.8, 1.2)),
    transforms.ToTensor(),
    transforms.Normalize((0.1307,),(0.3081,)),
    transforms.Lambda(lambda x: x.view(-1))       # To flatten the tensors
])

val_test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,),(0.3081,)),
    transforms.Lambda(lambda x: x.view(-1))       # To flatten the tensors
])

# Loading the complete train MNIST dataset.
full_train_dataset = torchvision.datasets.MNIST(
    root="data",
    train=True,
    download=True,
    transform=None
)


# Splitting the train dataset into validation.

train_size = int(0.9*len(full_train_dataset))
val_size = len(full_train_dataset) - train_size

train_subset, val_subset = torch.utils.data.random_split(
    full_train_dataset, [train_size, val_size],
    generator = torch.Generator().manual_seed(42)
)


# Creating a class to apply transforms separatly to subsets

class ApplyTransform(torch.utils.data.Dataset):
  def __init__(self, subset, transform=None):
    self.subset = subset
    self.transform=transform

  def __getitem__(self, index):
    x, y = self.subset[index]
    if self.transform:
      x = self.transform(x)
    return x, y

  def __len__(self):
    return len(self.subset)

train_dataset = ApplyTransform(train_subset, train_transform)
val_dataset = ApplyTransform(val_subset, val_test_transform)

test_dataset = torchvision.datasets.MNIST(
    root="data",
    train=False,
    download=True,
    transform=val_test_transform
)


# Creating the DataLoaders

batch_size = 128
train_loader = DataLoader(train_dataset, batch_size = batch_size, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size = batch_size, shuffle=True, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size = batch_size, shuffle=True, num_workers=4)

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Failed to download (trying next):
<urlopen error [Errno 111] Connection refused>

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz to data/MNIST/raw/train-images-idx3-ubyte.gz


100%|██████████| 9.91M/9.91M [00:00<00:00, 58.8MB/s]


Extracting data/MNIST/raw/train-images-idx3-ubyte.gz to data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Failed to download (trying next):
<urlopen error [Errno 111] Connection refused>

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz to data/MNIST/raw/train-labels-idx1-ubyte.gz


100%|██████████| 28.9k/28.9k [00:00<00:00, 2.14MB/s]


Extracting data/MNIST/raw/train-labels-idx1-ubyte.gz to data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Failed to download (trying next):
<urlopen error [Errno 111] Connection refused>

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz to data/MNIST/raw/t10k-images-idx3-ubyte.gz


100%|██████████| 1.65M/1.65M [00:00<00:00, 13.0MB/s]


Extracting data/MNIST/raw/t10k-images-idx3-ubyte.gz to data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Failed to download (trying next):
<urlopen error [Errno 111] Connection refused>

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz to data/MNIST/raw/t10k-labels-idx1-ubyte.gz


100%|██████████| 4.54k/4.54k [00:00<00:00, 2.36MB/s]

Extracting data/MNIST/raw/t10k-labels-idx1-ubyte.gz to data/MNIST/raw






In [3]:
class MLP(nn.Module):
  def __init__(self, input_size, hidden_sizes, output_size, dropout_prob=0.5):
    super(MLP, self).__init__()
    layers = []
    prev_size = input_size
    for size in hidden_sizes:
      layers.extend([
          nn.Linear(prev_size, size),
          nn.BatchNorm1d(size),
          nn.ReLU(),
          nn.Dropout(dropout_prob)
      ])
      prev_size = size
    layers.append(nn.Linear(prev_size, output_size))
    self.net = nn.Sequential(*layers)       # Now can run all layers in one line of code as in forward method.

  def forward(self, x):
    return self.net(x)


In [4]:
# Initializing the model

input_size = 784
hidden_sizes = [256, 128]
output_size = 10
dropout_prob = 0.3
lr = 0.001


''' model.to(device) moves the model to the "device". Its necessary that both the
 data and the model are in the same device otherwise we'll get a runtime error'''


model = MLP(input_size, hidden_sizes, output_size, dropout_prob).to(device)

In [5]:

''' Kaiming/He distribution function: Fills the tensor with the distribution U(-bound, bound) where
    bound = gain*sqrt(3/fan_mode)'''


def init_weights(m):
  if isinstance(m, nn.Linear):
    nn.init.kaiming_normal_(m.weight, nonlinearity='relu')
    nn.init.constant_(m.bias, 0.0)


''' The "apply" method in nn.Module class.Apply ``fn`` recursively to every submodule (as returned by ``.children()``) as well as self.'''

model.apply(init_weights)

MLP(
  (net): Sequential(
    (0): Linear(in_features=784, out_features=256, bias=True)
    (1): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Dropout(p=0.3, inplace=False)
    (4): Linear(in_features=256, out_features=128, bias=True)
    (5): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): ReLU()
    (7): Dropout(p=0.3, inplace=False)
    (8): Linear(in_features=128, out_features=10, bias=True)
  )
)

In [None]:
def train_model(model, criterion, optimizer, num_epochs=20):
  best_val_acc = 0.0
  history = {
      'train_loss' : [],
      'train_acc' : [],
      'val_loss' : [],
      'val_acc' : []
  }

  for epoch in range(num_epochs):
    # Training Phase

    # Sets the self.training = True and moves the model into training mode.
    model.train()
    running_loss = 0.0
    correct, total = 0, 0

    for inputs, labels in train_loader:
      inputs, labels = inputs.to(device), labels.to(device)     # Move the data into the "device(GPU)"
      optimizer.zero_grad()               # Initializes the gradients with zero values. Otherwise gradients will add up after few epoches.
      outputs = model(inputs)             # Callable through __call__ method in nn.Module class.
      loss = criterion(outputs, labels)
      loss.backward()                      # Calculates gradient d(loss)/dx for every parameter x.
      optimizer.step()                     # Update step

      running_loss+= loss.item()          # Tensor to float
      _, predicted = outputs.max(1)
      total += labels.size(0)
      correct += predicted.eq(labels).sum().item()

    train_loss = running_loss/len(train_loader)
    train_acc = correct/total
    history['train_loss'].append(train_loss)
    history['train_acc'].append(train_acc)


    #Validation Phase
    model.eval()        # Puts the model in evaluation mode
    val_loss = 0.0
    val_correct, val_total = 0, 0

    with torch.no_grad():
      for inputs, labels in val_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        val_loss += loss.item()
        _, predicted = outputs.max(1)
        val_total += labels.size(0)
        val_correct += predicted.eq(labels).sum().item()

    val_loss /= len(val_loader)
    val_acc = val_correct/val_total
    history['val_loss'].append(val_loss)
    history['val_acc'].append(val_acc)

    # Saving the best model
    if val_acc > best_val_acc:
      best_val_acc = val_acc
      torch.save(model.state_dict(), 'best_model.pth')

    print(f'Epoch {epoch+1}/{num_epochs}: '
              f'Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} '
              f'Train Acc: {train_acc:.4f} | Val Acc: {val_acc:.4f}')

  # Load best model for final evaluation
  model.load_state_dict(torch.load('best_model.pth'))
  return model, history



# Initialize and train model
model = MLP(input_size, hidden_sizes, output_size, dropout_prob).to(device)
model.apply(init_weights)
optimizer = optim.Adam(model.parameters(), lr)
criterion = nn.CrossEntropyLoss()

trained_model, history = train_model(model, criterion, optimizer, num_epochs=20)



Epoch 1/20: Train Loss: 0.5681 | Val Loss: 0.1755 Train Acc: 0.8215 | Val Acc: 0.9480
Epoch 2/20: Train Loss: 0.3107 | Val Loss: 0.1266 Train Acc: 0.9030 | Val Acc: 0.9637
Epoch 3/20: Train Loss: 0.2552 | Val Loss: 0.1015 Train Acc: 0.9217 | Val Acc: 0.9695
Epoch 4/20: Train Loss: 0.2241 | Val Loss: 0.0901 Train Acc: 0.9300 | Val Acc: 0.9718
Epoch 5/20: Train Loss: 0.2034 | Val Loss: 0.0821 Train Acc: 0.9374 | Val Acc: 0.9750
Epoch 6/20: Train Loss: 0.1896 | Val Loss: 0.0763 Train Acc: 0.9409 | Val Acc: 0.9765
Epoch 7/20: Train Loss: 0.1757 | Val Loss: 0.0718 Train Acc: 0.9445 | Val Acc: 0.9785
Epoch 8/20: Train Loss: 0.1646 | Val Loss: 0.0686 Train Acc: 0.9495 | Val Acc: 0.9800
Epoch 9/20: Train Loss: 0.1586 | Val Loss: 0.0641 Train Acc: 0.9497 | Val Acc: 0.9805
Epoch 10/20: Train Loss: 0.1509 | Val Loss: 0.0618 Train Acc: 0.9538 | Val Acc: 0.9827
Epoch 11/20: Train Loss: 0.1468 | Val Loss: 0.0587 Train Acc: 0.9548 | Val Acc: 0.9820
Epoch 12/20: Train Loss: 0.1482 | Val Loss: 0.0567 T

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

def plot_training_history(history):
    plt.figure(figsize=(12, 5))

    # Loss plot
    plt.subplot(1, 2, 1)
    plt.plot(history['train_loss'], label='Train Loss')
    plt.plot(history['val_loss'], label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training vs Validation Loss')
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.5)

    # Accuracy plot
    plt.subplot(1, 2, 2)
    plt.plot(history['train_acc'], label='Train Accuracy')
    plt.plot(history['val_acc'], label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.title('Training vs Validation Accuracy')
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.5)

    plt.tight_layout()
    plt.show()

# Plot the training history after training
plot_training_history(history)

In [None]:
def tune_hyperparams(param_grid):
    best_params = {}
    best_acc = 0

    for lr in param_grid['lr']:
        for dropout in param_grid['dropout']:
            print(f'Trying lr={lr}, dropout={dropout}')
            model = MLP(input_size, hidden_sizes, output_size, dropout).to(device)
            model.apply(init_weights)
            optimizer = optim.Adam(model.parameters(), lr=lr)
            criterion = nn.CrossEntropyLoss()

            # Train for reduced epochs for quick evaluation
            model, _ = train_model(model, criterion, optimizer, num_epochs=5)

            # Evaluate on validation set
            val_acc = history['val_acc'][-1]
            if val_acc > best_acc:
                best_acc = val_acc
                best_params = {'lr': lr, 'dropout': dropout}

    print(f'Best Params: {best_params} | Best Val Acc: {best_acc:.4f}')
    return best_params

# Example usage
param_grid = {
    'lr': [0.001, 0.0005, 0.0001],
    'dropout': [0.3, 0.5, 0.7]
}

best_params = tune_hyperparams(param_grid)

In [None]:
# 1. Combine original training and validation sets
full_train_dataset = torch.utils.data.ConcatDataset([train_dataset, val_dataset])
full_train_loader = DataLoader(full_train_dataset, batch_size=batch_size, shuffle=True)

# 2. Initialize final model with best parameters
final_model = MLP(
    input_size=input_size,
    hidden_sizes=hidden_sizes,
    output_size=output_size,
    dropout_prob=best_params['dropout']
).to(device)
final_model.apply(init_weights)

# 3. Use optimized learning rate
optimizer = optim.Adam(final_model.parameters(), lr=best_params['lr'])

# 4. Train on full dataset (original train + val)
final_model, final_history = train_model(
    final_model,
    criterion,
    optimizer,
    num_epochs=20  # Use original epoch count
)

In [None]:
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay
import matplotlib.pyplot as plt

def final_evaluation(model, test_loader):
    model.eval()
    all_preds = []
    all_labels = []
    test_loss = 0.0

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)

            # Calculate loss
            loss = criterion(outputs, labels)
            test_loss += loss.item()

            # Get predictions
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # Calculate metrics
    test_loss /= len(test_loader)
    test_acc = accuracy_score(all_labels, all_preds)

    print(f"\nFinal Test Results:")
    print(f"Loss: {test_loss:.4f}")
    print(f"Accuracy: {test_acc:.4f}")
    print(classification_report(all_labels, all_preds, digits=4))

    # Confusion Matrix
    cm = confusion_matrix(all_labels, all_preds)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm)
    disp.plot()
    plt.show()

    return test_loss, test_acc

# Run evaluation
test_loss, test_acc = final_evaluation(final_model, test_loader)