# 1. Setting up and get the data

## Download dependencies

In [None]:
%pip install tensorflow tensorflow-gpu opencv-python matplotlib albumentations

## Collect images (can collect on web or use capture)

In [None]:
import os
import time
import uuid
import cv2

### Setting up image path and use camera to collect image

In [None]:
ROOT_PATH = "D:\\Learning\\FPT\\AIT\\Tensorflow\\face_recognition\\"

In [None]:
IMAGES_PATH = os.path.join(ROOT_PATH, 'data', 'images', 'face')
image_number = 30

#### Use cam to collect images (optional)
if you want to use the camera to capture images for training purpose, use the cell below, else ignore it an continue to label

In [None]:
cap = cv2.VideoCapture(0)
for imgnum in range(image_number):
    print('Collecting image {}'.format(imgnum))
    ret, frame = cap.read()
    imgname = os.path.join(IMAGES_PATH, f'{str(uuid.uuid1())}.jpg')
    cv2.imwrite(imgname, frame)
    cv2.imshow('frame', frame)
    time.sleep(0.5)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

### Setting up label path and label all the images

In [None]:
IMAGES_PATH = os.path.join(ROOT_PATH, 'data', 'images', 'face');
LABELIMG_PATH = os.path.join(ROOT_PATH, 'libs', 'image_label_repo')

In [None]:
# create a lib folder to include labelimg library
if not os.path.exists(LABELIMG_PATH):
    !mkdir {LABELIMG_PATH}
    !git clone https://github.com/tzutalin/labelImg {LABELIMG_PATH}

In [None]:
if os.name == 'posix':
    %make qt5py3
if os.name =='nt':
    %cd {LABELIMG_PATH}
    !pyrcc5 -o libs/resources.py resources.qrc

#### Use labelimg
First we need to check where are we standing, if we already inside the lib folder, no neeed to direct to it, else we need to direct to the lib folder and execute the labelImg.py

In [None]:
pwd

In [None]:
%cd {LABELIMG_PATH}
!python labelImg.py

## Review dataset and build image loading function

## Import tensorflow and dependencies

In [None]:
import tensorflow as tf
import cv2
import lxml
import numpy as np
from matplotlib import pyplot as plt

### Limit GPU memory growth

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [None]:
tf.config.list_physical_devices('GPU')

In [None]:
# load all images in image dataset
images = tf.data.Dataset.list_files(ROOT_PATH + "data\\images\\face\\*.jpg")

In [None]:
next_images = images.as_numpy_iterator().next();

print(next_images);

In [None]:
def load_image(x):
    byte_image = tf.io.read_file(x)
    img = tf.io.decode_jpeg(byte_image)
    return img

In [None]:
images = images.map(load_image)

In [None]:
next_images = images.as_numpy_iterator().next();

print(next_images);

In [None]:
type(images)

In [None]:
image_generator = images.batch(4).as_numpy_iterator()

In [None]:
print(image_generator)

In [None]:
plot_img = image_generator.next()

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx, image in enumerate(plot_img):
    ax[idx].imshow(image)
plt.show()

#### Using split-folders to split the dataset images into train, val and test

In [None]:
%pip install split-folders

In [None]:
import splitfolders

In [None]:
input_folder = ROOT_PATH + "data\\images\\"
output_folder = ROOT_PATH + "data\\images\\"
splitfolders.ratio(input_folder, output=output_folder, seed=42, ratio=(.6,.2,.2), group_prefix=None)

#### Split the labels folder into train, test, val based on corresponding image file

In [None]:
images_list_folder = ['train', 'val', 'test']
for image_folder in images_list_folder:
    for file in os.listdir(os.path.join(ROOT_PATH + 'data\\images\\', image_folder, 'face')):
        filename = file.split('.')[0] + '.xml'
        existing_filepath = os.path.join(ROOT_PATH, 'data', 'labels', 'face_labels', filename)
        if os.path.exists(existing_filepath):
            new_filepath = os.path.join(ROOT_PATH, 'data', 'labels', image_folder, filename)
            os.replace(existing_filepath, new_filepath)

In [None]:
img = cv2.imread(os.path.join('D:\\Learning\\FPT\\AIT\\Tensorflow\\face_recognition\\data\\', 'images', 'train', 'face', '53d2a3bb-3829-11ed-a791-086ac5699bea.jpg'))

In [None]:
img.shape

In [None]:
type(img)

#### Using xmltodict to extract data from xml file

In [None]:
%pip install xmltodict

In [None]:
import xmltodict

In [None]:
from pprint import pprint


with open (os.path.join(ROOT_PATH, 'data', 'labels', 'train', '53d2a3bb-3829-11ed-a791-086ac5699bea.xml'), 'r') as file:
    my_xml = file.read()

label = xmltodict.parse(my_xml)

pprint(label, indent = 2)

In [None]:
label['annotation']['object']['bndbox']

In [None]:
# extracting the bounding box from xml file
coords = [0, 0, 0, 0]
coords[0] = int(label['annotation']['object']['bndbox']['xmin'])
coords[1] = int(label['annotation']['object']['bndbox']['ymin'])
coords[2] = int(label['annotation']['object']['bndbox']['xmax'])
coords[3] = int(label['annotation']['object']['bndbox']['ymax'])

In [None]:
coords

### Augmentation image using albumentations

In [None]:
coords = list(np.divide(coords, [640, 480, 640, 480]))

In [None]:
import albumentations as alb

In [None]:
augmentor = alb.Compose([
    alb.RandomCrop(width=450, height=450),
    alb.HorizontalFlip(p=0.5),
    alb.RandomGamma(p=0.2),
    alb.RandomBrightnessContrast(p=0.2),
    alb.RGBShift(p=0.2),
    alb.VerticalFlip(p=0.5)
], bbox_params=alb.BboxParams(format='pascal_voc', label_fields=['class_labels']))

In [None]:
augmented = augmentor(image=img, bboxes=[coords], class_labels=['face'])

In [None]:
augmented['class_labels']

In [None]:
cv2.rectangle(
    augmented['image'],
    tuple(np.multiply(augmented['bboxes'][0][:2], [450, 450]).astype(int)),
    tuple(np.multiply(augmented['bboxes'][0][2:], [450, 450]).astype(int)),
    (250, 0, 0), 2
)

In [None]:
plt.imshow(augmented['image'])

#### Now we are going to augment all the images in the dataset

In [None]:
import json

In [None]:
# since augmented style 'albumentations' requires all coords must be between 0 and 1, so we need to check if there are any number exceeds 1 or below 0
def check_bbox(bbox):
    for i in range(4):
        if (bbox[i]<0):
            bbox[i]=0
        elif (bbox[i]>1):
            bbox[i]=1

    return tuple(bbox)

In [None]:
for partition in ['train', 'val', 'test']:
    for image in os.listdir(os.path.join('D:\\Learning\\FPT\\AIT\\Tensorflow\\face_recognition\\data\\', 'images', partition, 'face')):
        print('folder: ' + partition + 'image: ' + image)
        img = cv2.imread(os.path.join(os.path.join('D:\\Learning\\FPT\\AIT\\Tensorflow\\face_recognition\\data\\', 'images', partition, 'face', image)))

        coords = [0, 0, 0.00001, 0.00001]
        label_path = os.path.join('D:\\Learning\\FPT\\AIT\\Tensorflow\\face_recognition\\data\\', 'labels', partition, f'{image.split(".")[0]}.xml')
        if (os.path.exists(label_path)):
            with open(label_path, 'r') as file:
                my_xml = file.read()
                
            label = xmltodict.parse(my_xml)
            coords[0] = int(label['annotation']['object']['bndbox']['xmin'])
            coords[1] = int(label['annotation']['object']['bndbox']['ymin'])
            coords[2] = int(label['annotation']['object']['bndbox']['xmax'])
            coords[3] = int(label['annotation']['object']['bndbox']['ymax'])
            print('finish extract data: ' + label['annotation']['filename'])
            coords = list(np.divide(coords, [640, 480, 640, 480]))
        
        # Create new annotation file for each image
        try:
            for x in range(60):
                new_bbox = check_bbox(coords)
                augmented = augmentor(image=img, bboxes=[coords], class_labels=['face'])
                cv2.imwrite(os.path.join('D:\\Learning\\FPT\\AIT\\Tensorflow\\face_recognition\\aug_data', partition, 'images', f'{image.split(".")[0]}.{x}.jpg'), augmented['image'])

                annotation = {}
                annotation['image'] = image

                if os.path.exists(label_path):
                    if len(augmented['bboxes']) == 0:
                        annotation['bbox'] = [0,0,0,0]
                        annotation['class'] = 0
                    else:
                        annotation['bbox'] = augmented['bboxes'][0]
                        annotation['class'] = 1
                else:
                    annotation['bbox'] = [0,0,0,0]
                    annotation['class'] = 0
                
                with open(os.path.join('D:\\Learning\\FPT\\AIT\\Tensorflow\\face_recognition\\aug_data\\', partition, 'labels', f'{image.split(".")[0]}.{x}.json'), 'w') as file:
                    json.dump(annotation, file)
        except Exception as e:
            print(e)


#### Put the new dataset into each variable

In [None]:
train_images = tf.data.Dataset.list_files('D:\\Learning\\FPT\\AIT\\Tensorflow\\face_recognition\\aug_data\\train\\images\\*.jpg', shuffle=False)
train_images = train_images.map(load_image)
train_images = train_images.map(lambda x : tf.image.resize(x, (120, 120)))
train_images = train_images.map(lambda x : x / 255)

In [None]:
test_images = tf.data.Dataset.list_files('D:\\Learning\\FPT\\AIT\\Tensorflow\\face_recognition\\aug_data\\test\\images\\*.jpg', shuffle=False)
test_images = test_images.map(load_image)
test_images = test_images.map(lambda x : tf.image.resize(x, (120, 120)))
test_images = test_images.map(lambda x : x / 255)

In [None]:
val_images = tf.data.Dataset.list_files('D:\\Learning\\FPT\\AIT\\Tensorflow\\face_recognition\\aug_data\\val\\images\\*.jpg', shuffle=False)
val_images = val_images.map(load_image)
val_images = val_images.map(lambda x : tf.image.resize(x, (120, 120)))
val_images = val_images.map(lambda x : x / 255)

In [None]:
train_images.as_numpy_iterator().next()

In [None]:
def load_labels(label_path):
    with open(label_path.numpy(), 'r', encoding='utf-8') as file:
        label = json.load(file)
    return [label['class']], label['bbox']

In [None]:
train_labels = tf.data.Dataset.list_files('D:\\Learning\\FPT\\AIT\\Tensorflow\\face_recognition\\aug_data\\train\\labels\\*.json', shuffle=False)
train_labels = train_labels.map(lambda x : tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [None]:
test_labels = tf.data.Dataset.list_files('D:\\Learning\\FPT\\AIT\\Tensorflow\\face_recognition\\aug_data\\test\\labels\\*.json', shuffle=False)
test_labels = test_labels.map(lambda x : tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [None]:
val_labels = tf.data.Dataset.list_files('D:\\Learning\\FPT\\AIT\\Tensorflow\\face_recognition\\aug_data\\val\\labels\\*.json', shuffle=False)
val_labels = val_labels.map(lambda x : tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [None]:
val_labels.as_numpy_iterator().next()

In [None]:
len(train_images), len(train_labels), len(test_images), len(test_labels), len(val_images), len(val_labels)

In [None]:
train = tf.data.Dataset.zip((train_images, train_labels))
train = train.shuffle(5000)
train = train.batch(8)
train = train.prefetch(4)

In [None]:
test = tf.data.Dataset.zip((test_images, test_labels))
test = test.shuffle(5000)
test = test.batch(8)
test = test.prefetch(4)

In [None]:
val = tf.data.Dataset.zip((val_images, val_labels))
val = val.shuffle(5000)
val = val.batch(8)
val = val.prefetch(4)

In [None]:
train.as_numpy_iterator().next()[1]

In [None]:
data_samples = train.as_numpy_iterator()

In [None]:
res = data_samples.next()

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx in range(4): 
    sample_image = res[0][idx]
    sample_coords = res[1][1][idx]
    
    cv2.rectangle(sample_image, 
                  tuple(np.multiply(sample_coords[:2], [120,120]).astype(int)),
                  tuple(np.multiply(sample_coords[2:], [120,120]).astype(int)), 
                        (255,0,0), 2)

    ax[idx].imshow(sample_image)

### Train models

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, Dense, GlobalMaxPooling2D
from tensorflow.keras.applications import VGG16

In [None]:
vgg = VGG16(include_top=False)

In [None]:
vgg.summary()

In [None]:
def build_model(): 
    input_layer = Input(shape=(120,120,3))
    
    vgg = VGG16(include_top=False)(input_layer)

    # Classification Model  
    f1 = GlobalMaxPooling2D()(vgg)
    class1 = Dense(2048, activation='relu')(f1)
    class2 = Dense(1, activation='sigmoid')(class1)
    
    # Bounding box model
    f2 = GlobalMaxPooling2D()(vgg)
    regress1 = Dense(2048, activation='relu')(f2)
    regress2 = Dense(4, activation='sigmoid')(regress1)
    
    facetracker = Model(inputs=input_layer, outputs=[class2, regress2])
    return facetracker

In [None]:
facetracker = build_model()

In [None]:
facetracker.summary()

In [None]:
X, y = train.as_numpy_iterator().next()

In [None]:
X.shape

In [None]:
classes, coords = facetracker.predict(X)

In [None]:
classes, coords

In [None]:
batches_per_epoch = len(train)
lr_decay = (1./0.75 -1)/batches_per_epoch

In [None]:
opt = tf.keras.optimizers.Adam(learning_rate=0.0001, decay=lr_decay)

In [None]:
def localization_loss(y_true, yhat):            
    delta_coord = tf.reduce_sum(tf.square(y_true[:,:2] - yhat[:,:2]))
                  
    h_true = y_true[:,3] - y_true[:,1] 
    w_true = y_true[:,2] - y_true[:,0] 

    h_pred = yhat[:,3] - yhat[:,1] 
    w_pred = yhat[:,2] - yhat[:,0] 
    
    delta_size = tf.reduce_sum(tf.square(w_true - w_pred) + tf.square(h_true-h_pred))
    
    return delta_coord + delta_size

In [None]:
classloss = tf.keras.losses.BinaryCrossentropy()
regressloss = localization_loss

In [None]:
localization_loss(y[1], coords)

In [None]:
classloss(y[0], classes)

In [None]:
regressloss(y[1], coords)

In [None]:
class FaceTracker(Model): 
    def __init__(self, eyetracker,  **kwargs): 
        super().__init__(**kwargs)
        self.model = eyetracker

    def compile(self, opt, classloss, localizationloss, **kwargs):
        super().compile(**kwargs)
        self.closs = classloss
        self.lloss = localizationloss
        self.opt = opt
    
    def train_step(self, batch, **kwargs): 
        
        X, y = batch
        
        with tf.GradientTape() as tape: 
            classes, coords = self.model(X, training=True)
            
            batch_classloss = self.closs(y[0], classes)
            batch_localizationloss = self.lloss(tf.cast(y[1], tf.float32), coords)
            
            total_loss = batch_localizationloss+0.5*batch_classloss
            
            grad = tape.gradient(total_loss, self.model.trainable_variables)
        
        opt.apply_gradients(zip(grad, self.model.trainable_variables))
        
        return {"total_loss":total_loss, "class_loss":batch_classloss, "regress_loss":batch_localizationloss}
    
    def test_step(self, batch, **kwargs): 
        X, y = batch
        
        classes, coords = self.model(X, training=False)
        
        batch_classloss = self.closs(y[0], classes)
        batch_localizationloss = self.lloss(tf.cast(y[1], tf.float32), coords)
        total_loss = batch_localizationloss+0.5*batch_classloss
        
        return {"total_loss":total_loss, "class_loss":batch_classloss, "regress_loss":batch_localizationloss}
        
    def call(self, X, **kwargs): 
        return self.model(X, **kwargs)

In [None]:
model = FaceTracker(facetracker)

In [None]:
model.compile(opt, classloss, regressloss)

In [None]:
logdir='logs'

In [None]:
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=logdir)

In [None]:
hist = model.fit(train, epochs=10, validation_data=val, callbacks=[tensorboard_callback])

In [None]:
hist.history

In [None]:
fig, ax = plt.subplots(ncols=3, figsize=(20,5))

ax[0].plot(hist.history['total_loss'], color='teal', label='loss')
ax[0].plot(hist.history['val_total_loss'], color='orange', label='val loss')
ax[0].title.set_text('Loss')
ax[0].legend()

ax[1].plot(hist.history['class_loss'], color='teal', label='class loss')
ax[1].plot(hist.history['val_class_loss'], color='orange', label='val class loss')
ax[1].title.set_text('Classification Loss')
ax[1].legend()

ax[2].plot(hist.history['regress_loss'], color='teal', label='regress loss')
ax[2].plot(hist.history['val_regress_loss'], color='orange', label='val regress loss')
ax[2].title.set_text('Regression Loss')
ax[2].legend()

plt.show()

In [None]:
test_data = test.as_numpy_iterator()

In [None]:
test_sample = test_data.next()

In [None]:
yhat = facetracker.predict(test_sample[0])

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx in range(4): 
    sample_image = test_sample[0][idx]
    sample_coords = yhat[1][idx]
    
    if yhat[0][idx] > 0.9:
        cv2.rectangle(sample_image, 
                      tuple(np.multiply(sample_coords[:2], [120,120]).astype(int)),
                      tuple(np.multiply(sample_coords[2:], [120,120]).astype(int)), 
                            (255,0,0), 2)
    
    ax[idx].imshow(sample_image)

### Package the model into h5 file and test

In [None]:
from tensorflow.keras.models import load_model

In [None]:
facetracker.save('facetracker.h5')

In [None]:
facetracker = load_model('facetracker.h5')

In [None]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    _ , frame = cap.read()
    frame = frame[50:500, 50:500,:]
    
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    resized = tf.image.resize(rgb, (120,120))
    
    yhat = facetracker.predict(np.expand_dims(resized/255,0))
    sample_coords = yhat[1][0]
    
    if yhat[0] > 0.5: 
        # Controls the main rectangle
        cv2.rectangle(frame, 
                      tuple(np.multiply(sample_coords[:2], [450,450]).astype(int)),
                      tuple(np.multiply(sample_coords[2:], [450,450]).astype(int)), 
                            (255,0,0), 2)
        # Controls the label rectangle
        cv2.rectangle(frame, 
                      tuple(np.add(np.multiply(sample_coords[:2], [450,450]).astype(int), 
                                    [0,-30])),
                      tuple(np.add(np.multiply(sample_coords[:2], [450,450]).astype(int),
                                    [80,0])), 
                            (255,0,0), -1)
        
        # Controls the text rendered
        cv2.putText(frame, 'face', tuple(np.add(np.multiply(sample_coords[:2], [450,450]).astype(int),
                                               [0,-5])),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
    
    cv2.imshow('EyeTrack', frame)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()