In [None]:
from google.colab import drive
import os
drive.mount('/content/drive')
os.chdir('/content/drive/MyDrive/242B_final_project') # customize this line to your working directory

Mounted at /content/drive


In [None]:
path = "/content/drive/MyDrive/242B_final_project/np_save/"

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

from skimage.transform import resize
import seaborn as sns
import torch
import torch.nn as nn
import torchvision.models as models
from torch.nn import functional as F
from torchvision.models import resnet50

import PIL.Image
from torchvision import transforms
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import random
from torch.optim import Adam

In [None]:
train_matched_pairs = np.load(f'{path}train_matched_pairs.npy', allow_pickle=True)
train_matched_labels = np.load(f'{path}train_matched_labels.npy', allow_pickle=True)
test_matched_pairs = np.load(f'{path}test_matched_pairs.npy', allow_pickle=True)
test_matched_labels = np.load(f'{path}test_matched_labels.npy', allow_pickle=True)

train_mismatched_pairs = np.load(f'{path}train_mismatched_pairs.npy', allow_pickle=True)
train_mismatched_labels = np.load(f'{path}train_mismatched_labels.npy', allow_pickle=True)
test_mismatched_pairs = np.load(f'{path}test_mismatched_pairs.npy', allow_pickle=True)
test_mismatched_labels = np.load(f'{path}test_mismatched_labels.npy', allow_pickle=True)

# Triplet Data Loader


In [None]:
class TripletDataset(Dataset):
    def __init__(self, matched_pairs, mismatched_pairs):
        self.matched_pairs = matched_pairs
        self.mismatched_pairs = mismatched_pairs

    def __len__(self):
        return len(self.matched_pairs)

    def __getitem__(self, idx):
        anchor, positive = self.matched_pairs[idx]
        _, negative = random.choice(self.mismatched_pairs)
        return anchor, positive, negative


In [None]:
train_dataset = TripletDataset(train_matched_pairs, train_mismatched_pairs)
test_dataset = TripletDataset(test_matched_pairs, test_mismatched_pairs)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)


# Triplet Model

In [None]:
def triplet_loss(anchor, positive, negative, margin=1.0):
    distance_positive = (anchor - positive).pow(2).sum(1)  # Euclidean distance
    distance_negative = (anchor - negative).pow(2).sum(1)
    losses = F.relu(distance_positive - distance_negative + margin)
    return losses.mean()


In [None]:
def triplet_accuracy(anchor_out, positive_out, negative_out, margin=1.0):
    positive_dist = (anchor_out - positive_out).pow(2).sum(1)  # L2 distance
    negative_dist = (anchor_out - negative_out).pow(2).sum(1)
    is_correct = (positive_dist + margin < negative_dist).float()
    accuracy = is_correct.mean().item() * 100
    return accuracy


In [None]:
class TripletNetwork(nn.Module):
    def __init__(self):
        super(TripletNetwork, self).__init__()
        self.base_model = nn.Sequential(
            nn.Conv2d(3, 16, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(16, 32, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
            nn.Flatten(),
            nn.Linear(64 * 28 * 28, 500), nn.ReLU(),
            nn.Linear(500, 128)
        )

    def forward(self, anchor, positive, negative):
        print(f"Anchor shape: {anchor.shape}")  # Debug: Check input shape
        anchor_output = self.base_model(anchor)
        positive_output = self.base_model(positive)
        negative_output = self.base_model(negative)
        return anchor_output, positive_output, negative_output

In [None]:
def train_triplet_model(model, loss_func, optimizer, num_epochs, train_loader, test_loader, margin=1.0):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    for epoch in range(num_epochs):
        model.train()  # Switch to training mode
        total_train_loss = 0
        correct_train = 0

        for anchor, positive, negative in train_loader:
            anchor, positive, negative = anchor.to(device), positive.to(device), negative.to(device)
            optimizer.zero_grad()

            anchor_out, positive_out, negative_out = model(anchor, positive, negative)
            loss = loss_func(anchor_out, positive_out, negative_out, margin)
            loss.backward()
            optimizer.step()

            total_train_loss += loss.item()
            # Calculate accuracy
            positive_dist = (anchor_out - positive_out).pow(2).sum(1)
            negative_dist = (anchor_out - negative_out).pow(2).sum(1)
            correct_train += (positive_dist + margin < negative_dist).sum().item()

        train_accuracy = 100 * correct_train / len(train_loader.dataset)

        # Evaluation on test data
        model.eval()  # Switch to evaluation mode
        total_test_loss = 0
        correct_test = 0

        with torch.no_grad():
            for anchor, positive, negative in test_loader:
                anchor, positive, negative = anchor.to(device), positive.to(device), negative.to(device)

                anchor_out, positive_out, negative_out = model(anchor, positive, negative)
                loss = loss_func(anchor_out, positive_out, negative_out, margin)
                total_test_loss += loss.item()
                # Calculate accuracy
                positive_dist = (anchor_out - positive_out).pow(2).sum(1)
                negative_dist = (anchor_out - negative_out).pow(2).sum(1)
                correct_test += (positive_dist + margin < negative_dist).sum().item()

        test_accuracy = 100 * correct_test / len(test_loader.dataset)

        print(f'Epoch {epoch+1}/{num_epochs}, Training Loss: {total_train_loss:.4f}, Training Accuracy: {train_accuracy:.2f}%, Test Loss: {total_test_loss:.4f}, Test Accuracy: {test_accuracy:.2f}%')

    return model


In [None]:
model = TripletNetwork()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
train_triplet_model(model, triplet_loss, optimizer, 10, train_loader, test_loader)

Anchor shape: torch.Size([32, 3, 224, 224])
Anchor shape: torch.Size([32, 3, 224, 224])
Anchor shape: torch.Size([32, 3, 224, 224])
Anchor shape: torch.Size([32, 3, 224, 224])
Anchor shape: torch.Size([32, 3, 224, 224])
Anchor shape: torch.Size([32, 3, 224, 224])
Anchor shape: torch.Size([32, 3, 224, 224])
Anchor shape: torch.Size([32, 3, 224, 224])
Anchor shape: torch.Size([32, 3, 224, 224])
Anchor shape: torch.Size([32, 3, 224, 224])
Anchor shape: torch.Size([32, 3, 224, 224])
Anchor shape: torch.Size([32, 3, 224, 224])
Anchor shape: torch.Size([32, 3, 224, 224])
Anchor shape: torch.Size([32, 3, 224, 224])
Anchor shape: torch.Size([32, 3, 224, 224])
Anchor shape: torch.Size([32, 3, 224, 224])
Anchor shape: torch.Size([32, 3, 224, 224])
Anchor shape: torch.Size([32, 3, 224, 224])
Anchor shape: torch.Size([32, 3, 224, 224])
Anchor shape: torch.Size([32, 3, 224, 224])
Anchor shape: torch.Size([32, 3, 224, 224])
Anchor shape: torch.Size([32, 3, 224, 224])
Anchor shape: torch.Size([32, 3,

TripletNetwork(
  (base_model): Sequential(
    (0): Conv2d(3, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): ReLU()
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): ReLU()
    (8): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (9): Flatten(start_dim=1, end_dim=-1)
    (10): Linear(in_features=50176, out_features=500, bias=True)
    (11): ReLU()
    (12): Linear(in_features=500, out_features=128, bias=True)
  )
)