In [None]:
import os
import json
import torch
import random
import numpy as np
import pandas as pd
import torch.nn as nn
from tqdm import tqdm
from datetime import datetime
from torchvision.datasets import ImageFolder
from torchvision import transforms
from torch.utils.data import Dataset,DataLoader
from torch.utils.tensorboard import SummaryWriter #type: ignore

<h2> Setting seed </h2>

In [None]:
def set_seed(seed=42):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    random.seed(seed)
    np.random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed()
#This ensures that the model is deterministic

<h2> Image Transformations </h2>

In [None]:
#imagenet stats

mean=[0.485, 0.456, 0.406]
std=[0.229, 0.224, 0.225]

train_transforms = transforms.Compose([
    transforms.Resize((256,256)),
    transforms.RandomCrop(224),
    transforms.RandomHorizontalFlip(p=0.5),
    # transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4),
    # transforms.RandomRotation(degrees=15),
    transforms.ToTensor(),
    transforms.Normalize(mean,std),
])

#validation and test transformation will be same
val_transforms =transforms.Compose([
    transforms.Resize((256,256)),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean,std)
])

<h2> Dataset Creation </h2>

In [None]:
TRAIN_DIR = "../../datasets/tiny-imagenet-200/train"
VAL_DIR = "../../datasets/tiny-imagenet-200/val"
'''
    Here, the training and validation sets are taken from the original Tiny imagenet dataset. But since the 
    test labels are not available for original tiny imagenet, the model is evaluated tiny imagenet cleaned
    version from huggingface. This allows us to train the model using the full original tiny image net data, 
    while also ensuring the model is tested.
'''
train_dataset = ImageFolder(root=TRAIN_DIR, transform=train_transforms)
val_dataset = ImageFolder(root=VAL_DIR, transform=val_transforms)
# test_dataset = ImageFolder(root=TEST_DIR, transform=val_transforms)
batch_size = 64


In [None]:
'''Tried different number of workers, above 4 workers there isn't much improvement in the data transfer. The 
cost overhead of setting up parallel connections is greater when more than 4 workers are used.
'''
train_loader = DataLoader(
    dataset=train_dataset,
    shuffle=True,
    batch_size=batch_size,
    num_workers=4,
    pin_memory=True,
)

val_loader = DataLoader(
    dataset=val_dataset,
    shuffle=False,
    batch_size=batch_size,
    num_workers=4,
    pin_memory=True,
)

<h2> Dataset Testing </h2>

In [None]:
for images, labels in train_loader:
    print(f"Image shape: {images.shape}") 
    print(f"Label: {labels.shape}")
    break

#Verifying whether the dataloader is working correctly
    

<h2> Alexnet Model </h2>

In [None]:
class Alexnet(nn.Module):
    """
        The original alexnet architecture is implement in the below code. Every parameter image size, 
        layers, kernel size, stride, padding etc.., are exactly same as in the paper, Except for 
            1. No model splitting between GPUs
            2. Final fully connected layer has 200 outputs instead of 1000 outputs in original paper,
            to match the number of classes in tiny imagenet.
    """

    def __init__(self):
        super().__init__()

        self.maxpool = nn.MaxPool2d(kernel_size=3,stride=2,padding=0)
        self.relu = nn.ReLU(inplace=True)
        
        self.conv1 = nn.Conv2d(in_channels=3,out_channels=96,kernel_size=11,stride=4,padding=2)
        self.conv2 = nn.Conv2d(in_channels=96,out_channels=256,kernel_size=5,stride=1,padding=2)
        self.conv3 = nn.Conv2d(in_channels=256,out_channels=384,kernel_size=3,stride=1,padding=1)
        self.conv4 = nn.Conv2d(in_channels=384,out_channels=384,kernel_size=3,stride=1,padding=1)
        self.conv5 = nn.Conv2d(in_channels=384,out_channels=256,kernel_size=3,stride=1,padding=1)

        feature_extractor_layers = [self.conv1, self.relu, self.maxpool, self.conv2, self.relu,
                                    self.maxpool, self.conv3, self.relu, self.conv4, self.relu,
                                    self.conv5, self.relu, self.maxpool]

        self.feature_extractor = nn.Sequential(*feature_extractor_layers)

        self.dropout = nn.Dropout(p=0.5)

        self.fc1 = nn.Linear(in_features=9216, out_features=4096)
        self.fc2 = nn.Linear(in_features=4096, out_features=4096)
        self.fc3 = nn.Linear(in_features=4096, out_features=200)

        classifier_layers = [self.dropout, self.fc1, self.relu, self.dropout, self.fc2,
                             self.relu, self.fc3]
        self.classifier = nn.Sequential(*classifier_layers)


    def forward(self,images):
        out = self.feature_extractor(images)
        out = torch.flatten(out, 1)
        out = self.classifier(out)
        return out



<h2> Model Initializaton </h2>

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_of_epochs = 50
learning_rate = 0.001


model = Alexnet().to(device=device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate) #type: ignore
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=3,factor=0.1)
#The learning rate scheduler decrease learning rate by factor of 10 if validation accuracy
#doesn't improve for 3 epochs
checkpoint_path = r"../checkpoints/current_model.pth" #stores the current model
best_model_path = r"../checkpoints/best_model.pth" #stores the best model upto date

<h2> Training Loop </h2>

<h3>&nbsp;&nbsp;&nbsp;&nbsp; 1. Displaying the train/val report for one epoch </h3>

In [None]:
#Average training loss and accuracy for each epoch is printed for display
def display_one_epoch_summary(epoch,num_of_epochs, train_loss,train_accuracy, val_loss, val_accuracy):
    print(f"\nEpoch [{epoch}/{num_of_epochs}] Summary:")
    print(f"  Train Loss: {train_loss:.4f} | Train Acc: {train_accuracy:.4f}")
    print(f"  Val   Loss: {val_loss:.4f} | Val   Acc: {val_accuracy:.4f}\n")

<h3>&nbsp;&nbsp;&nbsp;&nbsp; 2. Checkpoint Saving </h3>

In [None]:

def save_checkpoint(epoch, model, optimizer, scheduler, history, best_val_loss, patience_counter, checkpoint_path):
    """ 
        The model is stored at regular intervals to resume training if interrupted. Two different
        models are stored:
            1. Current/Latest version of model
            2. The model is lowest validation loss
    """
    model.eval() 
    #Model must not update the weights while saving to avoid that we use eval mode
    torch.save({
        'epoch': epoch, 
        'model_state_dict': model.state_dict(), #stores model's learnt weights, biases
        'optimizer_state_dict': optimizer.state_dict(), #stores momentum, weight decay info
        'scheduler_state_dict': scheduler.state_dict(), #stores learning rate history
        'history':history,  #the train/val loss and accuracy history is stored
        'best_val_loss':best_val_loss, #the best validation loss is stored to be used for early stopping
        'patience_counter':patience_counter #patience counter for early stopping is needed
    }, checkpoint_path)

def load_checkpoint(device, model, optimizer, scheduler, checkpoint_path):
    """ 
        Loads the saved model. try-except block is added 
    """
    try:
        print("🔁 Resuming from checkpoint...")
        checkpoint = torch.load(checkpoint_path, map_location=device)

        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        scheduler.load_state_dict(checkpoint['scheduler_state_dict'])

        start_epoch = checkpoint['epoch'] + 1
        history = checkpoint['history']
        patience_counter = checkpoint['patience_counter']
        best_val_loss = checkpoint['best_val_loss']

        print(f"✅ Loaded checkpoint from epoch {checkpoint['epoch']} with val loss {best_val_loss:.4f}")
        return start_epoch,history,patience_counter,best_val_loss
    
    except (RuntimeError, EOFError) as e:
        print(f"❌ Error loading checkpoint: {e}")
        print("Starting training from scratch.")
        return 1, {"train_loss": [], "val_loss": [], "train_acc": [], "val_acc": []}, 0, float('inf')


<h3> &nbsp;&nbsp;&nbsp;&nbsp; 3. Tensorboard visualization</h3>

In [None]:
"""
    tensorboard setup is added to visualize training/validation loss and accuracy curve, learning rate curve
    from learning rate schedule and gradient histograms from selective layers (conv1, conv5, fc1, fc3) are
    also visualized to observe vanishing and exploding gradients.
"""

def setup_tensorboard(log_dir="../logs/experiment_1"):
    if not os.path.exists(log_dir):
        os.makedirs(log_dir)
    writer = SummaryWriter(log_dir=log_dir) #Writer to write events in tensor board
    return writer

def log_train_val_curve(writer, epoch, train_loss, train_accuracy, val_loss, val_accuracy):
    writer.add_scalar("Loss/Train", train_loss, epoch)
    writer.add_scalar("Loss/Val", val_loss, epoch)
    writer.add_scalar("Accuracy/Train", train_accuracy, epoch)
    writer.add_scalar("Accuracy/Val", val_accuracy, epoch)

def log_learning_rate(writer, epoch, optimizer): #learning rate of optimizer is logged.
    for i, param_group in enumerate(optimizer.param_groups):
        writer.add_scalar(f"LR/group_{i}", param_group['lr'], epoch)

def log_selected_gradients(model, writer, epoch, layer_keywords=("conv1", "conv5", "fc1", "fc3")):
    """
    Logs gradient histograms and gradient norms of selected layers to TensorBoard.
    Helps diagnose vanishing or exploding gradients during training.

    Args:
        model (nn.Module): The model being trained.
        writer (SummaryWriter): TensorBoard writer.
        epoch (int): Current epoch number.
        layer_keywords (tuple): Substrings to match parameter names (e.g., layer names).
    """
    for name, param in model.named_parameters():
        if param.grad is not None and any(key in name for key in layer_keywords):
            try:
                grad = param.grad.detach().view(-1)
                grad_norm = grad.norm().item()

                # Log histogram of gradients
                writer.add_histogram(f"Gradients/{name}", grad, epoch)
                # Log L2 norm of gradients as scalar
                writer.add_scalar(f"GradientsNorm/{name}", grad_norm, epoch)

            except Exception as e:
                print(f"[Warning] Failed to log gradient for {name}: {e}")


<h3> &nbsp;&nbsp;&nbsp;&nbsp; 4. Training one epoch</h3>

In [None]:
#standard training loop for a single epoch is given
def train_one_epoch(device, epoch, num_of_epochs, train_loader, model, criterion, optimizer, clip_value):
    train_loss = 0
    train_correct = 0 #needed for accuracy calculation
    train_total = 0 #total number of images given as input 
    
    model.train() #The model is run in training mode
    for batch_idx, (images, labels) in tqdm(enumerate(train_loader),
                                                total=len(train_loader), desc=f"Epoch {epoch} [Train]"):
        #move to GPU
        images = images.to(device)
        labels = labels.to(device)

        #1. Make predictions
        predictions = model(images)
        #2. Compute loss
        loss = criterion(predictions, labels)
        #3. clear previous gradients if any
        optimizer.zero_grad()
        #4. Compute gradients from loss using backpropagation
        loss.backward()
        #5. Clip gradient if they exceed clip value threshold -> avoids exploding gradient
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip_value)
        #6. Update weights/biases based on gradients
        optimizer.step()

        #update loss and predictions to calculate loss and accuracy
        train_loss += loss.item()
        train_correct += (predictions.argmax(1) == labels).sum().item()
        train_total += labels.size(0)
        
        #print progress
        if (batch_idx ) % 10 == 0:
            tqdm.write(f"[Train] Epoch [{epoch}/{num_of_epochs}], Step [{batch_idx+1}/{len(train_loader)}], Loss: {loss.item():.4f}")
        #tqdm is used to get a nice progress bar
    
    #average loss in an epoch     
    avg_train_loss = train_loss / len(train_loader)
    train_accuracy = train_correct / train_total

    return avg_train_loss, train_accuracy


<h3> &nbsp;&nbsp;&nbsp;&nbsp; 5. Validation for one epoch</h3>

In [None]:
#used to evaluating (validation) for one epoch

def evaluate_one_epoch(device, epoch, num_of_epochs, val_loader, model, criterion):
    model.eval()
    val_loss = 0
    val_correct = 0
    val_total = 0

    with torch.no_grad(): #makes sure gradient doesn't change during validation
        for batch_idx, (images, labels) in tqdm(enumerate(val_loader),
                                                    total=len(val_loader), desc=f"Epoch {epoch} [VAL]"):
            images = images.to(device)
            labels = labels.to(device)

            predictions = model(images)
            loss = criterion(predictions, labels)

            val_loss += loss.item()
            val_correct += (predictions.argmax(1) == labels).sum().item()
            val_total += labels.size(0)

            if (batch_idx ) % 10 == 0:
                tqdm.write(f"[Val] Epoch [{epoch}/{num_of_epochs}], Step [{batch_idx+1}/{len(val_loader)}], Loss: {loss.item():.4f}")

    #average loss in an epoch     
    avg_val_loss = val_loss / len(val_loader)
    val_accuracy = val_correct / val_total

    return avg_val_loss, val_accuracy

<h3> &nbsp;&nbsp;&nbsp;&nbsp; 6. Main Training Loop</h3>

In [None]:
def train_model(model, train_loader,val_loader, criterion, optimizer,scheduler,num_of_epochs,
                 device, clip_value=10,checkpoint_path=r"../checkpoints/current_model.pth",
                  best_model_path=r"../checkpoints/best_model.pth", resume=False ):
    
    start_epoch = 1 #It is needed if model is resumed after training for some time. (then it will not be 1)
    best_val_loss = float('inf') #used for early stopping

    #stores train/val loss and accuracy for plotting in tensorboard
    history = {
        "train_loss": [], "val_loss": [],
        "train_acc": [], "val_acc": []
    }

    #parameters for early stopping
    early_stop_patience = 8
    patience_counter = 0

    writer = setup_tensorboard()
# Resume from checkpoint if available
    if resume and os.path.exists(checkpoint_path):
        start_epoch,history, patience_counter,best_val_loss = load_checkpoint(device, model, optimizer, scheduler, checkpoint_path)

    for epoch in range(start_epoch, num_of_epochs+1):

        #train and validate data 
        avg_train_loss, train_accuracy = train_one_epoch(device, epoch, num_of_epochs, train_loader, model, criterion, optimizer, clip_value)
        avg_val_loss, val_accuracy = evaluate_one_epoch(device, epoch, num_of_epochs, val_loader, model, criterion)
        
        #update LR using LR scheduler using validation loss
        scheduler.step(avg_val_loss)

        #stores train/val loss and accuracy for plotting and logging 
        history["train_loss"].append(avg_train_loss)
        history["val_loss"].append(avg_val_loss)
        history["train_acc"].append(train_accuracy)
        history["val_acc"].append(val_accuracy)

        #Saves the current model
        save_checkpoint(epoch, model, optimizer, scheduler, history, best_val_loss, patience_counter, checkpoint_path)

        #Early stopping
        if avg_val_loss < best_val_loss:
            #if val loss doesn't improve till patience counter exceeds a certain value,
            #the training is stopped
            best_val_loss = avg_val_loss
            patience_counter = 0 
            #best model is saved
            save_checkpoint(epoch, model, optimizer, scheduler, history, best_val_loss, patience_counter, best_model_path)
            print("🌟 New best model saved.")
        else:
            patience_counter += 1
            if patience_counter > early_stop_patience:
                print(f"⏹ Early stopping at epoch {epoch}. No improvement for {early_stop_patience} epochs.")
                break
        
        #tensorboard visualization for train/val loss and accuracy
        log_train_val_curve(writer, epoch, avg_train_loss, train_accuracy, avg_val_loss, val_accuracy)
        #tensorboard visualization for learning rate
        log_learning_rate(writer, epoch, optimizer)
        if epoch%10==0:
            #tensorboard visualization for gradients
            log_selected_gradients(model, writer, epoch)
        #prints epoch summary
        display_one_epoch_summary(epoch, num_of_epochs, avg_train_loss, train_accuracy,avg_val_loss, val_accuracy)

    return history


<h2> Train Model </h2>

In [None]:
history = train_model(model, train_loader,val_loader, criterion, optimizer,scheduler,num_of_epochs,
                 device, clip_value=10,checkpoint_path=checkpoint_path,best_model_path=best_model_path, resume=False)

<h2> Log hyperparameters and Final Metrics </h2>

In [None]:
def log_experiment(log_file, hyperparams, metrics):
    """
        Stores the details, hyperparametrics and final metrics of training for future 
        reference and optimization purposes. 
    """
    log_entry = {
        "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "hyperparameters": hyperparams,
        "metrics": metrics
    }

    if os.path.exists(log_file):
        with open(log_file, "r") as f:
            logs = json.load(f)
    else:
        logs = []

    logs.append(log_entry)

    with open(log_file, "w") as f:
        json.dump(logs, f, indent=4)

In [None]:
json_log_file = "../logs/experiment_log.json"
csv_log_file = "../logs/experiment_log.csv"

hyperparams = {
    "model": "AlexNet",
    "optimizer": "Adam",
    "learning_rate": learning_rate,
    "batch_size": batch_size,
    "epochs": num_of_epochs,
    "scheduler": "ReduceLROnPlateau",
    "clip_value": 10,
    "early_stop_patience": 8,
    "transform": "Resize(256)->Crop->Flip->Norm",
    "Weight Initialization": None,
    "seed":42,
    
}

final_metrics = {
    "final_train_loss": history["train_loss"][-1],
    "final_val_loss": history["val_loss"][-1],
    "final_train_acc": history["train_acc"][-1],
    "final_val_acc": history["val_acc"][-1],
    "best_val_loss": min(history["val_loss"]),
    "best_val_acc": max(history["val_acc"]),

}

log_experiment(json_log_file, hyperparams, final_metrics)

<h2> Convert JSON log file to CSV </h2>

In [None]:
def convert_json_to_csv(json_path, csv_path):
    """
        Converts json hyperparameter and metric log files into csv for
        tabular representation and use with libraries such as pandas.
    """
    with open(json_path, 'r') as f:
        logs = json.load(f)

    # Flatten entries (combine hyperparameters and metrics)
    flattened_logs = []
    for entry in logs:
        flat = {
            "timestamp": entry["timestamp"],
            **entry["hyperparameters"],
            **entry["metrics"]
        }
        flattened_logs.append(flat)

    df = pd.DataFrame(flattened_logs)
    df.to_csv(csv_path, index=False)
    print(f"✅ Log converted to CSV: {csv_path}")

convert_json_to_csv(json_log_file, csv_log_file)