# Import librairies

In [None]:
## import matplotlib.pyplot as plt
import os
import pandas as pd
import io
import cv2
import numpy as np
from os import listdir
from os.path import isfile, join

import sklearn
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
from sklearn.preprocessing import OneHotEncoder
from sklearn import svm
from sklearn.metrics import accuracy_score
from sklearn.model_selection import GridSearchCV

#import keras
import tensorflow as tf
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.applications.inception_v3 import InceptionV3
from tensorflow.keras.applications import ResNet50

from tensorflow.keras import layers
from tensorflow.keras.models import Model
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten, Conv2D, MaxPool2D, Input, AveragePooling2D
from skimage.feature import hog
from skimage import data, exposure

import tensorflow_addons as tfa
import random
from tqdm import tqdm

In [None]:
# example of loading the keras facenet model
from keras.models import load_model
# load the model
model = load_model('../input/facenet-model/facenet_keras.h5')
# summarize input and output shape
print(model.inputs)
print(model.outputs)

# Preprocessing functions

In [None]:
class DataGenerator(tf.keras.utils.Sequence):
    """
    A class that generate data batches using their paths.
    It is used when you have a big data-set that does not fit the memory
    ...
    
    Attributes
    ----------
    dataset: dictionary
        Its keys are the labels and its values are the paths of the images 
        of each label.
    dataset_path: str
        The path of the data-set
    shuffle: bool
        True if we want to shuffle the data and vice-versa
    batch_size: int
        The size of the batch
    no_of_people: int
        The number of labels (in our case people are the labels)
        
    Methods
    -------
    curate_dataset(dataset_path)
        Create the data-set dictionary
    on_epoch_end()
        Shuffle the labels if shuffle=True
    get_image(person, index)
        Read, resize and pre-process the image (given at 'index' in the label 'person')
    """

    def __init__(self, dataset_path, batch_size=5, shuffle=True):
        """
        class initialization
      
        param: dataset_path: The path of the data-set
        parma: shuffle: True if we want to shuffle the data and vice-versa 
        param: batch_size: The size of the batch
        """

        self.dataset = self.curate_dataset(dataset_path)
        self.dataset_path = dataset_path
        self.shuffle = shuffle
        self.batch_size =batch_size
        self.no_of_people = len(list(self.dataset.keys()))
        self.on_epoch_end()
        print(self.dataset.keys())
        
    def __getitem__(self, index):
        """
        Generate the batch
        
        param: index: the index of the batch
        """

        people = list(self.dataset.keys())[index * self.batch_size: (index + 1) * self.batch_size]
        P = []
        A = []
        N = []
        
        for person in people:
            anchor_index = random.randint(0, len(self.dataset[person])-1)
            a = self.get_image(person, anchor_index)
            
            positive_index = random.randint(0, len(self.dataset[person])-1)
            while positive_index == anchor_index and len(self.dataset[person]) != 1:
                positive_index = random.randint(0, len(self.dataset[person])-1)
            p = self.get_image(person, positive_index)
            
            negative_person_index = random.randint(0, self.no_of_people - 1)
            negative_person = list(self.dataset.keys())[negative_person_index]
            while negative_person == person:
                negative_person_index = random.randint(0, self.no_of_people - 1)
                negative_person = list(self.dataset.keys())[negative_person_index]
            
            negative_index = random.randint(0, len(self.dataset[negative_person])-1)
            n = self.get_image(negative_person, negative_index)
            
            P.append(p)
            A.append(a)
            N.append(n)
        
        A = np.asarray(A).reshape(-1, 100, 100,1)
        N = np.asarray(N).reshape(-1, 100, 100,1)
        P = np.asarray(P).reshape(-1, 100, 100,1)
        #print(A.shape, P.shape, N.shape)
        return [A, P, N]
        
    def __len__(self):
        return self.no_of_people // self.batch_size
        
    def curate_dataset(self, dataset_path):
        """
        create the data-set dictionary. Its keys are the labels and its values 
        are the paths of the images of each label.
        
        param: dataset_path: the path of the data-set
        """

        dataset = {}
        dirs = [dir for dir in listdir(dataset_path)]
        for dir in dirs: 
            fichiers = [f for f in listdir(dataset_path+dir) if "jpeg" in f or "png" in f]
            for f in fichiers:
                if dir in dataset.keys():
                    dataset[dir].append(f)
                else:
                    dataset[dir] = [f]
        return dataset
    
    def on_epoch_end(self):
        """
        shuffle the labels if shuffle=True 
        """

        if self.shuffle:
            keys = list(self.dataset.keys())
            random.shuffle(keys)
            dataset_ =  {}
            for key in keys:
                dataset_[key] = self.dataset[key]
            self.dataset = dataset_
            
    def get_image(self, person, index):
        """
        read, resize and pre-process the image
        
        param: person: the label (celebrity name)
        param: index: the image of index "index" in the list dataset[person]
        :return: pre-processed image
        """

        image = cv2.imread(os.path.join(self.dataset_path, os.path.join(person, self.dataset[person][index])))
        image = np.asarray(image, dtype=np.float32)
        image = cv2.resize(image, (100, 100), interpolation = cv2.INTER_AREA)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        image = image.astype('float32')
        mean, std = image.mean(), image.std()
        image = (image - mean) / std
        return image

In [None]:
def get_data(dataset_path):
    """
    Extract images with their labels and names
    
    param: dataset_path: the path of the dataset
    :return: a tuple of images, labels and names in an array format
    """
    
    images = []
    labels = []
    names = []
    try:
        
        for dire in listdir(dataset_path): 
            for f in listdir(dataset_path+'/'+dire):
                if "jpeg" in f or "png" in f or "PNG" in f or "jpg" in f:
                    image_path = dataset_path + '/' + dire + '/' + f
                    image = cv2.imread(image_path)
                    image = np.asarray(image, dtype=np.float32)
                    image = cv2.resize(image, (100, 100), interpolation = cv2.INTER_AREA)
                    image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
                    image = image.astype('float32')
                    mean, std = image.mean(), image.std()
                    image = (image - mean) / std

                    images.append(image)
                    labels.append(dire)
                    names.append(f)
    except:
         for f in listdir(dataset_path):
                if "jpeg" in f or "png" in f or "PNG" in f or "jpg" in f:
                    image_path = dataset_path + '/' + f
                    image = cv2.imread(image_path)
                    image = np.asarray(image, dtype=np.float32)
                    image = cv2.resize(image, (100, 100), interpolation = cv2.INTER_AREA)
                    image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
                    image = image.astype('float32')
                    mean, std = image.mean(), image.std()
                    image = (image - mean) / std

                    images.append(image)
                    labels.append(f)
                    names.append(f)
    return np.array(images), np.array(labels), np.array(names)

In [None]:
from numpy.linalg import norm
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.linear_model import SGDOneClassSVM
from sklearn.covariance import EllipticEnvelope
def get_clean_data(labels, embeds, names, threshold=9, method='distance'):
    """
    This function clean the data-set
    
    :param labels: the images labels
    :param embeds: the images embeddings
    :param names: the file names of the images
    :param threshold: according to this threshold we classify each image
    :param method: the method used to clean the data
    :return: a tuple of the outliers, cleaned embedding and labels, each one in an array format
    """
    
    outliers =  {}
    clean_embed = []
    clean_labels = []
    for k in set(labels):
        
        if method == 'distance':
            filter_ = []
            center = sum(embeds[k])/len(embeds[k])
            for e in embeds[k]:
                filter_.append(norm(e - center))
            filter_ = np.array(filter_)
            outliers[k] = np.array(names[k])[np.where(filter_ >= threshold)]
            clean_embed.extend(np.array(embeds[k])[np.where(filter_ < threshold)])
            n = len(np.array(embeds[k])[np.where(filter_ < threshold)])
            
        elif method == 'Gauss':
            m = np.mean(embeds[k], axis=0)
            v = np.var(embeds[k], axis=0)
            filter_ = multivariateGaussian(embeds[k], m, v)
            outliers[k] = np.array(names[k])[np.where(filter_ <= threshold)]
            clean_embed.extend(np.array(embeds[k])[np.where(filter_ > threshold)])
            n = len(np.array(embeds[k])[np.where(filter_ > threshold)])
        
        elif method == 'one_class_svm':
            filter_ = SGDOneClassSVM(random_state=0).fit_predict(embeds[k])
            outliers[k] = np.array(names[k])[np.where(filter_ == -1)]
            clean_embed.extend(np.array(embeds[k])[np.where(filter_ == 1)])
            n = len(np.array(embeds[k])[np.where(filter_ == 1)])
        clean_labels.extend([k]*n)
        #print(filter_, np.array(names[k]))

    return outliers, np.array(clean_embed), np.array(clean_labels)

In [None]:
def get_embeddings(model, images):
    """
    Get the embeddings of the images
    
    param: model: the embedding model (FaceNet)
    :return: an array of embeddings
    """
    embeddings = []
    for img in images:
        samples = np.expand_dims(img, axis=0)
        yhat = model.predict(samples)
        embeddings.append(yhat[0]) 
    return np.array(embeddings)    

In [None]:
def multivariateGaussian(X, mu, sigma):
    """
    Compute the mulvariate gaussian distribution probability
    
    param: X: the input data
    param: mu: the mean
    param: sigma: the standard deviation
    :return: the probability of X
    """
    k = len(mu)
    sigma=np.diag(sigma)
    X = X - mu.T
    prob = 1/((2*np.pi)**(k/2)*(np.linalg.det(sigma)**0.5))* np.exp(-0.5* np.sum(X @ np.linalg.pinv(sigma) * X,axis=1))
    return prob

# Our model

In [None]:
#pip install tensorflow_addons

In [None]:
input_shape= (100, 100, 1)

def create_encoder(input_shape):
    """
    Encoder architecture
    
    param: input_shape: a tuple that represents the shape of the images
    :return: the encoder architecture
    """
    
    model = Sequential()
    
    model.add(Conv2D(filters = 32, kernel_size = (3, 3), activation='relu', input_shape=input_shape, padding = 'Same'))
    model.add(Conv2D(filters = 32, kernel_size = (3, 3), activation='relu',padding = 'Same'))
    model.add(MaxPool2D(pool_size=(2,2), strides=(2,2)))
    
    model.add(Conv2D(filters = 64, kernel_size = (3, 3), activation='relu', padding = 'Same'))
    model.add(Conv2D(filters = 64, kernel_size = (3, 3), activation='relu', padding = 'Same'))
    model.add(MaxPool2D(pool_size=(2,2), strides=(2,2)))
    
    model.add(Conv2D(filters = 128, kernel_size = (3, 3), activation='relu', padding = 'Same'))
    model.add(Conv2D(filters = 128, kernel_size = (3, 3), activation='relu', padding = 'Same'))
    model.add(MaxPool2D(pool_size=(2,2), strides=(2,2)))
    
    model.add(Conv2D(filters = 256, kernel_size = (3, 3), activation='relu', padding = 'Same'))
    model.add(Conv2D(filters = 256, kernel_size = (3, 3), activation='relu', padding = 'Same'))
    model.add(MaxPool2D(pool_size=(2,2), strides=(2,2)))
    
    model.add(Conv2D(filters = 512, kernel_size = (3, 3), activation='relu', padding = 'Same'))
    model.add(Conv2D(filters = 512, kernel_size = (3, 3), activation='relu', padding = 'Same'))
    model.add(AveragePooling2D(pool_size=(2,2), strides=(2,2)))
    model.add(Dropout(0.4))
    
    model.add(Flatten())
    model.add(Dense(256, activation = "relu"))
    return model

encoder = create_encoder((100,100,1))
encoder.summary()


In [None]:
dropout_rate = 0.5
input_shape=(100,100,1)
hidden_units = 512

def create_classifier(encoder, trainable=True):
    """
    Classifier architecture
    
    param: encoder : the encoder model
    param: trainable: True if you want to retrain the encoder, False otherwise
    :return: model: the whole encoder-classifer model
    """
    for layer in encoder.layers:
        layer.trainable = trainable

    inputs = Input(shape=input_shape)
    features = encoder(inputs)
    #features = Dropout(dropout_rate)(features)
    features = Dense(hidden_units, activation="relu")(features)
    features = Dropout(dropout_rate)(features)
    features = Dense(hidden_units, activation="relu")(features)
    features = Dropout(dropout_rate)(features)
    outputs = Dense(num_classes, activation="softmax")(features)

    model = tf.keras.Model(inputs=inputs, outputs=outputs, name="our_classifier")
    model.compile(
        optimizer=tf.keras.optimizers.Adam(10e-5),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(),
        metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],
    )
    return model
    

In [None]:
# create classifier
classifier = create_classifier(encoder, trainable=True)
classifier.summary()

# Contrastive loss

In [None]:
import keras
class SupervisedContrastiveLoss(keras.losses.Loss):
    """
    Class of supervised contrastive loss
    ...
    
    Attribute
    ---------
    temperature: int
        Warm-up steps
    """
    def __init__(self, temperature=1, name=None):
        """
        Class initialization
        
        param: temperature: the warm-up steps
        """
        super(SupervisedContrastiveLoss, self).__init__(name=name)
        self.temperature = temperature

    def __call__(self, labels, feature_vectors, sample_weight=None):
        """
        The call function
        
        param: labels: images labels
        param: feature_vectors: images embeddings
        :return: the loss function value
        """
        
        # Normalize feature vectors
        feature_vectors_normalized = tf.math.l2_normalize(feature_vectors, axis=1)
        # Compute logits
        logits = tf.divide(
            tf.matmul(
                feature_vectors_normalized, tf.transpose(feature_vectors_normalized)
            ),
            self.temperature,
        )
        return tfa.losses.npairs_loss(tf.squeeze(labels), logits)


# def add_projection_head(encoder):
#     inputs = keras.Input(shape=input_shape)
#     features = encoder(inputs)
#     outputs = layers.Dense(projection_units, activation="relu")(features)
#     model = keras.Model(
#         inputs=inputs, outputs=outputs, name="cifar-encoder_with_projection-head"
#     )
#     return model

In [None]:
# create encoder
input_shape= (100, 100, 1)
encoder = create_encoder(input_shape)
# encoder_with_projection_head = add_projection_head(encoder)

In [None]:
# get data to train encoder
images_train, labels_train, names_train = get_data("../input/475-dataset/475_dataset/train/")
images_test, labels_test, names_test = get_data("../input/475-dataset/475_dataset/test/")

In [None]:
from sklearn import preprocessing
# preprocess data
le = preprocessing.LabelEncoder()
le.fit(labels_train)
labels_train = le.transform(labels_train)

In [None]:
images_train = images_train.reshape(-1, 100, 100,1)

In [None]:
# from tensorflow.keras.preprocessing.image import ImageDataGenerator

# # data augmentation
# datagen = ImageDataGenerator(
#     featurewise_center=True,
#     featurewise_std_normalization=True,
#     rotation_range=20,
#     width_shift_range=0.2,
#     height_shift_range=0.2,
#     horizontal_flip=True,
#     validation_split=0.2,
#     brightness_range = [0.8, 1.2],
#     zoom_range = [1-0.1, 1+0.1],
#     fill_mode='constant'
#         )
# datagen.fit(images_train)

In [None]:
# visualization
image = plt.imread("../input/dataset2/dataset/train/Ariana Grande/Ariana Grande1.jpeg")
iterator =  datagen.flow(np.array([image]), batch_size=9)
for j in range(9):
    plt.subplot(330 + 1 + j)
    chunk = iterator.next()
    sub_img = chunk[0].astype('uint8')
    plt.imshow(sub_img)
plt.show()

In [None]:
# compile encoder
encoder.compile(
    optimizer=tf.keras.optimizers.Adam(10e-5),
    loss=SupervisedContrastiveLoss(0.03),
)

In [None]:
# get data to train classifier
images_train_clf, labels_train_clf, names_train_clf = get_data("../input/dataset2/dataset/train/")
images_test_clf, labels_test_clf, names_test_clf = get_data("../input/dataset2/dataset/test/")

In [None]:
# make all layers trainable
for layer in encoder.layers:
    layer.trainable = True

In [None]:
# fit encoder
encoder.fit(images_train, labels_train, batch_size=30, epochs=600)

In [None]:
images_train_clf = images_train_clf.reshape(-1, 100, 100,1)
images_test_clf = images_test_clf.reshape(-1, 100, 100,1)

In [None]:
from sklearn import preprocessing
# preprocess data
le = preprocessing.LabelEncoder()
le.fit(labels_train_clf)
labels_train_clf = le.transform(labels_train_clf)
labels_test_clf = le.transform(labels_test_clf)

In [None]:
# train and test classifier
num_classes = len(set(labels_train_clf))
classifier = create_classifier(encoder, trainable=False)

history = classifier.fit(x=images_train_clf, y=labels_train_clf, batch_size=30, epochs=30)
accuracy = classifier.evaluate(images_test_clf, labels_test_clf)[1]
print(f"Test accuracy: {round(accuracy * 100, 2)}%")

# Triplet loss

In [None]:
class SiameseNetwork(tf.keras.Model):
    """
    A class that ceates the siamese network
    ...
    
    Attributes
    ----------
    vgg_face: neural network architecture of VGG Face
    
    Methods
    -------
    call(inputs)
        Return the VGG Face mappings of the anchor, the positive and the negative images
    get_features(inputs)
        Return the VGG Face mappings
    """
    
    def __init__(self, vgg_face):
        """
        Class initialization
        
        param: vgg_face:  the model used for Siamese network
        """
        
        super(SiameseNetwork, self).__init__()
        self.vgg_face = vgg_face
        
    @tf.function
    def call(self, inputs):
        """
        This function gives the VGG Face mappings of the anchor, the positive and the negative image
    
        param: inputs: list of the anchor, the positive and the negative images 
        :return: list of the embeddings of the anchor, the positive and the negative images
        """
        
        image_1, image_2, image_3 =  inputs
        with tf.name_scope("Anchor") as scope:
            feature_1 = self.vgg_face(image_1)
            feature_1 = tf.math.l2_normalize(feature_1, axis=-1)
        with tf.name_scope("Positive") as scope:
            feature_2 = self.vgg_face(image_2)
            feature_2 = tf.math.l2_normalize(feature_2, axis=-1)
        with tf.name_scope("Negative") as scope:
            feature_3 = self.vgg_face(image_3)
            feature_3 = tf.math.l2_normalize(feature_3, axis=-1)
        return [feature_1, feature_2, feature_3]
    
    @tf.function
    def get_features(self, inputs):
        """
        VGG Face mappings 

        param: inputs: list of the anchor, the positive and the negative images
        :return: list of l2 normalized embeddings of the anchor, the positive and the negative images
        """
        return tf.math.l2_normalize(self.vgg_face(inputs, training=False), axis=-1)

In [None]:
K = tf.keras.backend
def loss_function(x, alpha = 0.2):
    """
    Compute the loss function 
    
    param: x: list of VGG Face embeddings of the anchor, the positive and the negative images
    param: alpha: the fixed margin loss
    :return: the value of the loss function
    """

    # Triplet Loss function.
    anchor,positive,negative = x
    # distance between the anchor and the positive
    pos_dist = K.sum(K.square(anchor-positive),axis=1)
    # distance between the anchor and the negative
    neg_dist = K.sum(K.square(anchor-negative),axis=1)
    # compute loss
    basic_loss = pos_dist-neg_dist+alpha
    loss = K.mean(K.maximum(basic_loss,0.0))
    return loss

In [None]:
optimizer = tf.keras.optimizers.Adam(learning_rate=5e-5)
#binary_cross_entropy = tf.keras.losses.BinaryCrossentropy()
def train(X):
    """
    Compute the loss after applying "Adam" optimizer
    
    param: X: list of the anchor, the positive and the negative images
    :return: the value of the loss function
    """

    with tf.GradientTape() as tape:
        y_pred = model(X)
        loss = loss_function(y_pred, alpha=0.5)
    grad = tape.gradient(loss, encoder.trainable_variables)
    optimizer.apply_gradients(zip(grad, encoder.trainable_variables))
    return loss

In [None]:
# create siamese model
encoder = create_encoder((100, 100, 1))
model = SiameseNetwork(encoder)

In [None]:
# set trainable layers
for layer in encoder.layers:
    layer.trainable = True

In [None]:
# load our data in an online manner
data_generator = DataGenerator(dataset_path='../input/475-dataset/475_dataset/train/', batch_size=30)

# Train the model
losses = []
accuracy = []
epochs = 400
no_of_batches = data_generator.__len__()
for i in range(1, epochs+1, 1):
    loss = 0
    with tqdm(total=no_of_batches) as pbar:
        
        description = "Epoch " + str(i) + "/" + str(epochs)
        pbar.set_description_str(description)
        
        for j in range(no_of_batches):
            data = data_generator[j]
            temp = train(data)
            loss += temp
            
            pbar.update()
            print_statement = "Loss :" + str(temp.numpy())
            pbar.set_postfix_str(print_statement)
        
        loss /= no_of_batches
        losses.append(loss.numpy())
        # with file_writer.as_default():
        #     tf.summary.scalar('Loss', data=loss.numpy(), step=i)
            
        print_statement = "Loss :" + str(loss.numpy())
        
        pbar.set_postfix_str(print_statement)

In [None]:
# load our data in an online manner 
data_generator = DataGenerator(dataset_path='../input/475-dataset/475_dataset/train/', batch_size=20)

# Train the model
losses = []
accuracy = []
epochs = 800
no_of_batches = data_generator.__len__()
for i in range(1, epochs+1, 1):
    loss = 0
    with tqdm(total=no_of_batches) as pbar:
        
        description = "Epoch " + str(i) + "/" + str(epochs)
        pbar.set_description_str(description)
        
        for j in range(no_of_batches):
            data = data_generator[j]
            temp = train(data)
            loss += temp
            
            pbar.update()
            print_statement = "Loss :" + str(temp.numpy())
            pbar.set_postfix_str(print_statement)
        
        loss /= no_of_batches
        losses.append(loss.numpy())
        # with file_writer.as_default():
        #     tf.summary.scalar('Loss', data=loss.numpy(), step=i)
            
        print_statement = "Loss :" + str(loss.numpy())
        
        pbar.set_postfix_str(print_statement)

In [None]:
# create classifier
num_classes = len(set(labels_train_clf))
classifier = create_classifier(encoder, trainable=False)

In [None]:
history = classifier.fit(x=images_train_clf, y=labels_train_clf, batch_size=30, epochs=200)

accuracy = classifier.evaluate(images_test_clf, labels_test_clf)[1]
print(f"Test accuracy: {round(accuracy * 100, 2)}%")

# Visualization

In [None]:
# get data
imgs = images_train.reshape(-1, 100, 100,1)
encoded_images = np.array(encoder(imgs))

In [None]:
from sklearn.decomposition import PCA
# plot 2D embeddings
pca = PCA(n_components=2)
enc = pca.fit_transform(encoded_images)
plt.scatter(enc[:, 0], enc[:, 1])