In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
#splitting the dataset for training and validation functions 
from sklearn.model_selection import train_test_split
from torchvision.datasets import ImageFolder
import shutil
import os

#  original dataset directory
original_train_dir = "/kaggle/input/vlg-recruitment-24-challenge/vlg-dataset/vlg-dataset/train"

# new directories for train and validation data splits
train_dir = "/kaggle/working/train_split"
val_dir = "/kaggle/working/val_split"

os.makedirs(train_dir, exist_ok=True)
os.makedirs(val_dir, exist_ok=True)

dataset = ImageFolder(root=original_train_dir)

# Split dataset into train and validation subsets
train_idx, val_idx = train_test_split(range(len(dataset)), test_size=0.2, stratify=dataset.targets)


for idx, (path, label) in enumerate(dataset.samples):
    class_name = dataset.classes[label]
    dest_dir = train_dir if idx in train_idx else val_dir
    dest_class_dir = os.path.join(dest_dir, class_name)
    os.makedirs(dest_class_dir, exist_ok=True)
    shutil.copy2(path, dest_class_dir)

print("Train and validation datasets split successfully!")


In [None]:
#finetunnig and loading the pretrained resnet101 model 
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
from torch.optim.lr_scheduler import StepLR
from pathlib import Path
from torchvision.datasets.folder import default_loader

#  data augmentation and preprocessing
train_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.RandomResizedCrop(size=224, scale=(0.8, 1.0)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

test_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# a custom dataset for the test directory
class TestDataset(torch.utils.data.Dataset):
    def __init__(self, root, transform=None):
        self.root = root
        self.transform = transform
        self.image_paths = list(Path(root).glob("*.jpg"))  
        self.loader = default_loader

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image = self.loader(image_path)
        if self.transform:
            image = self.transform(image)
        return image, str(image_path.name)  

#  custom dataset for the test set
test_dataset = TestDataset(root="/kaggle/working/val_split", transform=test_transforms)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=16, shuffle=False)

# Loading datasets
train_dataset = datasets.ImageFolder(root="/kaggle/working/train_split", transform=train_transforms)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=16, shuffle=True)

# Loading  a pre-trained ResNet model
model = models.resnet101(pretrained=True)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 50)  #  number of classes (50)

# Move model to GPU if available else use CPU(which will take a lot of time to train)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
print(device) #ensuring wether GPU is being used or not 

#  loss function and optimizer and learning rate scheduler 
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.0001, weight_decay=1e-4) #adamw for resnet
scheduler = StepLR(optimizer, step_size=7, gamma=0.1)  #stepLR

# Training Function
def train(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct = 0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)
        _, preds = torch.max(outputs, 1)
        correct += torch.sum(preds == labels.data)

    epoch_loss = running_loss / len(train_loader.dataset)
    accuracy = correct.double() / len(train_loader.dataset)
    return epoch_loss, accuracy

# Validation function
def validate(model, test_loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0  # Track total samples for accuracy calculation

    with torch.no_grad():
        for inputs, _ in test_loader: 
            inputs = inputs.to(device)

            outputs = model(inputs)
            running_loss += 0

           
            _, preds = torch.max(outputs, 1)
            total += inputs.size(0)

    epoch_loss = running_loss / total if total > 0 else 0
    accuracy = 0  
    return epoch_loss, accuracy


# Train the model
num_epochs = 20 # 20 epcohs to train the model 
for epoch in range(num_epochs):
    train_loss, train_accuracy = train(model, train_loader, criterion, optimizer, device)
    val_loss, val_accuracy = validate(model, test_loader, criterion, device)
    
    scheduler.step()


    #printing the result after every epoch to see the progress of training loop
    print(f"Epoch {epoch+1}/{num_epochs}")
    print(f"Train Loss: {train_loss:.4f}  Train Accuracy: {train_accuracy:.4f}")
    print(f"Validation Loss: {val_loss:.4f}  Validation Accuracy: {val_accuracy:.4f}")

# Saving the trained model
torch.save(model.state_dict(), "resnet101_finetuned.pth")
print("Model saved successfully.")


In [None]:
#ensemble learning and storing the final predicition.
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, models
from PIL import Image
from transformers import CLIPProcessor, CLIPModel, AutoModelForImageClassification, AutoImageProcessor
import pandas as pd

# using gpu if available else cpu 
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# loading all the files 
train_dir = "/kaggle/input/vlg-recruitment-24-challenge/vlg-dataset/vlg-dataset/train"
test_dir = "/kaggle/input/vlg-recruitment-24-challenge/vlg-dataset/vlg-dataset/test"
classes_path = "/kaggle/input/vlg-recruitment-24-challenge/vlg-dataset/vlg-dataset/classes.txt"

# Load class names (seen + unseen)
with open(classes_path) as f:
    class_names = [line.strip() for line in f]

# Data transformations
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=mean, std=std)
])

# Dataset
class TestDataset(Dataset):
    def __init__(self, test_dir, transform=None):
        self.image_paths = [os.path.join(test_dir, fname) for fname in os.listdir(test_dir)]
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, os.path.basename(img_path)

test_dataset = TestDataset(test_dir, transform=test_transform)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Loading all the  Models for ensemble approach 
print("Loading Models...")

# ResNet101
resnet = models.resnet101(pretrained=True)
resnet.fc = nn.Linear(resnet.fc.in_features, len(class_names))
resnet.load_state_dict(torch.load("/kaggle/working/resnet101_finetuned.pth"))
resnet = resnet.to(device)

# Vision Transformer
vit_model = AutoModelForImageClassification.from_pretrained(
    "google/vit-base-patch16-224-in21k", num_labels=len(class_names)
).to(device)
vit_processor = AutoImageProcessor.from_pretrained("google/vit-base-patch16-224-in21k",use_fast=True)

# CLIP Model
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device)
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

# Inference
resnet.eval()
vit_model.eval()
clip_model.eval()

print("Performing Inference...")

# storing the predictions
final_predictions = []

with torch.no_grad():
    # Precompute text embeddings for all class names
    text_inputs = clip_processor(text=class_names, return_tensors="pt", padding=True).to(device)
    text_features = clip_model.get_text_features(**text_inputs)

    for images, image_names in test_loader:
        images = images.to(device)

        # Undo normalization for Vision Transformer
        images_unnormalized = images * torch.tensor(std, device=device)[:, None, None] + \
                              torch.tensor(mean, device=device)[:, None, None]
        images_unnormalized = torch.clamp(images_unnormalized, 0, 1)  # Ensure range [0, 1]

        # ResNet predictions
        resnet_outputs = resnet(images)
        _, resnet_preds = torch.max(resnet_outputs, 1)

        # Vision Transformer predictions
        vit_inputs = vit_processor(images=list(images_unnormalized.cpu()), return_tensors="pt").to(device)
        vit_outputs = vit_model(**vit_inputs).logits
        _, vit_preds = torch.max(vit_outputs, 1)

        # CLIP Zero-Shot predictions
        clip_image_features = clip_model.get_image_features(pixel_values=images)
        cosine_sim = torch.nn.functional.cosine_similarity(
            clip_image_features.unsqueeze(1), text_features.unsqueeze(0), dim=-1
        )
        _, clip_preds = torch.max(cosine_sim, dim=1)

        # Ensemble learning : Weighted Voting
        for name, resnet_pred, vit_pred, clip_pred in zip(image_names, resnet_preds, vit_preds, clip_preds):
            # Adjusting weights based on model reliability(using the best set of weights tried after many combinations)
            weights = {
                resnet_pred.item(): 0.3,  # ResNet weight
                vit_pred.item(): 0.3,    # Vision Transformer weight
                clip_pred.item(): 0.4    # CLIP Zero-Shot weight
            }
            final_pred = max(weights, key=weights.get)  
            final_predictions.append((name, class_names[final_pred])) #appending the final predictions




In [None]:
# Saving predictions with only class names (as per the format required by the challenge)
submission_df = pd.DataFrame(final_predictions, columns=["image_id", "class"])
submission_df["class"] = submission_df["class"].str.split("\t").str[-1]  # Extract only the class name
submission_df.to_csv("ensemble_submission.csv", index=False)
print("Ensemble submission saved as 'ensemble_submission.csv'!")
