In [10]:
import numpy as np 
import pandas as pd 
import matplotlib.pyplot as plt 
import seaborn as sns 
import os
import sys 
from warnings import filterwarnings as filt

filt('ignore')
plt.rcParams['figure.figsize'] = (12, 6)
plt.style.use('dark_background')

In [11]:
base_path   = "../input/lfwdatasetsupdated/"
images_path = os.path.join(base_path, "face_images", "lfw-dataset", 'lfw-deepfunneled', 'lfw-deepfunneled')
pairs_path  = os.path.join(base_path, "updated_pairs.csv")

In [12]:
df = pd.read_csv(pairs_path)
df.head()

In [13]:
df.shape

In [14]:
df.drop_duplicates().shape

In [15]:
import tensorflow as tf
import albumentations as A
import torch
import cv2 as cv

In [16]:
def read_img(base_path, img_name):
    path = os.path.join(base_path, img_name)
    img  = plt.imread(path)
    return img


def load_model(mname):
    base_path = '../input/face-detection-model'
    mpath = os.path.join(base_path, mname)
    device = torch.device('gpu' if torch.cuda.is_available() else 'cpu')
    return torch.hub.load('ultralytics/yolov5', 'custom', mpath, device = device)

def alb_transform(xs):
    transform = A.Compose([
        A.HorizontalFlip(0.5),
        A.Rotate(limit = 35, border_mode = cv.BORDER_CONSTANT),
        A.GaussianBlur(blur_limit = (3, 3)),
        A.CoarseDropout(10)
    ])
    
    tx = transform(image = xs)['image']
    return tx

def alb_transform_batch(xs):
    transform = A.Compose([
        A.HorizontalFlip(0.5),
        A.Rotate(limit = 35, border_mode = cv.BORDER_CONSTANT),
        A.GaussianBlur(blur_limit = (3, 3)),
        A.CoarseDropout(10)
    ])
    
    transformed_imgs = []
    for x in xs:
        tx = transform(image = x)['image']
        transformed_imgs.append(tx)
    imgs = np.array(transformed_imgs)
    return imgs

class GenerateIdx:
    def __init__(self, n, test_size, seed):
        np.random.seed(seed)
        idx = [i for i in range(n)]
        train_size = int(n - (n * test_size))
        self.train_idx = np.random.choice(idx, train_size, replace = False).tolist()
        self.test_idx  = [i for i in idx if i not in self.train_idx]
        
def add_zeros(x, units = '000'):
    return f'{units}{x}'[-4:]

In [17]:
class ImgDataset:
    def __init__(self, df, img_path, batch = 16, img_size = (64, 64), transform = True, subset = 'train', 
                 test_size = 0.2, seed = 0, shuffle = True, sanity_check = False):
        self.data = df.copy()
        if shuffle:
            self.data = self.data.sample(frac = 1.0).reset_index(drop = True)
        idx = GenerateIdx(self.data.shape[0], test_size, seed)
        if subset == 'train':
            self.data = self.data.iloc[idx.train_idx, :]
        elif subset == 'validation':
            self.data = self.data.iloc[idx.test_idx, :]
            
        if sanity_check:
            self.data = self.data.head()
            
        self.img_path = img_path 
        self.batch = batch
        self.transform = transform 
        self.img_size = img_size
            
    def load_dataset(self):
        images1 = []
        images2 = []
        targets = []
        
        for _, rdf in self.data.iterrows():
            img1, img2 = self.get_image_path(rdf, self.img_path)
            match = rdf['match']
            image1 = self.read_preprocess(img1)
            image2 = self.read_preprocess(img2)
            images1.append(image1)
            images2.append(image2)
            targets.append(float(match))
        
        print(f'Length of anchor images       : {len(images1)}')
        print(f'Length of verification images : {len(images2)}')
        print(f'Length of targets images      : {len(targets)}')
            
        images1 = tf.data.Dataset.from_tensor_slices(images1).batch(self.batch)
        images2 = tf.data.Dataset.from_tensor_slices(images2).batch(self.batch)
        targets = tf.data.Dataset.from_tensor_slices(targets).batch(self.batch)
        
        if self.transform:
            images1 = (images1
                        .map(self.tf_augment)
#                         .map(self.tf_crop_imgs)
                        .map(self.normalize)
                      )
            images2 = (images2
                        .map(self.tf_augment)
#                         .map(self.tf_crop_imgs)
                        .map(self.normalize)
                      )
            
        return images1, images2, targets
    
    def load_dataset_V2(self):
        images1 = self.data[['name1', 'imgnum1']].values
        images2 = self.data[['name2', 'imgnum2']].values
        targets = self.data['match'].astype(float).values
        
        print(f'Length of anchor images       : {images1.shape[0]}')
        print(f'Length of verification images : {images2.shape[0]}')
        print(f'Length of targets images      : {targets.shape[0]}')
        
        images1 = tf.data.Dataset.from_tensor_slices(images1)
        images2 = tf.data.Dataset.from_tensor_slices(images2)
        targets = tf.data.Dataset.from_tensor_slices(targets).cache().batch(self.batch)
        
        if self.transform:
            images1 = (images1
                        .map(self.tf_read_img)
                        .map(self.tf_augment)
                        .map(self.normalize).cache().batch(self.batch)
                      )
            images2 = (images2
                        .map(self.tf_read_img)
                        .map(self.tf_augment)
                        .map(self.normalize).cache().batch(self.batch)
                      )
        else:
            images1 = (images1
                        .map(self.tf_read_img)
                        .map(self.normalize).cache().batch(self.batch)
                      )
            images2 = (images2
                        .map(self.tf_read_img)
                        .map(self.normalize).cache().batch(self.batch)
                      )
            
        return images1, images2, targets
            
    def read_preprocess(self, img):
        image = plt.imread(img)
        image = cv.resize(image, self.img_size)
        channel = image.shape[-1]
        if channel == 1:
            image = cv.cvtColor(image, cv.COLOR_GRAY2RGB)
        elif channel == 4:
            image = cv.cvtColor(image, cv.COLOR_RGBA2RGB)
            
        return image
    
    def read_images(self, data):
        name, num = data
        path  = self.join_paths(self.img_path, name.decode('utf-8'), num.decode('utf-8'))
        image = self.read_preprocess(path)
        return image
    
    def read_images_batches(self, data):
        images = []
        for d in data:
            image = self.read_images(d)
            images.append(image)
        images = np.array(images)
        return images
    
    def tf_read_img(self, data):
        images = tf.numpy_function(func = self.read_images, inp = [data], Tout = tf.uint8)
        return images
    
    def tf_augment(self, image):
        aug_img = tf.numpy_function(func = self.transform, inp = [image], Tout = tf.uint8)
        return aug_img
    
    def tf_crop_imgs(self, image):
        crop_img = tf.numpy_function(func = self.crop_faces, inp = [image], Tout = tf.uint8)
        return crop_img
        
    def crop_faces(self, x):
        x = [i for i in x]
        pred = self.model(x).xyxy
        imgs = []
        
        for i in range(len(x)):
            p = pred[i][0]
            img = x[i][int(p[1]) : int(p[3]), int(p[0]) : int(p[2])]
            img = cv.resize(img, self.img_size)
            imgs.append(img)
        
        imgs = np.array(imgs)
        return imgs
            
    def __len__(self):
        return self.data.shape[0]
    
    @staticmethod
    def get_image_path(rdf, base_path):
        name1 = rdf['name1']
        name2 = rdf['name2']
        num1  = str(rdf['imgnum1'])
        num2  = str(rdf['imgnum2'])
        
        return (
                  self.join_paths(base_path, name1, num1),
                  self.join_pathsths(base_path, name2, num2)
                )
    
    @staticmethod
    def join_paths(base_path, name, num):
        return os.path.join(base_path, name, f"{name}_{add_zeros(num)}.jpg")
    
    @staticmethod
    def normalize(x):
        img = x / 255
        return img
    
    
from sklearn.model_selection import train_test_split
def sample_split(df, y, frac = 0.2, save_idx = False, delim = ', '):
    xcol = [c for c in df.columns if c != y]
    x = df[xcol]
    y = df[y]
    x, xt, y, yt = train_test_split(x, y, test_size = frac, stratify = y)

    print(f'Split size  :==> {frac}')
    print(f'Sample size :==> {xt.shape[0]}')
    print()
    
    if save_idx:
        filename = 'idx_batch.txt'
        idx = delim.join(list(map(str, xt.index.values)))
        with open(filename, 'w') as file:
            file.write(idx)
        
        print(f'Saved the idx to filename : {filename}')
    
    ndf1 = pd.concat([xt, yt], axis = 1).sample(frac = 1.0)
    ndf1.index = xt.index
    ndf2 = pd.concat([x, y], axis = 1).sample(frac = 1.0)
    ndf2.index = x.index
    return ndf1, ndf2

def remove_prev_batch_df_from_file(df, filename = 'idx_batch.txt', delim = ', '):
    with open(filename, 'r') as file:
        idx = file.read()
        
    idx = list(map(int, idx.split(delim)))
    idx = [i for i in df.index if i not in idx]
    return df.iloc[idx, :]

def remove_prev_batch_df(df, idx = []):
    idx = [i for i in df.index.values if i not in idx]
    return df.iloc[idx, :]

In [18]:
(df.shape[0] * 0.105) * 8, df.shape[0]

In [19]:
# df.shape[0] * (103193 / (df.shape[0] - df1.shape[0]))

In [20]:
df.shape[0] * 0.11731849926727808

In [21]:
# training batches 
split_frac = 0.105
ndf = df.copy()
df1, ndf = sample_split(ndf, 'match', split_frac)
df1[['imgnum1', 'imgnum2', 'match']] = df1[['imgnum1', 'imgnum2', 'match']].astype(str) 
training_batches_dfs = [df1]

for i in range(7):
    n = df1.shape[0]
    d = df.shape[0]
    for bdf in training_batches_dfs:
        d -= bdf.shape[0]
    split_frac = n / d
    dfn, ndf = sample_split(ndf, 'match', split_frac)
    dfn[['imgnum1', 'imgnum2', 'match']] = dfn[['imgnum1', 'imgnum2', 'match']].astype(str)
    training_batches_dfs.append(dfn)
    
validation_df, test_df = sample_split(ndf, 'match', 0.5)
validation_df[['imgnum1', 'imgnum2', 'match']] = validation_df[['imgnum1', 'imgnum2', 'match']].astype(str)
test_df[['imgnum1', 'imgnum2', 'match']] = test_df[['imgnum1', 'imgnum2', 'match']].astype(str)

In [13]:
plt.figure(1, figsize = (20, 15))
plt.tight_layout()
for i in range(len(training_batches_dfs)):
    plt.subplot(2, 4, i + 1)
    sns.countplot(training_batches_dfs[i]['match'])
    plt.title(f'Training Batch : {i + 1}')

In [22]:
def generate_train_data_generator(training_batches_dfs, SIZE, batch):
    for ind, bdf in enumerate(training_batches_dfs):
        print(f'Processing training batch : {ind + 1}')
        train_data = ImgDataset(bdf, images_path, batch, img_size = (SIZE, SIZE), test_size = 0.0, 
                                seed = 123, transform = alb_transform) 
        train_data = train_data.load_dataset_V2()
        train_data = tf.data.Dataset.zip(train_data).prefetch(1)
        yield train_data

In [23]:
# training data 
SIZE = 64 * 2
batch = 16
tf_train_data = generate_train_data_generator(training_batches_dfs, SIZE, batch)

# # validation data
# val_data = ImgDataset(validation_df, images_path, batch, img_size = (SIZE, SIZE), test_size = 0.0, 
#                       seed = 123, transform = None) 
# val_data = val_data.load_dataset_V2()
# val_data = tf.data.Dataset.zip(val_data).prefetch(tf.data.AUTOTUNE)

# # testing data  
# test_data = ImgDataset(test_df, images_path, batch, img_size = (SIZE, SIZE), test_size = 0.0, 
#                       seed = 123, transform = None) 
# test_data = test_data.load_dataset_V2()
# test_data = tf.data.Dataset.zip(test_data).prefetch(tf.data.AUTOTUNE)

In [24]:
def get_row(num, col):
    if num % col == 0:
        return num // col
    return (num // col) + 1

def plot_tf_image(data, num, col = 4):
    row = get_row(num, col)
    
    plt.figure(figsize = (col * 4, row * 4))
    print('Anchor images'.center(60, '='))
    print()
    #anchor images
    for td in train_data:
        matches = td[2]
        break_it = False
        for i, img in enumerate(td[0]):
            plt.subplot(row, col, i + 1)
            plt.imshow(img)
            plt.title(f"{'same person' if matches[i] == 1 else 'not a same person'}")
            plt.axis(False)
            if i + 1 >= num:
                break_it = True
                break
        if break_it:
            break
        
    plt.show()
        
    plt.figure(figsize = (col * 4, row * 4))
    print('Verification images'.center(60, '='))
    print()
    #verification images
    for td in train_data:
        matches = td[2]
        break_it = False
        for i, img in enumerate(td[1]):
            plt.subplot(row, col, i + 1)
            plt.imshow(img)
            plt.title(f"{'same person' if matches[i] == 1 else 'not a same person'}")
            plt.axis(False)
            if i + 1 >= num:
                break_it = True
                break
        if break_it:
            break
    plt.show()

In [None]:
plot_tf_image(tf_train_data[0], 8, 4)

In [25]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Layer, Input, GlobalAveragePooling2D
from tensorflow.keras.applications.densenet import DenseNet121
from tensorflow.keras import backend as k

class Distance(Layer):
    def __init__(self, **kwargs):
        super().__init__()
        
    def euclidean_dist(self, vectors):
        vec1, vec2 = vectors
        dist = k.sum(k.square(vec1 - vec2), axis = 1, keepdims = True)
        return k.sqrt(k.maximum(dist, k.epsilon()))
    
    def call(self, anchor, verification):
        return self.euclidean_dist([anchor, verification])

class SimeaseNet:
    def __init__(self, encoding_model, training = True):
        self.encoding_model = encoding_model
        self.encoding_model._name = "Encoder"
        self.training = training
        
    def encode(self, x):
        feat = self.encoding_model(x)
        return feat
    
    def enable_training_mode(self):
        self.training = True
        self.model.trainable = self.training
        self.encoding_model.trainable = self.training
        
        print('Set the encoding and siamese model to training mode ...')
        
    def enable_inference_mode(self):
        self.training = False
        self.model.trainable = self.training
        self.encoding_model.trainable = self.training
        
        print('Set the encoding and siamese model to inference mode ...')
    
    def build_model(self):
        dist    = Distance()
        dist._name = 'Distance'
        
        input1  = Input(shape = (SIZE, SIZE, 3), name = 'anchor')
        input2  = Input(shape = (SIZE, SIZE, 3), name = 'verification')
        feat1   = self.encode(input1)
        feat2   = self.encode(input2)
        d       = dist(feat1, feat2)
        output  = Dense(1, activation = 'sigmoid', name = 'classifier')(d)
        self.model   = Model(inputs = [input1, input2], outputs = output)
        self.model._name = 'SiameseNet'
        
        if self.training:
            self.enable_training_mode()
        else:
            self.enable_inference_mode()
            
        return self.model
    
def base_model(ip, verbose = True):
    model = DenseNet121(include_top = False, weights = 'imagenet', input_shape = ip)
    for layer in model.layers:
        layer.trainable = False
    
    x = GlobalAveragePooling2D()(model.output)
    x = Dense(512, activation = 'relu')(x)
    x = Dense(128)(x)
    
    model = Model(inputs = model.input, outputs = x)
    if verbose:
        print(model.summary())
        
    return model

def checkpoint(opt, model):
    ckpt_base_path = "./training_checkpoints"
    ckpt_path      = os.path.join(ckpt_base_path, 'ckpt')
    ckpt           = tf.train.Checkpoint(opt = opt, siamese_model = model)
    ckpt.save(file_prefix = ckpt_path)
    
    
@tf.function
def train_on_batch(batch, model, opt, loss_func, verbose = True):
    with tf.GradientTape() as tape: 
        X = batch[:2]
        y = batch[2]
        yhat = model(X, training = True)
        loss = loss_func(y, yhat)
    
    del X, y, yhat
    print(f'Loss : {loss}')
    grad = tape.gradient(loss, model.trainable_variables)
    opt.apply_gradients(zip(grad, model.trainable_variables))
    
    return loss

def train_model(data, model, opt, loss_func, epochs = 5, save_model = 0, verbose = True):
    history = {
        'loss' : [],
        'accuracy' : []
    }
    for epoch in range(1, epochs + 1):
        print(f'Epoch : {epoch} / {epochs} ', end = ' ')
        progbar = tf.keras.utils.Progbar(len(data))
        l = []
        for idx, batch in enumerate(data):
            loss = train_on_batch(batch, model, opt, loss_func, verbose = verbose)
            progbar.update(idx + 1)
            l.append(loss)
            
        if save_model > 0:
            if epoch % save_model == 0:
                checkpoint(opt, model)
                
        history['loss'].append(np.mean(l))
        del l
        
    return model, opt, history

In [26]:
ip = (SIZE, SIZE, 3)
encoding_model = base_model(ip, verbose = False)
model = SimeaseNet(encoding_model)
model = model.build_model()
model.summary()

In [None]:
from tensorflow.keras.optimizers import Adagrad, Adam
from tensorflow.keras.losses import BinaryCrossentropy
import gc

EPOCHS = 2
for train_data in tf_train_data:
    gc.collect()
    model, opt, his = train_model(train_data, model, Adam(learning_rate = 0.001), BinaryCrossentropy(), EPOCHS, 2)

In [None]:
def plot_history(his):
    loss, accu = his['loss'], his['accuracy']
    plt.figure(1)
    plt.plot(loss)
    plt.plot(accu)
    plt.title('Loss and Accuracy Over Epoch')
    plt.legend(['loss', 'accuracy'])
    
def plot_image(img):
    plt.imshow(img)
    plt.axis(False)
    plt.show()

In [None]:
plot_history(his)

In [None]:
from tensorflow.keras.metrics import Precision, Recall
from sklearn.metrics import confusion_matrix, f1_score, roc_auc_score, roc_curve

def plot_metrics(history):
    precision = Precision()(history['true'], history['pred']).numpy()
    recall    = Recall()(history['true'], history['pred']).numpy()
    f1score   = f1_score(history['true'], history['pred'])
    conf_mat  = confusion_matrix(history['true'], history['pred'])
    
    print(' Metrics Report '.center(70, '='))
    print()
    print(f'Precision :==> {precision}')
    print(f'Recall    :==> {recall}')
    print(f'F1 score  :==> {f1score}')
    print()
    sns.heatmap(conf_mat, fmt = '.2f', annot = True, cmap = 'hotr')
    plt.show()


def inference_model(model, val_data, thresh = 0.5, show_metrics = True):
    history = {
        'true'      : [],
        'pred'      : [],
    }
    
    for anchor, verification, y in val_data.as_numpy_iterator():
        pred = (model.predict([anchor, verification]) >= thresh).astype(int)
        history['true'] += y.tolist()
        history['pred'] += pred.tolist()
        
    if show_metrics:
        plot_metrics(history)
        
    return history

def plot_comparision(data):
    pass

In [None]:
ihis = inference_model(model, val_data)