In [1]:
import os
import tensorflow as tf
import io
import numpy as np
import tensorflow_addons as tfa

 The versions of TensorFlow you are currently using is 2.9.1 and is not supported. 
Some things might work, some things might not.
If you were to encounter a bug, do not file an issue.
If you want to make sure you're using a tested and supported configuration, either change the TensorFlow version or the TensorFlow Addons's version. 
You can find the compatibility matrix in TensorFlow Addon's readme:
https://github.com/tensorflow/addons


In [2]:
#Define variables

batch_size = 32
img_height = 256
img_width = 256
img_channels = 3

In [3]:
#Normalize pixel function

@tf.autograph.experimental.do_not_convert
def _normalize_img(img, label):
    image = tf.cast(img/255. ,tf.float32)
    return image, label

In [4]:
#Update Data Directory Name

data_dir = 'data_folder/all_data_processed' #Change this to lfw & pubfig dataset

In [5]:
#Load Dataset

lfw_pubfig_ds = tf.keras.utils.image_dataset_from_directory(
  data_dir,
  shuffle=True,
  seed=42,
  image_size=(img_height, img_width),
  batch_size=batch_size
)

lfw_pubfig_ds_norm = lfw_pubfig_ds.map(_normalize_img)

Found 22483 files belonging to 1751 classes.


In [6]:
#Set Dataset Split 0.7/0.15/0.15

train_split = 0.7
val_split = 0.15
test_split = 0.15

In [7]:
#Split Dataset

ds_size = len(list(lfw_pubfig_ds_norm))
train_size = int(train_split * ds_size)
val_size = int(val_split * ds_size)
test_size = int(test_split * ds_size)

train_ds = lfw_pubfig_ds_norm.take(train_size)    
val_ds = lfw_pubfig_ds_norm.skip(train_size).take(val_size)
test_ds = lfw_pubfig_ds_norm.skip(train_size + val_size).take(test_size)

In [8]:
#Print Dataset Sizes

print(len(list(train_ds)) * batch_size, train_ds.element_spec) # Default Batches of 32
print(len(list(val_ds)) * batch_size, val_ds.element_spec) # Default Batches of 32
print(len(list(test_ds)) * batch_size, test_ds.element_spec) # Default Batches of 32

15744 (TensorSpec(shape=(None, 256, 256, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None,), dtype=tf.int32, name=None))
3360 (TensorSpec(shape=(None, 256, 256, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None,), dtype=tf.int32, name=None))
3360 (TensorSpec(shape=(None, 256, 256, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None,), dtype=tf.int32, name=None))


In [9]:
#Create the Baseline Model Class

class BaselineModel(tf.keras.Model):
    def __init__(self):
        super(BaselineModel, self).__init__()
        
        self.input_layer = tf.keras.layers.Input((img_height, img_width, img_channels))
        self.conv1 = tf.keras.layers.Conv2D(filters=64, kernel_size=7, padding='same', activation='relu')
        self.mpool1 = tf.keras.layers.MaxPooling2D(pool_size=2)
        
        self.flat1 = tf.keras.layers.Flatten()
        self.fc1 = tf.keras.layers.Dense(128, activation=None)

        self.output_layer = tf.keras.layers.Lambda(lambda x: tf.math.l2_normalize(x, axis=1))
        
    def call(self, inputs):
        x = self.conv1(inputs)
        x = self.mpool1(x)
        x = self.flat1(x)
        x = self.fc1(x)
        out = self.output_layer(x)
        
        return out
    
    def model(self):
        return tf.keras.Model(inputs=self.input_layer, outputs=self.call(self.input_layer))
        

In [45]:
#Create the Advanced Model Class (Insert Model Architecture)

class AdvancedModel(tf.keras.Model):
    def __init__(self):
        super(AdvancedModel, self).__init__()
        
        self.input_layer = tf.keras.layers.Input((img_height, img_width, img_channels))
        self.c1 = tf.keras.layers.Conv2D(filters=64, kernel_size=5, strides=(2, 2), padding='same', activation='relu')
        self.mp1 = tf.keras.layers.MaxPooling2D(pool_size=2, padding='valid', strides=(2, 2))
        self.d1 = tf.keras.layers.Dropout(0.3)
        
        self.c2 = tf.keras.layers.Conv2D(filters=128, kernel_size=3, strides=(2, 2), padding='same', activation='relu')
        self.mp2 = tf.keras.layers.MaxPooling2D(pool_size=2, padding='valid', strides=(2, 2))
        self.d2 = tf.keras.layers.Dropout(0.3)
        
        self.c3 = tf.keras.layers.Conv2D(filters=256, kernel_size=3, strides=(2, 2), padding='same', activation='relu')
        self.mp3 = tf.keras.layers.MaxPooling2D(pool_size=2, padding='valid', strides=(2, 2))
        
        self.flat1 = tf.keras.layers.Flatten()
        self.fc1 = tf.keras.layers.Dense(256, activation=None)

        self.output_layer = tf.keras.layers.Lambda(lambda x: tf.math.l2_normalize(x, axis=1))
        
    def call(self, inputs):
        x = self.c1(inputs)
        x = self.mp1(x)
        x = self.d1(x)
        
        x = self.c2(x)
        x = self.mp2(x)
        x = self.d2(x)
        
        x = self.c3(x)
        x = self.mp3(x)
        
        x = self.flat1(x)
        x = self.fc1(x)
        
        out = self.output_layer(x)
        
        return out
    
    def model(self):
        return tf.keras.Model(inputs=self.input_layer, outputs=self.call(self.input_layer))

In [46]:
#Get Model

#Uncomment for Baseline Model
# model_name = 'baseline'
# model = BaselineModel()

#Uncomment for Advanced Model
model_name = 'advanced'
model = AdvancedModel()

model.model().summary()

Model: "model_13"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_15 (InputLayer)       [(None, 256, 256, 3)]     0         
                                                                 
 conv2d_40 (Conv2D)          (None, 128, 128, 64)      4864      
                                                                 
 max_pooling2d_39 (MaxPoolin  (None, 64, 64, 64)       0         
 g2D)                                                            
                                                                 
 dropout_26 (Dropout)        (None, 64, 64, 64)        0         
                                                                 
 conv2d_41 (Conv2D)          (None, 32, 32, 128)       73856     
                                                                 
 max_pooling2d_40 (MaxPoolin  (None, 16, 16, 128)      0         
 g2D)                                                     

In [47]:
#Compliel Model with Adam Optimizer and Triplet Loss

model.compile(
    optimizer=tf.keras.optimizers.Adam(1e-3),
    loss=tfa.losses.TripletHardLoss())

In [48]:
#Train Model
EPOCHS = 50

history = model.fit(
        x=train_ds,
        epochs=EPOCHS,
        validation_data=val_ds)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


In [49]:
#Save Model
model.save('saved_model/' + model_name + '_model')



INFO:tensorflow:Assets written to: saved_model/advanced_model\assets


INFO:tensorflow:Assets written to: saved_model/advanced_model\assets


In [50]:
#Save training data
import pandas as pd

# convert the history.history dict to a pandas DataFrame:   
hist_df = pd.DataFrame(history.history) 

# or save to csv: 
hist_csv_file = 'saved_model/' + model_name + '_history.csv'
with open(hist_csv_file, mode='w') as f:
    hist_df.to_csv(f)

In [51]:
#Constuct a database of known image encodings from the training/verification dataset as a reference to evaluate the test dataset

pubfig_db_dict = {}
for images, labels in train_ds.take(len(list(train_ds))):
    for i in range(labels.shape[0]):
        if int(labels[i]) not in pubfig_db_dict:
            pubfig_db_dict[int(labels[i])] = model.predict(np.expand_dims(images[i], axis=0))
            
for images, labels in val_ds.take(len(list(val_ds))):
    for i in range(labels.shape[0]):
        if int(labels[i]) not in pubfig_db_dict:
            pubfig_db_dict[int(labels[i])] = model.predict(np.expand_dims(images[i], axis=0))

























In [52]:
#For Each Image in the test set, compute the closest image embedding
#If it is closer than the threshold, classify the image as that class
#If it is further than the threshold, the test image is not in the referance classes

test_num = 0
test_correct = 0
total_distance = 0

threshold = 1.3

test_samples = len(list(test_ds))

for images, labels in test_ds.take(test_samples):
    for i in range(labels.shape[0]):
        #Establish Target and test encoding
        test_label = int(labels[i])
        test_encoding = model.predict(np.expand_dims(images[i], axis=0))
        
        # Initialize "min_dist" to a large value, say 100 (≈1 line)
        min_dist = 100
        min_label = 0
        
        #Loop through all images in dataset and find closests
        for key in pubfig_db_dict.keys():
            db_enc = pubfig_db_dict[key]
            
            #compute L2 distance
            dist = np.linalg.norm(db_enc - test_encoding)
            
            #set new label if smallest distance seen
            if dist < min_dist:
                min_dist = dist
                min_label = key
                
            if key == test_label:
                total_distance += dist
        
        test_num += 1
        if min_dist < threshold:
            if min_label == test_label:
                test_correct += 1
        elif test_label not in pubfig_db_dict:
            test_correct += 1













































In [53]:
#Print Facial Recognition Accuracy and Average Distance from correct class

print(test_correct/test_num)
print(total_distance/test_num)

0.013392857142857142
1.0615082841810017
