In [None]:
import numpy as np
import pandas as pd
import csv
import os
import glob
import random
import itertools
from typing import List, Tuple

from PIL import Image
import matplotlib.pyplot as plt
from PIL import ImageOps

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from sklearn.model_selection import train_test_split

import shutil

import time

In [None]:
def make_partition(signers: List[int],pairGenuineGenuine: List[Tuple[int, int]],pairGenuineForged: List[Tuple[int, int]]):
    samples = []
    for signer_id in signers:
        genuineGenuine = list(itertools.zip_longest(pairGenuineGenuine, [], fillvalue=1)) # y = 1
        genuineGenuine  = list(map(lambda sample: (signer_id, *sample[0], sample[1]), genuineGenuine ))
        samples.extend(genuineGenuine )

        subPairGenuineForged = random.sample(pairGenuineForged, len(pairGenuineGenuine))
        genuineForged = list(itertools.zip_longest(subPairGenuineForged, [], fillvalue=0)) # y = 0
        genuineForged= list(map(lambda sample: (signer_id, *sample[0], sample[1]), genuineForged))
        samples.extend(genuineForged)
    return samples

In [None]:
def prepare_CEDAR(M: int, K: int, random_state=0, data_dir='/content/drive/MyDrive/SigNet Implementation/offlineSignatureVerification/data/CEDAR'):
    def get_path(row):
        signer_id, x1, x2, y = row
        if y == 1:
            x1 = os.path.join(data_dir, 'full_org', f'original_{signer_id}_{x1}.png')
            x2 = os.path.join(data_dir, 'full_org', f'original_{signer_id}_{x2}.png')
        else:
            x1 = os.path.join(data_dir, 'full_org', f'original_{signer_id}_{x1}.png')
            x2 = os.path.join(data_dir, 'full_forg', f'forgeries_{signer_id}_{x2}.png')
        return x1, x2, y # drop signer_id

    random.seed(random_state)
    signers = list(range(1, K+1))
    num_genuine_sign = 24
    num_forged_sign = 24

    train_signers, test_signers = train_test_split(signers, test_size=K-M)
    pairGenuineGenuine = list(itertools.combinations(range(1, num_genuine_sign+1), 2))
    pairGenuineForged = list(itertools.product(range(1, num_genuine_sign+1), range(1, num_forged_sign+1)))
    

    train_samples = make_partition(train_signers, pairGenuineGenuine, pairGenuineForged)
    
    train_samples = list(map(get_path, train_samples))
    
    train_file_path = os.path.join(data_dir, 'train.csv')
    with open(train_file_path, 'wt') as f:
        writer = csv.writer(f)
        writer.writerows(train_samples)
   
    test_samples = make_partition(test_signers, pairGenuineGenuine, pairGenuineForged)
    test_samples = list(map(get_path, test_samples))
    
    test_file_path = os.path.join(data_dir, 'test.csv')
    with open(test_file_path, 'wt') as f:
        writer = csv.writer(f)
        writer.writerows(test_samples)
        
seed = 2021
np.random.seed(seed)

In [None]:
class SignDataset(Dataset):
    def __init__(self, is_train: bool, data_dir: str, image_transform=None):
        self.image_transform = image_transform
        if is_train:
            self.df = pd.read_csv(os.path.join(data_dir, 'train.csv'), header=None)
        else:
            self.df = pd.read_csv(os.path.join(data_dir, 'test.csv'), header=None)
        

    def __len__(self):
        return len(self.df)

    def __getitem__(self, index):
        x1, x2, y = self.df.iloc[index]

        x1 = Image.open(x1).convert('L')
        x2 = Image.open(x2).convert('L')
        
        if self.image_transform:
            x1 = self.image_transform(x1)
            x2 = self.image_transform(x2)

        return x1, x2, y


In [None]:
class SigNetModel(nn.Module):
    def __init__(self):
            super().__init__()
            
            self.features = nn.Sequential(
            #input size = [155, 220, 1]
            nn.Conv2d(in_channels = 1, out_channels = 96, kernel_size = 11), # size = [145,210,96]
            nn.ReLU(),
            nn.LocalResponseNorm(size=5, k=2, alpha=1e-4, beta=0.75),
            nn.MaxPool2d(2, stride=2), # size = [72, 105,96]
                
            nn.Conv2d(in_channels = 96, out_channels = 256, kernel_size = 5, padding=2, padding_mode='zeros'), # size = [72, 105,256]
            nn.LocalResponseNorm(size=5, k=2, alpha=1e-4, beta=0.75),
            nn.MaxPool2d(2, stride=2), # size = [36, 52,256]
            nn.Dropout2d(p=0.3),
                
            nn.Conv2d(in_channels = 256, out_channels = 384,kernel_size = 3, stride=1, padding=1, padding_mode='zeros'),
            nn.Conv2d(in_channels = 384, out_channels = 256,kernel_size = 3, stride=1, padding=1, padding_mode='zeros'),
            nn.MaxPool2d(2, stride=2), # size = [18, 26,256]
            nn.Dropout2d(p=0.3),
                
            nn.Flatten(1, -1), # 18*26*256
            nn.Linear(18*26*256, 1024),
            nn.Dropout2d(p=0.5),
            nn.Linear(1024, 128),
            )
            
    def forward(self, x1, x2):
        x1 = self.features(x1)
        x2 = self.features(x2)
        return x1, x2
        

In [None]:
image_transform = transforms.Compose([
        transforms.Resize((155, 220)),
        ImageOps.invert,
        transforms.ToTensor(),
        
    ])

prepare_CEDAR(M = 50,K = 55)

train_data = SignDataset(is_train = True, data_dir = "/content/drive/MyDrive/SigNet Implementation/offlineSignatureVerification/data/CEDAR",image_transform =  image_transform)
test_data = SignDataset(is_train = False, data_dir = "/content/drive/MyDrive/SigNet Implementation/offlineSignatureVerification/data/CEDAR",image_transform =  image_transform)

loaders = {
    'train_loader' : DataLoader(train_data, batch_size=32, shuffle=True, num_workers=2, pin_memory=True),
    'test_loader'  : DataLoader(test_data, batch_size=32, shuffle=False, num_workers=2, pin_memory=True),
}

In [None]:
class ContrastiveLoss(nn.Module):
    def __init__(self, alpha, beta, margin):
        super().__init__()
        self.alpha = alpha
        self.beta = beta
        self.margin = margin

    def forward(self, x1, x2, y):
        distance = torch.pairwise_distance(x1, x2, p=2)
        loss = self.alpha * (1-y) * distance**2 + \
               self.beta * y * (torch.max(torch.zeros_like(distance), self.margin - distance)**2)
        return torch.mean(loss, dtype=torch.float)


In [None]:
def accuracy(distances, y, step=0.01):
    min_threshold_d = min(distances)
    max_threshold_d = max(distances)
    max_acc = 0
    same_id = (y == 1)

    for threshold_d in torch.arange(min_threshold_d, max_threshold_d+step, step):
        true_positive = (distances <= threshold_d) & (same_id)
        true_positive_rate = true_positive.sum().float() / same_id.sum().float()
        true_negative = (distances > threshold_d) & (~same_id)
        true_negative_rate = true_negative.sum().float() / (~same_id).sum().float()

        acc = 0.5 * (true_negative_rate + true_positive_rate)
        max_acc = max(max_acc, acc)
    return max_acc

@torch.no_grad()
def eval(model, criterion, dataloader, log_interval=40):
    model.eval()
    running_loss = 0
    number_samples = 0

    distances = []

    for batch_idx, (x1, x2, y) in enumerate(dataloader):
        x1, x2, y = x1.to('cuda'), x2.to('cuda'), y.to('cuda')

        x1, x2 = model(x1, x2)
        loss = criterion(x1, x2, y)
        distances.extend(zip(torch.pairwise_distance(x1, x2, 2).cpu().tolist(), y.cpu().tolist()))

        number_samples += len(x1)
        running_loss += loss.item() * len(x1)

        if (batch_idx + 1) % 40 == 0 or batch_idx == len(dataloader) - 1:
            print('{}/{}: Loss: {:.4f}'.format(batch_idx+1, len(dataloader), running_loss / number_samples))

    distances, y = zip(*distances)
    distances, y = torch.tensor(distances), torch.tensor(y)
    max_accuracy = accuracy(distances, y)
    print(f'Max accuracy: {max_accuracy}')
    return running_loss / number_samples, max_accuracy


In [None]:
model = SigNetModel().to("cuda")
criterion = ContrastiveLoss(alpha=1, beta=1, margin=1).to("cuda")
optimizer = optim.RMSprop(model.parameters(), lr=1e-5, eps=1e-8, weight_decay=5e-4, momentum=0.9)
scheduler = optim.lr_scheduler.StepLR(optimizer, 5, 0.1)
num_epochs = 20
model.train()
print(model)

SigNetModel(
  (features): Sequential(
    (0): Conv2d(1, 96, kernel_size=(11, 11), stride=(1, 1))
    (1): ReLU()
    (2): LocalResponseNorm(5, alpha=0.0001, beta=0.75, k=2)
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (4): Conv2d(96, 256, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (5): LocalResponseNorm(5, alpha=0.0001, beta=0.75, k=2)
    (6): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (7): Dropout2d(p=0.3, inplace=False)
    (8): Conv2d(256, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (9): Conv2d(384, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (10): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (11): Dropout2d(p=0.3, inplace=False)
    (12): Flatten(start_dim=1, end_dim=-1)
    (13): Linear(in_features=119808, out_features=1024, bias=True)
    (14): Dropout2d(p=0.5, inplace=False)
    (15): Linear(in_features=1024, out_features

In [None]:
def saveCheckpoint(state, isBest, checkpointPath, bestModelPath):
    fPath = checkpointPath
    torch.save(state, fPath)
    if isBest:
        bestFPath = bestModelPath
        shutil.copyfile(fPath, bestFPath)

def loadCheckpoint(checkpointFPath, model, optimizer, scheduler):
    checkpoint = torch.load(checkpointFPath)
    model.load_state_dict(checkpoint['model'])
    optimizer.load_state_dict(checkpoint['optim'])
    scheduler.load_state_dict(checkpoint['scheduler'])
    return model, optimizer, checkpoint['epoch'], scheduler

In [None]:
def trainInternal(model, optimizer, criterion, dataloader, log_interval=50):
    model.train()
    running_loss = 0
    number_samples = 0

    for batch_idx, (x1, x2, y) in enumerate(dataloader):
        x1, x2, y = x1.to('cuda'), x2.to('cuda'), y.to('cuda')

        optimizer.zero_grad()
        x1, x2 = model(x1, x2)
        loss = criterion(x1, x2, y)
        loss.backward()
        optimizer.step()

        number_samples += len(x1)
        running_loss += loss.item() * len(x1)
        if (batch_idx + 1) % log_interval == 0 or batch_idx == len(dataloader) - 1:
            print('{}/{}: Loss: {:.4f}'.format(batch_idx+1, len(dataloader), running_loss / number_samples))
            running_loss = 0
            number_samples = 0

In [None]:
def trainModel(start_epochs, num_epochs, loaders, model, optimizer, criterion, checkpointPath, bestModelPath):
  
  for epoch in range(start_epochs, num_epochs):
    print('Epoch {}/{}'.format(epoch, num_epochs))
    print('Training', '-'*20)
        
    trainInternal(model, optimizer, criterion, loaders['train_loader'], log_interval=50)
    print('Evaluating', '-'*20)
    loss, acc = eval(model, criterion, loaders['test_loader'])
    scheduler.step()

    to_save = {
        'epoch': epoch + 1,
        'model': model.state_dict(),
        'scheduler': scheduler.state_dict(),
        'optim': optimizer.state_dict(),    
        }
    print('Saving checkpoint..')
    saveCheckpoint(to_save, False, checkpointPath, bestModelPath)
    torch.save(to_save, '/content/drive/MyDrive/SigNet Implementation/offlineSignatureVerification/PYTORCHCHECKPOINT/epoch_{}_loss_{:.3f}_acc_{:.3f}.pt'.format(epoch, loss, acc))

  return model




In [None]:
model.train()
start_time = time.time()
trained_model = trainModel(0, 20, loaders, model, optimizer, criterion, "/content/drive/MyDrive/SigNet Implementation/offlineSignatureVerification/CHECKPOINT/currentCheckPoint.pt", "/content/drive/MyDrive/SigNet Implementation/offlineSignatureVerification/best_model/bestModel.pt")
end_time = time.time()
print("Model trained....")
print("Training Time: {}".format((end_time-start_time)/60))


Epoch 0/20
Training --------------------
50/863: Loss: 0.3896
100/863: Loss: 0.2086
150/863: Loss: 0.1999
200/863: Loss: 0.1960
250/863: Loss: 0.1926
300/863: Loss: 0.1957
350/863: Loss: 0.1980
400/863: Loss: 0.1970
450/863: Loss: 0.1878
500/863: Loss: 0.1891
550/863: Loss: 0.1914
600/863: Loss: 0.1875
650/863: Loss: 0.1906
700/863: Loss: 0.1950
750/863: Loss: 0.1888
800/863: Loss: 0.1902
850/863: Loss: 0.1854
863/863: Loss: 0.1918
Evaluating --------------------
40/87: Loss: 0.5482
80/87: Loss: 0.5197
87/87: Loss: 0.4829
Max accuracy: 1.0
Saving checkpoint..
Epoch 1/20
Training --------------------
50/863: Loss: 0.1886
100/863: Loss: 0.1909
150/863: Loss: 0.1858
200/863: Loss: 0.1929
250/863: Loss: 0.1863
300/863: Loss: 0.1854
350/863: Loss: 0.1910
400/863: Loss: 0.1831
450/863: Loss: 0.1855
500/863: Loss: 0.1840
550/863: Loss: 0.1827
600/863: Loss: 0.1880
650/863: Loss: 0.1831
700/863: Loss: 0.1844
750/863: Loss: 0.1843
800/863: Loss: 0.1832
850/863: Loss: 0.1793
863/863: Loss: 0.183

In [None]:
#resume model training

model = SigNetModel().to("cuda")
optimizer = optim.RMSprop(model.parameters(), lr=1e-5, eps=1e-8, weight_decay=5e-4, momentum=0.9)
scheduler = optim.lr_scheduler.StepLR(optimizer, 5, 0.1)
checkpointFPath = '/content/drive/MyDrive/SigNet Implementation/offlineSignatureVerification/checkpoint/currentCheckPoint.pt'
model, optimizer, start_epoch, scheduler = loadCheckpoint(checkpointFPath, model, optimizer, scheduler)

print("model = ", model)
print("optimizer = ", optimizer)
print("start_epoch = ", start_epoch)
print("scheduler = ", scheduler)

In [None]:
start_time3 = time.time()
trained_model = train(start_epoch, 20, loaders, model, optimizer, criterion, "/content/drive/MyDrive/SigNet Implementation/offlineSignatureVerification/checkpoint/currentCheckPoint.pt", "/content/drive/MyDrive/SigNet Implementation/offlineSignatureVerification/best_model/bestModel.pt")
end_time3 = time.time()
print("Model trained....")
print("Training Time: {}".format((end_time3-start_time3)/60))

In [None]:
# start_time = time.time()
# for epoch in range(num_epochs):
#     print('Epoch {}/{}'.format(epoch, num_epochs))
#     print('Training', '-'*20)
    
#     model.train()
    
#     running_loss = 0
#     number_samples = 0
    
#     for batch_idx, (x1, x2, y) in enumerate(train_loader):
#         x1, x2, y = x1.to('cuda'), x2.to('cuda'), y.to('cuda')

#         optimizer.zero_grad()
#         x1, x2 = model(x1, x2)
#         loss = criterion(x1, x2, y)
#         loss.backward()
#         optimizer.step()

#         number_samples += len(x1)
#         running_loss += loss.item() * len(x1)
#         if (batch_idx + 1) % 40 == 0 or batch_idx == len(train_loader) - 1:
#             print('{}/{}: Loss: {:.4f}'.format(batch_idx+1, len(train_loader), running_loss / number_samples))
#             running_loss = 0
#             number_samples = 0
#     scheduler.step()
#     loss, acc = eval(model, criterion, test_loader)
#     to_save = {
#             'model': model.state_dict(),
#             'scheduler': scheduler.state_dict(),
#             'optim': optimizer.state_dict(),
#         }
#     print('Saving checkpoint..')
#     torch.save(to_save, 'checkpoints/epoch_{}_loss_{:.3f}_acc_{:.3f}.pt'.format(epoch, loss, acc))


# end_time = time.time()
# print("Training Time: {}".format((end_time-start_time)/60))

# start_time1 = time.time()
# print('Evaluating', '-'*20)
# loss, acc = eval(model, criterion, test_loader)
# end_time1 = time.time()

# print("Evaluation Time: {}".format((end_time1-start_time1)/60))

    