In [1]:
import os
import tensorflow as tf
import cv2
import json
import numpy as np
from matplotlib import pyplot as plt
import time
import uuid
import albumentations as alb
from keras.models import Sequential, Model
from keras.layers import Input, Conv2D, Reshape, Dropout
from keras.applications import ResNet152V2
from keras.models import load_model

# Creating preparing and augmenting data

In [2]:
# Path where images will be saved and how many of them
IMAGES_PATH = os.path.join('data','images')
number_of_images = 30

In [4]:
# Function to take the photos
def take_photos(number_of_images):
    capture =cv2.VideoCapture(0) # Choose the device
    for n in range(number_of_images):
        print("Getting image {}".format(n))
        ret,frame = capture.read()
        image_name = os.path.join(IMAGES_PATH,f'{str(uuid.uuid1())}.jpg')
        cv2.imwrite(image_name, frame)
        cv2.imshow('frame',frame)
        time.sleep(0.5)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    capture.release()
    cv2.destroyAllWindows()

In [None]:
take_photos(number_of_images)

In [None]:
# Launch the app to label the data
!labelme

In [3]:
# Reading images
def  load_image(path):
    byte_image = tf.io.read_file(path)
    image = tf.io.decode_jpeg(byte_image)
    return image

In [None]:
images = tf.data.Dataset.list_files('data\\images\\*.jpg',shuffle=False)
images = images.map(load_image)

In [None]:
# Viewing example images
image_generator = images.batch(4).as_numpy_iterator()
plot_images = image_generator.next()

fig,ax = plt.subplots(ncols=4, figsize=(20,20))
for idx, image in enumerate(plot_images):
    ax[idx].imshow(image)
plt.show()

In [5]:
# Calculating how many images for train, test and val
print({'Test': 3 * number_of_images  *.7}) 
print({'Train': 3 * number_of_images  *.15})
print({'Val': 3 * number_of_images  *.15})  

{'Test': 62.99999999999999}
{'Train': 13.5}
{'Val': 13.5}


In [7]:
# Moving labels after manually moving images to set folders
for folder in ['train','test','val']:
    for file in os.listdir(os.path.join('data',folder, 'images')):

        name = file.split('.')[0]+'.json'
        existing_filepath = os.path.join('data','labels', name)
        
        if os.path.exists(existing_filepath):
            new_filepath = os.path.join('data',folder,'labels',name)
            os.replace(existing_filepath, new_filepath)

In [6]:
# Creating augmentor for data augmentation
augmentor = alb.Compose([alb.RandomCrop(width=450,height=450),
                         alb.HorizontalFlip(p=0.5),
                         alb.RandomBrightnessContrast(p=0.3),
                         alb.RandomGamma(p=0.3),
                         alb.RGBShift(p=0.3),
                         alb.VerticalFlip(p=0.5)],
                         keypoint_params=alb.KeypointParams(format='xy', 
                                                    label_fields=['class_labels']))

In [29]:
for partition in ['train', 'test', 'val']: 
    for file in os.listdir(os.path.join('data', partition, 'images')):
        image = cv2.imread(os.path.join('data', partition, 'images', file))

        classes = [0,0]
        coordinates = [0,0,0.00001,0.00001]
        label_path = os.path.join('data', partition, 'labels', f'{file.split(".")[0]}.json')
        if os.path.exists(label_path):
            with open(label_path, 'r') as f:
                label = json.load(f)
    
            if label['shapes'][0]['label']=='LeftEye': 
                classes[0] = 1
                coordinates[0] = np.squeeze(label['shapes'][0]['points'])[0]
                coordinates[1] = np.squeeze(label['shapes'][0]['points'])[1]

            if label['shapes'][0]['label']=='RightEye':
                classes[1] = 1
                coordinates[2] = np.squeeze(label['shapes'][0]['points'])[0]
                coordinates[3] = np.squeeze(label['shapes'][0]['points'])[1]

            if len(label['shapes']) > 1:     
                if label['shapes'][1]['label'] =='LeftEye': 
                    classes[0] = 1 
                    coordinates[0] = np.squeeze(label['shapes'][1]['points'])[0]
                    coordinates[1] = np.squeeze(label['shapes'][1]['points'])[1]

                if label['shapes'][1]['label'] =='RightEye': 
                    classes[1] = 1
                    coordinates[2] = np.squeeze(label['shapes'][1]['points'])[0]
                    coordinates[3] = np.squeeze(label['shapes'][1]['points'])[1]
            
            np.divide(coordinates, [640,480,640,480])
                
        try: 
            for x in range(120):
                keypoints = [(coordinates[:2]), (coordinates[2:])]
                augmented = augmentor(image=image, keypoints=keypoints, class_labels=['LeftEye','RightEye'])
                cv2.imwrite(os.path.join('aug_data', partition, 'images', f'{file.split(".")[0]}.{x}.jpg'), augmented['image'])

                annotation = {}
                annotation['image'] = file
                annotation['class'] = [0,0]
                annotation['keypoints'] = [0,0,0,0]

                if os.path.exists(label_path):
                    if len(augmented['keypoints']) > 0: 
                        for id, cl in enumerate(augmented['class_labels']):
                            if cl == 'LeftEye': 
                                annotation['class'][0] = 1 
                                annotation['keypoints'][0] = augmented['keypoints'][id][0]
                                annotation['keypoints'][1] = augmented['keypoints'][id][1]
                            if cl == 'RightEye': 
                                annotation['class'][1] = 1 
                                annotation['keypoints'][2] = augmented['keypoints'][id][0]
                                annotation['keypoints'][3] = augmented['keypoints'][id][1]
                                
                annotation['keypoints'] = list(np.divide(annotation['keypoints'], [450,450,450,450]))


                with open(os.path.join('aug_data', partition, 'labels', f'{file.split(".")[0]}.{x}.json'), 'w') as f:
                    json.dump(annotation, f)

        except Exception as e:
            print(e)

In [4]:
# Resizing and normalizing images
def map_and_load_images(path):
    image_set = tf.data.Dataset.list_files(path, shuffle=False)
    image_set = image_set.map(load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    image_set = image_set.map(lambda x: tf.image.resize(x, (250,250)), num_parallel_calls=tf.data.experimental.AUTOTUNE)
    image_set = image_set.map(lambda x: x/255, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    return image_set

In [5]:
def load_labels(label_path):
    with open(label_path.numpy(), 'r', encoding="utf-8") as f:
        label = json.load(f)
    return [label['keypoints']]

In [6]:
# Loading labels
def map_and_load_labels(path):
    label_set = tf.data.Dataset.list_files(path, shuffle=False)
    label_set = label_set.map(lambda x: tf.py_function(load_labels, [x], [tf.float16]))
    return label_set

In [7]:
# Creating sets
def create_set(images, labels, n_shuffle):
    set = tf.data.Dataset.zip((images, labels))
    set = set.shuffle(n_shuffle)
    set = set.batch(16, drop_remainder=True)
    set = set.prefetch(4)
    return set

In [8]:
train_images = map_and_load_images('aug_data\\train\\images\\*.jpg')
test_images = map_and_load_images('aug_data\\test\\images\\*.jpg')
val_images = map_and_load_images('aug_data\\val\\images\\*.jpg')

In [9]:
train_labels = map_and_load_labels('aug_data\\train\\labels\\*.json')
test_labels = map_and_load_labels('aug_data\\test\\labels\\*.json')
val_labels = map_and_load_labels('aug_data\\val\\labels\\*.json')

In [10]:
train = create_set(train_images, train_labels, 7800)
test = create_set(test_images, test_labels, 1800)
val = create_set(val_images, val_labels, 1800)

In [12]:
data_samples = train.as_numpy_iterator()

In [13]:
res = data_samples.next()

In [85]:
res[0].shape

(16, 250, 250, 3)

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for idx in range(4): 
    sample_image = res[0][idx].copy()
    sample_coords = res[1][0][idx]
    
    cv2.circle(sample_image, tuple(np.multiply(sample_coords[:2], [250,250]).astype(int)), 2, (255,0,0), -1)
    cv2.circle(sample_image, tuple(np.multiply(sample_coords[2:], [250,250]).astype(int)), 2, (255,0,0), -1)
    
    ax[idx].imshow(sample_image)

# Creating, building and training the model

In [34]:
iristracker_model = Sequential([
    Input(shape=(250,250,3)), 
    ResNet152V2(include_top=False, input_shape=(250,250,3)),
    Conv2D(512, 3, padding='same', activation='relu'),
    Conv2D(512, 3, padding='same', activation='relu'),
    Conv2D(256, 3, 2, padding='same', activation='relu'),
    Conv2D(256, 2, 2, activation='relu'),
    Dropout(0.05),
    Conv2D(4, 2, 2),
    Reshape((4,))
])

In [75]:
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
loss = tf.keras.losses.MeanSquaredError()

In [None]:
iristracker_model.compile(optimizer, loss)

In [76]:
# Do not run this cell, only for testing purposes

class Iristracker(Model): 
    def __init__(self, iristracker_model,  **kwargs): 
        super().__init__(**kwargs)
        self.model = iristracker_model

    def compile(self, optimizer, custom_mean_squared_error, **kwargs):
        super().compile(**kwargs)
        self.custom_mean_squared_error = custom_mean_squared_error
        self.optimizer = optimizer
    

    @tf.function
    def train_step(self, batch, **kwargs): 
        
        X, y = batch
        y[0].set_shape((16, 4)) # Required to run

 
        with tf.GradientTape() as tape: 
            coordinates = self.model(X, training=True)

            loss = self.custom_mean_squared_error(tf.cast(y[0], tf.float32), coordinates) 
            
            grad = tape.gradient(loss, self.model.trainable_variables)
        
        optimizer.apply_gradients(zip(grad, self.model.trainable_variables))
        
        return {"loss":loss}
    
    @tf.function
    def test_step(self, batch, **kwargs): 
        X, y = batch
        y[0].set_shape((16, 4))

        coordinates = self.model(X, training=False)
        

        loss = self.custom_mean_squared_error(tf.cast(y[0], tf.float32), coordinates)

        
        return {"total_loss":loss}
        
    def call(self, X, **kwargs): 
        return self.model(X, **kwargs)
model = Iristracker(iristracker_model)
model.compile(optimizer,loss)

In [81]:
hist = model.fit(train, epochs=2, validation_data=val)

Epoch 1/2
[1m472/472[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2779s[0m 6s/step - loss: 9.4979 - val_loss: 0.0000e+00
Epoch 2/2
[1m472/472[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2779s[0m 6s/step - loss: 6.3614 - val_loss: 0.0000e+00


In [None]:
plt.plot(hist.history['loss'], color='green', label='loss')
plt.plot(hist.history['val_loss'], color='blue', label='val loss')
plt.suptitle('Loss')
plt.legend()
plt.show()

# Testing the model

In [11]:
test_data = test.as_numpy_iterator()

In [None]:
test_sample = test_data.next()

In [18]:
yhat = model.predict(test_sample[0])

In [None]:
fig, ax = plt.subplots(ncols=4, figsize=(20,20))
for id in range(4): 
    sample_image = test_sample[0][id]
    sample_coords = yhat[id]
    
    cv2.circle(sample_image, tuple(np.multiply(sample_coords[:2], [250,250]).astype(int)), 2, (255,0,0), -1)
    cv2.circle(sample_image, tuple(np.multiply(sample_coords[2:], [250,250]).astype(int)), 2, (0,255,0), -1)
    
    ax[id].imshow(sample_image)

In [None]:
model.save('iristracker_resnet_v1.0.h5')

In [None]:
cap = cv2.VideoCapture(0)
while cap.isOpened():
    _ , frame = cap.read()
    
    frame = frame[50:500,50:500,:] 
    rgb_img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    resized = cv2.resize(rgb_img, (250,250))
    
    yhat = model.predict(np.expand_dims(resized/255,0))
    sample_coords = yhat[0,:4]
    
    cv2.circle(frame, tuple(np.multiply(sample_coords[:2], [450,450]).astype(int)), 2, (255,0,0), -1)
    cv2.circle(frame, tuple(np.multiply(sample_coords[2:], [450,450]).astype(int)), 2, (0,255,0), -1)
    
    cv2.imshow('EyeTrack', frame)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()