In [1]:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from google.colab import drive
drive.mount('/content/drive')
!pip install mediapipe
import mediapipe as mp
import os
import json

Mounted at /content/drive
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting mediapipe
  Downloading mediapipe-0.8.10.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (32.9 MB)
[K     |████████████████████████████████| 32.9 MB 1.5 MB/s 
Installing collected packages: mediapipe
Successfully installed mediapipe-0.8.10.1




# Load Trained VAE Model

In [2]:
vae_path = '/content/drive/Shareddrives/URSI 2022/Eye Tracking ML/vae_encoder/vae_2022-07-20_15:24:39'
vae_encoder = tf.keras.models.load_model(vae_path)

vae_encoder.summary()

Model: "model_5"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 vae_encoder_input (InputLay  [(None, 68, 3)]          0         
 er)                                                             
                                                                 
 vae_flatten (Flatten)       (None, 204)               0         
                                                                 
 vae_dense_1 (Dense)         (None, 200)               41000     
                                                                 
 vae_dense_2 (Dense)         (None, 100)               20100     
                                                                 
 vae_dense_3 (Dense)         (None, 50)                5050      
                                                                 
 z_mean (Dense)              (None, 6)                 306       
                                                           

# Load MediaPipe model to get the set of mesh points

In [3]:
mp_face_mesh = mp.solutions.face_mesh

left_eye_point = set(sum(mp_face_mesh.FACEMESH_LEFT_EYE, ()))
right_eye_point = set(sum(mp_face_mesh.FACEMESH_RIGHT_EYE, ()))
left_iris_point = set(sum(mp_face_mesh.FACEMESH_LEFT_IRIS, ()))
right_iris_point = set(sum(mp_face_mesh.FACEMESH_RIGHT_IRIS, ()))

face_oval_point = set(sum(mp_face_mesh.FACEMESH_FACE_OVAL, ()))

#keypoints = left_eye_point.union(right_eye_point).union(left_iris_point).union(right_iris_point)

keypoints = left_eye_point.union(right_eye_point).union(face_oval_point)

keypoints = sorted(list(keypoints))


# Load in one JSON file as an example

In [4]:
json_path = '/content/drive/Shareddrives/URSI 2022/Eye Tracking ML/json/'
all_json_files = os.listdir(json_path)

with open(json_path + 'fwkruums.json', 'r') as file:
    json_data = json.load(file)

# Helper Functions


In [5]:
import statistics
import math

# Return coefficients a, b that represent the straight line 
# constructed by the given points pt1, pt2 (y = ax + b)
def get_line(pt1, pt2):
  x1, y1 = pt1
  x2, y2 = pt2
  a = (y2 - y1) / (x2 - x1)
  b = y1 - (a * x1)
  return [a, b]

# Return the coordinate of intersection of two straight lines
# l1, l2 in terms of [x, y]
def get_intersection(l1, l2):
  a1, b1 = l1
  a2, b2 = l2
  x = (b2 - b1) / (a1 - a2)
  y = a1 * x + b1
  return [x, y]

# Return the angle that a vector needs to rotate counter-clockwisely
# in order to point at the same direction as the x-axis
def get_ccw_angle(vector):
  x, y = vector
  tan = y / x
  r = math.atan(tan)
  if x >= 0 and y > 0:
    pass
  elif x < 0 and y >= 0:
    r = r + math.pi
  elif x <= 0 and y < 0:
    r = r + math.pi
  elif x > 0 and y <= 0:
    r = r + 2 * math.pi
  else:
    r = 0
  return r

# Given a list of 4 landmarks on the face mesh, construct a new coordinate
# system relative to the face, where the first two points from the list
# determine the x-axis and its intersection with the line constructed by
# the last two points is the origin of the new coordinate system. Return
# origin, rad. rad represents the angle the x-axis of the new coordinate
# system needs to rotate counter-clockwisely in order to point at the same
# direction as the x-axis of the coordinate system of the entire screen.
# The 2 return values serve to calculate the normalized iris features
def get_face_plane(points3d):
  points2d = []
  for point3d in points3d:
    x, y, z = point3d
    points2d.append([x, y])
  pt1, pt2, pt3, pt4 = points2d
  xaxis = get_line(pt1, pt2)
  yaxis = get_line(pt3, pt4)
  origin = get_intersection(xaxis, yaxis)
  v = []
  for a, b in zip(pt1, pt2):
    v.append(b - a)
  rad = 2 * math.pi - get_ccw_angle(v)
  return origin, rad

# Return a set of normalized iris features relative to the face coordinate
# system given the original iris features, origin and the counter-clockwise
# angle of the face coordinate system relative to the entire screen
def rotate(origin, points, angle):
  ox, oy = origin
  normalized_points = []
  for point in points:
    px, py, pz = point
    qx = ox + math.cos(angle) * (px - ox) - math.sin(angle) * (py - oy)
    qy = oy + math.sin(angle) * (px - ox) + math.cos(angle) * (py - oy)
    nx = qx - ox
    ny = qy - oy
    normalized_points.append([nx, ny])
  return normalized_points

# Define indices of corresponding features in the 478 features list
irises = [469, 470, 471, 472, 474, 475, 476, 477]
face_cross = [226, 446, 9, 195]

# Return the 6-dimensional face representation and the normalized iris features
# given a list of subject videos
def predict_and_normalize(videos):
  face_frames = []
  normalized_irises = []
  for video in videos:
    frames = video["features"]
    for frame in frames:
      face_frame = [frame[i] for i in keypoints]
      face_frames.append(face_frame)
      irises_data = [frame[i] for i in irises]
      o, r = get_face_plane([frame[i] for i in face_cross])
      normalized_data = rotate(o, irises_data, r)
      normalized_irises.append(normalized_data)
  # The latent features are eventually converted because vae_encoder.predict()
  # only supports a 3D tensor as the input
  latent_features = vae_encoder.predict(face_frames)
  return latent_features, normalized_irises

# Extract the facemesh features and rewrite them as latent features + normalized iris features

In [6]:
# Extract a list of videos under an example subject. Note: json_data is already
# the content of the json file 'fwkruums'. This is just a shortcut to extract
# the value of the only key-value pair in the file

subject_data = json_data['fwkruums']
print('There are ' + str(len(subject_data)) + ' videos for this subject')

latent_features, normalized_irises = predict_and_normalize(subject_data)

print(tf.shape(latent_features))
print(tf.shape(normalized_irises))

# latent_features (3605 x 6) and normalized_irises (3605 x 8 x 2) are two 
# separate lists. We may want to merge them together for convenience so that
# every element in aggregate_features contains all crucial information of the
# face in one frame

aggregate_features = []

for a, b in zip(latent_features, normalized_irises):
  aggregate_features.append([a, b])

# We want to make a copy of subject_data and replace the 'features' content
# of every video with information of the latent features and normalized irises.
# We want to put these info back to the dictionary because eventually we need
# to sort the input for the incoming deep learning model according to the 
# 'phase' and the 'block' attributes

# This counter records number of frames processed.
# It updates per video processed
frames_counter = 0

subject_data_copy = subject_data.copy()

# Loop through videos
for video in subject_data_copy:
  # Check number of frames of the video
  frames_num = len(video['features'])
  # Index of the first element we want from aggregate_features
  head = frames_counter
  # Index of the first element we want from aggregate_features FOR THE NEXT VIDEO
  tail = head + frames_num
  # Rewrite the 'features' attribute
  video['features'] = [aggregate_features[i] for i in range(head, tail)]
  # Update counter
  frames_counter = tail

There are 129 videos for this subject
tf.Tensor([3605    6], shape=(2,), dtype=int32)
tf.Tensor([3605    8    2], shape=(3,), dtype=int32)


# Extract the 90x14x(4+8x2) input tensor for the upcoming model

In [9]:
import random

# Return a list of videos with the given 'block' attribute
def get_block_data(block_num):
  block_data = []
  for video in subject_data_copy:
    if video['block'] == block_num:
      block_data.append(video)
  return block_data

# Return 2 lists of videos for calibration and test, respectively,
# given a list of videos share the same 'block' attribute
def get_ct_data(vlst):
  calibration_data = []
  test_data = []
  for video in vlst:
    if video['phase'] == 'calibration':
      calibration_data.append(video)
    else:
      test_data.append(video)
  return calibration_data, test_data

# Get the calibration data of a block of videos as a reference
block_zero = get_block_data('0')
c_zero, t_zero = get_ct_data(block_zero)
calibration_pts = []
print(calibration_pts)
for video in c_zero:
  calibration_pts.append([video['x'], video['y']])

print(calibration_pts)

# Sort calibration data respective to the reference order
def sort_calibration(c_data):
  sorted_data = []
  for pt in calibration_pts:
    x = pt[0]
    y = pt[1]
    for video in c_data:
      if video['x'] == x and video['y'] == y:
        sorted_data.append(video)
      break
  return sorted_data


# Inputs list for the deep learning model
inputs = []
# Targets list corresponding to the inputs list
targets = []

# Loop through blocks
for i in ['0', '1', '2']:
  block_data = get_block_data(i)
  c_data, t_data = get_ct_data(block_data)
  c_data = sort_calibration(c_data)
  # Loop through test videos first. The targets depend on the test videos NOT the calibration videos
  for t_video in t_data:
    # Declare an individual input list. There should be eventually 14 elements in it: 1 test video, all 13 calibration videos
    input = []
    t_frames = t_video['features']
    # Randomly frame selection
    input.append(random.choice(t_frames))
    # Note down the target gaze coordinate
    target = [int(t_video['x']), int(t_video['y'])]
    # For every test video, append all calibration videos info
    for c_video in c_data:
      c_frames = c_video['features']
      input.append(random.choice(c_frames))
    inputs.append(input)
    targets.append(target)


[]
[['10', '50'], ['10', '10'], ['90', '10'], ['50', '90'], ['30', '70'], ['50', '50'], ['50', '10'], ['90', '90'], ['70', '70'], ['70', '30'], ['10', '90'], ['90', '50'], ['30', '30']]
