In [17]:
# PyTorch
import torch
import os
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torch.utils.data import DataLoader,Dataset
from torchvision.models import *

import random
from collections import OrderedDict
import numpy as np
import matplotlib.pyplot as plt
from glob import glob
import cv2
import pandas as pd
import pywt
import warnings
warnings.filterwarnings("ignore")

class PairedDataset(Dataset):#Get two images and whether they are related.
    
    def __init__(self, features, names):
        self.features = features
        self.full_names = names
        self.value_cnts = pd.value_counts(self.full_names)
        self.names = np.unique(names)
        
    def get_pair(self, positive):
        pair = []
        if positive:
            while True:
                value = random.choice(self.names)
                if self.value_cnts[value]>=2:
                    id = [value, value]
                    break
            label = 1
        else:
            label = 0
            while True:
                id = [random.choice(self.names), random.choice(self.names)]
                if id[0] != id[1]:
                    break
        
        for i in range(2):
            if i==1:
              while True:
                indx = np.random.choice(np.where(self.full_names==id[i])[0])  
                if indx!=pair[0]:
                    pair.append(indx)
                    break
            else:
                pair.append(np.random.choice(np.where(self.full_names==id[i])[0]))
        # print(pair)
        # print(self.features.shapes)
        f1 =  torch.from_numpy(self.features[pair[0], :].reshape(1,-1))
        f2 =  torch.from_numpy(self.features[pair[1], :].reshape(1,-1))
            
            
        return f1, f2, torch.tensor(label).long()
  
    def __getitem__(self,index):
        #we need to make sure approx 50% of images are in the same class
        should_get_same_class = random.randint(0,1) 
        f1, f2, label = self.get_pair(should_get_same_class)
        return f1, f2, label 

    def __len__(self):
        return self.features.shape[0]#essential for choose the num of data in one epoch

In [3]:
class SB_Block(nn.Module):
    '''
    Siamese Convolutional Block
    '''
    def __init__(self):
        
        super(SB_Block, self).__init__()
        self.min = self.Mini_Block()
    
    def Mini_Block(self):
        '''
        Mini block in simaese convolutions
        '''
        return nn.Sequential(OrderedDict([
          ('linear1',  nn.Linear(240,128)),
          ('relu1', nn.ReLU(inplace=True)),
          ('linear2',  nn.Linear(128,128)),
          ('relu2', nn.ReLU(inplace=True)),
        ]))
    
    def forward(self, input1, input2):
        '''
        Forward pass of a single SB block
        '''
        # print(input1.shape)
        f11 = self.min(input1)
        # print(f"f11 - {f11.shape}")
        f12 = self.min(input2)
        # print(f"f12 - {f11.shape}")
        diff = f11-f12
        # print(f"diff shape - {diff.shape}")
        return diff
    
class network(nn.Module):
    '''
    the complete network implementation for our network
    '''
    def __init__(self):
        
        super(network, self).__init__()
        self.SB = SB_Block()
        self.output = nn.Linear(128,2)
        
    def forward(self, f1, f2):
        # print(f"f1 - {f1.shape}, f2 - {f2.shape}")
        fc = self.SB(f1, f2)
        # print(f"fc - {fc.shape}")
        out = self.output(fc)
        # print("OUT SHAPE", out.shape)
        out = out.squeeze(dim = 1)
        return out

## Ignore below few cells, as it run for only once, and then stored

In [6]:
## loading selected features from evolutionary algorithm
selected_features = []
for line in open("../config/evol_chkpts/best.txt"):
    if line.strip():
        selected_features.append(int(line))
selected_features = np.array(selected_features, dtype=bool)

In [12]:
## loading wavelet eigen vals, train_wavelet dataset and test_wavelet dataset

eigen_vecs = pd.read_csv("../data/eigen_vecs_wavelet.csv")
train_data = pd.read_csv("../data/wavelet_std_features.csv")
test_data = pd.read_csv("../data/wavelet_std_features_test.csv")

In [8]:
np.sum(selected_features)

240

In [13]:
train_names = train_data["names"]
test_names = test_data["names"]
test_data = test_data[test_data.columns[:-1]].values
train_data = train_data[train_data.columns[:-1]].values
eigen_vecs = eigen_vecs.values

In [14]:
selected_eigens = eigen_vecs[:, selected_features]


In [15]:
train_features = (selected_eigens.T@train_data.T).T
test_features = (selected_eigens.T@test_data.T).T

In [16]:
import gc
del train_data, test_data, selected_eigens, eigen_vecs
gc.collect()

train_features = train_features.astype("float32")
test_features = test_features.astype("float32")

In [17]:
train_names, test_names = np.array(train_names), np.array(test_names)
np.save("../data/train_names.npy", train_names)
np.save("../data/test_names.npy", test_names)
np.save("../data/train_features.npy", train_features)
np.save("../data/test_features.npy", test_features)

### RUN FROM HERE


## For Training 

In [3]:
train_names = np.load("../data/train_names.npy", allow_pickle = True)
test_names = np.load("../data/test_names.npy", allow_pickle = True)
train_features = np.load("../data/train_features.npy")
test_features = np.load("../data/test_features.npy")

In [19]:
trainset = PairedDataset(train_features , train_names)
train_loader = DataLoader(trainset,
                        shuffle=True,#whether randomly shuffle data in each epoch, but cannot let data in one batch in order.
                        num_workers=8, 
                        batch_size = 32)

valset = PairedDataset(test_features , test_names)
validation_loader = DataLoader(valset,
                        shuffle=True,#whether randomly shuffle data in each epoch, but cannot let data in one batch in order.
                        num_workers = 8, 
                        batch_size =32)

model = network()
device = torch.device('cuda:0')
model = model.to(device)

criterion = nn.CrossEntropyLoss() # computes softmax and then the cross entropy
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9, weight_decay = 0.0005)

lambda1 = lambda epoch: 0.01 * ((0.0001 * epoch + 1) ** -0.75)
exp_lr_scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=lambda1)

epochs = 30
iteration = 0

if os.path.exists('../config/model_chkpts/logs'):
    checkpoint = torch.load('../model_chkpts/logs')
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    exp_lr_scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
    iteration = checkpoint['iteration']
    print('Restore model at iteration - ', iteration)

for i in range(epochs):
    val_acc = []
    val_loss = []
    train_acc = []
    train_loss = []
    
    
    model.train() # setting for training
    for batch_idx, data in enumerate(train_loader):
        
        
        img0, img1 , labels = data #img=tensor[batch_size,channels,width,length], label=tensor[batch_size,label]
        img0, img1 , labels = img0.to(device), img1.to(device) , labels.to(device)#move to GPU
        optimizer.zero_grad()
        
        logits = model(img0, img1)
        # print(logits.shape, labels.shape)
        loss = criterion(logits, labels)
        loss.backward()
        optimizer.step()
        train_loss.append(loss.item())
        _, preds = torch.max(logits, 1)
        total = len(labels)
        correct = torch.sum(preds==labels)
        train_acc.append(correct.item()/total)
        iteration +=1
        # print('Iteration: {}, learning rate: {:.7f}, Loss: {:.4f}, Accuracy:{:.3f}'.format(iteration, 
        #                                                                                optimizer.param_groups[0]["lr"], 
        #                                                                                loss.item(), correct.item()/total))
        exp_lr_scheduler.step()
        # if iteration%100==0:
        #     print(iteration)
        if iteration%5 == 0:
            torch.save({
                'iteration': iteration,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'scheduler_state_dict': exp_lr_scheduler.state_dict(),
                }, '../config/model_chkpts/logs')
        
    model.eval() # setting for training
    
    for batch_idx, data in enumerate(validation_loader):
        # if iteration%10==0:
        #     print(iteration)
        img0, img1 , labels = data #img=tensor[batch_size,channels,width,length], label=tensor[batch_size,label]
        img0, img1 , labels = img0.to(device), img1.to(device) , labels.to(device)#move to GPU
        
        logits = model(img0, img1)
        loss = F.cross_entropy(logits, labels, reduction = 'sum')/len(img0)

        val_loss.append(loss.item())
        _, preds = torch.max(logits, 1)
        total = len(labels)
        correct = torch.sum(preds==labels)
        val_acc.append(correct.item()/total)  
    
    print('Epoch: {}, Loss: {:.4f}, Accuracy:{:.3f}, Val Loss: {:.4f}, Val Accuracy: {:.3f}'.format(
        i+1, np.mean(train_loss), np.mean(train_acc),np.mean(val_loss), np.mean(val_acc)))

## For Demo

In [16]:
image_path1 = "/media/amshra267/New Volume/CV/Soft-Computing/lfwcrop_color/sample_faces/Aaron_Eckhart_0001.ppm"
image_path2 = "/media/amshra267/New Volume/CV/Soft-Computing/lfwcrop_color/sample_faces/Abdul_Majeed_Shobokshi_0001.ppm"

img1, (_, _, _) = pywt.dwt2(cv2.cvtColor(cv2.imread(image_path1), cv2.COLOR_BGR2GRAY), "db4")
img2, (_, _, _) = pywt.dwt2(cv2.cvtColor(cv2.imread(image_path2), cv2.COLOR_BGR2GRAY), "db4")


## loading selected features from evolutionary algorithm
selected_features = []
for line in open("../config/evol_chkpts/best.txt"):
    if line.strip():
        selected_features.append(int(line))
selected_features = np.array(selected_features, dtype=bool)

## loading wavelet pca mean from evolutionary algorithm
wave_mean = []
for line in open("../config/wavelet_mean.txt"):
    if line.strip():
        wave_mean.append(float(line))
wave_mean = np.array(wave_mean).reshape(1,-1)

thresh = 0.4
eigen_vecs = pd.read_csv("../data/eigen_vecs_wavelet.csv").values
selected_eigens = eigen_vecs[:, selected_features]


## image preprocessing
img1 = img1.reshape(1,-1)
img2 = img2.reshape(1,-1)

img1 = (img1-wave_mean)/255
img2 = (img2-wave_mean)/255

img1 = ((selected_eigens.T@img1.T)).T
img2 = ((selected_eigens.T@img2.T)).T
img1 = torch.from_numpy(img1)
img2 = torch.from_numpy(img2)

## loading saved model

model = network()
device = torch.device('cuda:0')
model = model.to(device)

if os.path.exists('../../config/model_chkpts/logs'):
    checkpoint = torch.load('../model_chkpts/logs')
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    exp_lr_scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
    iteration = checkpoint['iteration']
    print('Restore model at iteration - ', iteration)
    

## prediction
img1, img2 = img1.to(device).float(), img2.to(device).float()
output = F.softmax(model(img1, img2))[0][0]
if output>thresh:
    print("DIfferent Face")
else:
    print("Same Face")



DIfferent Face
