In [1]:
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import Model
import numpy as np 
import matplotlib.pyplot as plt
import json
import os 
from PIL import Image

In [2]:
tf.__version__

'2.9.0'

In [9]:
AUTO = tf.data.experimental.AUTOTUNE # used in tf.data.Dataset API

TRAINING_FILENAMES = '../datasets/gazetrack_tfrec/train.tfrec'
VALID_FILENAMES = '../datasets/gazetrack_tfrec/val.tfrec'
TEST_FILENAMES = '../datasets/gazetrack_tfrec/test.tfrec'
BATCH_SIZE = 256

SEED = tf.Variable(256)

In [4]:
def parse_tfrecord_fn(example):
    feature_description = {
        "image": tf.io.FixedLenFeature([], tf.string),
        "path": tf.io.FixedLenFeature([], tf.string),
        "device": tf.io.FixedLenFeature([], tf.string),
        "screen_h": tf.io.FixedLenFeature([], tf.int64),
        "screen_w": tf.io.FixedLenFeature([], tf.int64),
        "face_valid": tf.io.FixedLenFeature([], tf.int64),
        "face_x": tf.io.FixedLenFeature([], tf.int64),
        "face_y": tf.io.FixedLenFeature([], tf.int64),
        "face_w": tf.io.FixedLenFeature([], tf.int64),
        "face_h": tf.io.FixedLenFeature([], tf.int64),
        "leye_x": tf.io.FixedLenFeature([], tf.int64),
        "leye_y": tf.io.FixedLenFeature([], tf.int64),
        "leye_w": tf.io.FixedLenFeature([], tf.int64),
        "leye_h": tf.io.FixedLenFeature([], tf.int64),
        "reye_x": tf.io.FixedLenFeature([], tf.int64),
        "reye_y": tf.io.FixedLenFeature([], tf.int64),
        "reye_w": tf.io.FixedLenFeature([], tf.int64),
        "reye_h": tf.io.FixedLenFeature([], tf.int64),
        "dot_xcam": tf.io.FixedLenFeature([], tf.float32),
        "dot_y_cam": tf.io.FixedLenFeature([], tf.float32),
        "dot_x_pix": tf.io.FixedLenFeature([], tf.float32),
        "dot_y_pix": tf.io.FixedLenFeature([], tf.float32),
        "reye_x1": tf.io.FixedLenFeature([], tf.int64),
        "reye_y1": tf.io.FixedLenFeature([], tf.int64),
        "reye_x2": tf.io.FixedLenFeature([], tf.int64),
        "reye_y2": tf.io.FixedLenFeature([], tf.int64),
        "leye_x1": tf.io.FixedLenFeature([], tf.int64),
        "leye_y1": tf.io.FixedLenFeature([], tf.int64),
        "leye_x2": tf.io.FixedLenFeature([], tf.int64),
        "leye_y2": tf.io.FixedLenFeature([], tf.int64),
    }
    example = tf.io.parse_single_example(example, feature_description)
    example["image"] = tf.io.decode_jpeg(example["image"], channels=3)
    return example

In [5]:
def augmentation(image, training = True):
    if training:
        aug = tf.keras.Sequential([
                layers.Resizing(128+10, 128+10),
                layers.RandomCrop(128, 128, 256),
                layers.Rescaling(1./255),
                layers.Normalization(mean = (0.3741, 0.4076, 0.5425), variance = (0.0004, 0.0004, 0.0004))
                ])
        
    else:
        aug = tf.keras.Sequential([
                layers.Resizing(128+10, 128+10),
                layers.Rescaling(1./255),
                layers.Normalization(mean = (0.3741, 0.4076, 0.5425), variance = (0.0004, 0.0004, 0.0004))
                ])
    
    image = aug(image)
    
    return image

In [6]:
def prepare_sample(features):
    image = features['image']
    w = tf.shape(image)[0]
    h = tf.shape(image)[1]
    
    w = tf.cast(w, tf.int64)
    h = tf.cast(h, tf.int64)
    
    screen_w, screen_h = features['screen_w'], features['screen_h']
    
    kps = [features['leye_x1']/w, features['leye_y1']/h, features['leye_x2']/w, features['leye_y2']/h,
           features['reye_x1']/w, features['reye_y1']/h, features['reye_x2']/w, features['reye_y2']/h]
    # kps has type float64
    

    lx, ly, lw, lh = features['leye_x'], features['leye_y'], features['leye_w'], features['leye_h']
    rx, ry, rw, rh = features['reye_x'], features['reye_y'], features['reye_w'], features['reye_h']
    
    # lx, ly, lw, lh = tf.cast((lx, ly, lw, lh), tf.int32)
    # rx, ry, rw, rh = tf.cast((rx, ry, rw, rh), tf.int32)
    
    lx = tf.cast(lx, tf.int32)
    ly = tf.cast(ly, tf.int32)
    lw = tf.cast(lw, tf.int32)
    lh = tf.cast(lh, tf.int32)
    
    rx = tf.cast(rx, tf.int32)
    ry = tf.cast(ry, tf.int32)
    rw = tf.cast(rw, tf.int32)
    rh = tf.cast(rh, tf.int32)
    
    # l_eye = tf.image.crop_to_bounding_box(image, max(0, ly), max(0, lx), max(0, lh), max(0, lw))  
    # r_eye = tf.image.crop_to_bounding_box(image, max(0, ry), max(0, rx), max(0, rh), max(0, rw))
    
    l_eye = tf.image.crop_to_bounding_box(image, ly, lx, lh, lw)  
    r_eye = tf.image.crop_to_bounding_box(image, ry, rx, rh, rw)
    
    l_eye = tf.image.flip_left_right(l_eye)
    
    out = [features['dot_xcam'], features['dot_y_cam']]
    # out has type float32
    
    l_eye = augmentation(l_eye)
    r_eye = augmentation(r_eye)
    
    
    return l_eye, r_eye, kps, out, screen_w, screen_h
    # return l_eye, r_eye, out, screen_w, screen_h

In [7]:
# @tf.function
def get_batched_dataset(filenames, batch_size):
    option_no_order = tf.data.Options()
    option_no_order.deterministic = False  # disable order, increase speed
    
    dataset = (
        tf.data.TFRecordDataset(filenames, num_parallel_reads=AUTO)
        .with_options(option_no_order)
        .map(parse_tfrecord_fn, num_parallel_calls=AUTO)
        .map(prepare_sample, num_parallel_calls=AUTO)
        .shuffle(batch_size*10)
        .batch(batch_size)
        .prefetch(buffer_size=AUTO)
    )
    
    return dataset

In [10]:
train_dataset = get_batched_dataset(TRAINING_FILENAMES, BATCH_SIZE)
valid_dataset = get_batched_dataset(VALID_FILENAMES, BATCH_SIZE)
test_dataset = get_batched_dataset(TEST_FILENAMES, BATCH_SIZE)

train_len = sum(1 for _ in tf.data.TFRecordDataset(TRAINING_FILENAMES))
val_len = sum(1 for _ in tf.data.TFRecordDataset(VALID_FILENAMES))
test_len = sum(1 for _ in tf.data.TFRecordDataset(TEST_FILENAMES))

print(f"No. of train samples: {train_len}")
print(f"No. of val samples: {val_len}")
print(f"No. of test samples: {test_len}")

No. of train samples: 398654
No. of val samples: 43458
No. of test samples: 59563
