In [None]:
!pip install torch 
!pip install torchvision
!pip install torchsummary
!pip install wandb 


In [2]:
import torch
import torchvision
import torch.nn as nn
import wandb
import torch.optim as optim
import torch.nn as nn
import torchvision.models as models
from torchvision import transforms
from tqdm import tqdm
from torch.utils.data import DataLoader, Subset
from torchvision import datasets, transforms, models
from sklearn.model_selection import StratifiedShuffleSplit
import os
import numpy as np
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [None]:
IMAGE_SIZE = 224  # Resize to ImageNet standard
# IMAGE_SIZE = 299

transform = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

data_dir = os.path.join("nature_12K", "inaturalist_12K")
full_dataset = datasets.ImageFolder(os.path.join(data_dir, 'train'), transform=transform)
best_model_path = "best_models"
os.makedirs(best_model_path, exist_ok=True)



In [None]:

# Define your train_dir and val_dir properly

train_dir = os.path.join(data_dir, "train")  # replace with actual path
# val_dir = os.path.join(data_dir, "train")  # optional if splitting from train_dir


# === get_data_loaders === #
def get_data_loaders(config, train_dir=train_dir):
    BATCH_SIZE = config.get("batch_size", 64)
    IMAGE_SIZE = 224
    # IMAGE_SIZE = 299

    if config.get("augment", False):
        print("Applying full data augmentation")
        transform_train = transforms.Compose([
            transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
            transforms.RandomHorizontalFlip(),
            transforms.RandomVerticalFlip(),
            transforms.RandomRotation(20),
            transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.02),
            transforms.RandomGrayscale(p=0.1),
            transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),
            transforms.ToTensor(),
            transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5]),
        ])
    else:
        print("Minimal preprocessing, no augmentation")
        transform_train = transforms.Compose([
            transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
            transforms.ToTensor(),
            transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5]),
        ])

    transform_val = transforms.Compose([
        transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
        transforms.ToTensor(),
        transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5]),
    ])

    # Load dataset
    full_dataset = datasets.ImageFolder(root=train_dir, transform=transform_train)
    targets = np.array(full_dataset.targets)

    # Stratified split
    sss = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
    train_idx, val_idx = next(sss.split(np.zeros(len(targets)), targets))

    train_dataset = Subset(full_dataset, train_idx)
    val_dataset = Subset(datasets.ImageFolder(root=train_dir, transform=transform_val), val_idx)

    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

    print(f"Total images: {len(full_dataset)}")
    print(f"Training set: {len(train_dataset)} images")
    print(f"Validation set: {len(val_dataset)} images")
    print(f"Number of classes: {len(full_dataset.classes)}")

    return train_loader, val_loader


Train one epoch and eval functions

In [None]:

def train_one_epoch(model, loader, optimizer, criterion, device, epoch):
    model.train()
    running_loss, correct, total = 0.0, 0, 0
    for images, labels in tqdm(loader, desc=f"Train Epoch {epoch}"):
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        # Comment the following lines if using a model with auxiliary outputs (like InceptionV3)
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Uncomment the following lines if using a model with auxiliary outputs (like InceptionV3)
        # outputs, aux_output = model(images)
        # loss = criterion(outputs, labels) + 0.4 * criterion(aux_output, labels)
 
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * images.size(0)
        _, predicted = outputs.max(1)
        correct += predicted.eq(labels).sum().item()
        total += labels.size(0)
    epoch_loss = running_loss / total
    epoch_acc = correct / total
    wandb.log({"train/loss": epoch_loss, "train/acc": epoch_acc, "epoch": epoch})
    print(f"Train Loss: {epoch_loss:.4f}, Train Acc: {epoch_acc:.4f}")
    return epoch_loss, epoch_acc


def evaluate(model, loader, criterion, device, epoch):
    model.eval()
    val_loss, correct, total = 0.0, 0, 0
    with torch.no_grad():
        for images, labels in tqdm(loader, desc=f"Eval Epoch {epoch}"):
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item() * images.size(0)
            _, predicted = outputs.max(1)
            correct += predicted.eq(labels).sum().item()
            total += labels.size(0)
    epoch_loss = val_loss / total
    epoch_acc = correct / total
    wandb.log({"val/loss": epoch_loss, "val/acc": epoch_acc, "epoch": epoch})
    print(f"Validation Loss: {epoch_loss:.4f}, Validation Acc: {epoch_acc:.4f}")
    return epoch_loss, epoch_acc


In [6]:

PROJECT_NAME = "da24m029-da6401-assignment2"  # Replace with your project name
ENTITY_NAME = "da24m029-indian-institute-of-technology-madras"  # Replace with your entity name

In [None]:


# === Configurable ===
BACKBONE = "resnet50"  # Options: 'resnet50', 'vgg16', 'efficientnet_v2_s', 'inception_v3', 'vit_b_16'
FREEZE_STRATEGY = "partial_percent"  # Options: 'last_only', 'partial_k', 'partial_percent'
K = 10  # Used if FREEZE_STRATEGY == 'partial_k'
PERCENT = 0.7  # Used if FREEZE_STRATEGY == 'partial_percent'

def get_pretrained_model(backbone_name, num_classes):
    if backbone_name == "resnet50":
        model = models.resnet50(weights='DEFAULT')
        in_features = model.fc.in_features
        model.fc = nn.Linear(in_features, num_classes)
        feature_layers = list(model.children())[:-1]
    elif backbone_name == "vgg16":
        model = models.vgg16(weights='DEFAULT')
        in_features = model.classifier[6].in_features
        model.classifier[6] = nn.Linear(in_features, num_classes)
        feature_layers = list(model.features)
    elif backbone_name == "efficientnet_v2_s":
        model = models.efficientnet_v2_s(weights='DEFAULT')
        in_features = model.classifier[1].in_features
        model.classifier[1] = nn.Linear(in_features, num_classes)
        feature_layers = list(model.features)
    elif backbone_name == "inception_v3":
        model = models.inception_v3(weights='DEFAULT', aux_logits=True)
        in_features = model.fc.in_features
        model.fc = nn.Linear(in_features, num_classes)
        feature_layers = list(model.children())[:-1]
    elif backbone_name == "vit_b_16":
        model = models.vit_b_16(weights='DEFAULT')
        in_features = model.heads.head.in_features
        model.heads.head = nn.Linear(in_features, num_classes)
        feature_layers = list(model.children())[:-1]
    else:
        raise ValueError("Unsupported backbone")

    return model#, feature_layers

# def apply_freezing_strategy(model, feature_layers, strategy, k=None, percent=None):
def apply_freezing_strategy(model, strategy, k=None, percent=None):
    all_params = list(model.parameters())

    if strategy == "last_only":
        for param in all_params:
            param.requires_grad = False
        # Unfreeze only classifier
        for param in model.parameters():
            if param.ndim > 1 and param.requires_grad == False:
                continue
            param.requires_grad = True

    elif strategy == "partial_k":
        for idx, param in enumerate(all_params):
            param.requires_grad = idx >= k

    elif strategy == "partial_percent":
        freeze_until = int(len(all_params) * percent)
        for idx, param in enumerate(all_params):
            param.requires_grad = idx >= freeze_until

    else:
        raise ValueError("Unknown freezing strategy")

def finetune_model(backbone=BACKBONE, strategy=FREEZE_STRATEGY, k=K, percent=PERCENT, num_classes=10):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    # model, feature_layers = get_pretrained_model(backbone, num_classes)
    model = get_pretrained_model(backbone, num_classes)

    # Apply freezing
    # apply_freezing_strategy(model, feature_layers, strategy, k, percent)
    apply_freezing_strategy(model, strategy, k, percent)

    model.to(device)
    print(f"Model: {backbone}, Strategy: {strategy} → Ready for training.")
    return model


In [None]:
def run_finetune_training(
    backbone="resnet50", strategy="last_only", num_epochs=10, batch_size=32,
    k=10, percent=0.7, num_classes=10
):
    # wandb init
    wandb.init(
        project=PROJECT_NAME,
        entity=ENTITY_NAME,
        config={
            "backbone": backbone,
            "strategy": strategy,
            "epochs": num_epochs,
            "batch_size": batch_size,
            "freeze_k": k,
            "freeze_percent": percent,
            # "augment": False
        }
    )
    wandb.run.name = f"backbone={backbone}, strategy={strategy}, epochs={num_epochs}, batch_size={batch_size}, freeze_k={k}, freeze_percent={percent}"
    wandb.run.save()
    
    config = wandb.config

    train_loader, val_loader = get_data_loaders(config)

    # Load model
    model = finetune_model(backbone=config.backbone,
                           strategy=config.strategy,
                           k=config.freeze_k,
                           percent=config.freeze_percent,
                           num_classes=num_classes)

    # Optimizer and criterion
    optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=1e-4)
    criterion = nn.CrossEntropyLoss()
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    for epoch in range(1, num_epochs + 1):
        train_one_epoch(model, train_loader, optimizer, criterion, device, epoch)
        evaluate(model, val_loader, criterion, device, epoch)

    torch.save(model, os.path.join(best_model_path, "partB.pth"))
    wandb.save("best_model_B.pth")

    wandb.finish()

    return model


FineTune Runner

In [None]:
finetuned_model = run_finetune_training(
                    backbone="resnet50",
                    strategy="last_only",
                    percent=0.6,
                    num_epochs=100,
                    num_classes=10,
                    batch_size=64,
                    k=10,
                )



In [None]:
IMAGE_SIZE = 224
# IMAGE_SIZE = 299
test_transform = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),  # match training image size
    transforms.ToTensor(),
    # transforms.Normalize([0.5], [0.5]),  # match training normalization
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
])

test_dataset = datasets.ImageFolder(os.path.join(data_dir,'test'), transform=test_transform)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)


In [11]:
from sklearn.metrics import accuracy_score

all_preds = []
all_labels = []

finetuned_model.eval()
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = finetuned_model(images)
        preds = outputs.argmax(dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

test_accuracy = accuracy_score(all_labels, all_preds)

# Print it
print(f" Final Test Accuracy: {test_accuracy:.4f}")

# Log to wandb (if active)
if wandb.run is not None:
    wandb.log({"test_accuracy": test_accuracy})


 Final Test Accuracy: 0.6090
