In [2]:
import os
import glob
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from PIL import Image
from tqdm import tqdm

In [3]:
import os
import glob
import random
from PIL import Image
from torch.utils.data import Dataset

class TripletFaceDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        """
        root_dir/
          └── <person_id>/
               ├── img1.jpg
               ├── img2.jpg
               └── distortion/
                    ├── img1_blurred.jpg
                    ├── img1_foggy.jpg
                    ├── img2_blurred.jpg
                    └── …
        """
        self.root = root_dir
        self.transform = transform

        # Build a list of (anchor_path, [positives], label)
        self.samples = []
        for person in sorted(os.listdir(root_dir)):
            pdir = os.path.join(root_dir, person)
            if not os.path.isdir(pdir):
                continue

            # 1) Gather all “clean” images in the person folder
            clean_imgs = [
                os.path.join(pdir, f)
                for f in os.listdir(pdir)
                if f.lower().endswith(('.jpg','.jpeg','.png'))
                and os.path.isfile(os.path.join(pdir, f))
            ]

            # 2) Gather all distorted images
            dist_dir = os.path.join(pdir, 'distortion')
            if not os.path.isdir(dist_dir):
                continue
            distorted = glob.glob(os.path.join(dist_dir, '*.*'))

            # 3) For each clean image, find its distortions by filename prefix
            #    e.g. “Ciro_Gomes_0001.jpg” → distortions starting with “Ciro_Gomes_0001_”
            for anchor_path in clean_imgs:
                basename = os.path.splitext(os.path.basename(anchor_path))[0]
                positives = [
                    d for d in distorted
                    if os.path.basename(d).startswith(basename + '_')
                ]
                if positives:
                    self.samples.append((anchor_path, positives, person))

        # keep list of all person labels for negative sampling
        self.labels = sorted(set(label for _,_,label in self.samples))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        anchor_path, positives, label = self.samples[idx]
        positive_path = random.choice(positives)

        # Negative sampling: pick a *different* person
        neg_label = random.choice([l for l in self.labels if l != label])
        neg_idx = random.choice([
            i for i,(_,_,lab) in enumerate(self.samples) if lab == neg_label
        ])
        neg_anchor, neg_positives, _ = self.samples[neg_idx]
        negative_path = random.choice([neg_anchor] + neg_positives)

        def load_img(p):
            img = Image.open(p).convert('RGB')
            return self.transform(img) if self.transform else img

        return load_img(anchor_path), load_img(positive_path), load_img(negative_path)


In [4]:
class FaceEncoder(nn.Module):
    def __init__(self, embedding_dim=128):
        super().__init__()
        resnet = models.resnet18(pretrained=True)
        resnet.fc = nn.Identity()
        self.backbone = resnet
        self.embedding = nn.Linear(512, embedding_dim)

    def forward(self, x):
        x = self.backbone(x)
        x = self.embedding(x)
        x = F.normalize(x, p=2, dim=1)
        return x


In [5]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3)
])

train_dataset = TripletFaceDataset('/kaggle/input/comsys-hackathon/Task_B/train', transform=transform)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=2)


In [6]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = FaceEncoder().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

def triplet_loss(anchor, positive, negative, margin=1.0):
    pos_dist = F.pairwise_distance(anchor, positive)
    neg_dist = F.pairwise_distance(anchor, negative)
    return F.relu(pos_dist - neg_dist + margin).mean()

# Training loop
for epoch in range(10):
    model.train()
    total_loss = 0
    for anchor, positive, negative in tqdm(train_loader):
        anchor, positive, negative = anchor.to(device), positive.to(device), negative.to(device)

        anchor_emb = model(anchor)
        pos_emb = model(positive)
        neg_emb = model(negative)

        loss = triplet_loss(anchor_emb, pos_emb, neg_emb)
        total_loss += loss.item()

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch+1}, Loss: {total_loss/len(train_loader):.4f}")


Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth
100%|██████████| 44.7M/44.7M [00:00<00:00, 207MB/s]
100%|██████████| 121/121 [00:41<00:00,  2.93it/s]


Epoch 1, Loss: 0.2486


100%|██████████| 121/121 [00:38<00:00,  3.10it/s]


Epoch 2, Loss: 0.0746


100%|██████████| 121/121 [00:34<00:00,  3.49it/s]


Epoch 3, Loss: 0.0492


100%|██████████| 121/121 [00:37<00:00,  3.26it/s]


Epoch 4, Loss: 0.0444


100%|██████████| 121/121 [00:34<00:00,  3.46it/s]


Epoch 5, Loss: 0.0337


100%|██████████| 121/121 [00:35<00:00,  3.41it/s]


Epoch 6, Loss: 0.0300


100%|██████████| 121/121 [00:35<00:00,  3.45it/s]


Epoch 7, Loss: 0.0249


100%|██████████| 121/121 [00:35<00:00,  3.44it/s]


Epoch 8, Loss: 0.0196


100%|██████████| 121/121 [00:37<00:00,  3.23it/s]


Epoch 9, Loss: 0.0203


100%|██████████| 121/121 [00:35<00:00,  3.45it/s]

Epoch 10, Loss: 0.0163





In [7]:
def get_embeddings(model, dataset_dir):
    model.eval()
    embeddings = {}
    with torch.no_grad():
        for person in os.listdir(dataset_dir):
            p_dir = os.path.join(dataset_dir, person)
            img_path = os.path.join(p_dir, '001_frontal.jpg')
            if not os.path.exists(img_path): continue
            img = Image.open(img_path).convert('RGB')
            img = transform(img).unsqueeze(0).to(device)
            emb = model(img).cpu().squeeze(0)
            embeddings[person] = emb
    return embeddings


In [8]:
torch.save(model.state_dict(), 'triplet_face_model.pth')


In [9]:
import os, glob, torch, numpy as np
import torch.nn.functional as F
from PIL import Image


In [10]:
def build_gallery(model, val_root, transform, device):
    """
    Returns: dict person_id -> embedding tensor (D,)
    """
    model.eval()
    gallery = {}
    with torch.no_grad():
        for person in sorted(os.listdir(val_root)):
            pdir = os.path.join(val_root, person)
            # find any “clean” image at person/
            clean_imgs = [
                os.path.join(pdir, f) for f in os.listdir(pdir)
                if f.lower().endswith(('.jpg','png','jpeg'))
                   and not f.startswith('distortion')
            ]
            if not clean_imgs:
                continue
            # pick the first clean image (or average multiple if you prefer)
            img = Image.open(clean_imgs[0]).convert('RGB')
            x = transform(img).unsqueeze(0).to(device)
            emb = model(x).cpu().squeeze(0)  # (D,)
            gallery[person] = emb
    return gallery

In [11]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model.load_state_dict(torch.load('triplet_face_model.pth', map_location=device))
model.to(device)
gallery = build_gallery(model, '/kaggle/input/comsys-hackathon/Task_B/val', transform, device)


In [None]:
def evaluate(model, gallery, val_root, transform, device):
    model.eval()
    total, correct = 0, 0

    # for per‑distortion stats
    per_type = {}    # e.g. { 'blurred': [correct, total], … }

    with torch.no_grad():
        for person, emb_g in gallery.items():
            pdir = os.path.join(val_root, person, 'distortion')
            for dist_path in glob.glob(os.path.join(pdir, '*.*')):

                # 1) True label
                true_id = person

                # 2) Distortion type from filename
                #    e.g. “Ciro_Gomes_0001_blurred.jpg” → “blurred”
                dtype = os.path.splitext(dist_path)[0].split('_')[-1]
                per_type.setdefault(dtype, [0,0])

                # 3) Compute embedding
                img = Image.open(dist_path).convert('RGB')
                x = transform(img).unsqueeze(0).to(device)
                q_emb = model(x).cpu().squeeze(0)

                # 4) Cosine similarities vs gallery
                sims = {pid: F.cosine_similarity(q_emb, g_emb, dim=0).item()
                        for pid, g_emb in gallery.items()}

                # 5) Find best match
                pred = max(sims, key=sims.get)

                # 6) Tally
                total += 1
                per_type[dtype][1] += 1
                if pred == true_id:
                    correct += 1
                    per_type[dtype][0] += 1

    overall_acc = correct / total
    print(f"🟢 Overall Top‑1 Accuracy: {overall_acc*100:.2f}%  ({correct}/{total})")

    print("\n🔍 Accuracy by Distortion Type:")
    for dtype, (c,t) in per_type.items():
        print(f"  • {dtype:10s}: { (c/t*100) if t>0 else 0:6.2f}% ({c}/{t})")

    return overall_acc, per_type

# usage
evaluate(model, gallery, '/content/Comys_Hackathon5/Task_B/val', transform, device)


🟢 Overall Top‑1 Accuracy: 61.31%  (1811/2954)

🔍 Accuracy by Distortion Type:
  • resized   :  61.14% (258/422)
  • blurred   :  61.85% (261/422)
  • lowlight  :  61.37% (259/422)
  • sunny     :  60.90% (257/422)
  • noisy     :  61.14% (258/422)
  • foggy     :  61.37% (259/422)
  • rainy     :  61.37% (259/422)


(0.6130670277589709,
 {'resized': [258, 422],
  'blurred': [261, 422],
  'lowlight': [259, 422],
  'sunny': [257, 422],
  'noisy': [258, 422],
  'foggy': [259, 422],
  'rainy': [259, 422]})

In [12]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

class ArcMarginProduct(nn.Module):
    """
    Implements the ArcFace angular margin penalty.
    """
    def __init__(self, embedding_size, num_classes, s=30.0, m=0.50, easy_margin=False):
        super().__init__()
        self.num_classes = num_classes
        self.embedding_size = embedding_size
        self.s = s      # norm scale
        self.m = m      # angular margin
        self.weight = nn.Parameter(torch.FloatTensor(num_classes, embedding_size))
        nn.init.xavier_uniform_(self.weight)

        self.easy_margin = easy_margin
        self.cos_m = math.cos(m)
        self.sin_m = math.sin(m)
        self.th = math.cos(math.pi - m)
        self.mm = math.sin(math.pi - m) * m


    def forward(self, embeddings, labels):
        # Normalize features and weights
        cosine = F.linear(F.normalize(embeddings), F.normalize(self.weight))  # [B, C]
        sine = torch.sqrt(1.0 - cosine.pow(2))
        phi = cosine * self.cos_m - sine * self.sin_m   # cos(θ + m)

        if self.easy_margin:
            phi = torch.where(cosine > 0, phi, cosine)
        else:
            phi = torch.where(cosine > self.th, phi, cosine - self.mm)

        # convert labels to one-hot
        one_hot = F.one_hot(labels, num_classes=self.num_classes).float().to(embeddings.device)

        # margin apply: where one_hot==1 use phi, else use cosine
        logits = (one_hot * phi) + ((1.0 - one_hot) * cosine)
        logits *= self.s

        return logits


In [13]:
class FaceModelWithArcFace(nn.Module):
    def __init__(self, num_classes, embedding_size=128):
        super().__init__()
        # backbone
        resnet = models.resnet18(pretrained=True)
        resnet.fc = nn.Identity()
        self.backbone = resnet

        # embedding layer
        self.embedding = nn.Linear(512, embedding_size)

        # ArcFace head
        self.arc_margin = ArcMarginProduct(
            embedding_size, num_classes, s=30.0, m=0.50, easy_margin=False
        )

    def forward(self, x, labels):
        x = self.backbone(x)                 # [B,512]
        emb = F.normalize(self.embedding(x)) # [B,128]
        logits = self.arc_margin(emb, labels)
        return logits, emb


In [14]:
import os
from torch.utils.data import Dataset
from PIL import Image

class FaceClassificationDataset(Dataset):
    """
    Scans:
      root_dir/
        ├── personA/
        │     ├── img1.jpg
        │     ├── img2.jpg
        │     └── distortion/
        │           ├── img1_blurred.jpg
        │           └── …
        └── personB/
              └── …
    And returns (image_tensor, label_idx).
    """
    def __init__(self, root_dir, transform=None):
        self.transform = transform

        # build mapping person_id -> class_idx
        self.persons = sorted(d for d in os.listdir(root_dir)
                              if os.path.isdir(os.path.join(root_dir, d)))
        self.label2idx = {p:i for i,p in enumerate(self.persons)}

        # gather (image_path, label_idx) for both clean + distorted
        self.samples = []
        for person in self.persons:
            pdir = os.path.join(root_dir, person)

            # 1) clean images in pdir
            for fn in os.listdir(pdir):
                if fn.lower().endswith(('.jpg','.jpeg','.png')):
                    self.samples.append((
                        os.path.join(pdir, fn),
                        self.label2idx[person]
                    ))

            # 2) distortions
            dist_dir = os.path.join(pdir, 'distortion')
            if os.path.isdir(dist_dir):
                for fn in os.listdir(dist_dir):
                    if fn.lower().endswith(('.jpg','.jpeg','.png')):
                        self.samples.append((
                            os.path.join(dist_dir, fn),
                            self.label2idx[person]
                        ))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        path, label = self.samples[idx]
        img = Image.open(path).convert('RGB')
        if self.transform:
            img = self.transform(img)
        return img, label


In [15]:
from torchvision import transforms
from torch.utils.data import DataLoader

transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3),
])

train_cls_ds = FaceClassificationDataset('/kaggle/input/comsys-hackathon/Task_B/train', transform)
train_loader = DataLoader(
    train_cls_ds,
    batch_size=64,
    shuffle=True,
    num_workers=4,
    pin_memory=True
)


In [16]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = FaceModelWithArcFace(
    num_classes=len(train_cls_ds.persons),
    embedding_size=128
).to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
criterion = torch.nn.CrossEntropyLoss()

for epoch in range(1, 11):
    model.train()
    total_loss = 0
    for imgs, labels in train_loader:
        imgs, labels = imgs.to(device), labels.to(device)

        logits, embeddings = model(imgs, labels)
        loss = criterion(logits, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"Epoch {epoch:02d} — Loss: {total_loss/len(train_loader):.4f}")


Epoch 01 — Loss: 16.0488
Epoch 02 — Loss: 4.2328
Epoch 03 — Loss: 0.4665
Epoch 04 — Loss: 0.0556
Epoch 05 — Loss: 0.0137
Epoch 06 — Loss: 0.0098
Epoch 07 — Loss: 0.0073
Epoch 08 — Loss: 0.0056
Epoch 09 — Loss: 0.0045
Epoch 10 — Loss: 0.0038


In [17]:
torch.save(model.state_dict(), 'arcface_model.pth')

In [18]:
# instantiate the ArcFace model architecture
model = FaceModelWithArcFace(num_classes=877, embedding_size=128).to(device)
# now load the correct weights
model.load_state_dict(torch.load('arcface_model.pth', map_location=device))


<All keys matched successfully>

In [19]:
import os
import glob
import torch
import torch.nn.functional as F
from PIL import Image
from torchvision import transforms
# from your_model_file import FaceModelWithArcFace  # adjust import to where you defined the class

# ——————————————————————————————————————————
# 1) Config & Device
# ——————————————————————————————————————————
device    = 'cuda' if torch.cuda.is_available() else 'cpu'
val_root  = '/kaggle/input/comsys-hackathon/Task_B/val'         # path to your validation folder
model_path = 'arcface_model.pth'          # your ArcFace‐trained weights

# ——————————————————————————————————————————
# 2) Data Transform (must match training)
# ——————————————————————————————————————————
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.5, 0.5, 0.5],
                         [0.5, 0.5, 0.5]),
])

# ——————————————————————————————————————————
# 3) Load ArcFace Model
# ——————————————————————————————————————————
num_classes    = 877   # update if different
embedding_size = 128   # same as during training

model = FaceModelWithArcFace(num_classes, embedding_size).to(device)
model.load_state_dict(torch.load(model_path, map_location=device))
model.eval()

# ——————————————————————————————————————————
# 4) Build Validation Gallery
# ——————————————————————————————————————————
def build_gallery(model, val_root, transform, device):
    """
    Returns: dict person_id -> L2‐normalized averaged embedding (tensor, shape [D])
    """
    gallery = {}
    with torch.no_grad():
        for person in sorted(os.listdir(val_root)):
            pdir = os.path.join(val_root, person)
            if not os.path.isdir(pdir):
                continue

            # collect all “clean” images (ignore subfolders)
            clean_imgs = [
                os.path.join(pdir, f)
                for f in os.listdir(pdir)
                if f.lower().endswith(('.jpg', '.jpeg', '.png'))
                and os.path.isfile(os.path.join(pdir, f))
            ]
            if not clean_imgs:
                continue

            embs = []
            for img_path in clean_imgs:
                img = Image.open(img_path).convert('RGB')
                x = transform(img).unsqueeze(0).to(device)
                # forward with dummy labels for embedding
                _, emb = model(x, torch.zeros(1, dtype=torch.long, device=device))
                embs.append(emb.cpu())

            # average and re‐normalize
            emb_avg = torch.stack(embs, dim=0).mean(dim=0)
            gallery[person] = F.normalize(emb_avg, p=2, dim=1).squeeze(0)

    return gallery

# ——————————————————————————————————————————
# 5) Evaluate on Distortions
# ——————————————————————————————————————————
def evaluate(model, gallery, val_root, transform, device):
    total = correct = 0
    per_type = {}

    with torch.no_grad():
        for person, g_emb in gallery.items():
            dist_dir = os.path.join(val_root, person, 'distortion')
            if not os.path.isdir(dist_dir):
                continue

            for dist_path in glob.glob(os.path.join(dist_dir, '*.*')):
                # true ID
                true_id = person
                # extract distortion type from filename suffix
                dtype = os.path.splitext(os.path.basename(dist_path))[0].split('_')[-1]
                per_type.setdefault(dtype, [0, 0])

                # compute query embedding
                img = Image.open(dist_path).convert('RGB')
                x = transform(img).unsqueeze(0).to(device)
                _, q_emb = model(x, torch.zeros(1, dtype=torch.long, device=device))
                q_emb = q_emb.cpu()

                # cosine similarity vs all gallery embeddings
                sims = {pid: F.cosine_similarity(q_emb, emb.unsqueeze(0), dim=1).item()
                        for pid, emb in gallery.items()}
                pred = max(sims, key=sims.get)

                # tally results
                total += 1
                per_type[dtype][1] += 1
                if pred == true_id:
                    correct += 1
                    per_type[dtype][0] += 1

    overall_acc = correct / total * 100
    print(f"\n🟢 Overall Top‑1 Accuracy: {overall_acc:.2f}% ({correct}/{total})\n")
    print("🔍 Accuracy by Distortion Type:")
    for dtype, (c, t) in per_type.items():
        acc = (c / t * 100) if t > 0 else 0
        print(f"  • {dtype:10s}: {acc:6.2f}% ({c}/{t})")

    return overall_acc, per_type

# ——————————————————————————————————————————
# 6) Run Evaluation
# ——————————————————————————————————————————
gallery = build_gallery(model, val_root, transform, device)
evaluate(model, gallery, val_root, transform, device)



🟢 Overall Top‑1 Accuracy: 93.84% (2772/2954)

🔍 Accuracy by Distortion Type:
  • foggy     :  97.39% (411/422)
  • lowlight  :  96.92% (409/422)
  • noisy     :  89.57% (378/422)
  • blurred   :  95.02% (401/422)
  • rainy     :  93.84% (396/422)
  • sunny     :  87.20% (368/422)
  • resized   :  96.92% (409/422)


(93.8388625592417,
 {'foggy': [411, 422],
  'lowlight': [409, 422],
  'noisy': [378, 422],
  'blurred': [401, 422],
  'rainy': [396, 422],
  'sunny': [368, 422],
  'resized': [409, 422]})

In [20]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

def evaluate_with_metrics(model, gallery, val_root, transform, device):
    total = correct = 0
    per_type = {}
    y_true = []
    y_pred = []

    with torch.no_grad():
        for person, g_emb in gallery.items():
            dist_dir = os.path.join(val_root, person, 'distortion')
            if not os.path.isdir(dist_dir):
                continue

            for dist_path in glob.glob(os.path.join(dist_dir, '*.*')):
                true_id = person
                dtype = os.path.splitext(os.path.basename(dist_path))[0].split('_')[-1]
                per_type.setdefault(dtype, [0, 0])

                img = Image.open(dist_path).convert('RGB')
                x = transform(img).unsqueeze(0).to(device)
                _, q_emb = model(x, torch.zeros(1, dtype=torch.long, device=device))
                q_emb = q_emb.cpu()

                sims = {pid: F.cosine_similarity(q_emb, emb.unsqueeze(0), dim=1).item()
                        for pid, emb in gallery.items()}
                pred = max(sims, key=sims.get)

                y_true.append(true_id)
                y_pred.append(pred)

                total += 1
                per_type[dtype][1] += 1
                if pred == true_id:
                    correct += 1
                    per_type[dtype][0] += 1

    # Metrics
    acc = accuracy_score(y_true, y_pred) * 100
    prec = precision_score(y_true, y_pred, average='macro', zero_division=0)
    rec = recall_score(y_true, y_pred, average='macro', zero_division=0)
    f1 = f1_score(y_true, y_pred, average='macro', zero_division=0)

    print(f"\n🟢 Overall Accuracy: {acc:.2f}%")
    print(f"📊 Precision: {prec:.4f}")
    print(f"📈 Recall:    {rec:.4f}")
    print(f"🏅 F1 Score:  {f1:.4f}")
    print("\n🔍 Accuracy by Distortion Type:")
    for dtype, (c, t) in per_type.items():
        acc_d = (c / t * 100) if t > 0 else 0
        print(f"  • {dtype:10s}: {acc_d:6.2f}% ({c}/{t})")

    return {
        "accuracy": acc,
        "precision": prec,
        "recall": rec,
        "f1_score": f1,
        "per_type": per_type
    }


In [21]:
# Validation
val_metrics = evaluate_with_metrics(model, gallery, val_root, transform, device)

# Training (if needed)
train_root = '/kaggle/input/comsys-hackathon/Task_B/train'
train_gallery = build_gallery(model, train_root, transform, device)
train_metrics = evaluate_with_metrics(model, train_gallery, train_root, transform, device)



🟢 Overall Accuracy: 93.84%
📊 Precision: 0.9353
📈 Recall:    0.9864
🏅 F1 Score:  0.9559

🔍 Accuracy by Distortion Type:
  • foggy     :  97.39% (411/422)
  • lowlight  :  96.92% (409/422)
  • noisy     :  89.57% (378/422)
  • blurred   :  95.02% (401/422)
  • rainy     :  93.84% (396/422)
  • sunny     :  87.20% (368/422)
  • resized   :  96.92% (409/422)

🟢 Overall Accuracy: 100.00%
📊 Precision: 1.0000
📈 Recall:    1.0000
🏅 F1 Score:  1.0000

🔍 Accuracy by Distortion Type:
  • blurred   : 100.00% (1926/1926)
  • resized   : 100.00% (1926/1926)
  • noisy     : 100.00% (1926/1926)
  • foggy     : 100.00% (1926/1926)
  • sunny     : 100.00% (1926/1926)
  • rainy     : 100.00% (1926/1926)
  • lowlight  : 100.00% (1926/1926)
