In [None]:
import math
import os
import random
import shutil
import time
import warnings

import torch
import torch.nn as nn
import torch.nn.parallel
import torch.backends.cudnn as cudnn
import torch.distributed as dist
import torch.optim
import torch.multiprocessing as mp
import torch.utils.data
import torch.utils.data.distributed
import torchvision
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models

import simsiam.loader
import simsiam.builder
import simsiam.resnet18
import simsiam.resnet18_signatures
import simsiam.builder_resnet18
import simsiam.builder_resnet18_2

import numpy as np
import matplotlib.pyplot as plt
from os import listdir
from PIL import Image

In [None]:

files_org = listdir("signatures/org/train")
files_forg = listdir("signatures/forg/train")
files = ['signatures/org/train/' + s for s in files_org if s[0] != '.'] + ['signatures/forg/train/' + s for s in files_forg if s[0] != '.']
images = []


for name in files:
    im = Image.open(name)
    im = im.convert("L")
    im = im.convert("RGB")
    images.append(im)

rez = transforms.Resize((32, 32))
tens = transforms.ToTensor()
tensors = tens(rez(images[0]))
tensors = tensors[None, :,]

for i in range(1, len(images)):
    temp = tens(rez(images[i]))
    temp = temp[None, :,]
    tensors = torch.cat((tensors, temp))
print(tensors.size())

m1 = torch.mean(tensors)
print(m1)
(std, mean) = torch.std_mean(tensors)
print(std)
print(mean)

#32x32
#mean 0.9495
#std 0.0535

#256x256
#mean 0.9495
#std 0.0822

In [None]:
augmentation = [
    transforms.ToTensor(),
    ThresholdTransform(),
    transforms.Resize((32, 32)),
    #transforms.CenterCrop(28)
    #ThresholdTransform(),
    
]

im = Image.open("signatures/org/train/original_1_2.png")
im = im.convert("L")
im = im.convert("RGB")
temp=transforms.Compose(augmentation)
nim = temp(im)
plt.figure(1)
plt.imshow(  nim.permute(1, 2, 0)  )

im = Image.open("signatures/forg/train/forgeries_1_19.png")
im = im.convert("L")
im = im.convert("RGB")
temp=transforms.Compose(augmentation)
nim = temp(im)
plt.figure(2)
plt.imshow(  nim.permute(1, 2, 0)  )

In [None]:
class SignatureDataset():
    def __init__(self, images, transform):
        # used to prepare the labels and images path
        self.images = images
        self.transform = transform

    def __getitem__(self,index):
        image = self.images[index]
        image = self.transform(image)

        return image
    def __len__(self):
        return len(self.images)

class ThresholdTransform(object):
    def __init__(self):
        self.thr = 0.85

    def __call__(self, x):
        return (x > self.thr).to(x.dtype)

In [None]:
model = simsiam.resnet18.resnet18()
for child in model.children():
    for layer in child.modules():
        print(layer)
model = simsiam.builder_resnet18_2.SimSiam(
        model, 2048, 512)

lr = 0.03
# batch_size = 512, workers = 32
batch_size = 256
workers = 6
momentum = 0.9
weight_decay = 0.0005
start_epoch = 0
epochs = 100

init_lr = lr * batch_size / 256
criterion = nn.CosineSimilarity(dim=1)

optim_params = [{'params': model.encoder.parameters(), 'fix_lr': False},
                        {'params': model.predictor.parameters(), 'fix_lr': True}]

optimizer = torch.optim.SGD(optim_params, init_lr,
                                momentum=momentum,
                                weight_decay=weight_decay)


data_storage = "./data"

normalize = transforms.Normalize(mean=[0.5, 0.5, 0.5],
                                     std=[1, 1, 1])

augmentation = [
    transforms.ToTensor(),
    ThresholdTransform(),
    transforms.Resize((32, 32)),
    transforms.RandomResizedCrop(28, scale=(0.2, 1.)),
    transforms.RandomApply([transforms.ColorJitter(0.4, 0.4, 0.4, 0.1)], p=0.8),
    transforms.RandomGrayscale(p=0.2),
    #transforms.RandomApply([simsiam.loader.GaussianBlur([.1, 2.])], p=0.5),
    transforms.RandomHorizontalFlip(),
    normalize
    ]

dataset = SignatureDataset(images, transform=simsiam.loader.TwoCropsTransform(transforms.Compose(augmentation)))

train_loader = torch.utils.data.DataLoader(dataset, batch_size = batch_size, shuffle= True,
        num_workers=workers, pin_memory=True, drop_last=True)

device = torch.device('cuda') 
print(device)
model.to(device)


In [None]:
def train(train_loader, model, criterion, optimizer, epoch, gpu = None):
    batch_time = AverageMeter('Time', ':6.3f')
    data_time = AverageMeter('Data', ':6.3f')
    losses = AverageMeter('Loss', ':.4f')
    progress = ProgressMeter(
        len(train_loader),
        [batch_time, data_time, losses],
        prefix="Epoch: [{}]".format(epoch))
    
    # switch to train mode

    model.train()

    end = time.time()

    for i, images in enumerate(train_loader):
        # measure data loading time
        data_time.update(time.time() - end)
        if gpu is not None:
            images[0] = images[0].cuda(args.gpu, non_blocking=True)
            images[1] = images[1].cuda(args.gpu, non_blocking=True)
        
        images[0], images[1] = images[0].to(device), images[1].to(device)
        # compute output and loss
     
        p1, p2, z1, z2 = model(x1=images[0], x2=images[1])
        loss = -(criterion(p1, z2).mean() + criterion(p2, z1).mean()) * 0.5

        losses.update(loss.item(), images[0].size(0))

        # compute gradient and do SGD step
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # measure elapsed time
        batch_time.update(time.time() - end)
        end = time.time()
        
        if i % 10 == 0:
            progress.display(i)


def save_checkpoint(state, is_best, filename='checkpoint.pth.tar'):
    torch.save(state, filename)
    if is_best:
        shutil.copyfile(filename, 'model_best.pth.tar')


class AverageMeter(object):
    """Computes and stores the average and current value"""
    def __init__(self, name, fmt=':f'):
        self.name = name
        self.fmt = fmt
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

    def __str__(self):
        fmtstr = '{name} {val' + self.fmt + '} ({avg' + self.fmt + '})'
        return fmtstr.format(**self.__dict__)


class ProgressMeter(object):
    def __init__(self, num_batches, meters, prefix=""):
        self.batch_fmtstr = self._get_batch_fmtstr(num_batches)
        self.meters = meters
        self.prefix = prefix

    def display(self, batch):
        entries = [self.prefix + self.batch_fmtstr.format(batch)]
        entries += [str(meter) for meter in self.meters]
        print('\t'.join(entries))

    def _get_batch_fmtstr(self, num_batches):
        num_digits = len(str(num_batches // 1))
        fmt = '{:' + str(num_digits) + 'd}'
        return '[' + fmt + '/' + fmt.format(num_batches) + ']'


def adjust_learning_rate(optimizer, init_lr, epoch, epochs):
    """Decay the learning rate based on schedule"""
    cur_lr = init_lr * 0.5 * (1. + math.cos(math.pi * epoch / epochs))
    for param_group in optimizer.param_groups:
        if 'fix_lr' in param_group and param_group['fix_lr']:
            param_group['lr'] = init_lr
        else:
            param_group['lr'] = cur_lr

In [None]:
# Trained 100 epochs
#model.load_state_dict(torch.load('nets/simsiam_resnet18_v2.pt'))
for epoch in range(start_epoch, epochs):
    
    adjust_learning_rate(optimizer, init_lr, epoch, epochs)
    train(train_loader, model, criterion, optimizer, epoch)

In [None]:
torch.save(model.state_dict(), 'nets/simsiam_signatures_100.pt')

In [None]:
res = [0]*2560
for i, images in enumerate(train_loader):
        # measure data loading time
        
        images[0], images[1] = images[0], images[1]
        # compute output and loss
     
        p1, p2, z1, z2 = model(x1=images[0].cuda(), x2=images[1].cuda())
        loss = (criterion(p1, z2) + criterion(p2, z1)) * 0.5
        res[i*60:i*60-1] = loss.tolist()

plt.hist(res, 40, (-1, 1))
plt.show()

In [None]:
#Trained on signatures
model = simsiam.resnet18.resnet18()

model = simsiam.builder_resnet18_2.SimSiam(
        model, 2048, 512)
model.load_state_dict(torch.load('nets/simsiam_signatures_100.pt'))
model.eval()

normalize = transforms.Normalize(mean=[0.5, 0.5, 0.5],
                                     std=[1, 1, 1])
augmentation = [
    transforms.ToTensor(),
    ThresholdTransform(),
    transforms.Resize((32, 32)),
    transforms.CenterCrop(28),
    normalize
]
cos = nn.CosineSimilarity(dim=0)
files_org = listdir("signatures/org/val")
files_forg = listdir("signatures/forg/val")
files_org.sort()
files_forg.sort()
images_org = []
images_forg = []

for name in files_org:
    if name[0] != '.':
        im = Image.open("signatures/org/val/" + name)
        im = im.convert("L")
        im = im.convert("RGB")
        images_org.append(im)
    
for name in files_forg:
    if name[0] != '.':
        im = Image.open("signatures/forg/val/" + name)
        im = im.convert("L")
        im = im.convert("RGB")
        images_forg.append(im)

dataset_org = SignatureDataset(images_org, transform=transforms.Compose(augmentation))

train_loader_org = torch.utils.data.DataLoader(dataset_org, batch_size = 24, shuffle= False,
        num_workers=4, pin_memory=True, drop_last=True)    
    
dataset_forg = SignatureDataset(images_forg, transform=transforms.Compose(augmentation))

train_loader_forg = torch.utils.data.DataLoader(dataset_forg, batch_size = 24, shuffle= False,
        num_workers=4, pin_memory=True, drop_last=True)

best_threshold = -1
best_acc = -1
for itera in range(0, 100):
    count_org = 0
    sum_org = 0
    threshold = itera/100
    corr_org = 0
    corr_forg = 0

    sl1 = [0] * 1380
    for x, images in enumerate(train_loader_org):
        res1 = model.forward_lat(images).detach()
        for i in range(0, 24):
            for j in range(0, 24):
                if j > i:
                    count_org+=1
                    diff = cos(res1[i],res1[j])
                    sum_org+= diff
                    sl1[count_org-1] = diff.item()
                    if(diff.item() >= threshold):
                        corr_org += 1

    it_org = iter(train_loader_org)
    it_forg = iter(train_loader_forg)
    count_forg = 0
    sum_forg = 0
    sl2 = [0] * 2880
    for x in range(0, 5):
        images_org = next(it_org)
        images_forg = next(it_forg)
        res1 = model.forward_lat(images_org).detach()
        res2 = model.forward_lat(images_forg).detach()
        for i in range(0, 24):
            for j in range(0, 24):
                count_forg+=1
                diff = cos(res1[i],res2[j])
                sum_forg+= diff
                sl2[count_forg-1] = diff.item()
                if(diff.item() < threshold):
                    corr_forg += 1
    acc = (corr_org/count_org+corr_forg/count_forg)/2
    if acc > best_acc:
        best_acc = acc
        best_threshold = threshold
print(best_acc)
print(best_threshold)


In [None]:
#Testing on signatures
model = simsiam.resnet18.resnet18()

model = simsiam.builder_resnet18_2.SimSiam(
        model, 2048, 512)
model.load_state_dict(torch.load('nets/simsiam_signatures_100.pt'))
model.eval()

normalize = transforms.Normalize(mean=[0.5, 0.5, 0.5],
                                     std=[1, 1, 1])
augmentation = [
    transforms.ToTensor(),
    ThresholdTransform(),
    transforms.Resize((32, 32)),
    transforms.CenterCrop(28),
    normalize
]
cos = nn.CosineSimilarity(dim=0)
files_org = listdir("signatures/org/test")
files_forg = listdir("signatures/forg/test")
files_org.sort()
files_forg.sort()
images_org = []
images_forg = []

for name in files_org:
    if name[0] != '.':
        im = Image.open("signatures/org/test/" + name)
        im = im.convert("L")
        im = im.convert("RGB")
        images_org.append(im)
    
for name in files_forg:
    if name[0] != '.':
        im = Image.open("signatures/forg/test/" + name)
        im = im.convert("L")
        im = im.convert("RGB")
        images_forg.append(im)

dataset_org = SignatureDataset(images_org, transform=transforms.Compose(augmentation))

train_loader_org = torch.utils.data.DataLoader(dataset_org, batch_size = 24, shuffle= False,
        num_workers=4, pin_memory=True, drop_last=True)    
    
dataset_forg = SignatureDataset(images_forg, transform=transforms.Compose(augmentation))

train_loader_forg = torch.utils.data.DataLoader(dataset_forg, batch_size = 24, shuffle= False,
        num_workers=4, pin_memory=True, drop_last=True)

best_threshold = -1
best_acc = -1
count_org = 0
sum_org = 0
threshold = 0.47
corr_org = 0
corr_forg = 0

sl1 = [0] * 1380
for x, images in enumerate(train_loader_org):
    res1 = model.forward_lat(images).detach()
    for i in range(0, 24):
        for j in range(0, 24):
            if j > i:
                count_org+=1
                diff = cos(res1[i],res1[j])
                sum_org+= diff
                sl1[count_org-1] = diff.item()
                if(diff.item() >= threshold):
                    corr_org += 1

it_org = iter(train_loader_org)
it_forg = iter(train_loader_forg)
count_forg = 0
sum_forg = 0
sl2 = [0] * 2880
for x in range(0, 5):
    images_org = next(it_org)
    images_forg = next(it_forg)
    res1 = model.forward_lat(images_org).detach()
    res2 = model.forward_lat(images_forg).detach()
    for i in range(0, 24):
        for j in range(0, 24):
            count_forg+=1
            diff = cos(res1[i],res2[j])
            sum_forg+= diff
            sl2[count_forg-1] = diff.item()
            if(diff.item() < threshold):
                corr_forg += 1

print(corr_org/count_org)
print(corr_forg/count_forg)

In [None]:
f, (ax1, ax2) = plt.subplots(1, 2)
f.set_size_inches(18.5/1.5, 10.5/1.5)
ax1.hist(sl1, 80, (-1, 1))
ax1.set_title('Distribution of similarity for positive pairs')
ax1.set_ylim([0, 450])
ax2.hist(sl2, 80, (-1, 1))
ax2.set_title('Distribution of similarity for negative pairs')
ax2.set_ylim([0, 940])

In [None]:
plt.figure(1)
plt.hist(sl1, 40, (0, 1))
plt.show()
plt.figure(2)
plt.hist(sl2, 40, (0, 1))
plt.show()
print(sum(sl1)/len(sl1))
print(sum(sl2)/len(sl2))