In [None]:
import torch
from torchvision.datasets import ImageFolder
import torchvision.transforms as transforms
from torch.utils.data import Subset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
import wandb
import matplotlib.pyplot as plt
import numpy as np
import cv2

In [None]:
wandb.login(key="")

In [None]:
h_params =  {
    "epochs":10,
    "learning_rate":0.0001,
    "batch_size":32,
    "num_of_filter":64,
    "filter_size":[3,3,3,3,3],
    "actv_func":"gelu",
    "filter_multiplier":2,
    "data_augumentation":False,
    "batch_normalization":True,
    "dropout":0.4,
    "conv_layers":5,
    "dense_layer_size":256
}
IMAGE_SIZE = 224
NUM_OF_CLASSES = 10
h_params["image_size"] = IMAGE_SIZE
h_params["num_classes"] = NUM_OF_CLASSES

In [None]:
class DataPreparer:
    def __init__(self, h_params, image_size, train_dir, val_dir):
        self.h_params = h_params
        self.image_size = image_size
        self.train_dir = train_dir
        self.val_dir = val_dir
    def get_train_transform(self):
        size = (self.image_size, self.image_size)
        if self.h_params["data_augumentation"]:
            return transforms.Compose([
                transforms.Resize(size),
                transforms.RandomHorizontalFlip(),
                transforms.RandomVerticalFlip(),
                transforms.RandomRotation(10),
                transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
                transforms.GaussianBlur(kernel_size=3),
                transforms.ToTensor()
            ])
        else:
            return transforms.Compose([
                transforms.Resize(size),
                transforms.ToTensor()
            ])
    def get_test_transform(self):
        size = (self.image_size, self.image_size)
        return transforms.Compose([
            transforms.Resize(size),
            transforms.ToTensor()
        ])
    def stratified_split(self, dataset, ratio):
        train_idx, val_idx = [], []
        class_bounds = [
            (0, 999), (1000, 1999), (2000, 2999), (3000, 3999), (4000, 4998),
            (4999, 5998), (5999, 6998), (6999, 7998), (7999, 8998), (8999, 9998)
        ]
        for start, end in class_bounds:
            indices = list(range(start, end + 1))
            split_at = int(len(indices) * ratio)
            train_idx.extend(indices[:split_at])
            val_idx.extend(indices[split_at:])
        return Subset(dataset, train_idx), Subset(dataset, val_idx)
    def get_datasets(self):
        train_transform = self.get_train_transform()
        test_transform = self.get_test_transform()
        full_train = ImageFolder(self.train_dir, transform=train_transform)
        train_set, val_set = self.stratified_split(full_train, 0.8)
        test_set = ImageFolder(self.val_dir, transform=test_transform)
        return train_set, val_set, test_set
    def get_loaders(self):
        train_set, val_set, test_set = self.get_datasets()
        batch = self.h_params["batch_size"]
        return {
            "train_loader": DataLoader(train_set, batch_size=batch, shuffle=True),
            "val_loader": DataLoader(val_set, batch_size=batch, shuffle=True),
            "test_loader": DataLoader(test_set, batch_size=batch, shuffle=True),
            "train_len": len(train_set),
            "val_len": len(val_set),
            "test_len": len(test_set)
        }


In [None]:

class FlexibleCNN(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.features = self._make_feature_extractor()
        with torch.no_grad():
            dummy = torch.zeros(1, 3, config['image_size'], config['image_size'])
            feat_dim = self.features(dummy).view(1, -1).size(1)
        self.classifier = self._make_classifier(feat_dim)
    def forward(self, x):
        x = self.features(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x
    def _make_feature_extractor(self):
        layers = []
        in_channels = 3
        num_filters = self.config['num_of_filter']
        multiplier = self.config['filter_multiplier']
        actv = self._get_activation_name(self.config['actv_func'])
        for i in range(self.config['conv_layers']):
            out_channels = int(num_filters * (multiplier ** i))
            kernel_size = self.config['filter_size'][i]
            layers.append(nn.Conv2d(in_channels, out_channels, kernel_size))
            if self.config.get('batch_normalization', False):
                layers.append(nn.BatchNorm2d(out_channels))
            layers.append(getattr(nn, actv)())
            layers.append(nn.MaxPool2d(2))
            in_channels = out_channels
        return nn.Sequential(*layers)
    def _make_classifier(self, input_dim):
        return nn.Sequential(
            nn.Linear(input_dim, self.config['dense_layer_size']),
            nn.ReLU(),
            nn.Dropout(self.config['dropout']),
            nn.Linear(self.config['dense_layer_size'], self.config['num_classes'])
        )
    def _get_activation_name(self, actv_func):
        mapping = {
            'elu': 'ELU',
            'gelu': 'GELU',
            'silu': 'SiLU',
            'selu': 'SELU',
            'leaky_relu': 'LeakyReLU',
            'relu': 'ReLU'
        }
        return mapping.get(actv_func, 'ReLU')

In [None]:
def print_cnn_stats(input_channels, input_size, num_conv_layers, num_filters, filter_size, dense_neurons, num_classes):
    total_params = 0
    total_computations = 0
    in_channels = input_channels
    size = input_size
    for i in range(num_conv_layers):
        conv_params = (filter_size * filter_size * in_channels + 1) * num_filters
        total_params += conv_params
        out_size = size - filter_size + 1
        conv_computations = (filter_size * filter_size * in_channels) * (out_size * out_size) * num_filters
        total_computations += conv_computations
        in_channels = num_filters
        size = out_size // 2
    flat_features = in_channels * size * size
    dense_params = flat_features * dense_neurons + dense_neurons
    total_params += dense_params
    dense_computations = flat_features * dense_neurons
    total_computations += dense_computations
    output_params = dense_neurons * num_classes + num_classes
    total_params += output_params
    output_computations = dense_neurons * num_classes
    total_computations += output_computations
    print(f"Total parameters: {total_params}")
    print(f"Total computations (MACs): {total_computations}")


In [None]:

def generateGridImage(model, device, loader_data):
    class_label_names = [
        "Amphibia", "Animalia", "Arachnida", "Aves", "Fungi",
        "Insecta", "Mammalia", "Mollusca", "Plantae", "Reptilia"
    ]
    n_rows, n_cols = 10, 3
    true_labels, predicted_labels, images = [], [], []
    test_loader = loader_data['test_loader']
    data_iterator = iter(test_loader)
    for _ in range(n_rows * n_cols):
        inputs, labels = next(data_iterator)
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        true_labels.extend(labels.cpu().numpy())
        predicted_labels.extend(predicted.cpu().numpy())
        images.extend(inputs.cpu().numpy())
    images = np.array(images)
    images = (images - images.min()) / (images.max() - images.min() + 1e-8)
    pad_size = 10
    fig, axs = plt.subplots(n_rows, n_cols, figsize=(12, 30), facecolor="#f8f9fa")
    fig.suptitle(
        "Model Predictions on Test Images",
        fontsize=24, fontweight='bold', color="#333333", y=1.02
    )
    for i, ax in enumerate(axs.flatten()):
        img = np.transpose(images[i], (1, 2, 0))
        true_idx = true_labels[i]
        pred_idx = predicted_labels[i]
        correct = (true_idx == pred_idx)
        img_uint8 = (img * 255).astype(np.uint8)
        color = (0, 255, 0) if correct else (255, 0, 0)
        img_padded = cv2.copyMakeBorder(
            img_uint8, pad_size, pad_size, pad_size, pad_size,
            borderType=cv2.BORDER_CONSTANT, value=color
        )
        img_padded = img_padded.astype(np.float32) / 255.0
        ax.imshow(img_padded)
        ax.axis('off')
        true_label_name = class_label_names[true_idx]
        predicted_label_name = class_label_names[pred_idx]
        ax.set_title(
            f"True: {true_label_name}\nPred: {predicted_label_name}",
            fontsize=10, fontweight='bold',
            color='#2ecc40' if correct else '#ff4136',
            pad=8
        )
    plt.figtext(
        0.5, 0.01,
        "Green border = Correct | Red border = Incorrect",
        ha="center", fontsize=14, color="#555555"
    )
    plt.tight_layout(rect=[0, 0.03, 1, 0.97], pad=2.0)
    wandb_image = wandb.Image(fig)
    wandb.log({"Predictions Grid": wandb_image})
    plt.show()
    plt.close(fig)


In [None]:
class Trainer:
    def __init__(self, model_class, h_params, training_data):
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.model = model_class(h_params)
        self.model = torch.nn.DataParallel(self.model, device_ids=[0, 1]).to(self.device)
        self.h_params = h_params
        self.loss_fn = nn.CrossEntropyLoss()
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=h_params["learning_rate"])
        self.train_loader = training_data['train_loader']
        self.val_loader = training_data['val_loader']
        self.test_loader = training_data['test_loader']
        self.train_len = training_data['train_len']
        self.val_len = training_data['val_len']
        self.test_len = training_data['test_len']
        self.training_data = training_data
    def fit(self):
        for epoch in range(self.h_params["epochs"]):
            train_loss, train_acc = self._train_one_epoch(epoch)
            val_loss, val_acc = self._validate(epoch)
            print(f"epoch: {epoch} train accuracy: {train_acc:.4f} train loss: {train_loss:.4f} val accuracy: {val_acc:.4f} val loss: {val_loss:.4f}")
            wandb.log({
                "train_accuracy": train_acc,
                "train_loss": train_loss,
                "val_accuracy": val_acc,
                "val_loss": val_loss,
                "epoch": epoch
            })
        test_loss, test_acc = self._test()
        wandb.log({
            "test_accuracy": test_acc,
            "test_loss": test_loss
        })
        print(f'Test accuracy: {test_acc}, Test loss: {test_loss}')
        generateGridImage(self.model, self.device, self.training_data)
        print('Finished Training')
        torch.save(self.model.state_dict(), './bestmodel.pth')
    def _train_one_epoch(self, epoch):
        self.model.train()
        running_loss = 0.0
        correct = 0
        for i, (inputs, labels) in enumerate(self.train_loader):
            inputs, labels = inputs.to(self.device), labels.to(self.device)
            self.optimizer.zero_grad()
            outputs = self.model(inputs)
            loss = self.loss_fn(outputs, labels)
            loss.backward()
            self.optimizer.step()
            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            if i % 10 == 0:
                batch_acc = (predicted == labels).float().mean().item()
                print(f"epoch {epoch} batch {i} accuracy {batch_acc:.4f} loss {loss.item():.4f}")
        avg_loss = running_loss / len(self.train_loader)
        accuracy = correct / self.train_len
        return avg_loss, accuracy
    def _validate(self, epoch):
        self.model.eval()
        running_loss = 0.0
        correct = 0
        with torch.no_grad():
            for inputs, labels in self.val_loader:
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                outputs = self.model(inputs)
                loss = self.loss_fn(outputs, labels)
                running_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                correct += (predicted == labels).sum().item()
        avg_loss = running_loss / len(self.val_loader)
        accuracy = correct / self.val_len
        return avg_loss, accuracy
    def _test(self):
        self.model.eval()
        running_loss = 0.0
        correct = 0
        with torch.no_grad():
            for inputs, labels in self.test_loader:
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                outputs = self.model(inputs)
                loss = self.loss_fn(outputs, labels)
                running_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                correct += (predicted == labels).sum().item()
        avg_loss = running_loss / len(self.test_loader)
        accuracy = correct / self.test_len
        return avg_loss, accuracy

In [None]:
train_data_dir = "/kaggle/input/assignment2dataset/inaturalist_12K/train"
test_data_dir = "/kaggle/input/assignment2dataset/inaturalist_12K/val"
data_preparer = DataPreparer(h_params, IMAGE_SIZE, train_data_dir, test_data_dir)
training_data = data_preparer.get_loaders()
run = wandb.init(
    project="DL Assignment 2",
    name=f"{h_params['actv_func']}_ep_{h_params['epochs']}_lr_{h_params['learning_rate']}_init_fltr_cnt_{h_params['num_of_filter']}_fltr_sz_{h_params['filter_size']}_fltr_mult_{h_params['filter_multiplier']}_data_aug_{h_params['data_augumentation']}_batch_norm_{h_params['batch_normalization']}_dropout_{h_params['dropout']}_dense_size_{h_params['dense_layer_size']}",
    config=h_params
)
print_cnn_stats(
    input_channels=3,
    input_size=IMAGE_SIZE,
    num_conv_layers=h_params["conv_layers"],
    num_filters=h_params["num_of_filter"],
    filter_size=h_params["filter_size"][0],
    dense_neurons=h_params["dense_layer_size"],
    num_classes=NUM_OF_CLASSES
)
trainer = Trainer(FlexibleCNN, h_params, training_data)
trainer.fit()

In [None]:
# ---- Hyperparameter Sweep Setup ----
sweep_config = {
    "method": "bayes",
    "name": "DL_assignment2_sweep",
    "metric": {
        "goal": "maximize",
        "name": "val_accuracy"
    },
    "parameters": {
        "epochs": {"values": [10]},
        "learning_rate": {"values": [1e-4, 1e-3]},
        "batch_size": {"values": [32, 64]},
        "num_of_filter": {"values": [16, 32, 64]},
        "filter_size": {"values": [
            [3,3,3,3,3], [5,5,5,5,5], [7,7,7,7,7], [11,9,7,5,3], [3,5,7,9,11]
        ]},
        "actv_func": {"values": ["elu", "gelu", "leaky_relu", "selu"]},
        "filter_multiplier": {"values": [1, 2]},
        "data_augumentation": {"values": [False]},
        "batch_normalization": {"values": [True, False]},
        "dropout": {"values": [0, 0.1, 0.2]},
        "dense_layer_size": {"values": [64, 128, 256]},
        "conv_layers": {"values": [5]}
    }
}

sweep_id = wandb.sweep(sweep=sweep_config, project="DL Assignment 2")
def main():
    wandb.init(project="DL Assignment 2")
    config = wandb.config
    config = dict(config)
    config['image_size'] = IMAGE_SIZE
    config['num_classes'] = NUM_OF_CLASSES
    train_data_dir = "/kaggle/input/assignment2dataset/inaturalist_12K/train"
    test_data_dir = "/kaggle/input/assignment2dataset/inaturalist_12K/val"
    data_preparer = DataPreparer(config, IMAGE_SIZE, train_data_dir, test_data_dir)
    training_data = data_preparer.get_loaders()
    with wandb.init(
        project="DL Assignment 2",
        name=f"{config['actv_func']}_ep_{config['epochs']}_lr_{config['learning_rate']}_init_fltr_cnt_{config['num_of_filter']}_fltr_sz_{config['filter_size']}_fltr_mult_{config['filter_multiplier']}_data_aug_{config['data_augumentation']}_batch_norm_{config['batch_normalization']}_dropout_{config['dropout']}_dense_size_{config['dense_layer_size']}_batch_size_{config['batch_size']}",
        config=config
    ):
        trainer = Trainer(FlexibleCNN, config, training_data)
        trainer.fit()
wandb.agent(sweep_id, function=main, count=10)


In [None]:

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = FlexibleCNN(h_params)
model = torch.nn.DataParallel(model, device_ids=[0, 1]).to(device)
state_dict = torch.load('./bestmodel.pth', weights_only=True)
model.load_state_dict(state_dict)
model.eval()


train_data_dir = "/kaggle/input/assignment2dataset/inaturalist_12K/train"
test_data_dir = "/kaggle/input/assignment2dataset/inaturalist_12K/val"
data_preparer = DataPreparer(h_params, IMAGE_SIZE, train_data_dir, test_data_dir)
loader_data = data_preparer.get_loaders()

generateGridImage(model, device, loader_data)
