In [0]:
import os

#download face database.
if not os.path.isfile('./FaceDB.zip'):
  !gdown --id 'replace this with google drive link'
#or download it from wherever you want it

#unzip the database.
if not os.path.isdir('./FaceDB'):
  !unzip -oq FaceDB.zip -d "./FaceDB"

#create folder for trained models to be saved.
if not os.path.isdir('./saved_models'):
  os.mkdir('./saved_models')

In [0]:
import tensorflow as tf

#using tensorflow's keras implementation
#so the models can be converted to .tflite.
keras = tf.keras

print(tf.__version__)

In [0]:
from sklearn import preprocessing

#getting images from a path.
#path must have sub-folders.
#each sub-folders contains pictures of one person.
def ImgClassLists(path, imageExtension):
    imagePaths = []
    classes = []
    
    #walk the path.
    for root, _, files in sorted(os.walk(path)):
        #sort the files iterator.
        for file in sorted(files):
            if file.endswith(imageExtension):
                #get image paths.
                imagePath = os.path.join(root, file)
                imagePaths.append(imagePath)
                #for each sub-folder provide an unique class name.
                className = root[len(path) : ]
                classes.append(className)
    #turn class names into numbers.
    #from ["class-a", "class-a", "class-b", "class-b", "class-b", "class-c"]
    #to   [0        , 0        ,  1       ,  1       ,  1       ,  2       ].
    le = preprocessing.LabelEncoder()
    le.fit(classes)
    classes = le.transform(classes)
    return (imagePaths, classes)

In [0]:
#MobileNet implementation taken from the TF Keras library.

def MobileNet(shape, num_classes, alpha = 1, include_top = True, weights = None, num_inputs = None):
    model = tf.keras.applications.MobileNet(input_shape = shape, alpha = alpha, include_top = include_top, weights = weights, classes = num_classes)
    return model

In [0]:
#Keras EffNet implementation taken from
#https://github.com/arthurdouillard/keras-effnet.

from keras.models import Model
from keras.layers import *
from keras.activations import *
from keras.callbacks import *

def get_post(x_in):
    x = LeakyReLU()(x_in)
    x = BatchNormalization()(x)
    return x

def get_block(x_in, ch_in, ch_out):
    x = Conv2D(ch_in,
               kernel_size=(1, 1),
               padding='same',
               use_bias=False)(x_in)
    x = get_post(x)

    x = DepthwiseConv2D(kernel_size=(1, 3), padding='same', use_bias=False)(x)
    x = get_post(x)
    x = MaxPool2D(pool_size=(2, 1),
                  strides=(2, 1))(x) # Separable pooling

    x = DepthwiseConv2D(kernel_size=(3, 1),
                        padding='same',
                        use_bias=False)(x)
    x = get_post(x)

    x = Conv2D(ch_out,
               kernel_size=(2, 1),
               strides=(1, 2),
               padding='same',
               use_bias=False)(x)
    x = get_post(x)

    return x

def Effnet(input_shape, nb_classes, include_top=True, weights=None, num_inputs=None):
    x_in = Input(shape=input_shape)

    x = get_block(x_in, 32, 64)
    x = get_block(x, 64, 128)
    x = get_block(x, 128, 256)

    if include_top:
        x = Flatten()(x)
        x = Dense(nb_classes, activation='softmax')(x)

    model = Model(inputs=x_in, outputs=x)

    if weights is not None:
        model.load_weights(weights, by_name=True)

    return model

In [0]:
#ShuffleNet implementation taken from
#https://github.com/arthurdouillard/keras-shufflenet.
#Validation to .tflite model doesn't work, so the model is scrapped
#for .tflite conversion and integration.
'''
from keras.models import Model
from keras.layers import *
from keras.activations import *
from keras.callbacks import *
import keras.backend as K


def _stage(tensor, nb_groups, in_channels, out_channels, repeat):
    x = _shufflenet_unit(tensor, nb_groups, in_channels, out_channels, 2)

    for _ in range(repeat):
        x = _shufflenet_unit(x, nb_groups, out_channels, out_channels, 1)

    return x


def _pw_group(tensor, nb_groups, in_channels, out_channels):
    """Pointwise grouped convolution."""
    nb_chan_per_grp = in_channels // nb_groups

    pw_convs = []
    for grp in range(nb_groups):
        x = Lambda(lambda x: x[:, :, :, nb_chan_per_grp * grp: nb_chan_per_grp * (grp + 1)])(tensor)
        grp_out_chan = int(out_channels / nb_groups + 0.5)

        pw_convs.append(
            Conv2D(grp_out_chan,
                   kernel_size=(1, 1),
                   padding='same',
                   use_bias=False,
                   strides=1)(x)
        )

    return Concatenate(axis=-1)(pw_convs)


def _shuffle(x, nb_groups):
    def shuffle_layer(x):
        _, w, h, n = K.int_shape(x)
        nb_chan_per_grp = n // nb_groups

        x = K.reshape(x, (-1, w, h, nb_chan_per_grp, nb_groups))
        x = K.permute_dimensions(x, (0, 1, 2, 4, 3)) # Transpose only grps and chs
        x = K.reshape(x, (-1, w, h, n))

        return x

    return Lambda(shuffle_layer)(x)


def _shufflenet_unit(tensor, nb_groups, in_channels, out_channels, strides, shuffle=True, bottleneck=4):
    bottleneck_channels = out_channels // bottleneck

    x = _pw_group(tensor, nb_groups, in_channels, bottleneck_channels)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    if shuffle:
        x = _shuffle(x, nb_groups)

    x = DepthwiseConv2D(kernel_size=(3, 3),
                        padding='same',
                        use_bias=False,
                        strides=strides)(x)
    x = BatchNormalization()(x)


    x = _pw_group(x, nb_groups, bottleneck_channels,
                  out_channels if strides < 2 else out_channels - in_channels)
    x = BatchNormalization()(x)

    if strides < 2:
        x = Add()([tensor, x])
    else:
        avg = AveragePooling2D(pool_size=(3, 3),
                               strides=2,
                               padding='same')(tensor)

        x = Concatenate(axis=-1)([avg, x])

    x = Activation('relu')(x)

    return x


def _info(nb_groups):
    return {
        1: [24, 144, 288, 576],
        2: [24, 200, 400, 800],
        3: [24, 240, 480, 960],
        4: [24, 272, 544, 1088],
        8: [24, 384, 768, 1536]
    }[nb_groups], [None, 3, 7, 3]


def ShuffleNet(input_shape, nb_classes, include_top=True, weights=None, nb_groups=8, num_inputs=None):
    x_in = Input(shape=input_shape)

    x = Conv2D(24,
               kernel_size=(3, 3),
               strides=2,
               use_bias=False,
               padding='same')(x_in)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    x = MaxPooling2D(pool_size=(3, 3),
                     strides=2,
                     padding='same')(x)

    channels_list, repeat_list = _info(nb_groups)
    for i, (out_channels, repeat) in enumerate(zip(channels_list[1:], repeat_list[1:]), start=1):
        x = _stage(x, nb_groups, channels_list[i-1], out_channels, repeat)

    if include_top:
        x = GlobalAveragePooling2D()(x)
        x = Dense(nb_classes, activation='softmax')(x)

    model = Model(inputs=x_in, outputs=x)

    if weights is not None:
        model.load_weights(weights, by_name=True)

    return model
'''

In [0]:
#L-CNN implementation taken from
#https://github.com/radu-dogaru/LightWeight_Binary_CNN_and_ELM_Keras.

#Radu Dogaru and Ioana Dogaru, "BCONV-ELM: Binary Weights Convolutional Neural
#Network Simulator based on Keras/Tensorflow, for Low Complexity
#Implementations", Proceedings of the ISEEE 2019 conference, submitted.

from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten, Activation
from keras.layers import Conv2D, DepthwiseConv2D, MaxPooling2D, AveragePooling2D 

def LCNN(input_shape, num_classes, include_top=True, weights=None, num_inputs=None):
    nhid1 = 0 # hidden-1 neurons (put 0 if nhid2=0, or a desired value)
    nhid2 = 0 # hidden-2 neurons (take 0 for 0 or 1 hidden layer)
    nr_conv = 2 # 0, 1, 2 sau 3  (number of convolution layers - generally, a bigger number allows for better accuracy with the same complexity)
    filtre1=8 ; filtre2=8 ; filtre3=8  # filters (kernels) per each layer
    csize1=3; csize2=3 ; csize3=3      # convolution kernel size (square kernel) 
    psize1=4; psize2=4 ; psize3=4      # pooling size (square)
    str1=2; str2=2; str3=2             # stride pooling (downsampling rate)
    pad='same'; # padding style ('valid' is also an alternative)
    type_conv=2 # 1='depth_wise' or 2='normal' 

    model = Sequential()

    if nr_conv>=1:
        if type_conv==2:
            model.add(Conv2D(filtre1, padding=pad, kernel_size=(csize1, csize1), input_shape=input_shape))
        elif type_conv==1:
            model.add(DepthwiseConv2D(kernel_size=csize2, padding=pad, input_shape=input_shape, depth_multiplier=filtre1, use_bias=False))
        #model.add(Activation('relu'))
        model.add(MaxPooling2D(pool_size=(psize1, psize1),strides=(str1,str1),padding=pad))
        #model.add(Activation('relu'))
        if nr_conv>=2:
            if type_conv==2:
                model.add(Conv2D(filtre2, padding=pad, kernel_size=(csize2, csize2)) )
            elif type_conv==1:
                model.add(DepthwiseConv2D(kernel_size=csize2, padding=pad, depth_multiplier=filtre2, use_bias=False))
            model.add(Activation('relu'))
            model.add(MaxPooling2D(pool_size=(psize2, psize2),strides=(str2,str2),padding=pad))
            #model.add(Activation('relu'))
            if nr_conv==3:
                if type_conv==2:
                    model.add(Conv2D(filtre3, padding=pad, kernel_size=(csize3, csize3)) )
                elif type_conv==1:
                    model.add(DepthwiseConv2D(kernel_size=csize3, padding=pad, depth_multiplier=filtre3, use_bias=False))
                #model.add(Activation('relu'))
                model.add(MaxPooling2D(pool_size=(psize3, psize3),strides=(str3,str3),padding=pad))
                #model.add(Activation('relu'))
        model.add(Activation('relu'))
        model.add(Flatten())
        #model.add(Activation('relu'))
        #model.add(Dropout(0.25))
    elif nr_conv==0:
        model.add(Flatten(input_shape=input_shape))
    # ---- first fc hidden layer  
    if nhid1>0:
        model.add(Dense(nhid1, activation='relu'))
        #model.add(Dropout(0.5))
    # ---- second fc hidden layer 
    if nhid2>0:
        model.add(Dense(nhid2, activation='relu'))
    #   model.add(Dropout(0.2))
    #   output layer 
    if (nhid1+nhid2)==0:
        model.add(Dense(num_classes, activation='softmax',input_shape=(num_inputs,)))
    else: 
        model.add(Dense(num_classes, activation='softmax'))
    
    return model

In [0]:
import numpy as np
from PIL import Image
from sklearn.model_selection import train_test_split
from keras.utils import to_categorical

#get a valid image size from the supported sizes array.
#if optional argument is provided, the function returns the closest image size
#from the supported sizes array to the wanted image size.
def calc_supported_image_size(image, supported_sizes, wanted_image_size = None):
    
    if wanted_image_size is None:
        image_shape = image.shape
        wanted_image_size = int((image_shape[0] + image_shape[1]) / 2)
    
    #calculate the closest differences from the wanted image sizes to the
    #supported sizes.
    min_diff_size = abs(wanted_image_size - supported_sizes[0])
    min_diff_size_idx = 0
    for idx in range(1, len(supported_sizes)):
        temp_min_diff_size = abs(wanted_image_size - supported_sizes[idx])
        if temp_min_diff_size < min_diff_size:
            min_diff_size = temp_min_diff_size
            min_diff_size_idx = idx
    
    #return the closest size to the wanted image size from the supported
    #size array.
    return supported_sizes[min_diff_size_idx]

#split the images into test and train image sets.
def image_split(splittableImagePaths, splittableClasses, test_size = 0.25, wanted_image_size = None, supported_sizes = [128]):
  #get valid image size
  image_size = calc_supported_image_size(np.asarray(Image.open(splittableImagePaths[0])), supported_sizes = supported_sizes, wanted_image_size = wanted_image_size)

  #populate array with images that have been resized to the calculated image size.
  splittableImages = []

  #convert images to rgb for working with color images.
  #resize images to the valid image size.
  #pixel values range from 0 to 255.
  for img in splittableImagePaths:
      splittableImages.append(np.asarray(Image.open(img).convert('RGB').resize((image_size, image_size)), dtype = np.uint8))

  #stratify is used for choosing an equal amount of pictures from each class
  #for the test and train image sets respectively.
  #test size is used to choose the percentage of how many images
  #will be used for testing.
  x_train, x_test, y_train, y_test =  train_test_split(splittableImages, splittableClasses, test_size = test_size, stratify = splittableClasses)
  #converts the classes array to a binary matrix
  #that is understandable to the model algorithm.
  #example:
  #y_train          =   [0, 2, 1]
  #encoded_y_train  =  [[1, 0, 0],
  #                     [0, 0, 1],
  #                     [0, 1, 0]] 
  encoded_y_train = to_categorical(y_train)
  encoded_y_test = to_categorical(y_test)
  return np.array(x_train), np.array(x_test), np.array(encoded_y_train), np.array(encoded_y_test)

In [0]:
import matplotlib.pyplot as plt
import time as ti
from tensorflow.keras.models import load_model

#supported databases dictionary with their image extensions.
databases = {"ESSEX" : ".jpg", "JAFFE" : ".tiff", "ORL" : ".pgm"}

#supported models dictionary with their function implementations.
models = {"MobileNet" : MobileNet, "EffNet" : Effnet, "LCNN" : LCNN}

#model accuracy to be hit.
#any accuracies bigger than that will be ignored.
threshold_acc = 0.985

#go through all models.
for current_model in models:
    #go through all databases.
    for current_database in databases:

        print("----------------{} + {}----------------".format(current_database, current_model))

        test_size = 0.25

        #get image paths and classes.
        (splittableImagePaths, splittableClasses) = ImgClassLists("./FaceDB/{}/".format(current_database), databases[current_database])

        #split the images into different datasets.
        #x_train = images to use for the training process.
        #y_train = classes to use for the training process.
        #x_test = images to use for the testing process.
        #y_test = classes to use for the testing process.
        x_train, x_test, y_train, y_test = image_split(splittableImagePaths, splittableClasses, test_size = test_size)
        input_shape = np.shape(x_train)[1:4]
        num_classes = np.shape(y_train)[1]

        print("Using database:    {}".format(current_database))
        print("Image shape:       {}".format(input_shape))
        print("Number of images:  {}".format(len(splittableImagePaths)))
        print("Number of classes: {}".format(num_classes))

        #directory to save the model into.
        saved_model_path = './saved_models/'
        #model to be saved under this name.
        saved_model_name = 'model_{}_{}'.format(current_database, current_model)
        #model extension.
        saved_model_ext = '.h5'
        #converted lite model extension.
        saved_model_lite = '.tflite'

        #learning rate value.
        lr = 0.001

        #loss function.
        loss = 'categorical_crossentropy'
        
        #optimizer to be used.
        optimizer = tf.keras.optimizers.Adam(lr = lr)
        
        #metrics to be tracked.
        metrics = ['accuracy']

        #call the function to invoke chosen model.
        model = models[current_model](input_shape, num_classes, num_inputs = np.shape(x_test)[1])

        #compile model with set parameters
        model.compile(loss = loss, optimizer = optimizer, metrics = metrics)
        print("Using deep learning model: {}".format(current_model))
        print("Loss function:             {}".format(loss))
        print("Metrics array:             {}".format(metrics))
        print("Optimizer config:          {}".format(optimizer))
        print("Learning rate:             {}".format(lr))

        #train "batch_size" images at a time.
        batch_size = 5

        #how many times to repeat the training process .
        epochs = 100

        #scores array to count the accuracy value for each epoch.
        scores = np.zeros(epochs)
        best_acc = 0
        best_epoch = 0
        last_epoch = 0

        print('--- {} training images, {} test images, {} batch size ---'.format(len(x_train), len(x_test), batch_size))

        #timestamp before beginning training.
        t1 = ti.time()
        for k in range(epochs):
            print('\nEpoch {} out of {} running...'.format(k + 1, epochs))
            #train the model with the provided train data set.
            model.fit(x_train, y_train, batch_size = batch_size, verbose = 1, epochs = 1, validation_data = (x_test, y_test))
            
            #check the accuracy of the model with the test data set.
            score = model.evaluate(x_test, y_test, verbose = 0)

            #remember best accuracy.
            scores[k] = score[1]
            last_epoch = k + 1
            if score[1] > best_acc:
                best_acc = score[1]
                best_epoch = last_epoch
                print('Improved accuracy on epoch {} : {}%'.format(k + 1, best_acc * 100))
                #remember best weight values from best accuracy.
                best_weights = model.get_weights()
                if best_acc > threshold_acc:
                    print('Threshold accuracy surpassed. Stopping training at epoch {} out of {}.'.format(k + 1, epochs))
                    break
        #timestamp after finishing training.
        t2 = ti.time()

        print('\nBest accuracy on epoch {} out of {} : {}%'.format(best_epoch, epochs, best_acc * 100))
        print('Fitting took {} seconds.'.format(t2 - t1))

        #set the best weights for the model.
        model.set_weights(best_weights)

        #save the trained model as a .h5 file.
        model.save(saved_model_path + saved_model_name + saved_model_ext)

        #plot the scores array for each database and model.
        plt.figure()
        plt.title("{} + {}".format(current_database, current_model))
        plt.xlim(0, last_epoch)
        plt.ylim(0, 1)
        plt.xlabel('Epochs')
        plt.ylabel('Accuracy (0 - min, 1 - max)')
        plt.plot(scores[:last_epoch])

        print('\nEvaluating {}\n'.format(saved_model_name + saved_model_ext))
        t1 = ti.time()
        #evaluate the trained model.
        score = model.evaluate(x_test, y_test, verbose = 1)
        t2 = ti.time()
        print('\nTest accuracy: {}%'.format(score[1] * 100))
        print('Test duration : {} seconds'.format(t2 - t1))
        print('Latency (per input sample) : {} ms.'.format(1000*(t2-t1)/np.shape(x_test)[0]))

        print('Converting model to .tflite...')
        #loading the model from a filepath.
        model = load_model(saved_model_path + saved_model_name + saved_model_ext)
        
        #initializing tflite converter.
        tflite_converter = tf.lite.TFLiteConverter.from_keras_model(model)
        
        #the tflite model can use the post training quantize
        #further into development.
        tflite_converter.post_training_quantize=True
        
        #convert the .h5 keras file to a .tflite file.
        tflite_model = tflite_converter.convert()

        #save the model.
        open(saved_model_path + saved_model_name + saved_model_lite, "wb").write(tflite_model)

        print('Validating .tflite model...')
        #checking if the conversion was done successfully.
        interpreter = tf.lite.Interpreter(model_path = saved_model_path + saved_model_name + saved_model_lite)
        interpreter.allocate_tensors()

        input_details = interpreter.get_input_details()
        output_details = interpreter.get_output_details()

        input_shape = input_details[0]['shape']
        input_data = np.array(np.random.random_sample(input_shape), dtype = np.float32)

        interpreter.set_tensor(input_details[0]['index'], input_data)
        interpreter.invoke()

        output_data = interpreter.get_tensor(output_details[0]['index'])
        print(output_data)
    model.summary()

In [0]:
#zip the saved models folder.
!zip -r saved_models.zip ./saved_models/

In [0]:
#download the zip.
from google.colab import files
files.download("saved_models.zip")

In [0]:
#generating labels file.
#labels file is used in the .tflite integration process.
#labels file is used to get the classes from the image databases
#and for each of them to be given a confidence score.
databases = {"ESSEX" : ".jpg", "JAFFE" : ".tiff", "ORL" : ".pgm"}
for db in databases:
    #get the classes from the image folder.
    classes = []
    path = "./FaceDB/{}/".format(db)
    for root, _, files in sorted(os.walk(path)):
        #sort the files iterator.
        for file in sorted(files):
            if file.endswith(databases[db]):
                #class name is based on the folder it's contained in.
                className = root[len(path) : ]
                classes.append(className)
    #get only unique entries from the classes array.
    unique = []
    for cls in classes:
        if cls not in unique:
            unique.append(cls)
    #save the unique class entries in a text file.
    with open('labels_{}.txt'.format(db), 'w') as f:
        f.write('\n'.join(unique))