# Proyecto 4: Faces in The Wild

**Integrantes**:
- Lucas Carranza
- David Herencia
- Kalos Lazo
- Lenin Chavez

# **Triplet Neural Network**

---
### **1. Libraries**

In [14]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import torchvision
import math
import os
import random

from PIL import Image
from torch.optim.lr_scheduler import StepLR, ReduceLROnPlateau
from torch.utils.data import Dataset, DataLoader, random_split, SubsetRandomSampler
from torchvision import transforms as transforms, datasets, models
from torchvision.models import alexnet, AlexNet_Weights
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report
from torch.optim.lr_scheduler import ReduceLROnPlateau

In [15]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f"Device: {device}")

Device: cpu


---
## **2. Dataset**

In [13]:
def generate_triplets(df):
    # Crear diccionarios para almacenar pares de imágenes por etiqueta
    pairs = {'same': [], 'diff': []}
    for _, row in df.iterrows():
        pairs[row['label']].append((row['image1'], row['image2']))
    
    triplets = []
    # Iterar sobre cada par 'same' para formar tripletas
    for anchor_positive in pairs['same']:
        anchor, positive = anchor_positive
        
        # Evitar seleccionar un negativo que podría ser confusamente similar a las imágenes 'same'
        if len(pairs['diff']) > 0:
            diff_pair = random.choice(pairs['diff'])
            negative = random.choice(diff_pair)  # Elegir al azar una imagen de un par 'diff'

            triplets.append({'anchor': anchor, 'positive': positive, 'negative': negative})

    return pd.DataFrame(triplets)

# Preparar DataFrame
train_df = pd.read_csv("./train.csv")
train_df[['image1', 'image2']] = train_df['image1_image2'].str.split('_', expand=True)
train_df.drop(columns=['image1_image2'], inplace=True)
train_df['label'] = ['same' if i % 2 == 0 else 'diff' for i in range(len(train_df))]

triplet_df = generate_triplets(train_df)
print(train_df)
print(triplet_df)

     anchor positive negative
0      1485     4047       84
1     10796     3696     8731
2      7003     2539     1093
3      8884     9008    10113
4      5320    11038    12340
...     ...      ...      ...
1095  11571     6594     5398
1096   1766     9118    12368
1097   2613    12250     7879
1098  10580     4391      618
1099   4677     2901    10154

[1100 rows x 3 columns]


---
## **3. Custom dataset**

In [None]:
class TripletImageDataset(Dataset):
    def __init__(self, df, root_dir, transform=None):
        self.df = df
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        anchor_file = f"{row['anchor']}.png"
        positive_file = f"{row['positive']}.png"
        negative_file = f"{row['negative']}.png"

        anchor_path = os.path.join(self.root_dir, anchor_file)
        positive_path = os.path.join(self.root_dir, positive_file)
        negative_path = os.path.join(self.root_dir, negative_file)
        
        anchor_img = Image.open(anchor_path).convert('RGB')
        positive_img = Image.open(positive_path).convert('RGB')
        negative_img = Image.open(negative_path).convert('RGB')

        if self.transform:
            anchor_img = self.transform(anchor_img)
            positive_img = self.transform(positive_img)
            negative_img = self.transform(negative_img)

        return anchor_img, positive_img, negative_img

In [None]:
data_root = './images'
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
])

train_df, validation_df = train_test_split(triplet_df, test_size = 0.2, random_state = 42)
train_dataset = TripletImageDataset(train_df, data_root, transform = transform)
train_dataloader = DataLoader(train_dataset, batch_size = 64, shuffle = True)

validation_dataset = TripletImageDataset(validation_df, data_root, transform = transform)
validation_dataloader = DataLoader(validation_dataset, batch_size = 64, shuffle = False)

---
## **4. Implementation**

In [None]:
class TripletNetwork(nn.Module):
    def __init__(self):
        super(TripletNetwork, self).__init__()
        alexnet = models.alexnet(pretrained=True)
        self.feature_extractor = nn.Sequential(*list(alexnet.children())[:-1])
        self.fc = nn.Sequential(
            nn.Linear(256 * 6 * 6, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(0.20),
            nn.Linear(512, 128),
            nn.BatchNorm1d(128),
        )

    def forward(self, anchor, positive, negative):
        anchor = self.feature_extractor(anchor).view(anchor.size(0), -1)
        anchor = self.fc(anchor)
        positive = self.feature_extractor(positive).view(positive.size(0), -1)
        positive = self.fc(positive)
        negative = self.feature_extractor(negative).view(negative.size(0), -1)
        negative = self.fc(negative)
        return anchor, positive, negative

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TripletNetwork().to(device)
optimizer = optim.Adam(model.parameters(), lr=0.0001)
loss_fn = nn.TripletMarginLoss(margin=0.5)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.5)

---
## **5. Training**

In [None]:
def train_model(model, train_loader, loss_fn, optimizer, num_epochs=25):
    model.train()
    for epoch in range(num_epochs):
        for i, (anchor, positive, negative) in enumerate(train_loader):
            anchor, positive, negative = anchor.to(device), positive.to(device), negative.to(device)
            anchor_out, positive_out, negative_out = model(anchor, positive, negative)
            loss = loss_fn(anchor_out, positive_out, negative_out)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            if i % 10 == 0:
                print(f'Epoch {epoch+1}, Step {i+1}, Loss: {loss.item()}')
        scheduler.step()

train_model(model, train_dataloader, loss_fn, optimizer, num_epochs=25)

---
## **6. Metrics**

In [None]:
def extract_features_and_distances(model, dataloader):
    model.eval()
    pos_distances = []
    neg_distances = []
    with torch.no_grad():
        for anchors, positives, negatives in dataloader:
            anchors = anchors.to(device)
            positives = positives.to(device)
            negatives = negatives.to(device)
            anchor_out, positive_out, negative_out = model(anchors, positives, negatives)
            
            pos_dist = torch.norm(anchor_out - positive_out, p=2, dim=1)
            neg_dist = torch.norm(anchor_out - negative_out, p=2, dim=1)
            pos_distances.extend(pos_dist.cpu().numpy())
            neg_distances.extend(neg_dist.cpu().numpy())
    
    return np.array(pos_distances), np.array(neg_distances)

def calculate_metrics(pos_distances, neg_distances, threshold):
    tp = sum(d < threshold for d in pos_distances)
    fn = sum(d >= threshold for d in pos_distances)
    tn = sum(d >= threshold for d in neg_distances)
    fp = sum(d < threshold for d in neg_distances)
    
    accuracy = (tp + tn) / (tp + tn + fp + fn)
    precision = tp / (tp + fp) if tp + fp > 0 else 0
    recall = tp / (tp + fn) if tp + fn > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if precision + recall > 0 else 0
    
    return accuracy, precision, recall, f1
    
pos_distances, neg_distances = extract_features_and_distances(model, train_dataloader)

thresholds = np.linspace(min(np.min(pos_distances), np.min(neg_distances)), max(np.max(pos_distances), np.max(neg_distances)), 100)
metrics = [calculate_metrics(pos_distances, neg_distances, t) for t in thresholds]

best_idx = np.nanargmax([m[3] for m in metrics])  # F1 score está en la posición 3
best_threshold = thresholds[best_idx]
best_accuracy, best_precision, best_recall, best_f1 = metrics[best_idx]

print(f"Best threshold: {best_threshold:.2f}")
print(f"Accuracy: {best_accuracy:.2f}, Precision: {best_precision:.2f}, Recall: {best_recall:.2f}, F1 Score: {best_f1:.2f}")

---
## **6. Submission**

In [None]:
test_df = pd.read_csv("../input/dataset/test.csv")
test_df[['anchor', 'positive']] = test_df['image1_image2'].str.split('_', expand = True)

test_dataset = TestTripletDataset(test_df, data_root, transform = transform)
test_dataloader = DataLoader(test_dataset, batch_size = 64, shuffle = False)

def evaluate_and_generate_submission(model, dataloader, threshold = 0.5):
    model.eval()
    predictions = []
    with torch.no_grad():
        for anchors, positives, negatives in dataloader:
            anchors = anchors.to(device)
            positives = positives.to(device)
            negatives = negatives.to(device)
            anchor_out, positive_out, negative_out = model(anchors, positives, negatives)
            
            pos_dist = torch.norm(anchor_out - positive_out, p = 2, dim = 1)
            neg_dist = torch.norm(anchor_out - negative_out, p = 2, dim = 1)
            predictions.extend(["same" if d < threshold else "diff" for d in pos_dist])
    
    submission_df = pd.DataFrame({
        'image1_image2': test_df['image1_image2'],
        'label': predictions
    })
    submission_df.to_csv('submission.csv', index = False)
    print("Results saved to submission.csv")

evaluate_and_generate_submission(model, test_dataloader, threshold = best_threshold)