<a href="https://colab.research.google.com/github/sahilkumar-sk/neural-network-deep-learning-project/blob/main/Notebook/nn_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
from torch import nn
import timm
from PIL import Image
import os
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
import torch.optim as optim
from sklearn.metrics import average_precision_score
from tqdm import tqdm
from torchvision.models import ViT_B_16_Weights, Swin_B_Weights, ResNet50_Weights

In [None]:
import os
from IPython.display import clear_output

!mkdir -p /content/VOC2008
os.chdir('/content/VOC2008')

!wget -q http://host.robots.ox.ac.uk/pascal/VOC/voc2008/VOCtrainval_14-Jul-2008.tar
!tar -xf VOCtrainval_14-Jul-2008.tar
clear_output()

In [None]:
BATCH_SIZE = 32
NUM_WORKERS = 2
NUM_EPOCHS = 5
LEARNING_RATE = 1e-4
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
IMG_DIR = '/content/VOC2008/VOCdevkit/VOC2008/JPEGImages'

In [None]:
def get_dataloaders(img_dir, batch_size=32, num_workers=0):
    transform = transforms.Compose([
        transforms.Resize((224,224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
    ])

    train_ds = DogDataset('/content/VOC2008/VOCdevkit/VOC2008/ImageSets/Main/dog_train.txt', img_dir, transform)
    val_ds   = DogDataset('/content/VOC2008/VOCdevkit/VOC2008/ImageSets/Main/dog_val.txt',   img_dir, transform)

    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=num_workers)
    val_loader   = DataLoader(val_ds,   batch_size=batch_size, shuffle=False, num_workers=num_workers)
    return train_loader, val_loader

In [None]:
class DogDataset(Dataset):
    def __init__(self, split_file, img_dir, transform=None):
        self.img_dir = img_dir
        self.transform = transform
        self.samples = []

        with open(split_file, 'r') as f:
            for line in f:
                img_id, lbl = line.strip().split()
                lbl = int(lbl)
                mapped_label = 0 if lbl == -1 else 1
                self.samples.append((img_id, mapped_label))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        img_id, lbl = self.samples[idx]
        img_path = os.path.join(self.img_dir, f"{img_id}.jpg")

        try:
            img = Image.open(img_path).convert('RGB')
        except:
            img = Image.new('RGB', (224, 224), (0, 0, 0))

        if self.transform:
            img = self.transform(img)

        return img, lbl, img_id


In [None]:
def create_vit_model(num_classes=2):
    weights = ViT_B_16_Weights.IMAGENET1K_V1
    model = models.vit_b_16(weights=weights)
    num_features = model.heads.head.in_features
    model.heads.head = nn.Linear(num_features, num_classes)
    return model


In [None]:
def create_swin_model(num_classes=2):
    weights = Swin_B_Weights.IMAGENET1K_V1
    model = models.swin_b(weights=weights)
    num_features = model.head.in_features
    model.head = nn.Linear(num_features, num_classes)
    return model

In [None]:
def create_resnet_model(num_classes=2):
    weights = ResNet50_Weights.IMAGENET1K_V2
    model = models.resnet50(weights=weights)
    num_features = model.fc.in_features
    model.fc = nn.Linear(num_features, num_classes)
    return model


In [None]:
from torchvision.models import densenet201, DenseNet201_Weights

def create_densenet_model(num_classes=2):
    weights = DenseNet201_Weights.IMAGENET1K_V1
    model = densenet201(weights=weights)
    num_features = model.classifier.in_features
    model.classifier = nn.Linear(num_features, num_classes)
    return model

In [None]:
def train_one_epoch(model, loader, criterion, optimizer, device):
    model.train()
    total_loss = 0.0

    for images, labels, _ in tqdm(loader, desc="Training"):
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * images.size(0)

    return total_loss / len(loader.dataset)


In [None]:
def evaluate_map(model, loader, device):
    model.eval()
    all_scores, all_labels, all_ids = [], [], []

    with torch.no_grad():
        for images, labels, img_ids in tqdm(loader, desc="Evaluating"):
            images = images.to(device)
            outputs = model(images)
            probs = torch.softmax(outputs, dim=1)[:, 1].cpu().numpy()

            all_scores.extend(probs)
            all_labels.extend(labels.numpy())
            all_ids.extend(img_ids)

    map_score = average_precision_score(all_labels, all_scores)
    top10 = sorted(zip(all_scores, all_ids), reverse=True)[:10]
    return map_score, top10

In [None]:
def visualize_top10(top10, img_dir, model_name):
    plt.figure(figsize=(15, 8))
    for i, (score, img_id) in enumerate(top10):
        img_path = os.path.join(img_dir, f"{img_id}.jpg")
        try:
            img = Image.open(img_path).convert('RGB')
            plt.subplot(2, 5, i+1)
            plt.imshow(img)
            plt.title(f"Score: {score:.4f}")
            plt.axis('off')
        except Exception as e:
            print(f"Error loading image {img_id}: {e}")

    plt.tight_layout()
    plt.savefig(f'{model_name}_top10_dogs.png')
    plt.show()

In [None]:
def train_model(model_name, model_creation_fn):
    print(f"Training {model_name}...")

    train_loader, val_loader = get_dataloaders(IMG_DIR, BATCH_SIZE, NUM_WORKERS)

    model = model_creation_fn()
    model = model.to(DEVICE)

    criterion = nn.CrossEntropyLoss()

    params = [
        {'params': [p for n, p in model.named_parameters() if 'head' not in n and 'fc' not in n], 'lr': LEARNING_RATE/10},
        {'params': [p for n, p in model.named_parameters() if 'head' in n or 'fc' in n], 'lr': LEARNING_RATE}
    ]

    optimizer = optim.AdamW(params, weight_decay=0.05)

    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.1, patience=2, verbose=True)

    best_map = 0.0
    best_model_path = f"best_{model_name}_model.pth"

    for epoch in range(NUM_EPOCHS):
        train_loss = train_one_epoch(model, train_loader, criterion, optimizer, DEVICE)

        val_map, top10 = evaluate_map(model, val_loader, DEVICE)

        scheduler.step(val_map)

        print(f"Epoch {epoch+1}/{NUM_EPOCHS}, Train Loss: {train_loss:.4f}, Validation MAP: {val_map:.4f}")

        if val_map > best_map:
            best_map = val_map
            torch.save(model.state_dict(), best_model_path)
            print(f"Saved new best model with MAP: {best_map:.4f}")

    model.load_state_dict(torch.load(best_model_path))

    final_val_map, top10 = evaluate_map(model, val_loader, DEVICE)
    print(f"Final Validation MAP: {final_val_map:.4f}")

    print(f"Top 10 detected dogs for {model_name}:")
    for i, (score, img_id) in enumerate(top10):
        print(f"{i+1}. Image ID: {img_id}, Confidence: {score:.4f}")

    visualize_top10(top10, IMG_DIR, model_name)

    return final_val_map

In [None]:
map_swin = train_model("swin", create_swin_model)

In [None]:
map_vit = train_model("vit", create_vit_model)

In [None]:
    # Train Swin Transformer model
    # map_swin = train_model("swin", create_swin_model)

    # Train ResNet model
    # map_resnet = train_model("resnet", create_resnet_model)

    # Compare results of all models (if all have been trained)
    # print("\nModel Comparison:")
    # print(f"ViT MAP: {map_vit:.4f}")
    # print(f"Swin MAP: {map_swin:.4f}")
    # print(f"ResNet MAP: {map_resnet:.4f})

In [None]:
map_resnet = train_model("resnet", create_resnet_model)


In [None]:
map_densenet = train_model("densenet", create_densenet_model)

In [None]:
from torchvision.models import convnext_base, ConvNeXt_Base_Weights

def create_convnext_model(num_classes=2):
    weights = ConvNeXt_Base_Weights.IMAGENET1K_V1
    model = convnext_base(weights=weights)
    num_features = model.classifier[2].in_features
    model.classifier[2] = nn.Linear(num_features, num_classes)

    return model

In [None]:
map_convnext = train_model("convnext", create_convnext_model)

In [None]:
print("\nModel Comparison:")
print(f"ViT MAP: {map_vit:.4f}")
print(f"Swin MAP: {map_swin:.4f}")
# print(f"ResNet MAP: {map_resnet:.4f}")
# print(f"DenseNet MAP: {map_densenet:.4f}")
print(f"ConvNeXt MAP: {map_convnext:.4f}")


DINNING TABLE CLASSIFICATION

In [None]:
class DinningTableDataset(Dataset):
    def __init__(self, split_file, img_dir, transform=None):
        self.img_dir = img_dir
        self.transform = transform
        self.samples = []

        with open(split_file, 'r') as f:
            for line in f:
                img_id, lbl = line.strip().split()
                lbl = int(lbl)
                mapped_label = 0 if lbl == -1 else 1
                self.samples.append((img_id, mapped_label))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        img_id, lbl = self.samples[idx]
        img_path = os.path.join(self.img_dir, f"{img_id}.jpg")

        try:
            img = Image.open(img_path).convert('RGB')
        except:
            img = Image.new('RGB', (224, 224), (0, 0, 0))

        if self.transform:
            img = self.transform(img)

        return img, lbl, img_id


In [None]:
def get_dataloaders_dinning(img_dir, batch_size=32, num_workers=0):
    transform = transforms.Compose([
        transforms.Resize((224,224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
    ])

    train_dt = DinningTableDataset('/content/VOC2008/VOCdevkit/VOC2008/ImageSets/Main/diningtable_train.txt', img_dir, transform)
    val_dt   = DinningTableDataset('/content/VOC2008/VOCdevkit/VOC2008/ImageSets/Main/diningtable_val.txt',   img_dir, transform)

    train_loader_dt = DataLoader(train_dt, batch_size=batch_size, shuffle=True, num_workers=num_workers)
    val_loader_dt   = DataLoader(val_dt,   batch_size=batch_size, shuffle=False, num_workers=num_workers)
    return train_loader_dt, val_loader_dt

In [None]:
def train_model_dt(model_name, model_creation_fn):
    print(f"Training {model_name}...")

    train_loader_dt, val_loader_dt = get_dataloaders_dinning(IMG_DIR, BATCH_SIZE, NUM_WORKERS)

    model = model_creation_fn()
    model = model.to(DEVICE)

    criterion = nn.CrossEntropyLoss()

    params = [
        {'params': [p for n, p in model.named_parameters() if 'head' not in n and 'fc' not in n], 'lr': LEARNING_RATE/10},
        {'params': [p for n, p in model.named_parameters() if 'head' in n or 'fc' in n], 'lr': LEARNING_RATE}
    ]

    optimizer = optim.AdamW(params, weight_decay=0.05)

    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.1, patience=2, verbose=True)

    best_map = 0.0
    best_model_path = f"best_{model_name}_model.pth"

    for epoch in range(NUM_EPOCHS):
        train_loss = train_one_epoch(model, train_loader_dt, criterion, optimizer, DEVICE)

        val_map, top10 = evaluate_map(model, val_loader_dt, DEVICE)

        scheduler.step(val_map)

        print(f"Epoch {epoch+1}/{NUM_EPOCHS}, Train Loss: {train_loss:.4f}, Validation MAP: {val_map:.4f}")

        if val_map > best_map:
            best_map = val_map
            torch.save(model.state_dict(), best_model_path)
            print(f"Saved new best model with MAP: {best_map:.4f}")

    model.load_state_dict(torch.load(best_model_path))

    final_val_map, top10 = evaluate_map(model, val_loader_dt, DEVICE)
    print(f"Final Validation MAP: {final_val_map:.4f}")

    print(f"Top 10 detected Dinning Table for {model_name}:")
    for i, (score, img_id) in enumerate(top10):
        print(f"{i+1}. Image ID: {img_id}, Confidence: {score:.4f}")

    visualize_top10_dt(top10, IMG_DIR, model_name)

    return final_val_map

In [None]:
def visualize_top10_dt(top10, img_dir, model_name):
    plt.figure(figsize=(15, 8))
    for i, (score, img_id) in enumerate(top10):
        img_path = os.path.join(img_dir, f"{img_id}.jpg")
        try:
            img = Image.open(img_path).convert('RGB')
            plt.subplot(2, 5, i+1)
            plt.imshow(img)
            plt.title(f"Score: {score:.4f}")
            plt.axis('off')
        except Exception as e:
            print(f"Error loading image {img_id}: {e}")

    plt.tight_layout()
    plt.savefig(f'{model_name}_top10_dinning.png')
    plt.show()

In [None]:
from google.colab import files
uploaded = files.upload()  # Upload the ZIP file

In [None]:
import zipfile
import io

for fn in uploaded.keys():
    print(f'Unzipping file: {fn}')
    with zipfile.ZipFile(io.BytesIO(uploaded[fn]), 'r') as zip_ref:
        zip_ref.extractall('/content/images')  # Extract files to this folder

print("Files extracted to /content/images")


### Random Images

In [None]:
import glob

test_img_paths = glob.glob('/content/images/Images/*.jpg')  # Note the extra 'Images' folder
print(f"Found {len(test_img_paths)} test images")


In [None]:
import torch
import torch.nn as nn
from torchvision import models, transforms
import numpy as np
from torch.utils.data import DataLoader
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, average_precision_score
from tqdm import tqdm
from PIL import Image

# ====== SETTINGS ======
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
IMG_DIR = '/content/VOC2008/VOCdevkit/VOC2008/JPEGImages'

# --- Dining Table Dataset Class ---
class DinningTableDataset(torch.utils.data.Dataset):
    def __init__(self, split_file, img_dir, transform=None):
        self.img_dir = img_dir
        self.transform = transform
        self.samples = []
        with open(split_file, 'r') as f:
            for line in f:
                img_id, lbl = line.strip().split()
                lbl = int(lbl)
                mapped_label = 0 if lbl == -1 else 1
                self.samples.append((img_id, mapped_label))
    def __len__(self):
        return len(self.samples)
    def __getitem__(self, idx):
        img_id, lbl = self.samples[idx]
        img_path = f"{self.img_dir}/{img_id}.jpg"
        try:
            img = Image.open(img_path).convert('RGB')
        except:
            img = Image.new('RGB', (224,224), (0,0,0))
        if self.transform:
            img = self.transform(img)
        return img, lbl, img_id

# --- Transforms ---
transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
])

val_ds = DinningTableDataset('/content/VOC2008/VOCdevkit/VOC2008/ImageSets/Main/diningtable_val.txt', IMG_DIR, transform)
val_loader = DataLoader(val_ds, batch_size=32, shuffle=False, num_workers=2)

# ====== LOAD MODELS ======
def create_vit_model(num_classes=2):
    model = models.vit_b_16(weights=None)
    model.heads.head = nn.Linear(model.heads.head.in_features, num_classes)
    return model

def create_swin_model(num_classes=2):
    model = models.swin_b(weights=None)
    model.head = nn.Linear(model.head.in_features, num_classes)
    return model

vit_path = '/content/VOC2008/best_vit_dt_model.pth'
swin_path = '/content/VOC2008/best_swin_dt_model.pth'

vit = create_vit_model().to(DEVICE)
swin = create_swin_model().to(DEVICE)

vit.load_state_dict(torch.load(vit_path, map_location=DEVICE))
swin.load_state_dict(torch.load(swin_path, map_location=DEVICE))

vit.eval(); swin.eval()

# ====== FEATURE EXTRACTORS ======
def get_feature_extractor(model, arch):
    def extractor(x):
        with torch.no_grad():
            if arch == 'vit':
                x = model._process_input(x)
                n = x.shape[0]
                batch_class_token = model.class_token.expand(n, -1, -1)
                x = torch.cat([batch_class_token, x], dim=1)
                x = model.encoder(x)
                feats = x[:, 0]
            elif arch == 'swin':
                x = model.features(x)
                x = model.norm(x)
                feats = x.mean([-2,-1])
            else:
                raise ValueError(f"Unsupported architecture for feature extraction: {arch}")
        return feats
    return extractor

models_dict = {'vit': vit, 'swin': swin}
extractors_dict = {
    'vit': get_feature_extractor(vit, 'vit'),
    'swin': get_feature_extractor(swin, 'swin')
}

# ====== GET PROBS & FEATURES ======
def get_outputs_and_features(models_dict, extractors_dict, loader, device):
    all_probs = {k: [] for k in models_dict}
    all_feats = {k: [] for k in models_dict}
    labels = []
    img_ids = []
    with torch.no_grad():
        for imgs, lbls, ids in tqdm(loader, desc="Fusion Extraction"):
            imgs = imgs.to(device)
            labels.extend(lbls.numpy())
            img_ids.extend(ids)
            for name in models_dict:
                model = models_dict[name]
                extractor = extractors_dict[name]
                feats = extractor(imgs)
                feats = feats.flatten(1)
                all_feats[name].append(feats.cpu().numpy())
                logits = model(imgs)
                prob = torch.softmax(logits, dim=1)[:,1].cpu().numpy()
                all_probs[name].append(prob)
    feats_stacked = {k: np.vstack(all_feats[k]) for k in all_feats}
    probs_stacked = {k: np.hstack(all_probs[k]) for k in all_probs}
    return probs_stacked, feats_stacked, np.array(labels), img_ids

# ====== EXTRACT FEATURES & PROBS ======
dt_probs, dt_feats, dt_labels, _ = get_outputs_and_features(models_dict, extractors_dict, val_loader, DEVICE)

# ====== LATE FUSION (Dining Table) ======
late_probs = np.mean(np.vstack(list(dt_probs.values())), axis=0)
late_preds = (late_probs >= 0.5).astype(int)
print("\nLate Fusion (Average):")
print("Accuracy:", accuracy_score(dt_labels, late_preds))
print("mAP:", average_precision_score(dt_labels, late_probs))

indiv_preds = [ (p >= 0.5).astype(int) for p in dt_probs.values() ]
num_models = len(models_dict)
majority_threshold = (num_models // 2) + (num_models % 2 > 0)
maj_vote = (np.sum(np.vstack(indiv_preds), axis=0) >= majority_threshold).astype(int)
print("\nLate Fusion (Majority Vote):")
print("Accuracy:", accuracy_score(dt_labels, maj_vote))
print("mAP:", average_precision_score(dt_labels, late_probs))

# ====== EARLY FUSION (Dining Table) ======
from sklearn.linear_model import LogisticRegression

X = np.concatenate([dt_feats[k] for k in ['vit','swin']], axis=1)
y = dt_labels

scaler = StandardScaler().fit(X)
X_scaled = scaler.transform(X)
clf = LogisticRegression(max_iter=1000).fit(X_scaled, y)
early_probs = clf.predict_proba(X_scaled)[:,1]
early_preds = (early_probs >= 0.5).astype(int)

print("\nEarly Fusion (LogReg):")
print("Accuracy:", accuracy_score(y, early_preds))
print("mAP:", average_precision_score(y, early_probs))


In [None]:
map_vit_dt = train_model_dt("vit_dt", create_vit_model)

In [None]:
map_swin_dt = train_model_dt("swin_dt", create_swin_model)

### Early and Late Fusion on DOG

In [None]:
import torch
import torch.nn as nn
from torchvision import models, transforms
import numpy as np
from torch.utils.data import DataLoader
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, average_precision_score
from tqdm import tqdm
from PIL import Image

# ====== SETTINGS ======
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
IMG_DIR = '/content/VOC2008/VOCdevkit/VOC2008/JPEGImages'

# --- Dog Dataset Class ---
class DogDataset(torch.utils.data.Dataset):
    def __init__(self, split_file, img_dir, transform=None):
        self.img_dir = img_dir
        self.transform = transform
        self.samples = []
        with open(split_file, 'r') as f:
            for line in f:
                img_id, lbl = line.strip().split()
                lbl = int(lbl)
                mapped_label = 0 if lbl == -1 else 1
                self.samples.append((img_id, mapped_label))
    def __len__(self):
        return len(self.samples)
    def __getitem__(self, idx):
        img_id, lbl = self.samples[idx]
        img_path = f"{self.img_dir}/{img_id}.jpg"
        try:
            img = Image.open(img_path).convert('RGB')
        except:
            img = Image.new('RGB', (224,224), (0,0,0))
        if self.transform:
            img = self.transform(img)
        return img, lbl, img_id

# --- Transforms ---
transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
])

val_ds = DogDataset('/content/VOC2008/VOCdevkit/VOC2008/ImageSets/Main/dog_val.txt', IMG_DIR, transform)
val_loader = DataLoader(val_ds, batch_size=32, shuffle=False, num_workers=2)

# ====== LOAD MODELS ======
def create_vit_model(num_classes=2):
    model = models.vit_b_16(weights=None)
    model.heads.head = nn.Linear(model.heads.head.in_features, num_classes)
    return model

def create_swin_model(num_classes=2):
    model = models.swin_b(weights=None)
    model.head = nn.Linear(model.head.in_features, num_classes)
    return model

def create_convnext_model(num_classes=2):
    model = models.convnext_base(weights=None)
    # Access the original in_features before replacing the layer
    # Assuming the structure is features -> avgpool -> classifier (norm -> flatten -> linear)
    original_in_features = model.classifier[2].in_features
    model.classifier[2] = nn.Linear(original_in_features, num_classes)
    return model

vit_path = '/content/VOC2008/best_vit_model.pth'
swin_path = '/content/VOC2008/best_swin_model.pth'
convnext_path = '/content/VOC2008/best_convnext_model.pth'

vit = create_vit_model().to(DEVICE)
swin = create_swin_model().to(DEVICE)
convnext = create_convnext_model().to(DEVICE)

vit.load_state_dict(torch.load(vit_path, map_location=DEVICE))
swin.load_state_dict(torch.load(swin_path, map_location=DEVICE))
convnext.load_state_dict(torch.load(convnext_path, map_location=DEVICE))

vit.eval(); swin.eval(); convnext.eval()

# ====== FEATURE EXTRACTORS ======
def get_feature_extractor(model, arch):
    """
    Returns a function that extracts features before the classification head
    for different model architectures from torchvision.
    """
    def extractor(x):
        with torch.no_grad():
            if arch == 'vit':
                # For VisionTransformer, features before the head are the output
                # from the encoder (usually the classification token).
                # Need to call the forward pass and access the output before 'heads'.
                # This is a simplified approach; a forward hook might be more robust
                # if the internal structure changes.
                # Let's manually pass through necessary parts of the ViT forward pass
                # excluding the final head.
                x = model._process_input(x)
                n = x.shape[0]
                # Expand the class token to the full batch
                batch_class_token = model.class_token.expand(n, -1, -1)
                x = torch.cat([batch_class_token, x], dim=1)
                x = model.encoder(x)
                # Features are typically the output for the class token (index 0)
                feats = x[:, 0]
            elif arch == 'swin':
                # For SwinTransformer, features are obtained by passing through
                # the features and norm layers, then global pooling.
                x = model.features(x)
                x = model.norm(x)
                # Apply global average pooling manually
                feats = x.mean([-2,-1]) # Mean across height and width dimensions
            elif arch == 'convnext':
                 # For ConvNeXt, pass through features and avgpool, then flatten
                 # The classifier is usually a Sequential with Norm, Flatten, and Linear
                 x = model.features(x)
                 # Access avgpool directly from the model
                 if hasattr(model, 'avgpool'):
                     x = model.avgpool(x)
                 # Features before the final linear layer are after the avgpool and norm/flatten
                 # The classifier in torchvision ConvNeXt is Sequential(Norm, Flatten, Linear)
                 # We want the output before the final Linear layer (index 2)
                 feats = model.classifier[:2](x) # Pass through Norm and Flatten
            else:
                raise ValueError(f"Unsupported architecture for feature extraction: {arch}")
        return feats
    return extractor


models_dict = {'vit': vit, 'swin': swin, 'convnext': convnext}
extractors_dict = {
    'vit': get_feature_extractor(vit, 'vit'),
    'swin': get_feature_extractor(swin, 'swin'),
    'convnext': get_feature_extractor(convnext, 'convnext')
}

# ====== GET PROBS & FEATURES ======
def get_outputs_and_features(models_dict, extractors_dict, loader, device):
    all_probs = {k: [] for k in models_dict}
    all_feats = {k: [] for k in models_dict}
    labels = []
    img_ids = []
    with torch.no_grad():
        for imgs, lbls, ids in tqdm(loader, desc="Fusion Extraction"):
            imgs = imgs.to(device)
            labels.extend(lbls.numpy())
            img_ids.extend(ids)
            for name in models_dict:
                model = models_dict[name]
                # Use the specific extractor for this architecture
                extractor = extractors_dict[name]
                feats = extractor(imgs)
                # Ensure features are flat for concatenation later
                feats = feats.flatten(1) # Flatten from batch_size x num_features
                all_feats[name].append(feats.cpu().numpy())
                logits = model(imgs) # Get logits for probability calculation
                prob = torch.softmax(logits, dim=1)[:,1].cpu().numpy()
                all_probs[name].append(prob)
    feats_stacked = {k: np.vstack(all_feats[k]) for k in all_feats}
    probs_stacked = {k: np.hstack(all_probs[k]) for k in all_probs}
    return probs_stacked, feats_stacked, np.array(labels), img_ids

# ====== EXTRACT FEATURES & PROBS ======
dog_probs, dog_feats, dog_labels, _ = get_outputs_and_features(models_dict, extractors_dict, val_loader, DEVICE)

# ====== LATE FUSION (Dog) ======
# 1. Average Softmax
late_probs = np.mean(np.vstack(list(dog_probs.values())), axis=0)
late_preds = (late_probs >= 0.5).astype(int)
print("\nLate Fusion (Average):")
print("Accuracy:", accuracy_score(dog_labels, late_preds))
print("mAP:", average_precision_score(dog_labels, late_probs))

# 2. Majority Vote
indiv_preds = [ (p >= 0.5).astype(int) for p in dog_probs.values() ]
# Ensure correct majority vote logic - at least ceil(num_models / 2)
num_models = len(models_dict)
majority_threshold = (num_models // 2) + (num_models % 2 > 0) # Simple ceil(n/2)
maj_vote = (np.sum(np.vstack(indiv_preds), axis=0) >= majority_threshold).astype(int)
print("\nLate Fusion (Majority Vote):")
print("Accuracy:", accuracy_score(dog_labels, maj_vote))
# Note: mAP for majority vote is usually calculated on the predicted class
# (0 or 1), not probabilities. Using average probs mAP for consistency.
print("mAP:", average_precision_score(dog_labels, late_probs)) # Using average probs for mAP

# ====== EARLY FUSION (Dog) ======
from sklearn.linear_model import LogisticRegression

X = np.concatenate([dog_feats[k] for k in ['vit','swin','convnext']], axis=1)
y = dog_labels

scaler = StandardScaler().fit(X)
X_scaled = scaler.transform(X)
clf = LogisticRegression(max_iter=1000).fit(X_scaled, y)
early_probs = clf.predict_proba(X_scaled)[:,1]
early_preds = (early_probs >= 0.5).astype(int)

print("\nEarly Fusion (LogReg):")
print("Accuracy:", accuracy_score(y, early_preds))
print("mAP:", average_precision_score(y, early_probs))

### Pedicting Dog Images

In [None]:
import os
from PIL import Image
import numpy as np
from torchvision import transforms
import torch
import shutil
from google.colab import files
from IPython.display import display
import matplotlib.pyplot as plt

# Update these paths and variables as per your setup
TEST_FOLDER = '/content/images/Images'

def predict_early_fusion(img_path, extractors_dict, scaler, clf, device):
    img = Image.open(img_path).convert('RGB')
    img_tensor = test_transform(img).unsqueeze(0).to(device)
    feats_concat = []
    for name, extractor in extractors_dict.items():
        feats = extractor(img_tensor)
        feats = feats.flatten(1).cpu().numpy()
        feats_concat.append(feats)
    feats_stack = np.concatenate(feats_concat, axis=1)
    feats_stack = scaler.transform(feats_stack)
    prob = clf.predict_proba(feats_stack)[:,1][0]  # Probability of 'dog' class
    return prob

# Collect test images
test_img_paths = [os.path.join(TEST_FOLDER, f) for f in os.listdir(TEST_FOLDER)
                  if f.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp', '.webp'))]

# Predict dog probabilities
results = []
for img_path in test_img_paths:
    prob = predict_early_fusion(img_path, extractors_dict, scaler, clf, DEVICE)
    results.append((img_path, prob))

# Sort descending
results_sorted = sorted(results, key=lambda x: x[1], reverse=True)

print("\n=== Top 5 Images Predicted as 'Dog' (Early Fusion) ===\n")
for rank, (img_path, prob) in enumerate(results_sorted[:5], start=1):
    print(f"Rank {rank}: {os.path.basename(img_path)} | Probability: {prob:.4f}")

# Plot top 5 images with scores in a grid
top_results = results_sorted[:5]
fig, axes = plt.subplots(1, 5, figsize=(20,5))
for ax, (img_path, score) in zip(axes, top_results):
    img = Image.open(img_path)
    ax.imshow(img)
    ax.set_title(f"Score: {score:.4f}")
    ax.axis('off')
plt.tight_layout()
plt.show()

# Save top 5 images to folder and zip for download
output_dir = '/content/top5_dog_images'
os.makedirs(output_dir, exist_ok=True)

for rank, (img_path, prob) in enumerate(results_sorted[:5], start=1):
    dest_path = os.path.join(output_dir, f'rank{rank}_{os.path.basename(img_path)}')
    shutil.copy(img_path, dest_path)

shutil.make_archive('/content/top5_dog_images_zip', 'zip', output_dir)
files.download('/content/top5_dog_images_zip.zip')


### Early and Late Fusion on Dining Table

In [None]:
import torch
import torch.nn as nn
from torchvision import models, transforms
import numpy as np
from torch.utils.data import DataLoader
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, average_precision_score
from tqdm import tqdm
from PIL import Image

# ====== SETTINGS ======
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
IMG_DIR = '/content/VOC2008/VOCdevkit/VOC2008/JPEGImages'

# --- Dining Table Dataset Class ---
class DinningTableDataset(torch.utils.data.Dataset):
    def __init__(self, split_file, img_dir, transform=None):
        self.img_dir = img_dir
        self.transform = transform
        self.samples = []
        with open(split_file, 'r') as f:
            for line in f:
                img_id, lbl = line.strip().split()
                lbl = int(lbl)
                mapped_label = 0 if lbl == -1 else 1
                self.samples.append((img_id, mapped_label))
    def __len__(self):
        return len(self.samples)
    def __getitem__(self, idx):
        img_id, lbl = self.samples[idx]
        img_path = f"{self.img_dir}/{img_id}.jpg"
        try:
            img = Image.open(img_path).convert('RGB')
        except:
            img = Image.new('RGB', (224,224), (0,0,0))
        if self.transform:
            img = self.transform(img)
        return img, lbl, img_id

# --- Transforms ---
transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
])

val_ds = DinningTableDataset('/content/VOC2008/VOCdevkit/VOC2008/ImageSets/Main/diningtable_val.txt', IMG_DIR, transform)
val_loader = DataLoader(val_ds, batch_size=32, shuffle=False, num_workers=2)

# ====== LOAD MODELS ======
def create_vit_model(num_classes=2):
    model = models.vit_b_16(weights=None)
    model.heads.head = nn.Linear(model.heads.head.in_features, num_classes)
    return model

def create_swin_model(num_classes=2):
    model = models.swin_b(weights=None)
    model.head = nn.Linear(model.head.in_features, num_classes)
    return model

vit_path = '/content/VOC2008/best_vit_dt_model.pth'
swin_path = '/content/VOC2008/best_swin_dt_model.pth'

vit = create_vit_model().to(DEVICE)
swin = create_swin_model().to(DEVICE)

vit.load_state_dict(torch.load(vit_path, map_location=DEVICE))
swin.load_state_dict(torch.load(swin_path, map_location=DEVICE))

vit.eval(); swin.eval()

# ====== FEATURE EXTRACTORS ======
def get_feature_extractor(model, arch):
    def extractor(x):
        with torch.no_grad():
            if arch == 'vit':
                x = model._process_input(x)
                n = x.shape[0]
                batch_class_token = model.class_token.expand(n, -1, -1)
                x = torch.cat([batch_class_token, x], dim=1)
                x = model.encoder(x)
                feats = x[:, 0]
            elif arch == 'swin':
                x = model.features(x)
                x = model.norm(x)
                feats = x.mean([-2,-1])
            else:
                raise ValueError(f"Unsupported architecture for feature extraction: {arch}")
        return feats
    return extractor

models_dict = {'vit': vit, 'swin': swin}
extractors_dict = {
    'vit': get_feature_extractor(vit, 'vit'),
    'swin': get_feature_extractor(swin, 'swin')
}

# ====== GET PROBS & FEATURES ======
def get_outputs_and_features(models_dict, extractors_dict, loader, device):
    all_probs = {k: [] for k in models_dict}
    all_feats = {k: [] for k in models_dict}
    labels = []
    img_ids = []
    with torch.no_grad():
        for imgs, lbls, ids in tqdm(loader, desc="Fusion Extraction"):
            imgs = imgs.to(device)
            labels.extend(lbls.numpy())
            img_ids.extend(ids)
            for name in models_dict:
                model = models_dict[name]
                extractor = extractors_dict[name]
                feats = extractor(imgs)
                feats = feats.flatten(1)
                all_feats[name].append(feats.cpu().numpy())
                logits = model(imgs)
                prob = torch.softmax(logits, dim=1)[:,1].cpu().numpy()
                all_probs[name].append(prob)
    feats_stacked = {k: np.vstack(all_feats[k]) for k in all_feats}
    probs_stacked = {k: np.hstack(all_probs[k]) for k in all_probs}
    return probs_stacked, feats_stacked, np.array(labels), img_ids

# ====== EXTRACT FEATURES & PROBS ======
dt_probs, dt_feats, dt_labels, _ = get_outputs_and_features(models_dict, extractors_dict, val_loader, DEVICE)

# ====== LATE FUSION (Dining Table) ======
late_probs = np.mean(np.vstack(list(dt_probs.values())), axis=0)
late_preds = (late_probs >= 0.5).astype(int)
print("\nLate Fusion (Average):")
print("Accuracy:", accuracy_score(dt_labels, late_preds))
print("mAP:", average_precision_score(dt_labels, late_probs))

indiv_preds = [ (p >= 0.5).astype(int) for p in dt_probs.values() ]
num_models = len(models_dict)
majority_threshold = (num_models // 2) + (num_models % 2 > 0)
maj_vote = (np.sum(np.vstack(indiv_preds), axis=0) >= majority_threshold).astype(int)
print("\nLate Fusion (Majority Vote):")
print("Accuracy:", accuracy_score(dt_labels, maj_vote))
print("mAP:", average_precision_score(dt_labels, late_probs))

# ====== EARLY FUSION (Dining Table) ======
from sklearn.linear_model import LogisticRegression

X = np.concatenate([dt_feats[k] for k in ['vit','swin']], axis=1)
y = dt_labels

scaler = StandardScaler().fit(X)
X_scaled = scaler.transform(X)
clf = LogisticRegression(max_iter=1000).fit(X_scaled, y)
early_probs = clf.predict_proba(X_scaled)[:,1]
early_preds = (early_probs >= 0.5).astype(int)

print("\nEarly Fusion (LogReg):")
print("Accuracy:", accuracy_score(y, early_preds))
print("mAP:", average_precision_score(y, early_probs))


### Pedicting Dining Table Images

In [None]:
import os
from PIL import Image
import numpy as np
from torchvision import transforms
import torch
import shutil
from google.colab import files
import matplotlib.pyplot as plt

# Assuming these are already defined and loaded somewhere in your notebook:
# DEVICE, models_dict, extractors_dict, scaler, clf

test_transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
])

TEST_FOLDER = '/content/images/Images'

# Collect all image paths
test_img_paths = []
for fname in os.listdir(TEST_FOLDER):
    if fname.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp', '.webp')):
        test_img_paths.append(os.path.join(TEST_FOLDER, fname))

def predict_early_fusion(img_path, models_dict, extractors_dict, scaler, clf):
    img = Image.open(img_path).convert('RGB')
    img_tensor = test_transform(img).unsqueeze(0).to(DEVICE)
    feats_concat = []
    for name in models_dict:
        extractor = extractors_dict[name]
        feats = extractor(img_tensor)
        feats = feats.flatten(1).cpu().numpy()
        feats_concat.append(feats)
    feats_stack = np.concatenate(feats_concat, axis=1)
    feats_stack = scaler.transform(feats_stack)
    early_prob = clf.predict_proba(feats_stack)[:,1][0]
    return early_prob

# Get prediction probabilities for all test images
results = []
for img_path in test_img_paths:
    early_prob = predict_early_fusion(img_path, models_dict, extractors_dict, scaler, clf)
    results.append((img_path, early_prob))

# Sort by probability descending
results_sorted = sorted(results, key=lambda x: x[1], reverse=True)

print("\n=== Top 5 Images Predicted as 'Dining Table' (Early Fusion) ===\n")
for i, (img_path, early_prob) in enumerate(results_sorted[:5]):
    img_name = os.path.basename(img_path)
    print(f"Rank {i+1}: {img_name} | Early Fusion Probability: {early_prob:.3f}")

# Plot top 5 images with scores in a grid
top_results = results_sorted[:5]
fig, axes = plt.subplots(1, 5, figsize=(20,5))
for ax, (img_path, score) in zip(axes, top_results):
    img = Image.open(img_path)
    ax.imshow(img)
    ax.set_title(f"Score: {score:.4f}")
    ax.axis('off')
plt.tight_layout()
plt.show()

# Save top 5 images to folder and zip for download
output_dir = '/content/top5_dining_table_images'
os.makedirs(output_dir, exist_ok=True)

for rank, (img_path, prob) in enumerate(results_sorted[:5], start=1):
    dest_path = os.path.join(output_dir, f'rank{rank}_{os.path.basename(img_path)}')
    shutil.copy(img_path, dest_path)

shutil.make_archive('/content/top5_dining_table_images_zip', 'zip', output_dir)
files.download('/content/top5_dining_table_images_zip.zip')
