In [8]:
import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)


Using device: cpu


In [2]:
from sklearn.datasets import fetch_lfw_people

# Fetch people with at least 2 images
lfw_people = fetch_lfw_people(min_faces_per_person=2, resize=1.0, download_if_missing=True)

print("Shape:", lfw_people.images.shape)  # (n_samples, height, width)
print("Number of people:", len(lfw_people.target_names))
print("Total images:", len(lfw_people.images))


Shape: (9164, 125, 94)
Number of people: 1680
Total images: 9164


In [3]:
import numpy as np
from sklearn.model_selection import train_test_split

# Get images and labels
X = lfw_people.images
y = lfw_people.target
target_names = lfw_people.target_names

# Find unique persons
unique_ids = np.unique(y)

# Split persons into train and test (no overlap of people!)
train_ids, test_ids = train_test_split(unique_ids, test_size=0.3, random_state=42)

# Create train and test sets
X_train = X[np.isin(y, train_ids)]
y_train = y[np.isin(y, train_ids)]

X_test = X[np.isin(y, test_ids)]
y_test = y[np.isin(y, test_ids)]


In [4]:
print(X_train.shape)
print(y_train.shape)
print(X_test.shape)
print(y_test.shape)

(6617, 125, 94)
(6617,)
(2547, 125, 94)
(2547,)


In [None]:
pairs = []
labels = []

# For each person
for person_id in np.unique(y_train):
    # Get all images of that person
    person_images = X_train[y_train == person_id]

    # Create positive pairs (same person)
    for i in range(len(person_images)):
        for j in range(i+1, len(person_images)):
            pairs.append([person_images[i], person_images[j]])
            labels.append(1)

    # Create negative pairs (different people)
    number_of_negatives_per_person = 3
    for _ in range(number_of_negatives_per_person):
        other_person_id = np.random.choice(np.setdiff1d(np.unique(y_train), [person_id]))
        other_person_image = X_train[y_train == other_person_id][0]

        pairs.append([person_images[0], other_person_image])
        labels.append(0)


In [None]:
from torch.utils.data import Dataset

class SiameseDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

        # Group images by person
        self.person_to_images = {}
        for idx, label in enumerate(labels):
            if label not in self.person_to_images:
                self.person_to_images[label] = []
            self.person_to_images[label].append(idx)

        self.person_ids = list(self.person_to_images.keys())

    def __getitem__(self, idx):
        # Randomly decide: positive or negative pair
        should_get_same_class = torch.rand(1).item() < 0.5

        img1_idx = torch.randint(len(self.images), (1,)).item()
        img1 = self.images[img1_idx]
        label1 = self.labels[img1_idx]

        if should_get_same_class:
            # Positive pair
            img2_idx = torch.choice(self.person_to_images[label1])
            label = 1
        else:
            # Negative pair
            different_label = label1
            while different_label == label1:
                different_label = torch.choice(self.person_ids)
            img2_idx = torch.choice(self.person_to_images[different_label])
            label = 0

        img2 = self.images[img2_idx]

        # Apply transform (like resize, normalize) if needed
        if self.transform:
            img1 = self.transform(img1)
            img2 = self.transform(img2)

        return img1, img2, torch.tensor(label, dtype=torch.float32)

    def __len__(self):
        return len(self.images)


In [None]:
from torch.utils.data import DataLoader

# Assuming your images/labels are numpy arrays
train_dataset = SiameseDataset(X_train, y_train)

train_loader = DataLoader(
    train_dataset,
    shuffle=True,
    batch_size=32,  # how many pairs per batch
    num_workers=2
)


In [None]:
import torch.nn as nn

class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()

        # CNN to extract features from an image
        self.cnn = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=5),   # (batch, 32, H-4, W-4)
            nn.ReLU(),
            nn.MaxPool2d(2),                   # (batch, 32, (H-4)//2, (W-4)//2)

            nn.Conv2d(32, 64, kernel_size=5),   # (batch, 64, (H-8)//2, (W-8)//2)
            nn.ReLU(),
            nn.MaxPool2d(2)                    # (batch, 64, ((H-8)//2)//2, ((W-8)//2)//2)
        )

        # Flattened embedding size calculation:
        # Input images are about (62, 47), after two conv + pool:
        # Roughly size ~ (64, 11, 9)
        self.fc1 = nn.Sequential(
            nn.Linear(64 * 11 * 9, 256),
            nn.ReLU(),
            nn.Linear(256, 128)
        )

        # Final layer to compute similarity
        self.fc2 = nn.Sequential(
            nn.Linear(128, 1)
        )

    def forward_once(self, x):
        # Pass one image through CNN and FC to get embedding
        x = self.cnn(x)
        x = x.view(x.size(0), -1)  # Flatten
        x = self.fc1(x)
        return x

    def forward(self, input1, input2):
        # Get embeddings
        output1 = self.forward_once(input1)
        output2 = self.forward_once(input2)

        # Absolute difference between embeddings
        distance = torch.abs(output1 - output2)

        # Pass through final layer
        out = self.fc2(distance)

        return out


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

# Model, Loss, Optimizer
model = SiameseNetwork().to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

num_epochs = 20

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for batch_idx, (img1, img2, label) in enumerate(train_loader):
        img1, img2, label = img1.to(device), img2.to(device), label.to(device)

        optimizer.zero_grad()

        # Forward pass
        output = model(img1, img2).squeeze()  # squeeze to match shape

        # Loss
        loss = criterion(output, label)

        # Backward pass
        loss.backward()

        # Update weights
        optimizer.step()

        running_loss += loss.item()

    epoch_loss = running_loss / len(train_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}")


Saved 9164 images to 'lfw_train/'
