<a href="https://colab.research.google.com/github/aswit3/SigNet_Pytorch/blob/master/SigNet_Pytorch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Import all the necessary Library 
import torchvision
import torch.utils.data as utils
from torchvision import datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader,Dataset
from torch.autograd import Variable
import matplotlib.pyplot as plt
import torchvision.utils
import numpy as np
import time
import copy
from torch.optim import lr_scheduler
import os
from PIL import Image
import torch
from torch.autograd import Variable
import PIL.ImageOps    
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
import pandas as pd 
import matplotlib.pyplot as plt
import shutil

In [None]:
class SiameseNetworkDataset():
    
    def __init__(self,training_csv=None,transform=None):
        self.training_df= training_csv
        self.training_df.columns =["image1","image2","label"]
        self.transform = transform
        self.training_dir = "/home/dell/SigNet_Pytorch/"

    def __getitem__(self,index):    
        # getting the image path
        image1_path=self.training_df.iat[index,0].replace("./", self.training_dir)
        image2_path=self.training_df.iat[index,1].replace("./", self.training_dir)
                
        # Loading the image
        img0 = Image.open(image1_path)
        img1 = Image.open(image2_path)
        img0 = img0.convert("L")
        img1 = img1.convert("L")
        
        # Apply image transformations
        if self.transform is not None:
            img0 = self.transform(img0)
            img1 = self.transform(img1)
        
        return img0, img1 , torch.from_numpy(np.array([int(self.training_df.iat[index,2])],dtype=np.float32))
    
    def __len__(self):
        return len(self.training_df)

In [None]:
training_csv =pd.read_csv("chineese_train.csv")

In [None]:
val_csv = pd.read_csv("chineese_test.csv")

In [None]:
from sklearn.model_selection import train_test_split
val_csv, test_csv = train_test_split(val_csv, test_size=0.5, random_state=42)

In [None]:
training_csv = training_csv.head(100)
val_csv = val_csv.head(10)
test_csv = val_csv.head(10)

In [None]:
# Load the the dataset from raw image folders
train_siamese_dataset = SiameseNetworkDataset(training_csv,
                                        transform=transforms.Compose([transforms.Resize((105,105)),
                                                                      transforms.ToTensor()
                                                                      ])
                                       )

In [None]:
# Load the the dataset from raw image folders
val_siamese_dataset = SiameseNetworkDataset(val_csv,
                                        transform=transforms.Compose([transforms.Resize((105,105)),
                                                                      transforms.ToTensor()
                                                                      ])
                                       )

In [None]:
# Load the the dataset from raw image folders
test_siamese_dataset = SiameseNetworkDataset(test_csv,
                                        transform=transforms.Compose([transforms.Resize((105,105)),
                                                                      transforms.ToTensor()
                                                                      ])
                                       )

## Siamese Network Definition

In [None]:
class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()
        
        # Setting up the Sequential of CNN Layers
        self.cnn1 = nn.Sequential(
            
            nn.Conv2d(1, 96, kernel_size=11,stride=1),
            nn.ReLU(inplace=True),
            nn.LocalResponseNorm(5,alpha=0.0001,beta=0.75,k=2),
            nn.MaxPool2d(3, stride=2),
            
            nn.Conv2d(96, 256, kernel_size=5,stride=1,padding=2),
            nn.ReLU(inplace=True),
            nn.LocalResponseNorm(5,alpha=0.0001,beta=0.75,k=2),
            nn.MaxPool2d(3, stride=2),
            nn.Dropout2d(p=0.3),

            nn.Conv2d(256,384 , kernel_size=3,stride=1,padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(384,256 , kernel_size=3,stride=1,padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(3, stride=2),
            nn.Dropout2d(p=0.3),

        )
        
        # Defining the fully connected layers
        self.fc1 = nn.Sequential(
            nn.Linear(30976, 1024),
            nn.ReLU(inplace=True),
            nn.Dropout2d(p=0.5),
            
            nn.Linear(1024, 128),
            nn.ReLU(inplace=True),
            
            nn.Linear(128,2))
        
  
  
    def forward_once(self, x):
        # Forward pass 
        output = self.cnn1(x)
        output = output.view(output.size()[0], -1)
        output = self.fc1(output)
        return output

    def forward(self, input1, input2):
        # forward pass of input 1
        output1 = self.forward_once(input1)
        # forward pass of input 2
        output2 = self.forward_once(input2)
        return output1, output2


### Loss Function

In [None]:
class ContrastiveLoss(torch.nn.Module):
    """
    Contrastive loss function.
    Based on: http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf
    """

    def __init__(self, margin=2.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        euclidean_distance = F.pairwise_distance(output1, output2)
        loss_contrastive = torch.mean((1-label) * torch.pow(euclidean_distance, 2) +
                                      (label) * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2))


        return loss_contrastive, euclidean_distance

### Train the Model

In [None]:
class Config():
    train_batch_size = 1
    val_batch_size = 1
    test_batch_size = 1
    train_number_epochs = 5
    patience = 3
    checkpoint_dir ="./checkpoints/"

In [None]:
# Load the dataset as pytorch tensors using dataloader
train_dataloader = DataLoader(train_siamese_dataset,
                        shuffle=True,
                        num_workers=8,
                        batch_size=Config.train_batch_size)

In [None]:
val_dataloader = DataLoader(val_siamese_dataset,
                        shuffle=True,
                        num_workers=8,
                        batch_size=Config.val_batch_size)

In [None]:
test_dataloader = DataLoader(test_siamese_dataset,
                        shuffle=True,
                        num_workers=8,
                        batch_size=Config.test_batch_size)

In [None]:
# Check whether you have GPU is loaded or not
if torch.cuda.is_available():
    print('Yes')

In [None]:
# Declare Siamese Network
net = SiameseNetwork()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

if torch.cuda.device_count() > 1:
    print("Let's use", torch.cuda.device_count(), "GPUs!")
    net = nn.DataParallel(net)

net.to(device)

# Decalre Loss Function
criterion = ContrastiveLoss()
# Declare Optimizer
optimizer = optim.RMSprop(net.parameters(), lr=1e-4, alpha=0.99, eps=1e-8, weight_decay=0.0005, momentum=0.9)

In [None]:
def save_ckp(state, is_best, checkpoint_dir):
    f_path = checkpoint_dir+'checkpoint.pt'
    torch.save(state, f_path)
    if is_best:
        best_fpath = checkpoint_dir+'best_model.pt'
        shutil.copyfile(f_path, best_fpath)

In [None]:
def load_ckp(checkpoint_fpath, model, optimizer):
    checkpoint = torch.load(checkpoint_fpath)
    model.load_state_dict(checkpoint['state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer'])
    return model, optimizer, checkpoint['epoch']

In [None]:
import numpy as np
import torch

class EarlyStopping:
    """Early stops the training if validation loss doesn't improve after a given patience."""
    def __init__(self, patience=7, verbose=False, delta=0, path='checkpoint.pt', trace_func=print):
        """
        Args:
            patience (int): How long to wait after last time validation loss improved.
                            Default: 7
            verbose (bool): If True, prints a message for each validation loss improvement. 
                            Default: False
            delta (float): Minimum change in the monitored quantity to qualify as an improvement.
                            Default: 0
            path (str): Path for the checkpoint to be saved to.
                            Default: 'checkpoint.pt'
            trace_func (function): trace print function.
                            Default: print            
        """
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf
        self.delta = delta
        self.path = path
        self.trace_func = trace_func
    def __call__(self, val_loss, model, optimizer, epoch, checkpoint_dir):

        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model, optimizer, epoch, checkpoint_dir)
        elif score < self.best_score + self.delta:
            self.counter += 1
            self.trace_func(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model, optimizer, epoch, checkpoint_dir)
            self.counter = 0

    def save_checkpoint(self, val_loss, model, optimizer, epoch, checkpoint_dir):
        '''Saves model when validation loss decrease.'''
        if self.verbose:
            self.trace_func(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
        
        is_best = True
        checkpoint = {
            'epoch': epoch + 1,
            'state_dict': model.state_dict(),
            'optimizer': optimizer.state_dict()
        }
        save_ckp(checkpoint, is_best, checkpoint_dir)
        
        torch.save(model, self.path) # model.state_dict()
        self.val_loss_min = val_loss

In [None]:
def train(net, optimizer, epoch_start):     
    train_losses = []
    valid_losses = []
    avg_train_losses = []
    avg_valid_losses = [] 
    
    # initialize the early_stopping object
    early_stopping = EarlyStopping(patience=Config.patience, verbose=True)
    
    for epoch in range(epoch_start,Config.train_number_epochs):
        for batch_idx, (img0, img1, label) in enumerate(train_dataloader):
            img0, img1 , label = img0.to(device), img1.to(device) , label.to(device)
            
            optimizer.zero_grad()
            
            output1,output2 = net(img0,img1)
            loss_contrastive, _ = criterion(output1,output2,label)
            
            loss_contrastive.backward()
            optimizer.step()
            
            train_losses.append(loss_contrastive.item())            
                        
            if batch_idx % 10 == 0:
                print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                    epoch, batch_idx * len(img0), len(train_dataloader.dataset),
                    100. * batch_idx / len(train_dataloader), loss_contrastive.item()))

        with torch.no_grad():
            for data in val_dataloader:
                img0, img1 , label = data
                img0, img1 , label = img0.to(device), img1.to(device) , label.to(device)

                net.eval()
                
                output1,output2 = net(img0,img1)
                validation_loss, _ = criterion(output1,output2,label)
                
                valid_losses.append(validation_loss.item())

        train_loss = np.average(train_losses)
        valid_loss = np.average(valid_losses)
        avg_train_losses.append(train_loss)
        avg_valid_losses.append(valid_loss)     
                     
        print(f'Average Train_loss: {train_loss:.5f} ' + f'Average Validation_loss: {valid_loss:.5f}')
        
        train_losses = []
        valid_losses = []

        early_stopping(valid_loss, net, optimizer, epoch, Config.checkpoint_dir)
        
        print("\n")
        if early_stopping.early_stop:
            print("Early stopping")
            break

    checkpoint = {
        'epoch': epoch + 1,
        'state_dict': net.state_dict(),
        'optimizer': optimizer.state_dict()
    }

    is_best = False
    save_ckp(checkpoint, is_best, Config.checkpoint_dir)
    print("Model Saved Successfully")
    return net

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

Resume_training = True
# Resume_training = False

if Resume_training:
    best_fpath =  Config.checkpoint_dir+'best_model.pt'
    net, optimizer, start_epoch = load_ckp(best_fpath, net, optimizer)
    print(start_epoch)
    model = train(net, optimizer, start_epoch)
else:
    # Train the model
    model = train(net, optimizer, 0)

3
Average Train_loss: 1.19972 Average Validation_loss: 0.00569
Validation loss decreased (inf --> 0.005693).  Saving model ...


Average Train_loss: 1.04869 Average Validation_loss: 0.16915
EarlyStopping counter: 1 out of 3


Model Saved Successfully


In [None]:
def compute_accuracy_roc(predictions, labels):
    '''Compute ROC accuracy with a range of thresholds on distances.
    '''
    dmax = np.max(predictions)
    dmin = np.min(predictions)
    nsame = np.sum(labels == 1)
    ndiff = np.sum(labels == 0)
   
    step = 0.01
    max_acc = 0
    best_thresh = -1
   
    for d in np.arange(dmin, dmax+step, step):
        idx1 = predictions.ravel() <= d
        idx2 = predictions.ravel() > d
       
        tpr = float(np.sum(labels[idx1] == 1)) / nsame       
        tnr = float(np.sum(labels[idx2] == 0)) / ndiff
        acc = 0.5 * (tpr + tnr)       
#       print ('ROC', acc, tpr, tnr)
       
        if (acc > max_acc):
            max_acc, best_thresh = acc, d
           
    return max_acc, best_thresh

In [None]:
# initialize lists to monitor test loss and accuracy
test_loss = 0.0
accuracy=0
counter=0
correct=0

pred, tr_y = [], []

model.eval() # prep model for evaluation

for data in test_dataloader:
    img0, img1 , label = data
    img0, img1 , label = img0.to(device), img1.to(device) , label.to(device)

    output1,output2 = model(img0,img1)
    loss, distance = criterion(output1,output2,label)
    
    test_loss += loss.item()    

    res=torch.abs(output1.to(device) - output2.to(device))
    label=label[0].tolist()
    label=int(label[0])
    
    tr_y.append(label)
    pred.append(distance.item())
    
    result = torch.max(res,1)[1].item()
    
    if label == result:
        correct=correct+1
    counter=counter+1

# calculate and print avg test loss
test_loss = test_loss/len(test_dataloader.dataset)
print('Test Loss: {:.6f}\n'.format(test_loss))
accuracy=(correct/len(test_dataloader))*100
print("Accuracy:{}%".format(accuracy))
tr_acc, threshold = compute_accuracy_roc(np.array(pred), np.array(tr_y))
print("Accuracy =", tr_acc, "Threshold =", threshold)