In [None]:
#%% [code]
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import DataLoader, Subset
import pytorch_lightning as pl
from pytorch_lightning.loggers import WandbLogger
import numpy as np
from collections import defaultdict

# Set up wandb logger
wandb_logger = WandbLogger(project="da6401_assignment2_partA")

# Configuration parameters (you can sweep these later with wandb)
config = {
    "conv_filters": [32, 64, 128, 256, 512],
    "conv_kernel_sizes": [3, 3, 3, 3, 3],
    "dense_neurons": 256,
    "num_classes": 10,
    "activation": "relu",   # options: relu, gelu, silu, mish
    "batch_size": 32,
    "epochs": 10,
    "learning_rate": 1e-3,
    "img_size": 128  # input image size: 128x128
}

#%% [code]
class LitCNN(pl.LightningModule):
    def __init__(self, config):
        super(LitCNN, self).__init__()
        self.save_hyperparameters(config)
        self.config = config
        
        self.activation = self._get_activation(config["activation"])
        in_channels = 3
        conv_layers = []
        # Build 5 conv - activation - maxpool blocks
        for out_channels, k in zip(config["conv_filters"], config["conv_kernel_sizes"]):
            conv_layers.append(nn.Conv2d(in_channels, out_channels, kernel_size=k, padding=k//2))
            conv_layers.append(self._get_activation(config["activation"]))
            conv_layers.append(nn.MaxPool2d(kernel_size=2))
            in_channels = out_channels
        self.conv = nn.Sequential(*conv_layers)
        
        # Calculate flattened dimension after conv layers
        final_size = config["img_size"] // (2 ** 5)  # 5 max pooling layers
        self.flatten_dim = config["conv_filters"][-1] * final_size * final_size
        
        # Dense layers
        self.fc1 = nn.Linear(self.flatten_dim, config["dense_neurons"])
        self.fc2 = nn.Linear(config["dense_neurons"], config["num_classes"])

    def _get_activation(self, act):
        act = act.lower()
        if act == 'relu':
            return nn.ReLU()
        elif act == 'gelu':
            return nn.GELU()
        elif act == 'silu':
            return nn.SiLU()
        elif act == 'mish':
            return nn.Mish()
        else:
            raise ValueError(f"Unsupported activation: {act}")

    def forward(self, x):
        x = self.conv(x)
        x = x.view(x.size(0), -1)  # flatten
        x = self.fc1(x)
        x = self._get_activation(self.config["activation"])(x)
        x = self.fc2(x)
        return x

    def training_step(self, batch, batch_idx):
        images, labels = batch
        outputs = self(images)
        loss = F.cross_entropy(outputs, labels)
        acc = (outputs.argmax(dim=1) == labels).float().mean()
        self.log("train_loss", loss, prog_bar=True)
        self.log("train_acc", acc, prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx):
        images, labels = batch
        outputs = self(images)
        loss = F.cross_entropy(outputs, labels)
        acc = (outputs.argmax(dim=1) == labels).float().mean()
        self.log("val_loss", loss, prog_bar=True)
        self.log("val_acc", acc, prog_bar=True)

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.config["learning_rate"])
        return optimizer

#%% [code]
# Data transforms
transform = transforms.Compose([
    transforms.Resize((config["img_size"], config["img_size"])),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

#%% [code]
# Prepare dataset (assumes a folder structure with sub-folders per class)
# Update 'path/to/train' to your training dataset folder (e.g., iNaturalist train data)
dataset = datasets.ImageFolder(root='path/to/train', transform=transform)

# Stratified split: ensuring equal representation for each class
class_indices = defaultdict(list)
for idx, (_, label) in enumerate(dataset.samples):
    class_indices[label].append(idx)

train_indices = []
val_indices = []
for label, indices in class_indices.items():
    indices = np.array(indices)
    np.random.shuffle(indices)
    split = int(0.8 * len(indices))
    train_indices.extend(indices[:split])
    val_indices.extend(indices[split:])

train_dataset = Subset(dataset, train_indices)
val_dataset = Subset(dataset, val_indices)

train_loader = DataLoader(train_dataset, batch_size=config["batch_size"], shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=config["batch_size"], shuffle=False, num_workers=2)

#%% [code]
# Instantiate the Lightning model
model = LitCNN(config)

#%% [code]
# Train using PyTorch Lightning Trainer with wandb logger
trainer = pl.Trainer(
    max_epochs=config["epochs"],
    logger=wandb_logger,
    accelerator="gpu" if torch.cuda.is_available() else "cpu"
)

trainer.fit(model, train_dataloaders=train_loader, val_dataloaders=val_loader)

#%% [code]
# Save the model checkpoint locally
trainer.save_checkpoint("litcnn_partA.ckpt")


In [4]:
#%% [code]
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import DataLoader, Subset
import pytorch_lightning as pl
from pytorch_lightning.loggers import WandbLogger
import numpy as np
from collections import defaultdict

The cache for model files in Transformers v4.22.0 has been updated. Migrating your old cache. This is a one-time only operation. You can interrupt this and resume the migration later on by calling `transformers.utils.move_cache()`.


0it [00:00, ?it/s]

In [5]:
"gpu" if torch.cuda.is_available() else "cpu"

'cpu'

In [6]:
torch.cuda.is_available()

False