# 1. Setup and Getting Data

### 1.1 Install Detpendencies and Setup

In [None]:
!pip install labelme tensorflow opencv-python matplotlib albumentations #tensorflow-gpu

Collecting labelme
  Using cached labelme-5.5.0.tar.gz (1.4 MB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting imgviz>=1.7.5 (from labelme)
  Downloading imgviz-1.7.6-py3-none-any.whl.metadata (6.2 kB)
Collecting onnxruntime!=1.16.0,>=1.14.1 (from labelme)
  Downloading onnxruntime-1.20.1-cp310-cp310-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (4.5 kB)
Collecting qtpy!=1.11.2 (from labelme)
  Downloading QtPy-2.4.2-py3-none-any.whl.metadata (12 kB)
Collecting PyQt5!=5.15.3,!=5.15.4 (from labelme)
  Downloading PyQt5-5.15.11-cp38-abi3-manylinux_2_17_x86_64.whl.metadata (2.1 kB)
Collecting coloredlogs (from onnxruntime!=1.16.0,>=1.14.1->labelme)
  Downloading coloredlogs-15.0.1-py2.py3-none-any.whl.metadata (12 kB)
Collecting PyQt5-sip<13,>=12.15 (from PyQt5!=5.15.3,!=5.15.4->labelme)
  Downloading PyQt5_sip-12.16.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.whl.metadata (421 bytes)
Collecting PyQt5-Qt5<5.16.0,>=5.15.2 (from PyQt5!=5.15.3,!=5.15.4->l

### 1.2 Collect Images Using OpenCV

In [None]:
import os # deal with file paths
import time # time between frames/images
import uuid # uniform unique identifier
import cv2 # computer vision

In [None]:
IMAGES_PATH = os.path.join('data', 'images')
number_images = 20

In [None]:
cap = cv2.VideoCapture(1)
for imgnum in range(number_images):
  print('Collecting image {}'.format(imgnum))
  ret, frame = cap.read()
  imgname = os.path.join(IMAGES_PATH, f'{str(uuid.uuid1())}.jpg')
  cv2.imwrite(imgname, frame)
  cv2.imshow('frame', frame)
  time.sleep(0.5)

  if cv2.waitKey(1) & 0xFF == ord('q'):
    break

cap.release()
cv2.destroyAllWindows()

### 1.3 Annotate Images with LabelMe

In [None]:
!labelme # choose images dir, output dir, save auto, draw boxes

# 2. Review Dataset and Build Image Loading Function

### 2.1 Import TF and Deps

In [None]:
import tensorfor as tf
import json
import numpy as np
import matplotlib.pyplot as plt

### 2.2 Limit GPU Memory Growth

In [None]:
# Avoid OOM errors by setting GPU Memory Consumption Growth
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
  tf.config.experimental.set_memory_growth(gpu, True)

In [None]:
tf.config.list_physical_devices('GPU')

### 2.3 Load Image Into TF Data Pipeline

In [None]:
images = tf.data.Dataset.list_files('data\\images\\*.jpg', shufle=False)

In [None]:
# check if the previous lines work
images.as_numpy_iterator().next()

In [None]:
def load_image(x):
  byte_img = tf.io.read_file(x)
  img = tf.io.decode_jpeg(byte_img)
  return img

In [None]:
images = images.map(load_image)

In [None]:
images.as_numpy_iterator().next()

### 2.4 View Raw Image with Matplotlib

In [None]:
image_generator = images.batch(4).as_numpy_iterator()

In [None]:
plot_images = image_generator.next()

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx, image in enumerate(plot_images):
  ax[idx].imshow(image)
plt.show()

# 3. Partition Unaugmentaed Data

### 3.1 Manually split data into train, validation and test

In [None]:
 # create 3 folders (train, val, test) in each folder create 2 folders (images, labels)
 # put images inside images folders with ratios (0.7, 0.15, 0.15)

### 3.2 Move the Matching Labels

In [None]:
for folder in ['train', 'test', 'val']:
  for file in os.listdir(os.path.join('data', folder, 'images')):
    file_name = file.split('.')[0]+'json'
    existing_filepath = os.path.join('data', 'labels', file_name)
    if os.path.exists(existing_file_path):
      new_filepath = os.path.join('data', folder, 'labels', file_name)
      os.replace(existing_filepath, new_filepath)

# 4. Apply Image Augmentation on Imagesand Labels Using Albumentations

### 4.1 Setup Albumentations Transform Pipeline

In [None]:
import albumentations as alb

In [None]:
augemtor = alb.Compose([
    alb.RandomCrop(wieth=450, height=450),
    alb.HorizontalFlip(p=0.5),
    alb.RandomBirghtnessContrast(p=0.2),
    alb.RandomGamma(p=0.2),
    alb.RGBShift(p=0.2),
    alb.VerticalFlip(0.5)
], bbox_params=alb.BboxParams(format='albumentaitons', label_fields = ['class_labels']))

### 4.2 Load a Test Image and Annotation with OpenCV and JSON

In [None]:
img = cv2.imread(os.path.join('data', 'train', 'images', 'captured_image_name.jpg'))

In [None]:
with open(os.path.join('data', 'train', 'labels', 'label_me_name.json'), 'r') as f:
  label = json.load(f)

### 4.3 Extract Coordinates and Rescale to Match Image Resolution

In [None]:
coords = [0, 0, 0, 0]
coors[0] = label['shapes'][0]['points'][0][0]
coors[1] = label['shapes'][0]['points'][0][1]
coors[2] = label['shapes'][0]['points'][1][0]
coors[3] = label['shapes'][0]['points'][1][1]

In [None]:
coords

In [None]:
coords = list(np.divide(coords, [640, 480, 640, 480])) # divide by image dimensions

In [None]:
coords

### 4.4 Apply Augmentaions and View Results

In [None]:
augmented = augmentor(image=img, bbox=[coords], class_labels=['face'])

In [None]:
cv2.rectange(augmented['image'],
             tuple(np.multiply(augmented['bboxes'][0][:2], [450,450]).astype(int)),
             tuple(np.multiply(augmented['bboxes'][0][2:], [450,450]).astype(int)),
             (255, 0, 0), 2)

plt.imshow(augmented['images'])

# 5. Build and Run Augmentation Pipeline

### 5.1 Run Augmentation Pipeline

In [None]:
for partition in ['train', 'test', 'val']:
  for image in os.listdir(os.path.join('data', partition, 'images')):
    img = cv2.imread(os.path.join('data', partition, 'images', image))

    coords = [0, 0, 0.00001, 0.00001]
    label_path = os.path.join('data', partition, 'labels', f'{image.split('.'[0])}.json')
    if of.path.exists(label_path):
      with open(label, 'r') as f:
        label = json.load(f)

      coors[0] = label['shapes'][0]['points'][0][0]
      coors[1] = label['shapes'][0]['points'][0][1]
      coors[2] = label['shapes'][0]['points'][1][0]
      coors[3] = label['shapes'][0]['points'][1][1]

    try:
      for x in range(60):
        augmented = augmentor(image=img, bboxes=[coords], class_labels=['face'])
        cv2.imwrite(os.path.join('aug_data', partition, 'images', f'{image.split('.')[0]}.{x}.jpg'), augmented['image'])

        annotation = {}
        annotation['image'] = image

        if os.path.exists(label_path):
          if len(augmented['bboxes']) == 0:
            annotation['bbox'] = [0,0,0,0]
            annotation['class'] = 0
          else:
            annotation['bbox'] = augmented['bboxes'][0]
            annotation['class'] = 1
        else:
          annotations['bbox'] = [0,0,0,0]
          annotation['class'] = 0

        with open(os.path.join('aug_data', partition, 'labels', f'{image.split('.')[0]}.{x}.json') 'w') as f:
          json.dump(annotation, f)

      except Exception as e:
        print(e)

### 5.2 Load Augmented Images to Tensorflow Dataset

In [None]:
train_images = tf.data.Dataset.list_files('aug_data\\train\\images\\*.jpg', suffle=False)
train_images = train_images.map(load_image)
train_images = train_images.map(lambda x: tf.image.resize(x, (120,120)))
train_images = train_images.map(lambda x: x/255)

In [None]:
test_images = tf.data.Dataset.list_files('aug_data\\test\\images\\*.jpg', suffle=False)
test_images = train_images.map(load_image)
test_images = train_images.map(lambda x: tf.image.resize(x, (120,120)))
test_images = train_images.map(lambda x: x/255)

In [None]:
val_images = tf.data.Dataset.list_files('aug_data\\val\\images\\*.jpg', suffle=False)
val_images = train_images.map(load_image)
val_images = train_images.map(lambda x: tf.image.resize(x, (120,120)))
val_images = train_images.map(lambda x: x/255)

# 6. Prepare Labels

### 6.1 Build Label Loading Function

In [None]:
def load_labels(label_path):
  with open(label_path.numpy(), 'r', encoding = 'utf-8') as f:
    label = json.load(f)
  return [label['class']], label['bbox']


### 6.2 Load Labels to Tensorflow Dataset

In [None]:
train_labels = tf.data.Dataset.list_files('aug_data\\train\\labels\\*.json', suffle=False)
train_labels = train_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.unit8, tf.float16]))

In [None]:
test_labels = tf.data.Dataset.list_files('aug_data\\test\\labels\\*.json', suffle=False)
test_labels = train_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.unit8, tf.float16]))

In [None]:
val_labels = tf.data.Dataset.list_files('aug_data\\val\\labels\\*.json', suffle=False)
val_labels = train_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.unit8, tf.float16]))

# 7. Combine Label and Image Sample

### 7.1 Check Partition Lengths

In [None]:
len(train_images), len(train_labels), len(test_images), len(test_labels), len(val_images), len(val_labels)

### 7.2 Create Final Dataset (Images / Labels)

In [None]:
train = tf.data.Dataset.zip((train_images, train_labels))
train = train.shuffle(5000) # bigger than the size above
train = train.batch(8)
train = train.prefetch(4)

In [None]:
test = tf.data.Dataset.zip((test_images, test_labels))
test = test.shuffle(1300)
test = test.batch(8)
test = test.prefetch(4)

In [None]:
val = tf.data.Dataset.zip((val_images, train_labels))
val = val.shuffle(1000)
val = val.batch(8)
val = val.prefetch(4)

### 7.3 View Images and Annotations

In [None]:
 data_samples = train.as_numpy_iterator()

In [None]:
res = data_samples.next()

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx in range(4):
  sample_image = res[0][idx]
  sample_coords = res[1][1][idx]

cv2.rectangle(sample_image,
              tuple(np.multiply(sample_coords[:2], [120,120]).astype(int)),
              tuple(np.multiply(sample_coords[2:], [120,120]).astype(int)),
              (255,0,0), 2)

  ax[idx].imshow(sample_image)

# 8. Build Deep Learning Using the Funcitonal API

### 8.1 Import Layers and Base Network

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, Dense, GlobalMaxPooling2D
from tensorflow.keras.applications import VGG16

### 8.2 Download VGG16

In [None]:
vgg = VGG16(include_top=False)

In [None]:
vgg.summary()

### 8.3 Build instance of Network

In [None]:
def build_model():
  input_layer = Input(shape=(120,120,3))

  vgg = VGG16(include_top=False)(input_layer)

  f1 = GlobalMaxPooling2D()(vgg)
  class1 = Dense(2048, activation='relu')(f1)
  class2 = Dense(1, activation='sigmoid')(class1)

  f2 = GlobalMaxPooling2D()(vgg)
  regress1 = Dense(2048, activation='relu')(f2)
  regress2 = Dense(1, activation='sigmoid')(regress1)

  facetracker = Model(inputs=input_layer, outputs[class2, regress2])

  return facetracker

### 8.4 Test out Neural Network

In [None]:
facetracker = build_model()

In [None]:
facetracker.summary()

In [None]:
X, y = train.as_numpy_iterator().next()

In [None]:
X.shape

In [None]:
classes, coords = facetracker.predict(X)

In [None]:
classes, coords

# 9. Define Loss and Optimizers

### 9.1 Define Optimizer and LR

In [None]:
batch_per_epoch = len(train) #690
lr_decay = (1./0.75 - 1)/batch_per_epoch

In [None]:
opt = tf.keras.optimizers.Adam(learning_rate=0.00001, decay=lr_decay)

### 9.2 Create Localization Loss and Classification Loss

In [None]:
def localization_loss(y_true, yhat):
  delta_coord = tf.reduce_sum(tf.square(y_true[:,:2] - yhat[:,:2]))

  h_true = y_true[:,3] - y_true[:,1]
  w_true = y_ture[:,2] - y_true[:,0]

  h_pred = yhat[:,3] - yhat[:,1]
  w_pred = yhat[:,2] - yhat[:,0]

  delta_size = tf.reduce_sum(tf.square(w_true - w_pred) + tf.square(h_ture, h_pred))

  return delta_coord + delta_size

In [None]:
classloss = tf.keras.losses.BinaryCrossentropy()
regressloss = localization_loss

### 9.3 Teset out Loss Meertics

In [None]:
localizaiton_loss(y[1], coords) # to get the value add .numpy() at the end

In [None]:
classloss(y[0], classes)

In [None]:
regressloss(y[1], coords)

# 10. Train Neural Network

### 10.1 Create Custom Model Class

In [None]:
class FaceTracker(Model):
  def __init__(self, eyetracker, **kwargs):
    super().__init__(**kwargs)
    self.model = eyetracker

  def compile(self, opt, classloss, localizationloss, **kwargs):
    super().compile(**kwargs)
    self.closs = classloss
    self.lloss = localizationloss
    self.opt = opt

  def train_step(self, batch, **kwargs):
    X, y = batch

    with tf.GradientTape() as tape:
      classes, coords = self.model(X, training=True)

      batch_classloss = self.closs(y[0], classes)
      batch_localizationloss = self.lloss(tf.cast(y[1], tf.float32), coords)

      total_loss = batch_localizationloss + 0.5 * batch_classloss

      gard = tape.gradient(total_loss, self.model.trainable_variables)

    opt.apply_gradients(zip(gard, self.model.trainable_variables))

    return {"total_loss": total_loss, "class_loss": batch_classloss, "regress_loss": batch_localizationloss}

  def test_step(self, batch, **kwargs):
    X, y = batch

    classes, coords = self.model(X, training=False)

    batch_classloss = self.closs(y[0], classes)
    batch_localizationloss = self.lloss(tf.cast(y[1], tf.float32), coords)
    total_loss = batch_localizationloss + 0.5 * batch_classloss

    return {"total_loss": total_loss, "class_loss": batch_classloss, "regress_loss": batch_localizationloss}

  def call(self, X, **kwargs): # to implement .predict()
    return self.model(X, **kwargs)

In [None]:
model = FaceTracker(facetracker)

In [None]:
model.compile(opt, classloss, regressloss)

### 10.2 Train

In [None]:
logdir = 'logs'

In [None]:
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=logdir)

In [None]:
hist = model.fit(train.take(100), epoch=40, validation_data=val, callbacks=[tensorboard_callbac])

### 10.3 Plot Performance

In [None]:
hist.history

In [None]:
fig, ax = plt.subplots(ncols=3, figsize=(20,5))

ax[0].plot(hist.history['total_loss'], color='teal', label='loss')
ax[0].plot(hist.history['val_total_loss'], color='orange', label='val_loss')
ax[0].title.set_text('Loss')
ax[0].legend()

ax[1].plot(hist.history['class_loss'], color='teal', label='class loss')
ax[1].plot(hist.history['val_class_loss'], color='orange', label='val class loss')
ax[1].title.set_text('Classification Loss')
ax[1].legend()

ax[2].plot(hist.history['regress_loss'], color='teal', label='regress loss')
ax[2].plot(hist.history['val_regress_loss'], color='orange', label='val regress loss')
ax[2].title.set_text('Regression Loss')
ax[2].legend()

plt.show()

# 11. Make Predictions

### 11.1 Make Predictions on Test Set

In [None]:
test_data = test.as_numpy_iterator()

In [None]:
test_sample = test_data.next()

In [None]:
yhat = facetracker.predict(test_sample[0])

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx in range(4):
  smaple_image = test_sample[0][idx]
  smaple_coords = yhat[1][idx]

  if yhat[0][idx] > 0.5:
    cv2.rectangle(sample_image,
    tuple(np.multiply(sample_coords[:2], [120,120]).astype(int)),
    tuple(np.multiply(sample_coords[2:], [120,120]).astype(int)),
    (255,0,0), 2)

  ax[idx].imshow(sample_image)

### 11.2 Save the Model

In [None]:
from tensorflow.keras.models import load_model

In [None]:
facetracer.save('facetracker.h5')

In [None]:
facetracker = load_model('facetracker.h5')

### 11.3 Real Time Detection

In [None]:
cap = cv2.VideoCapture(0)

while cap.isOpened():
  _ , frame = cap.read()
  frame = frame[50:500, 50:500,:]

  rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
  resized = tf.image.resize(rgb, (120,120))

  yhat = facetracker.predict(np.expand_dims(resized/255,0))
  sample_coords = yhat[1][0]

  if yhat[0] > 0.5:
    cv2.rectangle(frame,
                  tuple(np.multiply(sample_coords[:2], [450,450]).astype(int)),
                  tuple(np.multiply(sample_coords[2:], [450,450]).astype(int)),
                  (255,0,0), 2)
    cv2.rectangle(frame,
                  tuple(np.add(np.multiply(sample_coords[:2], [450,450]).astype(int), [0, -30]))
                  tuple(np.add(np.multiply(sample_coords[2:], [450,450]).astype(int), [80, 0])),
                  (255,0,0), 2)

    cv2.putText(frame, 'face', tuple(np.add(np.multiply(sample_coords[:2], [450,450]).astype(int), [0, -5])),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)

    cv2.imshow('EyeTrack', frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):
      break

cap.release()
cv2.destroyAllWindows()
