In [None]:
import tensorflow as tf
import json
import numpy as np
from matplotlib import pyplot as plt
import os

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus: 
    tf.config.experimental.set_memory_growth(gpu, True)

In [None]:
tf.config.list_physical_devices('GPU')

## Load Image into TF Data Pipeline

In [None]:
current_folder = os.path.abspath('')
IMAGES_PATH = os.path.join(current_folder,'data','images')
print(IMAGES_PATH)

images = tf.data.Dataset.list_files(IMAGES_PATH + '/*.jpg', shuffle=False)

In [None]:
images.as_numpy_iterator().next()

In [None]:
def load_image(x): 
    byte_img = tf.io.read_file(x)
    img = tf.io.decode_jpeg(byte_img)
    return img

In [None]:
images = images.map(load_image)

In [None]:
images.as_numpy_iterator().next()

In [None]:
image_generator = images.batch(4).as_numpy_iterator()
plot_images = image_generator.next()
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx, image in enumerate(plot_images):
    ax[idx].imshow(image) 
plt.show()

# Partition Unaugmented Data

In [None]:
current_folder = os.path.abspath('')
for folder in ['train','test','val']:
    for file in os.listdir(os.path.join(current_folder,'data', folder, 'images')):
        
        filename = file.split('.')[0]+'.json'
        existing_filepath = os.path.join(current_folder,'data','labels', filename)
        if os.path.exists(existing_filepath): 
            new_filepath = os.path.join('data',folder,'labels',filename)
            os.replace(existing_filepath, new_filepath)      

## Apply Image Augmentation on Images and Labels using Albumentations
This part sets up the Albumentation pipeline and tests it on 1 image.

In [None]:
import albumentations as alb
import cv2

In [None]:
augmentor = alb.Compose([alb.RandomCrop(width=540, height=540), 
                         alb.HorizontalFlip(p=0.5), 
                         alb.RandomBrightnessContrast(p=0.2),
                         alb.RandomGamma(p=0.2), 
                         alb.RGBShift(p=0.2), 
                         alb.VerticalFlip(p=0.5)], 
                       bbox_params=alb.BboxParams(format='albumentations', 
                                                  label_fields=['class_labels']))

In [None]:
img = cv2.imread(os.path.join('data','train', 'images','8d8023dc-2eb0-11ee-b1eb-166359f0a21b.jpg'))
img.shape

In [None]:
with open(os.path.join('data', 'train', 'labels', '8d8023dc-2eb0-11ee-b1eb-166359f0a21b.json'), 'r') as f:
    label = json.load(f)
label['shapes'][0]['points']

In [None]:
# Shows the bounding box coordinates

coords = [0,0,0,0]
coords[0] = label['shapes'][0]['points'][0][0]
coords[1] = label['shapes'][0]['points'][0][1]
coords[2] = label['shapes'][0]['points'][1][0]
coords[3] = label['shapes'][0]['points'][1][1]
coords

In [None]:
coords = list(np.divide(coords, [1280,720,1280,720]))
coords

In [None]:
augmented = augmentor(image=img, bboxes=[coords], class_labels=['face'])

In [None]:
augmented['bboxes']

In [None]:
augmented['bboxes'][0][2:]

In [None]:
cv2.rectangle(augmented['image'], 
              tuple(np.multiply(augmented['bboxes'][0][:2], [540,540]).astype(int)),
              tuple(np.multiply(augmented['bboxes'][0][2:], [540,540]).astype(int)), 
                    (255,0,0), 2)

plt.imshow(augmented['image'])

## Using the Albumentation Pipeline to Augment the Data

In [None]:
current_folder = os.path.abspath('')

for partition in ['train','test','val']: 
    for image in os.listdir(os.path.join(current_folder, 'data', partition, 'images')):
        img = cv2.imread(os.path.join(current_folder, 'data', partition, 'images', image))

        coords = [0,0,0.00001,0.00001]
        label_path = os.path.join(current_folder, 'data', partition, 'labels', f'{image.split(".")[0]}.json')
        if os.path.exists(label_path):
            with open(label_path, 'r') as f:
                label = json.load(f)

            coords[0] = label['shapes'][0]['points'][0][0]
            coords[1] = label['shapes'][0]['points'][0][1]
            coords[2] = label['shapes'][0]['points'][1][0]
            coords[3] = label['shapes'][0]['points'][1][1]
            coords = list(np.divide(coords, [1280,720,1280,720]))

        try: 
            for x in range(50):
                augmented = augmentor(image=img, bboxes=[coords], class_labels=['face'])
                cv2.imwrite(os.path.join(current_folder, 'aug_data', partition, 'images', f'{image.split(".")[0]}.{x}.jpg'), augmented['image'])

                annotation = {}
                annotation['image'] = image

                if os.path.exists(label_path):
                    if len(augmented['bboxes']) == 0: 
                        annotation['bbox'] = [0,0,0,0]
                        annotation['class'] = 0 
                    else: 
                        annotation['bbox'] = augmented['bboxes'][0]
                        annotation['class'] = 1
                else: 
                    annotation['bbox'] = [0,0,0,0]
                    annotation['class'] = 0 


                with open(os.path.join(current_folder,  'aug_data', partition, 'labels', f'{image.split(".")[0]}.{x}.json'), 'w') as f:
                    json.dump(annotation, f)

        except Exception as e:
            print(e)

Notes:
- `tf.data.Dataset.list_files('aug_data/train/images/*.jpg', shuffle=False)` loads all the file names in the directory into a dataset.
- `train_images.map(load_image)` applies the `load_image` function to each element of the dataset.
- `train_images.map(lambda x: tf.image.resize(x, (120,120)))` applies a lambda function where  `tf.image.resize()` is called on each image and resizes them to 120x120.
- `train_images.map(lambda x: x/255)` applies a lambda function where each image is divided by 255 to decrease the data size (to improve processing speed).

In [None]:
train_images = tf.data.Dataset.list_files('aug_data/train/images/*.jpg', shuffle=False)
train_images = train_images.map(load_image)
train_images = train_images.map(lambda x: tf.image.resize(x, (120,120)))
train_images = train_images.map(lambda x: x/255)

In [None]:
test_images = tf.data.Dataset.list_files('aug_data/test/images/*.jpg', shuffle=False)
test_images = test_images.map(load_image)
test_images = test_images.map(lambda x: tf.image.resize(x, (120,120)))
test_images = test_images.map(lambda x: x/255)

In [None]:
val_images = tf.data.Dataset.list_files('aug_data/val/images/*.jpg', shuffle=False)
val_images = val_images.map(load_image)
val_images = val_images.map(lambda x: tf.image.resize(x, (120,120)))
val_images = val_images.map(lambda x: x/255)

In [None]:
train_images.as_numpy_iterator().next()

## Prepare Labels

In [None]:
def load_labels(label_path):
    with open(label_path.numpy(), 'r', encoding = "utf-8") as f:
        label = json.load(f)
        
    return [label['class']], label['bbox']

In [None]:
train_labels = tf.data.Dataset.list_files('aug_data/train/labels/*.json', shuffle=False)
train_labels = train_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [None]:
test_labels = tf.data.Dataset.list_files('aug_data/test/labels/*.json', shuffle=False)
test_labels = test_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [None]:
val_labels = tf.data.Dataset.list_files('aug_data/val/labels/*.json', shuffle=False)
val_labels = val_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [None]:
len(train_images), len(train_labels), len(test_images), len(test_labels), len(val_images), len(val_labels)

Notes:
- `zip()` function returns a zip object, which is an iterator of tuples where the first item in each passed iterator is paired together, and then the second item in each passed iterator are paired together etc.
- `shuffle()` function takes a sequence (list, string, or tuple) and reorganize the order of the items.
- `batch()` function combines consecutive elements of a dataset into batches.
- `prefetch()` function allows later elements to be prepared while the current element is being processed. It can be used to overlap the preprocessing and model execution of a training dataset.

In [None]:
train = tf.data.Dataset.zip((train_images, train_labels))
train = train.shuffle(5000)
train = train.batch(8)
train = train.prefetch(4)

In [None]:
train.as_numpy_iterator().next()[1]

In [None]:
test = tf.data.Dataset.zip((test_images, test_labels))
test = test.shuffle(1300)
test = test.batch(8)
test = test.prefetch(4)

In [None]:
val = tf.data.Dataset.zip((val_images, val_labels))
val = val.shuffle(1000)
val = val.batch(8)
val = val.prefetch(4) 

In [None]:
data_samples = train.as_numpy_iterator()

In [None]:
res = data_samples.next()

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx in range(4): 
    sample_image = res[0][idx]
    sample_coords = res[1][1][idx]
    
    cv2.rectangle(sample_image, 
                  tuple(np.multiply(sample_coords[:2], [120,120]).astype(int)),
                  tuple(np.multiply(sample_coords[2:], [120,120]).astype(int)), 
                        (255,0,0), 2)

    ax[idx].imshow(sample_image)

## Build Deep Learning using the Functional API

Notes:
- Our problem is both a classification problem as we are trying to determine whether the object is a 'face', and it is also a regression problem because we are trying to predict the coordinates of the bounding box.
- Keras is a higher level API that runs on top of TensorFlow. It is easier to use and has a simpler syntax while still allowing for full customization.
- VGG16 is a convolutional neural network model that is pre-trained. We will cut off the last few layers of the model as it is meant to do classification but we need BOTH classification and regression (for localization).

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, Dense, GlobalMaxPooling2D
from tensorflow.keras.applications import VGG16

In [None]:
vgg = VGG16(include_top=False)

In [None]:
vgg.summary()

In [None]:
def build_model(): 
    input_layer = Input(shape=(120,120,3))
    
    vgg = VGG16(include_top=False)(input_layer)

    # Classification Model  
    f1 = GlobalMaxPooling2D()(vgg)
    class1 = Dense(2048, activation='relu')(f1)
    class2 = Dense(1, activation='sigmoid')(class1)
    
    # Bounding box model
    f2 = GlobalMaxPooling2D()(vgg)
    regress1 = Dense(2048, activation='relu')(f2)
    regress2 = Dense(4, activation='sigmoid')(regress1)
    
    facetracker = Model(inputs=input_layer, outputs=[class2, regress2])
    return facetracker

In [None]:
facetracker = build_model()

In [None]:
facetracker.summary()

#### Example

In [None]:
X, y = train.as_numpy_iterator().next()

In [None]:
classes, coords = facetracker.predict(X)

In [None]:
classes, coords

## Define Losses and Optimizers

We're using Adam as our optimizer
- Optimizer figure out how to apply the best gradients and effectively apply back-propagation across the neural network

In [None]:
#### this method is deprecated
batches_per_epoch = len(train)
# lr_decay = (1./0.75 -1)/batches_per_epoch
# opt = tf.keras.optimizers.Adam(learning_rate=0.0001, decay=lr_decay)


# this is to set up the learning rate to be 75% of the previous epoch's learning rate so 
# that we slow down the learning rate as we continue to train the model so that we don't 
# overfit and blow out the gradients

lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=0.0001,
    decay_steps=batches_per_epoch,
    decay_rate=0.75)

In [None]:
opt = tf.keras.optimizers.Adam(learning_rate=lr_schedule)

#### Create Localization Loss and Classification Loss

Notes:
- calculating binary cross entropy loss (aka logarithmic loss) for CLASSIFICATION LOSS
- calculating localization loss for REGRESSION LOSS
  1. we're getting the distance between our actual coordinate vs the predicted coordinate
  2. we're getting the square of the height diff and the square of the width diff
      - then we're getting the actual height & width of the bounding box
      - then we're getting the predicted height & width of the bounding box

In [None]:
def localization_loss(y_true, yhat):            
    delta_coord = tf.reduce_sum(tf.square(y_true[:,:2] - yhat[:,:2]))
                  
    h_true = y_true[:,3] - y_true[:,1] 
    w_true = y_true[:,2] - y_true[:,0] 

    h_pred = yhat[:,3] - yhat[:,1] 
    w_pred = yhat[:,2] - yhat[:,0] 
    
    delta_size = tf.reduce_sum(tf.square(w_true - w_pred) + tf.square(h_true-h_pred))
    
    return delta_coord + delta_size

In [None]:
classloss = tf.keras.losses.BinaryCrossentropy()
regressloss = localization_loss

##### Example

In [None]:
localization_loss(y[1], coords)

In [None]:
classloss(y[0], classes)

In [None]:
regressloss(y[1], coords)

## Train Neural Network

In [None]:
class FaceTracker(Model): 
    def __init__(self, facetracker,  **kwargs): 
        super().__init__(**kwargs)
        self.model = facetracker

    def compile(self, opt, classloss, localizationloss, **kwargs):
        super().compile(**kwargs)
        self.opt = opt
        self.closs = classloss
        self.lloss = localizationloss
    
    def train_step(self, batch, **kwargs): 
        
        X, y = batch
        
        with tf.GradientTape() as tape: 
            classes, coords = self.model(X, training=True)
            
            batch_classloss = self.closs(y[0], classes)
            batch_localizationloss = self.lloss(tf.cast(y[1], tf.float32), coords)
            
            # this 0.5 is a hyperparameter that we can tune to see how much we want to penalize the classification loss
            total_loss = batch_localizationloss + 0.5 * batch_classloss
            
            # calculates the gradients with respect to the loss function
            grad = tape.gradient(total_loss, self.model.trainable_variables)
        
        # applying one step of gradient descent for the backpropagation
        opt.apply_gradients(zip(grad, self.model.trainable_variables))
        
        return {"total_loss":total_loss, "class_loss":batch_classloss, "regress_loss":batch_localizationloss}
    
    def test_step(self, batch, **kwargs): 
        X, y = batch
        
        classes, coords = self.model(X, training=False)
        
        batch_classloss = self.closs(y[0], classes)
        batch_localizationloss = self.lloss(tf.cast(y[1], tf.float32), coords)
        total_loss = batch_localizationloss+0.5*batch_classloss
        
        # notice no backpropagation here !!
        
        return {"total_loss":total_loss, "class_loss":batch_classloss, "regress_loss":batch_localizationloss}
        
    def call(self, X, **kwargs): 
        return self.model(X, **kwargs)

In [None]:
model = FaceTracker(facetracker)

In [None]:
model.compile(opt, classloss, regressloss)

#### Training

In [None]:
logdir='logs'

In [None]:
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=logdir)

In [None]:
hist = model.fit(train, epochs=10, validation_data=val, callbacks=[tensorboard_callback])

## Plotting Performance

In [None]:
hist.history

In [None]:
fig, ax = plt.subplots(ncols=3, figsize=(20,5))

ax[0].plot(hist.history['total_loss'], color='teal', label='loss')
ax[0].plot(hist.history['val_total_loss'], color='orange', label='val loss')
ax[0].title.set_text('Loss')
ax[0].legend()

ax[1].plot(hist.history['class_loss'], color='teal', label='class loss')
ax[1].plot(hist.history['val_class_loss'], color='orange', label='val class loss')
ax[1].title.set_text('Classification Loss')
ax[1].legend()

ax[2].plot(hist.history['regress_loss'], color='teal', label='regress loss')
ax[2].plot(hist.history['val_regress_loss'], color='orange', label='val regress loss')
ax[2].title.set_text('Regression Loss')
ax[2].legend()

plt.show()

## Making Predictions / Testing

In [None]:
test_data = test.as_numpy_iterator()

In [None]:
test_sample = test_data.next()

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx in range(4): 
    sample_image = test_sample[0][idx]
    sample_coords = yhat[1][idx]
    
    if yhat[0][idx] > 0.9:
        cv2.rectangle(sample_image, 
                      tuple(np.multiply(sample_coords[:2], [120,120]).astype(int)),
                      tuple(np.multiply(sample_coords[2:], [120,120]).astype(int)), 
                            (255,0,0), 2)
    
    ax[idx].imshow(sample_image)

## Saving the Model

In [None]:
from tensorflow.keras.models import load_model

In [None]:
facetracker.save('facetracker.h5')

In [None]:
facetracker = load_model('facetracker.h5')

## Real-Time Test

In [None]:
cap = cv2.VideoCapture(1)
while cap.isOpened():
    _ , frame = cap.read()
    frame = frame[50:500, 50:500,:]
    
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    resized = tf.image.resize(rgb, (120,120))
    
    yhat = facetracker.predict(np.expand_dims(resized/255,0))
    sample_coords = yhat[1][0]
    
    if yhat[0] > 0.9: 
        # Controls the main rectangle
        cv2.rectangle(frame, 
                      tuple(np.multiply(sample_coords[:2], [540,540]).astype(int)),
                      tuple(np.multiply(sample_coords[2:], [540,540]).astype(int)), 
                            (255,0,0), 2)
        # Controls the label rectangle
        cv2.rectangle(frame, 
                      tuple(np.add(np.multiply(sample_coords[:2], [540,540]).astype(int), 
                                    [0,-30])),
                      tuple(np.add(np.multiply(sample_coords[:2], [540,540]).astype(int),
                                    [80,0])), 
                            (255,0,0), -1)
        
        # Controls the text rendered
        cv2.putText(frame, 'face', tuple(np.add(np.multiply(sample_coords[:2], [540,540]).astype(int), [0,-5])),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
    
    cv2.imshow('EyeTrack', frame)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()