In [None]:
pip install wandb

In [None]:
!wandb login

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Subset, ConcatDataset
from sklearn.model_selection import train_test_split
import wandb
from tqdm import tqdm
import os
import random
import numpy as np
import matplotlib.pyplot as plt

In [None]:
# Trying to set device as GPU if available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
torch.manual_seed(42)

In [None]:
# Sweep Configurations that will be used with Bayesian Optimization
sweep_config = {
    'method': 'bayes',
    'metric': {'name': 'val_accuracy', 'goal': 'maximize'},
    'parameters': {
        'filters': {
            'values': [
                [32, 32, 32, 32, 32],
                [32, 64, 128, 256, 512],
                [32, 64, 128, 64, 32],
                [64, 128, 256, 128, 64],
                [64, 128, 256, 128, 8],
            ]
        },
        'activation': {'values': ['ReLU', 'GELU', 'SiLU', 'Mish']},
        'dropout': {'values': [0.2, 0.3]},
        'batch_norm': {'values': [True, False]},
        'data_aug': {'values': [True, False]},
        'batch_size': {'values': [32, 64]},
        'lr': {'values': [1e-3, 1e-4]},
        'epochs': {'value': 10}
    }
}

In [None]:
# Data Loading and Transforming
def load_data(data_aug, batch_size):
    base_transform = [
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))
    ]
    if data_aug:
        transform = transforms.Compose([
            transforms.RandomHorizontalFlip(),
            transforms.RandomRotation(15),
            *base_transform
        ])
    else:
        transform = transforms.Compose(base_transform)

    dataset = datasets.ImageFolder("/kaggle/input/inaturalist/inaturalist_12K/train", transform=transform)
    targets = [sample[1] for sample in dataset.samples]
    #Spliting train data into train and validation
    train_idx, val_idx = train_test_split(range(len(dataset)), stratify=targets, test_size=0.2, random_state=42)

    train_loader = DataLoader(Subset(dataset, train_idx), batch_size=batch_size, shuffle=True, num_workers=2)
    val_loader = DataLoader(Subset(dataset, val_idx), batch_size=batch_size, shuffle=False, num_workers=2)
    return train_loader, val_loader

In [None]:
# CNN Model Definition
class CNN(nn.Module):
    def __init__(self, filters, activation_fn, batch_norm, dropout):
        super(CNN, self).__init__()
        layers = []
        in_channels = 3 #RGB channel
        for out_channels in filters:
            layers.append(nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1))
            if batch_norm:
                layers.append(nn.BatchNorm2d(out_channels))
            layers.append(activation_fn())
            layers.append(nn.MaxPool2d(2))
            in_channels = out_channels
        #convolution layer adding
        self.conv = nn.Sequential(*layers)
        #Fully connected layer initialization (dense)
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(filters[-1] * (224 // (2 ** 5)) ** 2, 256),
            nn.Dropout(dropout),
            nn.ReLU(),
            nn.Linear(256, 10)
        )
    #forward pass 
    def forward(self, x):
        return self.fc(self.conv(x))

In [None]:
# Training & Evaluation with W&B Logging
def train_eval(config=None):
    with wandb.init(config=config) as run:
        config = wandb.config
        #assigning unique name to identify different runs
        run.name = f"ac-{config.activation}_filters-{'-'.join(map(str, config.filters))}_drop-{config.dropout}"

        model = CNN(config.filters, activation_map[config.activation], config.batch_norm, config.dropout).to(device)
        train_loader, val_loader = load_data(config.data_aug, config.batch_size)
        optimizer = optim.Adam(model.parameters(), lr=config.lr)
        criterion = nn.CrossEntropyLoss()

        for epoch in range(config.epochs):
            model.train()
            total_loss, correct, total = 0, 0, 0
            for images, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}"):
                images, labels = images.to(device), labels.to(device)
                optimizer.zero_grad()
                outputs = model(images)
                #calculating loss
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()
                total_loss += loss.item()
                preds = outputs.argmax(1)
                correct += (preds == labels).sum().item()
                total += labels.size(0)
            #accuracies of train and validation
            train_acc = correct / total
            model.eval()
            val_loss, val_correct, val_total = 0, 0, 0
            with torch.no_grad():
                for images, labels in val_loader:
                    images, labels = images.to(device), labels.to(device)
                    outputs = model(images)
                    val_loss += criterion(outputs, labels).item()
                    preds = outputs.argmax(1)
                    val_correct += (preds == labels).sum().item()
                    val_total += labels.size(0)

            val_acc = val_correct / val_total

            #logging the calculated loss and accuracies to wandb
            wandb.log({
                "epoch": epoch + 1,
                "train_loss": total_loss / len(train_loader),
                "train_accuracy": train_acc * 100,
                "val_loss": val_loss / len(val_loader),
                "val_accuracy": val_acc * 100
            })

In [None]:
activation_map = {
    "ReLU": nn.ReLU,
    "GELU": nn.GELU,
    "SiLU": nn.SiLU,
    "Mish": nn.Mish
}

In [None]:
#Running 100 sweeps on wandb
sweep_id = wandb.sweep(sweep_config, project="Assignment_02A")
wandb.agent(sweep_id, function=train_eval, count=100)

In [None]:
# Best Configuration according to training and validation data 
best_config = {
    "filters": [64, 128, 256, 128, 64],
    "activation": "SiLU",
    "dropout": 0.3,
    "batch_norm": True,
    "data_aug": False,
    "batch_size": 32,
    "lr": 1e-4,
    "epochs": 10
}

In [None]:
# Loading Train and Test data
def load_final_data(batch_size, data_aug):
    base_transform = [
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))
    ]
    if data_aug:
        transform = transforms.Compose([
            transforms.RandomHorizontalFlip(),
            transforms.RandomRotation(15),
            *base_transform
        ])
    else:
        transform = transforms.Compose(base_transform)

    dataset = datasets.ImageFolder("/kaggle/input/inaturalist/inaturalist_12K/train", transform=transform)
    train_idx, val_idx = train_test_split(range(len(dataset)), stratify=[s[1] for s in dataset.samples], test_size=0.2, random_state=42)
    merged_dataset = ConcatDataset([Subset(dataset, train_idx), Subset(dataset, val_idx)])
    train_loader = DataLoader(merged_dataset, batch_size=batch_size, shuffle=True, num_workers=2)

    test_dataset = datasets.ImageFolder("/kaggle/input/inaturalist/inaturalist_12K/val", transform=transforms.Compose(base_transform))
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=2)
    return train_loader, test_loader, test_dataset.classes


In [None]:
# Training using Best Configurations
def train_final_model():
    train_loader, test_loader, class_names = load_final_data(best_config['batch_size'], best_config['data_aug'])
    model = CNN(best_config['filters'], activation_map[best_config['activation']], 
                best_config['batch_norm'], best_config['dropout']).to(device)
    optimizer = optim.Adam(model.parameters(), lr=best_config['lr'])
    criterion = nn.CrossEntropyLoss()

    for epoch in range(best_config['epochs']):
        model.train()
        total_loss, correct, total = 0, 0, 0
        for images, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}"):
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            preds = outputs.argmax(1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

        print(f"Epoch {epoch+1}: Train Acc = {correct / total:.4f} | Loss = {total_loss / len(train_loader):.4f}")
    
    return model, test_loader, class_names


In [None]:
# Final Test Accuracy
def compute_test_accuracy(model, test_loader):
    model.eval()
    correct, total = 0, 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            preds = outputs.argmax(1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
    acc = correct / total * 100
    print(f"\nFinal Test Accuracy: {acc:.2f}%")
    wandb.log({"Test Accuracy (%)": acc})


In [None]:
# Prediction Grid Visualization(True and Predicted)
def show_test_predictions(model, test_loader, class_names):
    model.eval()
    all_images, all_labels, all_preds = [], [], []

    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            preds = outputs.argmax(1)
            all_images.extend(images.cpu())
            all_labels.extend(labels.cpu())
            all_preds.extend(preds.cpu())

    idxs = random.sample(range(len(all_images)), 30)
    fig, axes = plt.subplots(10, 3, figsize=(12, 40), dpi=150)
    for i, idx in enumerate(idxs):
        row, col = i // 3, i % 3
        img = all_images[idx].permute(1, 2, 0) * 0.5 + 0.5
        axes[row, col].imshow(img)
        axes[row, col].axis("off")
        true_label = class_names[all_labels[idx]]
        pred_label = class_names[all_preds[idx]]
        axes[row, col].set_title(f"T: {true_label}\nP: {pred_label}",
                                 fontsize=12,
                                 color='green' if true_label == pred_label else 'red')

    plt.tight_layout()
    plt.savefig("test_predictions_grid.png", bbox_inches='tight')
    wandb.log({"Prediction Grid": wandb.Image("test_predictions_grid.png")})
    plt.show()



In [None]:
#Run sweep for test data on wandb and log accuracies
wandb.init(project="Assignment_02A", name="test-image")
final_model, test_loader, class_names = train_final_model()
compute_test_accuracy(final_model, test_loader)
show_test_predictions(final_model, test_loader, class_names)
wandb.finish()