In [4]:
# Step 1: Install Dependencies (Colab)
!pip install torch torchvision --quiet

# Step 2: Import Libraries
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.models as models
from torch.utils.data import Dataset, DataLoader, random_split
from PIL import Image
import random
import numpy as np
from google.colab import drive
import zipfile
from sklearn.metrics.pairwise import cosine_similarity

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Step 3: Mount Google Drive & Extract Dataset
drive.mount('/content/drive')

dataset_zip_path = "/content/drive/My Drive/snapvision_dataset.zip"
extract_folder = "/content/snapvision_dataset"

os.makedirs(extract_folder, exist_ok=True)
with zipfile.ZipFile(dataset_zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_folder)

print(f"Dataset extracted to: {extract_folder}")

dataset_path = "/content/snapvision_dataset/uob_image_set"

# Step 4: Define Dataset Class
class FashionDataset(Dataset):
    def __init__(self, items, item_ids, transform=None, mode="train"):
        self.items = items
        self.item_ids = item_ids
        self.transform = transform
        self.mode = mode

        if mode == "train":
            self.flat_images = [(imgs, item_id) for imgs, item_id in zip(items, item_ids)]
        else:
            self.flat_images = [(img, item_id) for imgs, item_id in zip(items, item_ids) for img in imgs]

    def __len__(self):
        return len(self.flat_images)

    def __getitem__(self, idx):
        if self.mode == "train":
            images, item_id = self.flat_images[idx]  # Use actual folder name as ID

            if len(images) == 0:
              raise ValueError(f"No images found for item_id {item_id}")

            anchor_img = Image.open(random.choice(images)).convert("RGB")
            positive_img = Image.open(random.choice(images)).convert("RGB")

            negative_idx = idx
            while negative_idx == idx:
              negative_idx = random.randint(0, len(self.flat_images) - 1)

            negative_images, negative_id = self.flat_images[negative_idx]
            negative_img = Image.open(random.choice(negative_images)).convert("RGB")

            if self.transform:
                anchor_img = self.transform(anchor_img)
                positive_img = self.transform(positive_img)
                negative_img = self.transform(negative_img)

            return anchor_img, positive_img, negative_img, item_id

        else:  # Test mode
            img_path, item_id = self.flat_images[idx]
            img = Image.open(img_path).convert("RGB")

            if self.transform:
                img = self.transform(img)

            return img, item_id

# Step 5: Data Preprocessing
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),  # Add flips
    transforms.RandomRotation(10),  # Rotate slightly
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),  # Vary colors
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Step 6: Load Dataset & Split into Train/Validation/Test
all_items = []
item_ids = []

for clothing_item in sorted(os.listdir(dataset_path)):
    item_path = os.path.join(dataset_path, clothing_item)
    if os.path.isdir(item_path):
        images = [os.path.join(item_path, img) for img in sorted(os.listdir(item_path)) if img.endswith(".jpg")]
        if len(images) >= 2:
            all_items.append(images)
            item_ids.append(clothing_item)

# 80-10-10 Split (Clothing Item Level)
train_size = int(0.8 * len(all_items))
val_size = int(0.1 * len(all_items))
test_size = len(all_items) - train_size - val_size

# Split both `all_items` and `item_ids` consistently (instead of using random_split)
train_items, val_items, test_items = all_items[:train_size], all_items[train_size:train_size+val_size], all_items[train_size+val_size:]
train_ids, val_ids, test_ids = item_ids[:train_size], item_ids[train_size:train_size+val_size], item_ids[train_size+val_size:]

train_dataset = FashionDataset(train_items, train_ids, transform=transform, mode="train")
val_dataset = FashionDataset(val_items, val_ids, transform=transform, mode="test")
test_dataset = FashionDataset(test_items, test_ids, transform=transform, mode="test")

train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)

# Step 7: Define the Embedding Model
class EmbeddingNet(nn.Module):
    def __init__(self, embedding_dim=256):  # Increased embedding dim
        super(EmbeddingNet, self).__init__()
        self.backbone = models.resnet50(pretrained=True)

        # Fine-tune the entire ResNet50 model
        # for param in self.backbone.parameters():
        #     param.requires_grad = True  # Unfreeze all layers for fine-tuning
        for param in list(self.backbone.parameters())[:-5]:
            param.requires_grad = False  # Freeze first layers, train only last 5

        # Replace classification layer with custom embedding layer
        self.backbone.fc = nn.Linear(self.backbone.fc.in_features, embedding_dim)

    def forward(self, x):
        return self.backbone(x)

# Step 8: Define Triplet Loss
class TripletLoss(nn.Module):
    def __init__(self, margin=1.0):
        super(TripletLoss, self).__init__()
        self.margin = margin
        self.loss_fn = nn.TripletMarginLoss(margin=self.margin, p=2)

    def forward(self, anchor, positive, negative):
        return self.loss_fn(anchor, positive, negative)

# Step 9: Initialize Model, Loss, and Optimizer
model = EmbeddingNet().to(device)
criterion = TripletLoss(margin=1.0)
optimizer = optim.Adam(model.parameters(), lr=0.0001, weight_decay=1e-5)

# Step 10: Training Loop with Validation
def get_hard_negative(anchor_emb, batch_emb, batch_labels, anchor_label):
    """ Find the hardest negative example in the batch """
    anchor_emb_cpu = anchor_emb.reshape(1, -1).detach().cpu().numpy()
    batch_emb_cpu = batch_emb.detach().cpu().numpy()

    similarities = cosine_similarity(anchor_emb_cpu, batch_emb_cpu)[0]

    negatives = [(similarities[i], batch_emb_cpu[i]) for i, lbl in enumerate(batch_labels) if lbl != anchor_label]

    if negatives:
        hard_negative = min(negatives, key=lambda x: x[0])[1]
        return torch.tensor(hard_negative, dtype=torch.float32).to(device)
    return None

def train(model, train_dataloader, val_dataloader, criterion, optimizer, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0.0
        for anchor, positive, negative, batch_labels in train_dataloader:
            anchor, positive, negative = anchor.to(device), positive.to(device), negative.to(device)
            batch_labels = [str(lbl) for lbl in batch_labels]

            optimizer.zero_grad()
            anchor_emb = model(anchor)
            positive_emb = model(positive)
            batch_emb = model(negative)  # Process negatives as batch

            # Replace random negative with hardest negative
            hard_negatives = []
            for i in range(anchor.shape[0]):
                hard_negative = get_hard_negative(anchor_emb[i], batch_emb, batch_labels, batch_labels[i])
                if hard_negative is not None:
                    hard_negatives.append(hard_negative)

            if hard_negatives:
                hard_negatives = torch.stack(hard_negatives)
                loss = criterion(anchor_emb, positive_emb, hard_negatives)
                loss.backward()
                optimizer.step()
                total_loss += loss.item()

        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss/len(train_dataloader):.4f}")

# Step 11: Train the Model
train(model, train_dataloader, val_dataloader, criterion, optimizer, num_epochs=30)

# Step 12: Save the Model
torch.save(model.state_dict(), "fashion_embedding_model.pth")

# Step 13: Model Evaluation (Top-1 Accuracy)
from tqdm import tqdm

def extract_embeddings(model, dataloader):
    model.eval()
    embeddings = None
    labels = []

    with torch.no_grad():
        for images, item_ids in tqdm(dataloader, desc="Extracting embeddings"):
            images = images.to(device)
            emb = model(images).cpu().numpy()

            if embeddings is None:
                embeddings = emb
            else:
                embeddings = np.concatenate((embeddings, emb), axis=0)

            labels.extend(item_ids)

    return embeddings, np.array(labels)

def evaluate_model(model, test_dataloader):
    print("\nEvaluating Model...")

    # Extract embeddings and item IDs
    test_embeddings, test_labels = extract_embeddings(model, test_dataloader)

    # Compute cosine similarity matrix
    similarity_matrix = cosine_similarity(test_embeddings)

    # Evaluate Top-1 retrieval accuracy
    num_correct = 0
    num_samples = similarity_matrix.shape[0]

    for i in range(num_samples):
        # Find most similar item (excluding itself)
        sorted_indices = np.argsort(similarity_matrix[i])[::-1]  # Descending order
        top_match = sorted_indices[1]  # First match after self (index 0)

        # Check if retrieved image belongs to the same item
        if test_labels[i] == test_labels[top_match]:  # Compare item IDs
            num_correct += 1

    accuracy = num_correct / num_samples
    print(f"✅ Top-1 Retrieval Accuracy: {accuracy * 100:.2f}%")

# Run Evaluation
evaluate_model(model, test_dataloader)



Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Dataset extracted to: /content/snapvision_dataset




Epoch [1/30], Loss: 0.0972




Epoch [2/30], Loss: 0.0693




Epoch [3/30], Loss: 0.0431




Epoch [4/30], Loss: 0.0463




Epoch [5/30], Loss: 0.0336




Epoch [6/30], Loss: 0.0396




Epoch [7/30], Loss: 0.0323




Epoch [8/30], Loss: 0.0377




Epoch [9/30], Loss: 0.0368




Epoch [10/30], Loss: 0.0402




Epoch [11/30], Loss: 0.0228




Epoch [12/30], Loss: 0.0430




Epoch [13/30], Loss: 0.0347




Epoch [14/30], Loss: 0.0379




Epoch [15/30], Loss: 0.0377




Epoch [16/30], Loss: 0.0334




Epoch [17/30], Loss: 0.0328




Epoch [18/30], Loss: 0.0248




Epoch [19/30], Loss: 0.0435




Epoch [20/30], Loss: 0.0338




Epoch [21/30], Loss: 0.0259




Epoch [22/30], Loss: 0.0219




Epoch [23/30], Loss: 0.0186




Epoch [24/30], Loss: 0.0229




Epoch [25/30], Loss: 0.0254




Epoch [26/30], Loss: 0.0237




Epoch [27/30], Loss: 0.0348




Epoch [28/30], Loss: 0.0302




Epoch [29/30], Loss: 0.0289




Epoch [30/30], Loss: 0.0270

Evaluating Model...


Extracting embeddings: 100%|██████████| 21/21 [00:12<00:00,  1.70it/s]

✅ Top-1 Retrieval Accuracy: 44.41%



