In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms as T
import matplotlib.pyplot as plt
import PIL

%matplotlib inline

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"using device: {device}")

using device: cuda


In [2]:
import glob
import random
from PIL import Image
from pathlib import Path
import numpy as np

# prepare data
class FaceDataset(Dataset):
    def __init__(self, folder, trns=None):
        super().__init__()
        self.folder = folder
        self.trns = trns
        self.items = glob.glob(f"{self.folder}/*/*")

    def __len__(self):
        return len(self.items)

    def choose(self, items):
        ix = random.randint(0, len(items)-1)
        return items[ix]
    
    def __getitem__(self, ix):
        imga_path = self.items[ix]
        person = Path(imga_path).parent.name
        same_person = random.randint(0,1)
        if same_person:
            imgb_path = self.choose(glob.glob(f"{self.folder}/{person}/*"))
        else:
            while True:
                imgb_path = self.choose(self.items)
                if Path(imgb_path).parent.name != person:
                    break
        
        imga = Image.open(imga_path)
        imgb = Image.open(imgb_path)
        if self.trns:
            imga = self.trns(imga)
            imgb = self.trns(imgb)

        return imga, imgb, 1-same_person

train_trns = T.Compose([
    T.RandomHorizontalFlip(),
    T.RandomAffine(5, (0.01, 0.2), scale=(0.9, 1.1)),
    T.Resize((100, 100)),
    T.ToTensor(),
    T.Normalize((0.5), (0.5))
])
val_trns = T.Compose([
    T.Resize((100, 100)),
    T.ToTensor(),
    T.Normalize((0.5), (0.5))
])

train_ds = FaceDataset(r"E:\datasets\faces\training", trns=train_trns)
val_ds = FaceDataset(r"E:\datasets\faces\testing", trns=val_trns)
train_dl = DataLoader(train_ds, batch_size=32, shuffle=True)
val_dl = DataLoader(val_ds, batch_size=16, shuffle=False)

print(f"train dataset: {len(train_ds)}, test dataset: {len(val_ds)}")


train dataset: 370, test dataset: 30


In [6]:
# prepare the model
import torch.nn.functional as F

def convBlock(in_channels, out_channels):
    return nn.Sequential(
        #nn.Dropout(0.2),
        nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
        nn.BatchNorm2d(out_channels),
        nn.ReLU()
    )

class SiameseNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.features = nn.Sequential(
            convBlock(1, 4),
            convBlock(4, 8),
            convBlock(8, 16),
            convBlock(16, 32),
            convBlock(32, 64),
            nn.Flatten(),
            nn.Dropout(0.2),
            nn.Linear(64*100*100, 1024),
            nn.ReLU(),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Linear(512, 32)
        )


    def forward(self, imgas, imgbs):
        fmapas = self.features(imgas)
        fmapbs = self.features(imgbs)
        return fmapas, fmapbs


class ContrastiveLoss(nn.Module):
    def __init__(self, margin=2):
        super().__init__()
        self.margin = margin
    
    def forward(self, features_a, features_b, label):
        distance = F.pairwise_distance(features_a, features_b, keepdim=True)
        loss_contrastiv = torch.mean(
            (1-label)*(torch.pow(distance, 2)) +    # same person
            label * (torch.pow(torch.clamp(self.margin - distance, min=0.0), 2))    # different person 
        )
        acc = ((distance > 0.55) == label).float().mean()
        return loss_contrastiv, acc

model = SiameseNetwork().to(device)
lossfn = ContrastiveLoss()
optimizer = optim.SGD(model.parameters(), lr=1e-3)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=5, threshold=1e-6, min_lr=1e-6, threshold_mode="abs", verbose=True, factor=0.25)

In [9]:
# training utility

def train_batch(model, imgas, imgbs, labels, optimizer, lossfn):
    model.train()
    imgas, imgbs, labels = imgas.to(device), imgbs.to(device), labels.to(device)
    features_a, features_b = model(imgas, imgbs)
    loss, acc = lossfn(features_a, features_b, labels)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    return loss.cpu().item(), acc.cpu().item()

@torch.no_grad()
def val_batch(model, imgas, imgbs, labels, lossfn):
    model.eval()
    imgas, imgbs, labels = imgas.to(device), imgbs.to(device), labels.to(device)
    features_a, features_b = model(imgas, imgbs)
    loss, acc = lossfn(features_a, features_b, labels)
    return loss.cpu().item(), acc.cpu().item()


In [10]:
import time

for epoch in range(200):
    start = time.time()
    train_losses = []
    train_acces = []
    val_losses = []
    val_acces = []
    for imgas, imgbs, labels in train_dl:
        loss, acc = train_batch(model, imgas, imgbs, labels, optimizer, lossfn)
        train_losses.append(loss)
        train_acces.append(acc)
    for imgas, imgbs, labels in val_dl:
        loss, acc = val_batch(model, imgas, imgbs, labels, lossfn)
        val_losses.append(loss)
        val_acces.append(acc)

    train_loss = np.mean(train_losses)
    train_acc = np.mean(train_acces)
    val_loss = np.mean(val_losses)
    val_acc = np.mean(val_acces)
    scheduler.step(train_loss)
    end = time.time()
    print(f"epoch {epoch} consumes {end - start}s, train loss: {train_loss:.2f}, train acc: {train_acc:.2f}, test loss: {val_loss:.2f}, test acc: {val_acc:.2f}")


epoch 0 consumes 2.007722854614258s, train loss: 1.11, train acc: 0.51, test loss: 1.94, test acc: 0.46
epoch 1 consumes 1.3383915424346924s, train loss: 1.10, train acc: 0.50, test loss: 1.47, test acc: 0.54
epoch 2 consumes 1.3388473987579346s, train loss: 1.09, train acc: 0.50, test loss: 1.29, test acc: 0.52
epoch 3 consumes 1.3382158279418945s, train loss: 1.07, train acc: 0.54, test loss: 1.19, test acc: 0.55
epoch 4 consumes 1.3406031131744385s, train loss: 1.05, train acc: 0.50, test loss: 1.67, test acc: 0.41
epoch 5 consumes 1.341254711151123s, train loss: 1.04, train acc: 0.48, test loss: 1.33, test acc: 0.55
epoch 6 consumes 1.3364505767822266s, train loss: 1.05, train acc: 0.48, test loss: 1.30, test acc: 0.53
epoch 7 consumes 1.3367242813110352s, train loss: 1.05, train acc: 0.52, test loss: 1.10, test acc: 0.58
epoch 8 consumes 1.3393776416778564s, train loss: 1.03, train acc: 0.55, test loss: 1.09, test acc: 0.54
epoch 9 consumes 1.3365466594696045s, train loss: 1.04, t