In [1]:
import cv2
%matplotlib inline
import torchvision
import torchvision.datasets as dset
import torchvision.transforms as transforms
from torch.utils.data import DataLoader,Dataset
import matplotlib.pyplot as plt
import torchvision.utils
import numpy as np
import pandas as pd
import sys
import os
import random
from PIL import Image
import torch
from torch.autograd import Variable
import PIL.ImageOps    
import torch.nn as nn
from torch import optim
import torch.nn.functional as F

In [3]:
data_east = pd.read_csv('/kaggle/input/processed-handwriting/hand_east/data_east.csv')

In [None]:
data_east.head()

In [None]:
data_east['cropped'][0][1:]

In [4]:
input_dir = '/kaggle/input/processed-handwriting/'
train_dir = '/kaggle/input/processed-handwriting/hand_east/train'
val_dir = '/kaggle/input/processed-handwriting/hand_east/val/'

In [None]:
pt = input_dir + data_east['cropped'][0][1:]
img = cv2.imread(pt)
img.shape

## Creating the Siamese Dataset
**selecting 2 images at random from the dataset**

In [None]:
indices = random.choices(range(len(data_east)))
selected_tuple = data_east.loc[indices]
selected_tuple['folder']

In [5]:
class SiameseNetworkDataset(Dataset):
    
    def __init__(self,data,input_dir,transform=None):
        self.data = data
        self.transform = transform
        self.input_dir = input_dir
        
    def __getitem__(self,index):
        indices = random.choices(range(len(self.data)))
        selected_tuple = self.data.loc[indices]
        img0_path = input_dir + selected_tuple['cropped'].iloc[0][1:]
        img0_folder = selected_tuple['folder'].iloc[0]
        #we need to make sure approx 50% of images are in the same class
        should_get_same_class = random.randint(0,1) 
        if should_get_same_class:
            while True:
                #keep looping till the same class image is found
                ind = random.choice(range(len(self.data)))
                search_tuple = self.data.loc[ind]
                if search_tuple['folder']==img0_folder:
                    break
        else:
            while True:
                #keep looping till a different class image is found
                
                ind = random.choice(range(len(self.data)))
                search_tuple = self.data.loc[ind]
                if search_tuple['folder']!=img0_folder:
                    break

        img0 = cv2.imread(img0_path)
        img1_path = input_dir + search_tuple['cropped'][1:]
        img1 = cv2.imread(img1_path)
        

        if self.transform is not None:
            img0 = self.transform(img0[:,:,0])
            img1 = self.transform(img1[:,:,0])
        
        return img0, img1 , torch.from_numpy(np.array([int(should_get_same_class)],dtype=np.float32))
    
    def __len__(self):
        return len(self.data)



In [6]:
siamese_dataset = SiameseNetworkDataset(data=data_east,input_dir=input_dir,
                                        transform=transforms.Compose([transforms.ToTensor()]))

## The Siamese Network

In [7]:
class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()
        self.cnn1 = nn.Sequential(
            nn.ReflectionPad2d(1),
            nn.Conv2d(1, 2, kernel_size=3),
            nn.ReLU(inplace=True),
            
            nn.ReflectionPad2d(1),
            nn.Conv2d(2, 4, kernel_size=3),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(4),
            
            
            nn.ReflectionPad2d(1),
            nn.Conv2d(4, 8, kernel_size=3),
            nn.ReLU(inplace=True),


            nn.ReflectionPad2d(1),
            nn.Conv2d(8, 8, kernel_size=3),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(8),


        )

        self.fc1 = nn.Sequential(
            nn.Linear(8*256*256, 1024),
            nn.ReLU(inplace=True),

            nn.Linear(1024, 512),
            nn.ReLU(inplace=True),

            nn.Linear(512, 64))

    def forward_once(self, x):
        output = self.cnn1(x)
        output = output.view(output.size()[0], -1)
        output = self.fc1(output)
        return output

    def forward(self, input1, input2):
        output1 = self.forward_once(input1)
        output2 = self.forward_once(input2)
        return output1, output2



## Contrastive Loss

In [8]:


class ContrastiveLoss(torch.nn.Module):
    """
    Contrastive loss function.
    Based on: http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf
    """

    def __init__(self, margin=0.2):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        cosine_similarity = F.cosine_similarity(output1, output2, dim=1)
        loss_contrastive = torch.mean((1 - label) * torch.pow(1 - cosine_similarity, 2) +
                                      (label) * torch.pow(torch.clamp(cosine_similarity - self.margin, min=0.0), 2))


        return loss_contrastive



In [9]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

device(type='cuda', index=0)

In [10]:
train_dataloader = DataLoader(siamese_dataset,
                        shuffle=True,
                        num_workers=2,
                        batch_size=64)
net = SiameseNetwork().to(device)
criterion = ContrastiveLoss()
optimizer = optim.Adam(net.parameters(),lr = 0.0005 )

In [11]:
counter = []
loss_history = [] 
iteration_number= 0

In [None]:
for epoch in range(100):
    for i, data in enumerate(train_dataloader,0):
        img0, img1 , label = data
        img0, img1 , label = img0.to(device), img1.to(device) , label.to(device)
        optimizer.zero_grad()
        output1,output2 = net(img0,img1)
        loss_contrastive = criterion(output1,output2,label)
        loss_contrastive.backward()
        optimizer.step()
        if i %10 == 0 :
            print("Epoch number {}\n Current loss {}\n".format(epoch,loss_contrastive.item()))
            iteration_number +=10
            counter.append(iteration_number)
            loss_history.append(loss_contrastive.item())
show_plot(counter,loss_history)

In [None]:
!rm -r /kaggle/working/

In [1]:
torch.save(net.state_dict(), '/kaggle/working/model-3')

In [None]:
model = SiameseNetwork()
model.load_state_dict(torch.load('/kaggle/working/model-1'))
model.eval()

In [6]:
val_comp = pd.read_csv('/kaggle/input/val-comp/val.csv')
val_comp.shape

(5770, 3)

In [None]:
val_comp.head()

In [4]:
from tqdm import tqdm

In [2]:
net.eval()

SiameseNetwork(
  (cnn1): Sequential(
    (0): ReflectionPad2d((1, 1, 1, 1))
    (1): Conv2d(1, 2, kernel_size=(3, 3), stride=(1, 1))
    (2): ReLU(inplace=True)
    (3): ReflectionPad2d((1, 1, 1, 1))
    (4): Conv2d(2, 4, kernel_size=(3, 3), stride=(1, 1))
    (5): ReLU(inplace=True)
    (6): BatchNorm2d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (7): ReflectionPad2d((1, 1, 1, 1))
    (8): Conv2d(4, 8, kernel_size=(3, 3), stride=(1, 1))
    (9): ReLU(inplace=True)
    (10): ReflectionPad2d((1, 1, 1, 1))
    (11): Conv2d(8, 8, kernel_size=(3, 3), stride=(1, 1))
    (12): ReLU(inplace=True)
    (13): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (fc1): Sequential(
    (0): Linear(in_features=524288, out_features=1024, bias=True)
    (1): ReLU(inplace=True)
    (2): Linear(in_features=1024, out_features=512, bias=True)
    (3): ReLU(inplace=True)
    (4): Linear(in_features=512, out_features=64, bias=True)
  )
)

In [None]:
model.to(device)

In [8]:
net.to(device)

SiameseNetwork(
  (cnn1): Sequential(
    (0): ReflectionPad2d((1, 1, 1, 1))
    (1): Conv2d(1, 2, kernel_size=(3, 3), stride=(1, 1))
    (2): ReLU(inplace=True)
    (3): ReflectionPad2d((1, 1, 1, 1))
    (4): Conv2d(2, 4, kernel_size=(3, 3), stride=(1, 1))
    (5): ReLU(inplace=True)
    (6): BatchNorm2d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (7): ReflectionPad2d((1, 1, 1, 1))
    (8): Conv2d(4, 8, kernel_size=(3, 3), stride=(1, 1))
    (9): ReLU(inplace=True)
    (10): ReflectionPad2d((1, 1, 1, 1))
    (11): Conv2d(8, 8, kernel_size=(3, 3), stride=(1, 1))
    (12): ReLU(inplace=True)
    (13): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (fc1): Sequential(
    (0): Linear(in_features=524288, out_features=1024, bias=True)
    (1): ReLU(inplace=True)
    (2): Linear(in_features=1024, out_features=512, bias=True)
    (3): ReLU(inplace=True)
    (4): Linear(in_features=512, out_features=64, bias=True)
  )
)

In [16]:
submi = pd.DataFrame(columns=['img1_name','img2_name','label','proba'])
for i in tqdm(range(val_comp.shape[0])):
    x1 = val_dir + val_comp['img1_name'][i]
    x2 = val_dir + val_comp['img2_name'][i]
    img1 = cv2.imread(x1)
    img2 = cv2.imread(x2)
    img1 = img1[:, :, 0]
    img2 = img2[:, :, 0]
    to_tensor = transforms.ToTensor()
    tensor1 = to_tensor(img1).unsqueeze(0).to(device)
    tensor2 = to_tensor(img2).unsqueeze(0).to(device)
    op1,op2 = net(tensor1,tensor2)
    d = F.cosine_similarity(op1, op2, dim=1)
    norm_d = (1+d.item())/2
    proba = 1/(1+np.exp(-norm_d))
    if proba>0.69:
        l = 1
    else:
        l = 0
    submi.loc[len(submi)] = [val_comp['img1_name'][i],val_comp['img2_name'][i],l,proba]

100%|██████████| 5770/5770 [01:28<00:00, 65.02it/s]


In [None]:
x1 = val_dir + val_comp['img1_name'][0]
img1 = cv2.imread(x1)
img1.shape

In [17]:
submi.groupby(submi['label']).count()

Unnamed: 0_level_0,img1_name,img2_name,proba
label,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,1265,1265,1265
1,4505,4505,4505


In [18]:
submi.describe()

Unnamed: 0,label,proba
count,5770.0,5770.0
mean,0.780763,0.696241
std,0.413766,0.008695
min,0.0,0.673035
25%,1.0,0.690567
50%,1.0,0.694809
75%,1.0,0.699961
max,1.0,0.731059


In [19]:
pd.DataFrame.to_csv(submi,'/kaggle/working/Falcon_m3_01.csv',index=False)