In [1]:
import cv2, os, time, uuid
import tensorflow as tf
import numpy as np, matplotlib.pyplot as plt, pandas as pd
import albumentations as alb
import json
import shutil

Source code : https://github.com/nicknochnack/FaceDetection/blob/main/FaceDetection.ipynb

In [2]:
IMAGES_PATH = os.path.join('C:/Users/naufal','images')
number_images = 50

In [4]:
cap = cv2.VideoCapture(0)
for imgnum in range(number_images):
    print('Collecting image {}'.format(imgnum))
    ret, frame = cap.read()
    imgname = os.path.join(IMAGES_PATH,f"image_{imgnum}.jpg")
    cv2.imwrite(imgname, frame)
    cv2.imshow('frame',frame)
    time.sleep(1)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

Collecting image 0
Collecting image 1
Collecting image 2
Collecting image 3
Collecting image 4
Collecting image 5
Collecting image 6
Collecting image 7
Collecting image 8
Collecting image 9
Collecting image 10
Collecting image 11


In [None]:
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

x_train = np.expand_dims(x_train, axis = -1)
x_test = np.expand_dims(x_test, axis = -1)

In [None]:
model_1 = tf.keras.Sequential([
    tf.keras.layers.Conv2D(32, (2,2), activation = 'relu', input_shape = (28,28,1)),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.MaxPooling2D(2,2),
    tf.keras.layers.Conv2D(32, (2,2), activation = 'relu', strides = 1, padding = 'same'),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.MaxPooling2D(2,2),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation = 'relu'),
    tf.keras.layers.Dropout(0.5),
    tf.keras.layers.Dense(128, activation = 'relu'),
    tf.keras.layers.Dense(10, activation = 'softmax')
])

In [None]:
model_1.compile(loss = tf.keras.losses.SparseCategoricalCrossentropy(), optimizer = tf.keras.optimizers.Adam(), metrics = ['accuracy'])

In [None]:
history = model_1.fit(x = x_train, y = y_train, epochs = 5)

In [None]:
for path in ['train','val','test']:
    os.mkdir(os.path.join("C:/Users/naufal/data/",path))
    os.mkdir(os.path.join(os.path.join("C:/Users/naufal/data/",path),'images'))
    os.mkdir(os.path.join(os.path.join("C:/Users/naufal/data/",path),'labels'))      

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [None]:
images = tf.data.Dataset.list_files('C:/Users/naufal/images/*.jpg')
iterator = images.as_numpy_iterator()

In [None]:
list_img_dir = []
for img_dir in iterator:
    list_img_dir.append(str(img_dir))

In [None]:
list_img_dir = list(map((lambda x : x.split("'")[1].split("\\")[-1]), list_img_dir))

In [None]:
print(len(list_img_dir))

In [None]:
def img_read(file):
    byte_img = tf.io.read_file(file)
    img_cnvrt = tf.io.decode_jpeg(byte_img)
    return img_cnvrt

In [None]:
images = images.map(img_read)

In [None]:
samples = images.batch(4).as_numpy_iterator().next()
fig, ax = plt.subplots(ncols = 4, figsize = (20,4.5))
for idx, image in enumerate(samples):
    ax[idx].imshow(image)
plt.show()

In [None]:
src_dir = "C:/Users/naufal/images"
for folder in ['train','val','test']:
    if folder == 'train':
        for img in (list_img_dir[:30]):
            shutil.copy(os.path.join(src_dir,img), os.path.join(f"C:/Users/naufal/data/{folder}/images",img))
    elif folder == 'val':
        for img in (list_img_dir[30:40]):
            shutil.copy(os.path.join(src_dir,img), os.path.join(f"C:/Users/naufal/data/{folder}/images",img))
    elif folder == 'test':
        for img in (list_img_dir[40:50]):
            shutil.copy(os.path.join(src_dir,img), os.path.join(f"C:/Users/naufal/data/{folder}/images",img))

In [None]:
!labelme

In [None]:
for folder in ['train','val','test']:
    if folder == 'train':
        for img in (list_img_dir[:30]):
            file = img.split('.')[0]+'.json'
            if os.path.exists(os.path.join(src_dir,file)):
                shutil.copy(os.path.join(src_dir,file), os.path.join(f"C:/Users/naufal/data/{folder}/labels",file))
            else : 
                continue
    elif folder == 'val':
        for img in (list_img_dir[30:40]):
            file = img.split('.')[0]+'.json'
            if os.path.exists(os.path.join(src_dir,file)):
                shutil.copy(os.path.join(src_dir,file), os.path.join(f"C:/Users/naufal/data/{folder}/labels",file))
            else:
                continue
    elif folder == 'test':
        for img in (list_img_dir[40:50]):
            file = img.split('.')[0]+'.json'
            if os.path.exists(os.path.join(src_dir,file)):
                shutil.copy(os.path.join(src_dir,file), os.path.join(f"C:/Users/naufal/data/{folder}/labels",file))
            else:
                continue

In [None]:
augmentor = alb.Compose([alb.RandomCrop(width=450, height=450), 
                         alb.HorizontalFlip(p=0.5), 
                         alb.RandomBrightnessContrast(p=0.2),
                         alb.RandomGamma(p=0.2), 
                         alb.RGBShift(p=0.2), 
                         alb.VerticalFlip(p=0.5)], 
                       bbox_params=alb.BboxParams(format='albumentations', 
                                                  label_fields=['class_labels']))

In [None]:
for partition in ['train','test','val']: 
    for image in os.listdir(os.path.join("C:/Users/naufal/data/", partition, 'images')):
        img = cv2.imread(os.path.join("C:/Users/naufal/data/", partition, 'images', image))

        coords = [0.,0.,0.,0.]
        label_path = os.path.join("C:/Users/naufal/data/", partition, 'labels', f'{image.split(".")[0]}.json')
        if os.path.exists(label_path):
            with open(label_path, 'r') as f:
                label = json.load(f)

            coords[0] = label['shapes'][0]['points'][0][0]
            coords[1] = label['shapes'][0]['points'][0][1]
            coords[2] = label['shapes'][0]['points'][1][0]
            coords[3] = label['shapes'][0]['points'][1][1]
            coords = list(np.divide(coords, [640,480,640,480]))

        try: 
            for x in range(100):
                augmented = augmentor(image=img, bboxes=[coords], class_labels=['face'])
                cv2.imwrite(os.path.join("C:/Users/naufal/aug_data", partition, 'images', f'{image.split(".")[0]}.{x}.jpg'), augmented['image'])

                annotation = {}
                annotation['image'] = image

                if os.path.exists(label_path):
                    if len(augmented['bboxes']) == 0: 
                        annotation['bbox'] = [0,0,0,0]
                        annotation['class'] = 0 
                    else: 
                        annotation['bbox'] = augmented['bboxes'][0]
                        annotation['class'] = 1
                else: 
                    annotation['bbox'] = [0,0,0,0]
                    annotation['class'] = 0 


                with open(os.path.join("C:/Users/naufal/aug_data", partition, 'labels', f'{image.split(".")[0]}.{x}.json'), 'w') as f:
                    json.dump(annotation, f)

        except Exception as e:
            print(e)

In [None]:
train_images = tf.data.Dataset.list_files("C:/Users/naufal/aug_data/train/images/*.jpg", shuffle = False)
train_images = train_images.map(img_read)
train_images = train_images.map(lambda x : tf.image.resize(x, (120,120)))
train_images = train_images.map(lambda x : x/255.)

In [None]:
val_images = tf.data.Dataset.list_files("C:/Users/naufal/aug_data/val/images/*.jpg", shuffle = False)
val_images = val_images.map(img_read)
val_images = val_images.map(lambda x : tf.image.resize(x, (120,120)))
val_images = val_images.map(lambda x : x/255.)

In [None]:
test_images = tf.data.Dataset.list_files("C:/Users/naufal/aug_data/test/images/*.jpg", shuffle = False)
test_images = test_images.map(img_read)
test_images = test_images.map(lambda x : tf.image.resize(x, (120,120)))
test_images = test_images.map(lambda x : x/255.)

In [None]:
def load_labels(label_path):
    with open(label_path.numpy(), 'r', encoding = "utf-8") as f:
        label = json.load(f)
        
    return [label['class']], label['bbox']

In [None]:
train_labels = tf.data.Dataset.list_files("C:/Users/naufal/aug_data/train/labels/*.json", shuffle=False)
train_labels = train_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

val_labels = tf.data.Dataset.list_files("C:/Users/naufal/aug_data/val/labels/*.json", shuffle=False)
val_labels = val_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

test_labels = tf.data.Dataset.list_files("C:/Users/naufal/aug_data/test/labels/*.json", shuffle=False)
tesl_labels = test_labels.map(lambda x: tf.py_function(load_labels, [x], [tf.uint8, tf.float16]))

In [None]:
len(train_images), len(train_labels), len(test_images), len(test_labels), len(val_images), len(val_labels)

In [None]:
train = tf.data.Dataset.zip((train_images, train_labels))
train = train.shuffle(1000)
train = train.batch(32)
train = train.prefetch(4)

In [None]:
val = tf.data.Dataset.zip((val_images, val_labels))
val = val.shuffle(500)
val = val.batch(32)
val = val.prefetch(4)

In [None]:
test = tf.data.Dataset.zip((test_images, test_labels))
test = test.shuffle(500)
test = test.batch(32)
test = test.prefetch(4)

In [None]:
data_samples = train.as_numpy_iterator()
res = data_samples.next()

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx in range(4): 
    sample_image = res[0][idx]
    sample_coords = res[1][1][idx]
    
    cv2.rectangle(sample_image, 
                  tuple(np.multiply(sample_coords[:2], [256,256]).astype(int)),
                  tuple(np.multiply(sample_coords[2:], [256,256]).astype(int)), 
                        (255,0,0), 2)

    ax[idx].imshow(sample_image)
plt.show()

In [5]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, Dense, GlobalMaxPooling2D
from tensorflow.keras.applications import VGG16
from tensorflow.keras.models import load_model

In [None]:
def build_model(): 
    input_layer = Input(shape=(120,120,3))
    
    vgg = VGG16(include_top=False)(input_layer)

    # Classification Model  
    f1 = GlobalMaxPooling2D()(vgg)
    class1 = Dense(2048, activation='relu')(f1)
    class2 = Dense(1, activation='sigmoid', name = 'face')(class1)
    
    # Bounding box model
    f2 = GlobalMaxPooling2D()(vgg)
    regress1 = Dense(2048, activation='relu')(f2)
    regress2 = Dense(4, activation='sigmoid', name = 'bbox')(regress1)
    
    facetracker = Model(inputs=input_layer, outputs=[class2, regress2])
    return facetracker

In [None]:
facetracker = build_model()
facetracker.summary()

In [None]:
def localization_loss(y_true, yhat):            
    delta_coord = tf.reduce_sum(tf.square(y_true[:,:2] - yhat[:,:2]))
                  
    h_true = y_true[:,3] - y_true[:,1] 
    w_true = y_true[:,2] - y_true[:,0] 

    h_pred = yhat[:,3] - yhat[:,1] 
    w_pred = yhat[:,2] - yhat[:,0] 
    
    delta_size = tf.reduce_sum(tf.square(w_true - w_pred) + tf.square(h_true-h_pred))
    
    return delta_coord + delta_size

In [None]:
batches_per_epoch = len(train)
lr_decay = (1./0.75 -1)/batches_per_epoch
opt = tf.keras.optimizers.legacy.Adam(learning_rate=0.0001, decay=lr_decay)
classloss = tf.keras.losses.BinaryCrossentropy()
regressloss = localization_loss

In [None]:
print(batches_per_epoch)

In [None]:
class FaceTracker(Model): 
    def __init__(self, eyetracker,  **kwargs): 
        super().__init__(**kwargs)
        self.model = eyetracker

    def compile(self, opt, classloss, localizationloss, **kwargs):
        super().compile(**kwargs)
        self.closs = classloss
        self.lloss = localizationloss
        self.opt = opt
    
    def train_step(self, batch, **kwargs): 
        
        X, y = batch
        
        with tf.GradientTape() as tape: 
            classes, coords = self.model(X, training=True)
            
            batch_classloss = self.closs(y[0], classes)
            batch_localizationloss = self.lloss(tf.cast(y[1], tf.float32), coords)
            
            total_loss = batch_localizationloss+0.5*batch_classloss
            
            grad = tape.gradient(total_loss, self.model.trainable_variables)
        
        opt.apply_gradients(zip(grad, self.model.trainable_variables))
        
        return {"total_loss":total_loss, "class_loss":batch_classloss, "regress_loss":batch_localizationloss}
    
    def test_step(self, batch, **kwargs): 
        X, y = batch
        
        classes, coords = self.model(X, training=False)
        
        batch_classloss = self.closs(y[0], classes)
        batch_localizationloss = self.lloss(tf.cast(y[1], tf.float32), coords)
        total_loss = batch_localizationloss+0.5*batch_classloss
        
        return {"total_loss":total_loss, "class_loss":batch_classloss, "regress_loss":batch_localizationloss}
        
    def call(self, X, **kwargs): 
        return self.model(X, **kwargs)

In [None]:
model = FaceTracker(facetracker)

In [None]:
model.compile(opt, classloss, regressloss)

In [None]:
tensorboard_callback = tf.keras.callbacks.EarlyStopping(monitor='val_total_loss', patience=3)

In [None]:
hist = model.fit(train, epochs=20, validation_data=val, callbacks=[tensorboard_callback])

In [None]:
facetracker.save('facetracker.h5')

In [6]:
facetracker = load_model('facetracker.h5')



In [8]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    _ , frame = cap.read()
    frame = frame[50:500, 50:500,:]
    
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    resized = tf.image.resize(rgb, (120,120))
    
    yhat = facetracker.predict(np.expand_dims(resized/255,0))
    sample_coords = yhat[1][0]
    
    if yhat[0] > 0.5: 
        # Controls the main rectangle
        cv2.rectangle(frame, 
                      tuple(np.multiply(sample_coords[:2], [450,450]).astype(int)),
                      tuple(np.multiply(sample_coords[2:], [450,450]).astype(int)), 
                            (255,0,0), 2)
        # Controls the label rectangle
        cv2.rectangle(frame, 
                      tuple(np.add(np.multiply(sample_coords[:2], [450,450]).astype(int), 
                                    [0,-30])),
                      tuple(np.add(np.multiply(sample_coords[:2], [450,450]).astype(int),
                                    [80,0])), 
                            (255,0,0), -1)
        
        # Controls the text rendered
        cv2.putText(frame, 'face', tuple(np.add(np.multiply(sample_coords[:2], [450,450]).astype(int),
                                               [0,-5])),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
    
    cv2.imshow('EyeTrack', frame)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

































