In [None]:
# =============================================
# Task B: Face Recognition - Train & Save
# =============================================

# 1. Mount Drive and Import Libraries
from google.colab import drive
drive.mount('/content/drive')

import os
import torch
import torch.nn as nn
import torchvision
from torchvision import transforms, models
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import f1_score, accuracy_score
from sklearn.utils.class_weight import compute_class_weight
from PIL import Image
import numpy as np
from tqdm import tqdm
import random

# 2. Set Paths and Parameters
dataset_path = "/content/drive/MyDrive/Task_B"
train_path = os.path.join(dataset_path, "train")
val_path = os.path.join(dataset_path, "val")
BATCH_SIZE = 32
NUM_WORKERS = 4
IMG_SIZE = 224
EPOCHS = 10
SEED = 42
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

torch.manual_seed(SEED)

# 3. Custom Dataset Loader
class FaceDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.samples = []
        self.class_to_idx = {}
        self.idx_to_class = {}
        self.labels = []

        persons = sorted(os.listdir(root_dir))
        for idx, person in enumerate(persons):
            self.class_to_idx[person] = idx
            self.idx_to_class[idx] = person

            person_folder = os.path.join(root_dir, person)
            for root, _, files in os.walk(person_folder):
                for file in files:
                    if file.lower().endswith(('jpg', 'png', 'jpeg')):
                        self.samples.append((os.path.join(root, file), idx))
                        self.labels.append(idx)  # For computing class weights

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        path, label = self.samples[idx]
        image = Image.open(path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, label

# 4. Transforms
train_transforms = transforms.Compose([
    transforms.RandomResizedCrop(IMG_SIZE),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.4, contrast=0.4),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5]*3, std=[0.5]*3)
])

val_transforms = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5]*3, std=[0.5]*3)
])

# 5. Load Data
train_dataset = FaceDataset(train_path, transform=train_transforms)
val_dataset = FaceDataset(val_path, transform=val_transforms)
num_classes = len(train_dataset.class_to_idx)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS)

# 6. Compute Class Weights (for imbalance handling)
class_weights_np = compute_class_weight(class_weight='balanced',
                                        classes=np.unique(train_dataset.labels),
                                        y=np.array(train_dataset.labels))
class_weights = torch.tensor(class_weights_np, dtype=torch.float).to(DEVICE)

# 7. Build Model
model = models.resnet50(pretrained=True)
model.fc = nn.Linear(model.fc.in_features, num_classes)
model = model.to(DEVICE)

# 8. Loss, Optimizer, Scheduler
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

# 9. Train and Validate
def evaluate(model, loader):
    model.eval()
    preds, labels = [], []
    with torch.no_grad():
        for images, targets in loader:
            images, targets = images.to(DEVICE), targets.to(DEVICE)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            preds.extend(predicted.cpu().numpy())
            labels.extend(targets.cpu().numpy())
    acc = accuracy_score(labels, preds)
    f1 = f1_score(labels, preds, average='macro')
    return acc, f1

best_f1 = 0
for epoch in range(EPOCHS):
    model.train()
    running_loss = 0.0
    for images, targets in tqdm(train_loader):
        images, targets = images.to(DEVICE), targets.to(DEVICE)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    scheduler.step()
    acc, f1 = evaluate(model, val_loader)
    print(f"\nEpoch {epoch+1}/{EPOCHS}, Loss: {running_loss:.4f}, Val Acc: {acc:.4f}, Macro F1: {f1:.4f}")

    if f1 > best_f1:
        best_f1 = f1
        torch.save(model.state_dict(), "model_b.pth")

print("Training Completed. Best Macro-F1:", best_f1)
