In [1]:
import torch
from torch import optim
import torch.nn as nn
import torchvision
import torchvision.models as models
import torchvision.transforms as transforms
import pathlib
from torch.utils import data
from siamese_dataset_example import SiamesePairDataset
import PIL.Image as Image
import matplotlib.pyplot as plt
import time
import numpy as np
import torch.nn.functional as F
import csv
import pandas
from sklearn import metrics
import itertools
from models.siamese_resnet import SiameseResnet
import log_compiler
from tqdm.autonotebook import tqdm



### Contrastive Loss
This function used to calculate the loss/cost of our input image, based on this paper by Yan Lecun http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf. Because siamese is not classification problem rather a distance problem which means we need to compute the difference between two images hence we need another type of loss function.

In [2]:
class ContrastiveLoss(nn.Module):
    def __init__(self, device, margin=2.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin
        self.device = device

    def forward(self, output1, output2, label):
        euclidean_distance = F.pairwise_distance(output1, output2)
        euclidean_distance = euclidean_distance.to(self.device)
        loss_contrastive = torch.mean((1-label) * torch.pow(euclidean_distance, 2) +
                                      (label) * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2))


        return loss_contrastive

### Siamesse Network class, based on Resnet18 model
We are using resnet18 model with custom fully connected layer based on our very case. But first, we need to freeze layer parameters so they dont get recalculated on the training. The only layer we are training is last layer which is the fully connected layer that has been customized to our case.

### Few setups before training

#### Setup the methods

In [3]:
def rounding(val, threshold=0.5):
    return 1 if val > threshold else 0

def log_training_result(numepoch, batchsize, lrate, accuracy, precision, f1, tp, tn, fp, fn, fc, model, name='training_logs.csv'):
    with open(name, 'r') as f:
        train_number = len(f.readlines())
    detail = log_compiler.compile(numepoch, batchsize, lrate, accuracy, precision, f1, tp, tn, fp, fn, fc, train_number, model)
    data = [batchsize,lrate,numepoch,round(accuracy, 2),round(f1, 2),round(precision, 2),tp,tn,fp,fn,f'logs/{train_number}.md']
    log_compiler.write_log_file(train_number, detail)
    with open(name, 'a') as f:
        writer = csv.writer(f)
        writer.writerow(data)
    print("data saved to %s" % name)
    
def print_scores(p, r, f1, a, batch_size):
    # just an utility printing function
    for name, scores in zip(("precision", "recall", "F1", "accuracy"), (p, r, f1, a)):
        print(f"\t{name.rjust(14, ' ')}: {sum(scores)/batch_size:.4f}")

In [4]:
def train_and_validate(epoch, trainloader, testloader, model, criterion, optimizer):
    running_loss = 0
    progress = tqdm(enumerate(trainloader, 1), desc="loss: ", total=len(trainloader))
    for steps, (imgs1, imgs2, labels) in progress:
        imgs1, imgs2, labels = imgs1.to(device), imgs2.to(device), labels.to(device)
        optimizer.zero_grad()
        oimgs1, oimgs2 = model.forward(imgs1, imgs2)
        loss = criterion(oimgs1, oimgs2, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        
        progress.set_description(f"Loss: {loss.item():.3f}")

        # only validate test data on last train iteration
        if steps == len(trainloader):
            test_loss = 0
            accuracy, precision, recall, f1 = [], [], [], []
            
            
            if torch.cuda.is_available():
                torch.cuda.empty_cache()
            
            model.eval()
            with torch.no_grad():
                for counter, (timgs1, timgs2, tlabels) in enumerate(testloader, 1):
                    timgs1, timgs2, tlabels = timgs1.to(device), timgs2.to(device), tlabels.to(device)
                    toimgs1, toimgs2 = model.forward(timgs1, timgs2)
                    batch_loss = criterion(toimgs1, toimgs2, labels)
                    test_loss += batch_loss.item()


                    predicted_label = F.pairwise_distance(toimgs1, toimgs2)
                    y_pred = list(map(rounding, predicted_label))
                    y_true = list(map(int,tlabels.view(-1).cpu().numpy()))
                    accuracy.append(metrics.accuracy_score(y_true, y_pred))
                    precision.append(metrics.precision_score(y_true, y_pred))
                    recall.append(metrics.recall_score(y_true, y_pred))
                    f1.append(metrics.f1_score(y_true, y_pred))
            
            print(f"Epoch {epoch+1}/{NUM_EPOCH}, training loss: {running_loss/len(trainloader):.3f}, validation loss: {test_loss/len(testloader):.3f}")
            print_scores(precision, recall, f1, accuracy, len(testloader))
            train_losses.append(running_loss/len(trainloader))
            test_losses.append(test_loss/len(testloader))
            running_loss = 0
            model.train()

### Hyper parameters
A few arbitrary predefined parameters.

In [5]:
BATCH_SIZE = 512
LEARNING_RATE = 0.01
NUM_EPOCH = 10
ARCHITECTURE = 'resnet50'

#### Setup data

In [6]:
train_path = 'dataset/train/yale'
valid_path = 'dataset/valid/yale'

trfrm = transforms.Compose([
    lambda img: img.convert("RGB"),
    transforms.Resize((224,224)),
    transforms.CenterCrop((224)),
    transforms.ToTensor(),
])

train_set = SiamesePairDataset(root=train_path, ext='jpg', transform=trfrm)
valid_set = SiamesePairDataset(root=valid_path, ext='jpg', transform=trfrm)
train_loader = data.DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True)
valid_loader = data.DataLoader(valid_set, batch_size=BATCH_SIZE, shuffle=True)

#### Setup the tools

In [9]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
fc = lambda in_features: nn.Sequential(
    nn.BatchNorm1d(in_features),
    nn.Dropout(.5),
    nn.Linear(in_features, in_features//2),
    nn.ReLU(inplace=True),
    
    nn.BatchNorm1d(in_features//2),
    nn.Dropout(.5),
    nn.Linear(in_features//2, in_features//4),
    nn.ReLU(inplace=True),
    
    nn.BatchNorm1d(in_features//4),
    nn.Dropout(.5),
    nn.Linear(in_features//4, 128),
)

model = SiameseResnet(architecture=ARCHITECTURE, fc_layer=fc).to(device)
criterion = ContrastiveLoss(device=device)
optimizer = optim.Adam(model.model.fc.parameters(), lr=LEARNING_RATE)
print(model.model.fc)

Sequential(
  (0): BatchNorm1d(2048, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (1): Dropout(p=0.5)
  (2): Linear(in_features=2048, out_features=1024, bias=True)
  (3): ReLU(inplace)
  (4): BatchNorm1d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (5): Dropout(p=0.5)
  (6): Linear(in_features=1024, out_features=512, bias=True)
  (7): ReLU(inplace)
  (8): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (9): Dropout(p=0.5)
  (10): Linear(in_features=512, out_features=128, bias=True)
)


### Train the model

#### Train only FC layers

In [None]:
train_losses, test_losses = [], []
start_ts = time.time()
for epoch in range(NUM_EPOCH):
    train_and_validate(epoch=epoch, trainloader=train_loader, criterion=criterion, optimizer=optimizer, testloader=valid_loader, model=model)
print(f"Training time: {time.time()-start_ts}s")
plt.plot(train_losses, label='train')
plt.plot(test_losses, label='valid')
plt.legend(frameon=False)
plt.show()

HBox(children=(IntProgress(value=0, description='loss: ', max=262, style=ProgressStyle(description_width='init…

Epoch 1/10, training loss: 1.584, validation loss: 1.471
	     precision: 0.6433
	        recall: 0.0556
	            F1: 0.1018
	      accuracy: 0.5119



HBox(children=(IntProgress(value=0, description='loss: ', max=262, style=ProgressStyle(description_width='init…

### Validate the model

In [19]:
model.eval()
labels = []
preds = []
progress = tqdm(enumerate(valid_loader, 1))
len_valid = len(valid_loader)
for idx, (img1,img2,label) in progress:
    progress.set_description(f'{idx}/{len_valid}')
    img1 = img1.to(device)
    img2 = img2.to(device)
    out1, out2 = model.forward(img1,img2)
    predicted_label = F.pairwise_distance(out1, out2)
    rounded = list(map(rounding, predicted_label))
    preds.append(rounded)
    labels.append(list(map(int, label.view(-1))))
    
y_true = np.array(list(itertools.chain(*labels)))
y_pred = np.array(list(itertools.chain(*preds)))
tn, fp, fn, tp = metrics.confusion_matrix(y_true, y_pred).ravel()
total = tn+tp+fp+fn
print(f'Predicted true and actually true: {tp}'
      f'\nPredicted false and actually false: {tn}'
      f'\nPredicted true but actually false: {fp}'
      f'\nPredicted false but actually true: {fn}'
      f'\nTotal correct predictions: {tp+tn} ({(tp+tn)/total*100:.2f})'
      f'\nTotal wrong predictions: {fn+fp} ({(fn+fp)/total*100:.2f})'
      f'\nTotal: ({total})')

HBox(children=(IntProgress(value=1, bar_style='info', max=1), HTML(value='')))

In [None]:
acc = metrics.accuracy_score(y_true, y_pred)*100
f1 = metrics.f1_score(y_true, y_pred)*100
prec = metrics.precision_score(y_true, y_pred)*100
print(f'Batch Size: {BATCH_SIZE}\t Learning Rate: {LEARNING_RATE}\t NUM EPOCH: {NUM_EPOCH}')
print(f'Accuracy: {acc:.2f}\t F1: {f1:.2f}\t Precision: {prec:.2f}')

In [None]:
#for training_logs.csv purpose
log_training_result(NUM_EPOCH, BATCH_SIZE, LEARNING_RATE, acc, prec, f1, tp, tn, fp, fn, fc, ARCHITECTURE)

In [None]:
df = pandas.read_csv("training_logs.csv")
df[-15:]

### Testing the model

In [None]:
# test_set = SiamesePairDataset(root='dataset/yale', ext='jpg', transform=trfrm)
# test_loader = data.DataLoader(test_set, batch_size=4000, shuffle=True)

# if torch.cuda.is_available():
#     torch.cuda.empty_cache()

In [None]:
# model.eval()
# labels = []
# preds = []
# progress = tqdm(enumerate(test_loader), desc="Testing ")
# for idx, (img1,img2,label) in progress:
#     progress.set_description(f"Testing {idx+1}/{len(test_loader)}")
#     if torch.cuda.is_available():
#         torch.cuda.empty_cache()
#     img1 = img1.to(device)
#     img2 = img2.to(device)
#     out1, out2 = model.forward(img1,img2)
#     predicted_label = F.pairwise_distance(out1, out2)
#     rounded = list(map(rounding, predicted_label))
#     preds.append(rounded)
#     labels.append(list(map(int, label.view(-1))))

In [None]:
# y_true = np.array(list(itertools.chain(*labels)))
# y_pred = np.array(list(itertools.chain(*preds)))
# tn, fp, fn, tp = metrics.confusion_matrix(y_true, y_pred).ravel()
# total = tn+tp+fp+fn
# print("Testing results:")
# print(f'Predicted true and actually true: {tp}'
#       f'\nPredicted false and actually false: {tn}'
#       f'\nPredicted true but actually false: {fp}'
#       f'\nPredicted false but actually true: {fn}'
#       f'\nTotal correct predictions: {tp+tn} ({(tp+tn)/total*100:.2f})'
#       f'\nTotal wrong predictions: {fn+fp} ({(fn+fp)/total*100:.2f})'
#       f'\nTotal: ({total})')

In [None]:
# x0,x1,label = next(iter(test_loader))
# x0,x1,label = x0.to(device), x1.to(device), label.to(device)

# o1, o2 = model.forward(x0, x1)
# sim = F.pairwise_distance(o1,o2)

# emoji = "✅" if int(label[0].item()) == int(rounding(sim[0].item())) else "❌"
# print(f"{emoji} Truth: {int(label[0].item())}", f"Pred: {rounding(sim[0].item())}")

# topil = transforms.ToPILImage()
# plt.subplot(1,2,1)
# plt.imshow(torch.squeeze(x0[0].cpu()).permute(1,2,0))
# plt.subplot(1,2,2)
# plt.imshow(torch.squeeze(x1[0].cpu()).permute(1,2,0))

In [47]:
# models.resnet152().fc.in_features

2048