In [None]:
# Links
# Triplet Loss: https://www.tensorflow.org/addons/tutorials/losses_triplet
# Triplet loss more info: https://www.tensorflow.org/addons/api_docs/python/tfa/losses/TripletSemiHardLoss
# FaceNet: https://arxiv.org/pdf/1503.03832.pdf
# Inception Cell: https://www.jeremyjordan.me/content/images/2018/04/Screen-Shot-2018-04-17-at-10.12.35-AM.png
# Kaggle Dataset: https://www.kaggle.com/atulanandjha/lfwpeople

# Open an example of a triple

# Import

In [1]:
import tensorflow as tf
import numpy as np
import cv2

import os
from shutil import copyfile

# Preprocess Data

In [None]:
src_dir = r'C:\Users\kwens\Documents\Python Scripts\TensorFlow\Local Datasets\face-lfw\lfw_funneled'
same_paths = {}
diff_paths = {}

for path_name, dir_names, file_names in os.walk(src_dir):
    if len(file_names) > 1:
        same_paths[path_name] = file_names
        
    if len(file_names) == 1:
        diff_paths[path_name] = file_names

In [None]:
def filter_dict(path, dictionary, option):
    for file in dictionary[path]:
        img = cv2.imread(os.path.join(path, file))
        img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        detected_faces = face_cascade.detectMultiScale(image=img)
        
        if len(detected_faces) != 1:
            index = dictionary[path].index(file)
            del dictionary[path][index]
    
    if option == 'same_paths':
        if len(dictionary[path]) % 2 == 1:
            del dictionary[path][-1]

        if len(dictionary[path]) < 2:
            return False
        
        return True
    
    else:
        if len(dictionary[path]) < 1:
            return False
        
        return True


filter_func = lambda path: filter_dict(path, same_paths, 'same_paths')
same_paths = {key: same_paths[key] for key in filter(filter_func, same_paths)}

filter_func = lambda path: filter_dict(path, diff_paths, 'diff_paths')
diff_paths = {key: diff_paths[key] for key in filter(filter_func, diff_paths)}

In [None]:
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades+"haarcascade_frontalface_default.xml")
tar_dir = r'C:\Users\kwens\Documents\Python Scripts\TensorFlow\Local Datasets\face-lfw\detectable\0_null'

index = 0

for path in same_paths:
    for i in range(len(same_paths[path])//2):
        img = cv2.imread(os.path.join(path, same_paths[path][2*i]))
        gray_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        detected_faces = face_cascade.detectMultiScale(image=gray_img)
        
        if len(detected_faces) != 1:
            continue
        else:
            offset_width, offset_height, target_width, target_height = detected_faces[0]
            img = tf.image.crop_to_bounding_box(img, offset_height, offset_width, target_height, target_width)
            img = tf.keras.preprocessing.image.smart_resize(img, (128, 128))
            
        img_1 = cv2.imread(os.path.join(path, same_paths[path][2*i+1]))
        gray_img = cv2.cvtColor(img_1, cv2.COLOR_BGR2GRAY)
        detected_faces = face_cascade.detectMultiScale(image=gray_img)
        
        if len(detected_faces) != 1:
            continue
        else:
            offset_width, offset_height, target_width, target_height = detected_faces[0]
            img_1 = tf.image.crop_to_bounding_box(img_1, offset_height, offset_width, target_height, target_width)
            img_1 = tf.keras.preprocessing.image.smart_resize(img_1, (128, 128))
            
        diff_path = next(diff_iter)
        
        img_2 = cv2.imread(os.path.join(diff_path, diff_paths[diff_path][0]))
        gray_img = cv2.cvtColor(img_2, cv2.COLOR_BGR2GRAY)
        detected_faces = face_cascade.detectMultiScale(image=gray_img)
        
        if len(detected_faces) != 1:
            continue
        else:
            offset_width, offset_height, target_width, target_height = detected_faces[0]
            img_2 = tf.image.crop_to_bounding_box(img_2, offset_height, offset_width, target_height, target_width)
            img_2 = tf.keras.preprocessing.image.smart_resize(img_2, (128, 128))
        
        tar_path = os.path.join(tar_dir, str(index))
        os.mkdir(tar_path)
        
        cv2.imwrite(os.path.join(tar_path, same_paths[path][2*i]), img.numpy())
        cv2.imwrite(os.path.join(tar_path, same_paths[path][2*i+1]), img_1.numpy())
        cv2.imwrite(os.path.join(tar_path, diff_paths[diff_path][0]), img_2.numpy())
        
        index += 1

# Form tf.data.Dataset

In [None]:
path = r'C:\Users\kwens\Documents\Python Scripts\TensorFlow\Local Datasets\face-lfw\detectable\0_null'

train_ds = tf.keras.preprocessing.image_dataset_from_directory(
    path,
    shuffle=False,
    batch_size=3,
    image_size=(128, 128)
) # len(train_ds) --> 2845

In [None]:
@tf.function
def to_triplets(images, labels):
    anchor, positive, negative = images

    anchor = tf.image.rgb_to_grayscale(anchor/255.)
    positive = tf.image.rgb_to_grayscale(positive/255.)
    negative = tf.image.rgb_to_grayscale(negative/255.)
    
    return anchor, positive, negative

batch_size = 64

train_ds = train_ds.map(to_triplets)
train_ds = train_ds.shuffle(1000)
train_ds = train_ds.batch(batch_size)
train_ds = train_ds.prefetch(tf.data.experimental.AUTOTUNE)

# Define Model and Loss Function

In [None]:
from tensorflow.keras.layers import Concatenate, Conv2D, Dense, Dropout, GlobalAveragePooling2D, MaxPool2D

# Modified GoogLeNet or Inception v1
#
# "Going Deeper with Convolutions"
# Christian Szegedy et al.
#
# https://arxiv.org/pdf/1409.4842.pdf

class InceptionCell(tf.keras.layers.Layer):
    def __init__(self, output_dim):
        super().__init__(self)
        
        self.reduce_11 = Conv2D( output_dim//4, (1, 1), strides=(1, 1), activation='relu' )

        self.reduce_12 = Conv2D( (output_dim*3)//8, (1, 1), strides=(1, 1), activation='relu' )
        self.conv_12 = Conv2D( output_dim//2, (3, 3), strides=(1, 1), activation='relu', padding='same' )

        self.reduce_13 = Conv2D( output_dim//16, (1, 1), strides=(1, 1), activation='relu' )
        self.conv_13 = Conv2D( output_dim//8, (5, 5), strides=(1, 1), activation='relu', padding='same' )

        self.maxpool_14 = MaxPool2D( (3, 3), strides=(1, 1), padding='same' )
        self.reduce_14 = Conv2D( output_dim//8, (1, 1), strides=(1, 1), activation='relu' )

        self.concat_1 = Concatenate()
    
    @tf.function
    def call(self, inputs):
        x = inputs
        
        x_11 = self.reduce_11(x)
        x_12 = self.reduce_12(x); x_12 = self.conv_12(x_12)
        x_13 = self.reduce_13(x); x_13 = self.conv_13(x_13)
        x_14 = self.maxpool_14(x); x_14 = self.reduce_14(x_14)

        x = self.concat_1([x_11, x_12, x_13, x_14])

        return x

# Modified GoogLeNet or Inception v1
#
# "Going Deeper with Convolutions"
# Christian Szegedy et al.
#
# https://arxiv.org/pdf/1409.4842.pdf

class GoogLeNet(tf.keras.Model):
    def __init__(self):
        super().__init__(self)
        
        # Stem 
        # Input shape: (None, 128, 128, 3)
        self.conv_s1 = Conv2D(64, (7, 7), strides=(2, 2), activation='relu', padding='same')  # (None, 64, 64, 64)
        self.maxpool_s1 = MaxPool2D((3, 3), strides=(2, 2), padding='same')                   # (None, 32, 32, 64)
        self.conv_s2 = Conv2D(128, (3, 3), strides=(1, 1), activation='relu', padding='same') # (None, 32, 32, 128)
        self.maxpool_s2 = MaxPool2D((3, 3), strides=(2, 2), padding='same')                   # (None, 16, 16, 128)

        # Inception Cell 1
        self.icell_1 = InceptionCell(256)                                    # (None, 16, 16, 256)

        # Inception Cell 2
        self.icell_2 = InceptionCell(416)                                    # (None, 16, 16, 416)

        # MaxPooling 1
        self.maxreduce_1 = MaxPool2D((3, 3), strides=(2, 2), padding='same') # (None, 8, 8, 416)

        # Inception Cell 3
        self.icell_3 = InceptionCell(512)                                    # (None, 8, 8, 512)

        # Inception Cell 4
        self.icell_4 = InceptionCell(512)                                    # (None, 8, 8, 512)

        # Inception Cell 5
        self.icell_5 = InceptionCell(832)                                    # (None, 8, 8, 832)

        # MaxPooling 2
        self.maxreduce_2 = MaxPool2D((3, 3), strides=(2, 2), padding='same') # (None, 4, 4, 832)

        # Inception Cell 6
        self.icell_6 = InceptionCell(1024)                                   # (None, 4, 4, 1024)

        # Inception Cell 7
        self.icell_7 = InceptionCell(1024)                                   # (None, 4, 4, 1024)

        # GlobalAverage
        self.globalav = GlobalAveragePooling2D()                             # (None, 1024)

        # Head
        self.dropout = Dropout(0.3)
        self.dense_h1 = Dense(1024, activation='relu')                       # (None, 1024)
        self.dense_h2 = Dense(128)                                           # (None, 128)

    @tf.function
    def call(self, inputs, training=True):
        x = inputs

        x = self.conv_s1(x)
        x = self.maxpool_s1(x)
        x = self.conv_s2(x)
        x = self.maxpool_s2(x)

        x = self.icell_1(x)
        x = self.icell_2(x)

        x = self.maxreduce_1(x)

        x = self.icell_3(x)
        x = self.icell_4(x)
        x = self.icell_5(x)

        x = self.maxreduce_2(x)

        x = self.icell_6(x)
        x = self.icell_7(x)

        x = self.globalav(x)

        x = self.dropout(x, training=training)
        x = self.dense_h1(x)
        x = self.dense_h2(x)

        return x

# "FaceNet: A Unified Embedding for Face Recognition and Clustering"
# Florian Schroff, Dmitry Kalenichenko, and James Philbin
#
# https://arxiv.org/pdf/1503.03832.pdf

class TripletLoss():
    @tf.function
    def __call__(self, inputs, alpha):
        anchor, positive, negative = inputs
        
        anchor = tf.math.l2_normalize(anchor, axis=-1)
        positive = tf.math.l2_normalize(positive, axis=-1)
        negative = tf.math.l2_normalize(negative, axis=-1)

        anchor_positive = tf.math.reduce_sum((anchor-positive)**2, -1)
        anchor_negative = tf.math.reduce_sum((anchor-negative)**2, -1)

        loss = anchor_positive-anchor_negative+alpha
        loss = tf.math.maximum(loss, 0.)

        return tf.math.reduce_mean(loss, 0)

# Train Model

In [None]:
embedder = GoogLeNet()

loss_fn = TripletLoss()
optimizer = tf.keras.optimizers.SGD(learning_rate=0.05, momentum=0.9)
epochs = 25

@tf.function
def train_step(batch):
    anchor, positive, negative = batch
    
    with tf.GradientTape() as tape:
        anchor = embedder(anchor, training=True)
        positive = embedder(positive, training=True)
        negative = embedder(negative, training=True)
        
        loss = loss_fn((anchor, positive, negative), 0.2)
    
    grads = tape.gradient(loss, embedder.trainable_weights)
    optimizer.apply_gradients(zip(grads, embedder.trainable_weights))

    return loss

for epoch in range(epochs):
    print('Epoch {} Begin.\n'.format(epoch))

    for step, batch in enumerate(train_ds):
        loss = train_step(batch)

        if (step+1) % 10 == 0:
            print('Step {} Complete.'.format(step))

    print('\nEpoch {} (Step {}) Training Loss: {}'.format(epoch, step, loss))

    print('\nEpoch {} Complete.'.format(epoch))
    print('-'*100, '\n')

In [None]:
save_path = r'C:\Users\kwens\Documents\Python Scripts\TensorFlow\Archive\State of My Art\weights_v2.h5'
embedder.save_weights(save_path)

# Load Model

In [2]:
from tensorflow.keras.layers import Concatenate, Conv2D, Dense, Dropout, GlobalAveragePooling2D, MaxPool2D

# Modified GoogLeNet or Inception v1
#
# "Going Deeper with Convolutions"
# Christian Szegedy et al.
#
# https://arxiv.org/pdf/1409.4842.pdf

class InceptionCell(tf.keras.layers.Layer):
    def __init__(self, output_dim):
        super().__init__(self)
        
        self.reduce_11 = Conv2D( output_dim//4, (1, 1), strides=(1, 1), activation='relu' )

        self.reduce_12 = Conv2D( (output_dim*3)//8, (1, 1), strides=(1, 1), activation='relu' )
        self.conv_12 = Conv2D( output_dim//2, (3, 3), strides=(1, 1), activation='relu', padding='same' )

        self.reduce_13 = Conv2D( output_dim//16, (1, 1), strides=(1, 1), activation='relu' )
        self.conv_13 = Conv2D( output_dim//8, (5, 5), strides=(1, 1), activation='relu', padding='same' )

        self.maxpool_14 = MaxPool2D( (3, 3), strides=(1, 1), padding='same' )
        self.reduce_14 = Conv2D( output_dim//8, (1, 1), strides=(1, 1), activation='relu' )

        self.concat_1 = Concatenate()
    
    @tf.function
    def call(self, inputs):
        x = inputs
        
        x_11 = self.reduce_11(x)
        x_12 = self.reduce_12(x); x_12 = self.conv_12(x_12)
        x_13 = self.reduce_13(x); x_13 = self.conv_13(x_13)
        x_14 = self.maxpool_14(x); x_14 = self.reduce_14(x_14)

        x = self.concat_1([x_11, x_12, x_13, x_14])

        return x

# Modified GoogLeNet or Inception v1
#
# "Going Deeper with Convolutions"
# Christian Szegedy et al.
#
# https://arxiv.org/pdf/1409.4842.pdf

class GoogLeNet(tf.keras.Model):
    def __init__(self):
        super().__init__(self)
        
        # Stem 
        # Input shape: (None, 128, 128, 3)
        self.conv_s1 = Conv2D(64, (7, 7), strides=(2, 2), activation='relu', padding='same')  # (None, 64, 64, 64)
        self.maxpool_s1 = MaxPool2D((3, 3), strides=(2, 2), padding='same')                   # (None, 32, 32, 64)
        self.conv_s2 = Conv2D(128, (3, 3), strides=(1, 1), activation='relu', padding='same') # (None, 32, 32, 128)
        self.maxpool_s2 = MaxPool2D((3, 3), strides=(2, 2), padding='same')                   # (None, 16, 16, 128)

        # Inception Cell 1
        self.icell_1 = InceptionCell(256)                                    # (None, 16, 16, 256)

        # Inception Cell 2
        self.icell_2 = InceptionCell(416)                                    # (None, 16, 16, 416)

        # MaxPooling 1
        self.maxreduce_1 = MaxPool2D((3, 3), strides=(2, 2), padding='same') # (None, 8, 8, 416)

        # Inception Cell 3
        self.icell_3 = InceptionCell(512)                                    # (None, 8, 8, 512)

        # Inception Cell 4
        self.icell_4 = InceptionCell(512)                                    # (None, 8, 8, 512)

        # Inception Cell 5
        self.icell_5 = InceptionCell(832)                                    # (None, 8, 8, 832)

        # MaxPooling 2
        self.maxreduce_2 = MaxPool2D((3, 3), strides=(2, 2), padding='same') # (None, 4, 4, 832)

        # Inception Cell 6
        self.icell_6 = InceptionCell(1024)                                   # (None, 4, 4, 1024)

        # Inception Cell 7
        self.icell_7 = InceptionCell(1024)                                   # (None, 4, 4, 1024)

        # GlobalAverage
        self.globalav = GlobalAveragePooling2D()                             # (None, 1024)

        # Head
        self.dropout = Dropout(0.3)
        self.dense_h1 = Dense(1024, activation='relu')                       # (None, 1024)
        self.dense_h2 = Dense(128)                                           # (None, 128)

    @tf.function
    def call(self, inputs, training=True):
        x = inputs

        x = self.conv_s1(x)
        x = self.maxpool_s1(x)
        x = self.conv_s2(x)
        x = self.maxpool_s2(x)

        x = self.icell_1(x)
        x = self.icell_2(x)

        x = self.maxreduce_1(x)

        x = self.icell_3(x)
        x = self.icell_4(x)
        x = self.icell_5(x)

        x = self.maxreduce_2(x)

        x = self.icell_6(x)
        x = self.icell_7(x)

        x = self.globalav(x)

        x = self.dropout(x, training=training)
        x = self.dense_h1(x)
        x = self.dense_h2(x)

        return x

In [3]:
embedder = GoogLeNet()
embedder(np.zeros((1, 128, 128, 1)))

load_path = r'C:\Users\kwens\Documents\Python Scripts\TensorFlow\Archive\State of My Art\weights_v2.h5'
embedder.load_weights(load_path)

# Build Application

In [8]:
def euclid_dist(v1, v2):
    v1 = tf.math.l2_normalize(v1, -1)
    v2 = tf.math.l2_normalize(v2, -1)
    
    dist = tf.math.reduce_sum((v1-v2)**2, -1)
    dist = tf.math.sqrt(dist)

    return dist

def preprocess_frame(frame):
    cut = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    detected_faces = face_cascade.detectMultiScale(image=cut)

    if len(detected_faces) != 1:
        return True, None, None, None, None

    offset_width, offset_height, target_width, target_height = detected_faces[0]

    cut = tf.image.crop_to_bounding_box(
        tf.reshape(cut, (480, 640, 1)), 
        offset_height, offset_width, 
        target_height, target_width
    )
    cut = tf.keras.preprocessing.image.smart_resize(cut, (128, 128))
    cut = tf.reshape(cut, (1, 128, 128, 1))
    
    return cut, offset_width, offset_height, target_width, target_height

face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades+"haarcascade_frontalface_default.xml")

features_list = []
key = None
proceed = False
skip = False

#test = []

cam = cv2.VideoCapture(0)

while(True):
    if key == ord('q'): # Quit
        break
    
    if key == ord('a'): # Anchor
        if skip:
            _, frame = cam.read()
            
            cv2.imshow('', frame)
            
            key = cv2.waitKey(10)
            continue
        
        for i in range(100):
            _, frame = cam.read()
            
            cut, offset_width, offset_height, target_width, target_height = preprocess_frame(frame)
            
            if cut is True:
                _, frame = cam.read()

                cv2.imshow('', frame)

                key = cv2.waitKey(10)
                continue
            
            features_list.append(embedder(cut, training=False))
            
            color = (255, 255, 0)

            frame = cv2.rectangle(
                frame, (offset_width, offset_height), 
                (offset_width+target_width, offset_height+target_height), 
                color, 1
            )

            cv2.imshow('', frame)
            
            key = cv2.waitKey(10)
        
        features = np.mean(features_list, axis=0)
        
        proceed = True
        skip = True
        
        continue
    
    if proceed:
        _, frame = cam.read()

        cut, offset_width, offset_height, target_width, target_height = preprocess_frame(frame)

        if cut is True:
            _, frame = cam.read()

            cv2.imshow('', frame)

            key = cv2.waitKey(10)
            continue

        current_features = embedder(cut, training=False)
        
        #print(euclid_dist(features, current_features))
        #test.append(euclid_dist(features, current_features))

        if euclid_dist(features, current_features) <= 0.004:
            # For same faces:      d(f1, f2) approx. 0.0027
            # For different faces: d(f1, f2) approx. 0.005
            color = (0, 255, 0)
        else:
            color = (0, 0, 255)

        frame = cv2.rectangle(
            frame, (offset_width, offset_height), 
            (offset_width+target_width, offset_height+target_height), 
            color, 1
        )

        cv2.imshow('', frame)

        key = cv2.waitKey(10)

    else:
        _, frame = cam.read()

        cut = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        detected_faces = face_cascade.detectMultiScale(image=cut)

        if len(detected_faces) != 1:
            _, frame = cam.read()

            cv2.imshow('', frame)

            key = cv2.waitKey(10)
            continue

        offset_width, offset_height, target_width, target_height = detected_faces[0]
        
        color = (255, 0, 0)
        
        frame = cv2.rectangle(
            frame, (offset_width, offset_height), 
            (offset_width+target_width, offset_height+target_height), 
            color, 1
        )
        
        cv2.imshow('', frame)
        
        key = cv2.waitKey(10)

cam.release()
cv2.destroyAllWindows()