# CS153 Computer Vision Eye-Tracking Dataset RF Model

## Setting Up Environment

### Downloading Necessary Dependencies

Only run the following cell the first time you run the notebook. It's just to download the necessary dependencies for the models contained in this file

In [None]:
# Update pip
!python -m pip install -U pip
# Install scikit-image
!python -m pip install -U scikit-image
!pip install -U scikit-learn
!python -m pip install openpyxl==3.0
!pip install -q DataSynthesizer
!pip install optuna
# Current stable release for CPU and GPU
!pip install tensorflow
!pip install torch torchvision torchaudio
!pip3 install opencv-python
!pip install dlib
!pip install face_recognition
!pip install deepface
!pip install imutils

To create the tree structure required for the machine learning models to run, the following code is provided for file hiearchy purposes

In [None]:
# Create Necessary Parent Directories
output_data = './output_data'
open_cv_figures = './open_cv_figures'

try: 
    os.mkdir(output_data)
except OSError as error: 
    print(error)  

In [None]:
try: 
    os.mkdir(open_cv_figures) 
except OSError as error: 
    print(error)  

### Pandas Data Cleaning

Performing version control on packages so that the Conda environment does not run into dependency issues

In [None]:
from deepface import DeepFace

import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
%matplotlib inline

import numpy as np
import optuna
import os
import pandas as pd
from pathlib import Path
from PIL import Image

import seaborn as sns
import skimage as skim
from skimage import transform as sktsf
import sklearn
from sklearn import tree
from sklearn import ensemble
from sklearn.impute import SimpleImputer
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.model_selection import StratifiedKFold, train_test_split, cross_val_score
import sys
sys.path.append('models')

import tensorflow as tf
import torch as t
from torchvision import transforms as tvtsf

# import the necessary packages
from imutils.video import VideoStream
import argparse
import imutils
import time
import cv2

!python --version
print("Matplotlib: ", mpl.__version__)
print("Numpy: ", np.__version__)
print("Optuna: ", optuna.__version__)
print("pandas: ", pd.__version__)
print("PIL: ", Image.__version__)
print("Seaborn: ", sns.__version__)
print("Scikit-image: ", skim.__version__)
print("Scikit-learn: ", sklearn.__version__)
print("TensorFlow: ", tf.__version__)
print("PyTorch: ", t.__version__)

In [None]:
tf.test.gpu_device_name()

## Dataset We're Using

#### Gaze Data for the Analysis of Attention in Feature Films
The data set contains gaze information in response to carefully curated film clips

which have been augmented via the annotation of selected high-level features. This includes:
- 15 film clips, of duration 1 – 4 minutes each, selected to be representative of a variety of genres,
temporal pacing, and visual styles
- Recorded gaze behavior for 21 viewers watching those clips
- Frame by frame annotations of high-level cinematographic features

This data set is easily extendable, either with additional hand annotations or with existing
methods from machine vision. The following sections describe the selection of candidate films, the
identification of clips from within these films, and the high-level image features that have been
annotated.

In [None]:
# Convert the txt space separated text files into csv files so that we can read them in as Pandas dataframes
import csv
with open("./input_data/all_gazepoints_max_invalid_per_subject10%.txt") as fin, open("./input_data/all_gazepoints_max_invalid_per_subject10%.csv", "w") as fout:
    o=csv.writer(fout)
    for line in fin:
        o.writerow(line.split())

with open("./input_data/features_by_frame_all_clips.txt") as fin, open("./input_data/features_by_frame_all_clips.csv", "w") as fout:
    o=csv.writer(fout)
    for line in fin:
        o.writerow(line.split())

## Data Preparation

In [None]:
# Convert gazepoint data into a Pandas dataframe
gazepoint_df = pd.read_csv("./input_data/all_gazepoints_max_invalid_per_subject10%.csv",encoding='utf-8')
gazepoint_df.head()

In [None]:
# Convert feature-by-frame data into a Pandas dataframe
feature_df = pd.read_csv("./input_data/features_by_frame_all_clips.csv",encoding='utf-8')
feature_df.head()

In [None]:
# First, let us remove all columns from both of the dataframes that are not relevant to our research
gazepoint_df = gazepoint_df.drop(columns=["subject", "eyetracker_valid", "in_frame", "subject_valid_for_clip"])
feature_df.drop(feature_df.columns.difference(['frame_num','shot_num', "face", "faces", "film"]), 1, inplace=True)
gazepoint_df.head()

In [None]:
# Let us remove all entries in the feature_df that do not contain at least one face
mask = (feature_df.face==1) | (feature_df.faces==1)
feature_df = feature_df[mask]
feature_df.head()

Similar to creating the necessary file hiearchy before, use the following code to create sub-directories

In [None]:
# Create Necessary Sub Directories
films = feature_df.film.unique()
root_output_data_path = "./output_data"
root_open_cv_figures_path = "./open_cv_figures/"
for film in films:
    try: 
        path1 = os.path.join(root_output_data_path, film)
        path2 = os.path.join(root_open_cv_figures_path, film)
        os.mkdir(path1) 
        os.mkdir(path2) 
    except OSError as error: 
        print(error)  

Now, we know the frame_nums for each movie where a face appears. For each movie, let's go through the frame images so that for each frame_num, we append the corresponding filePaths to the feature_df dataframe

In [None]:
import shutil
def map_coordiantes(image):
    file_name = Path(image).stem
    try:
        frame_num = int(file_name[file_name.rindex('_')+1:])
        return frame_num
    except ValueError as e:
        print(f"The folder contains a non-image: {e}")
# go through the clips in frame and create a dictionary where the key = frame_num, and value = filePath
full_movie_dict = {}
for movie in os.listdir("./frames"):
    frame_num_dict = {}
    for image_filePath in os.listdir(f"./frames/{movie}"):
        frame_num = map_coordiantes(image_filePath)
        frame_num_dict[frame_num] = image_filePath
    full_movie_dict[movie] = frame_num_dict

feature_df = feature_df.reset_index()
for index, row in feature_df.iterrows():
    film = row['film']
    df_frame_num = row['frame_num']
    if os.path.isdir(f"./frames/{film}"):
        if df_frame_num in full_movie_dict[film].keys():
            corr_filePath = f"./frames/{film}/{full_movie_dict[film][df_frame_num]}"
            feature_df["file_path"] = corr_filePath
            shutil.copy(corr_filePath, f"./output_data/{film}")
    else:
        feature_df["file_path"] = None

feature_df.head()

## Deep Face Recognition Model

Although the deepFace library comes with the most advanced face detection models, it is only able to generate a similarity function between the target image and the rest of the images in a database as shown below in the example using my face and the film frames from the movie Argo.

In [None]:
models = ["VGG-Face", "Facenet", "Facenet512", "OpenFace", "DeepFace", "DeepID", "ArcFace", "Dlib"]
backends = ['opencv', 'ssd', 'dlib', 'mtcnn', 'retinaface', 'mediapipe']
deepFace = DeepFace.find(img_path = "../Profilepicture.jpg", db_path = "./frames/argo/", enforce_detection = False, model_name = models[2], detector_backend=backends[0])

In [None]:
deepFace.head()

## OpenCV Face Recognition

A horrible, non-trained face detection model that only recognizes faces for high-quality pictures such as the example image of my face below. This model does not work for movie frames as the quality of the images is not high enough to detect faces

### Initial Model With Horrible AS

In [None]:
# Let's look at some of the frames
def plt_img_by_title(title):
    _, _, files = next(os.walk(f'./frames/{title}/'))
    file_num = len(files)
    for i in range(1, file_num, 500):
        n = 5 - len(str(i))
        img = mpimg.imread(f'./frames/{title}/{title}_{"0"*n+str(i)}.png')
        imgplot = plt.imshow(img)
        plt.show()

plt_img_by_title('argo')

Initialize a FaceDetector class that employs cv2's CascadeClassifier() to detect rectange coordinates for faces for a given image

In [None]:
import cv2
class FaceDetector():

    def __init__(self,faceCascadePath):
        self.faceCascade=cv2.CascadeClassifier(faceCascadePath)


    def detect(self, image, scaleFactor=1.1,
               minNeighbors=5,
               minSize=(5,5)):
        
        #function return rectangle coordinates of faces for given image
        rects=self.faceCascade.detectMultiScale(image,
                                                scaleFactor=scaleFactor,
                                                minNeighbors=minNeighbors,
                                                minSize=minSize)
        return rects

In [None]:
#Frontal face of haar cascade loaded
frontal_cascade_path="/Users/staniya/Documents/HMC/SPRING2022/CS153_Computer_Vision/Final Project/haarcascade_frontalface_default.xml"

#Detector object created
fd=FaceDetector(frontal_cascade_path)

Functions for reading and saving images using cv2

In [None]:
import numpy as np
def get_my_image(image):
    image = cv2.imread(image)
    return np.copy(image)

def save_image(film, image, image_name):
    plt.figure(figsize=(18,15))
    #Before showing image, bgr color order transformed to rgb order
    plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    plt.xticks([])
    plt.yticks([])
    plt.savefig(f"./open_cv_figures/{film}/{image_name}")

In [None]:
def detect_face(image, scaleFactor, minNeighbors, minSize, film, image_name):
    # face will detected in gray image
    image_gray=cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

    faces=fd.detect(image_gray,
                   scaleFactor=scaleFactor,
                   minNeighbors=minNeighbors,
                   minSize=minSize)

    for x, y, w, h in faces:
        #detected faces shown in color image
        cv2.rectangle(image,(x,y),(x+w, y+h),(127, 255,0),3)

    save_image(film, image, image_name)

In [None]:
# feature_df = feature_df.reset_index()
# for index, row in feature_df.iterrows():
#     film = row['film']
#     file_path = row['file_path']
#     if file_path is not None:
#         image_name = os.path.basename(file_path)
#         detect_face(image=get_my_image(file_path), 
#                     scaleFactor=1.9, 
#                     minNeighbors=3, 
#                     minSize=(30,30),
#                     film=film,
#                     image_name=image_name)

### A More Sophisticated Model That Uses cv2.CascadeClassifier()

The following cell has code structured so that for each of the films stored in the frames directory (assuming the above cells have ran properly and a Pandas dataframe has been created), apply the face detection algorithm and store the resulting file in open_cv_figures

In [None]:
feature_df = feature_df.reset_index()
for index, row in feature_df.iterrows():
    file_path = row['file_path']
    if file_path is not None:
        # load the haar cascade face detector from
        detector = cv2.CascadeClassifier("../haarcascade_frontalface_default.xml")
        # load the input image from disk, resize it, and convert it to
        # grayscale
        image = cv2.imread(file_path)
        image = imutils.resize(image, width=500)
        film_name = Path(file_path).parts[1]
        image_name = Path(file_path).parts[2]
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

        # detect faces in the input image using the haar cascade face
        # detector

        rects = detector.detectMultiScale(gray, scaleFactor=1.05,
                                        minNeighbors=7, minSize=(30, 30),
                                        flags=cv2.CASCADE_SCALE_IMAGE)
        # loop over the bounding boxes
        for (x, y, w, h) in rects:
            # draw the face bounding box on the image
            cv2.rectangle(image, (x, y), (x + w, y + h), (0, 255, 0), 2)
        # show the output image
        cv2.imwrite(f"./open_cv_figures/{film}/{file_path}", image)


## FCHD Image Manipulation Classes

Functions necesary to preproces the image data so that they can be scaled and rendered by PyTorch 

In [None]:
def inverse_normalize(img):
    if opt.caffe_pretrain:
        img = img + (np.array([122.7717, 115.9465, 102.9801]).reshape(3, 1, 1))
        return img[::-1, :, :]
    return (img * 0.225 + 0.45).clip(min=0, max=1) * 255

def pytorch_normalze(img):
    normalize = tvtsf.Normalize(mean=[0.485, 0.456, 0.406],
                                std=[0.229, 0.224, 0.225])
    img = normalize(t.from_numpy(img))
    return img.numpy()

def caffe_normalize(img):
    img = img[[2, 1, 0], :, :]  # RGB-BGR
    img = img * 255
    mean = np.array([122.7717, 115.9465, 102.9801]).reshape(3, 1, 1)
    img = (img - mean).astype(np.float32, copy=True)
    return img

def preprocess(img, min_size=600, max_size=1000):   
    """ Function to preprocess the input image. 
    
    Scales the input image in such a manner that the shorter side of the 
    image is converted to the size equal to min_size. 
    Also normalizes the input image. 

    Args: 
        img: Input image that is to be preprocessed. 
        min_size: size to which the smaller side of the image is to be 
                    converted. 
        max_size: size to which the larger side of the image is to be 
                    converted. 
    """
    C, H, W = img.shape
    scale1 = min_size / min(H, W)
    scale2 = max_size / max(H, W)
    scale = min(scale1, scale2)
    img = img / 255.
    img = sktsf.resize(img, (C, H * scale, W * scale), mode='reflect')
    # both the longer and shorter should be less than
    # max_size and min_size
    if opt.caffe_pretrain:
        normalize = caffe_normalize
    else:
        normalize = pytorch_normalze
    return normalize(img)

A class function that processes the frame images such that they can serve as inputs for the FCHD model

In [None]:
class EyeGazeDataset:
    
    def __init__(self, dl):
        self.datalist = dl
    
    def get_example(self, idx):
        data_obj = self.datalist[idx]
        img_path = data_obj.path
        n_boxs = data_obj.n_boxs
        bboxs = data_obj.bboxs
        print(bboxs)
        img, scale_w, scale_h = self.read_image(img_path)
        # scale_bboxs = []
        for i in range(n_boxs):
            # ymin,xmin,ymax,xmax = bboxs[i,:]
            # ymin = bbox[0]*scale_h
            # bbox[1] = bbox[1]*scale_w
            # bbox[2] = bbox[2]*scale_h
            # bbox[3] = bbox[3]*scale_w
            
            bboxs[i,0] = bboxs[i,0]*scale_h
            bboxs[i,1] = bboxs[i,1]*scale_w
            bboxs[i,2] = bboxs[i,2]*scale_h
            bboxs[i,3] = bboxs[i,3]*scale_w

            # scale_bboxs.append(bbox)
        return img, bboxs, n_boxs      

    def read_image(self, path, dtype=np.float32):
        f = Image.open(path)
        # w_O, h_O = f.size
        W_o, H_o = f.size
        # print "Height: %s" %(H_o)
        # print "Width: %s" %(W_o)
        f = f.resize((640,480), Image.ANTIALIAS)
        W_n, H_n = f.size        
        # Convert to RGB

        scale_w = W_n / W_o
        scale_h = H_n / H_o


        f.convert('RGB')
        # Convert to a numpy array
        img = np.asarray(f, dtype=np.float32)
        # _, h_N, w_N = img.shape
        # Transpose the final image array i.e. C, H, W
        return img.transpose((2, 0, 1)), scale_w, scale_h

In [None]:

feature_df = pd.read_csv("./input_data/features_by_frame_all_clips.csv",encoding='utf-8')
NUM_ßROWS, NUM_COLS = feature_df.shape[0], feature_df.shape[1]
Y_col = 'face'
X_cols = feature_df.loc[:, feature_df.columns != Y_col].columns

X_train, X_val, y_train, y_val = \
train_test_split(feature_df[X_cols], feature_df[Y_col],test_size=0.3, random_state=42)

X_train_op, X_val_op, y_train_op, y_val_op = X_train.copy(), X_val.copy(), y_train.copy(), y_val.copy()

print("X_train.shape = ", X_train.shape, " \t y_train.shape = ", y_train.shape)
print("X_val.shape = ", X_val.shape, " \t y_val.shape = ", y_val.shape)

The following k-fold cross validatoin fails as the above cells do not have an appropriate case for handling NaN values.

In [None]:
best_accuracy = 0
best_depth = 1
best_num_trees = 42

for ntrees in range(50,450,100):  
    for d in range(1,20): 
        rforest_model = ensemble.RandomForestClassifier(max_depth=d, 
                                                        n_estimators=ntrees,
                                                        random_state=42)
        cv_scores = cross_val_score( rforest_model, X_train, y_train, cv=5 )
        average_cv_accuracy = cv_scores.mean()  
        if average_cv_accuracy >= best_accuracy: 
            best_accuracy = average_cv_accuracy
            best_depth = d
            best_num_trees = ntrees
        print(f"depth: {d:2d} num_trees : {ntrees:3d} average_cv_accuracy: {average_cv_accuracy:7.4f}")

print()
print(f"best_depth: {best_depth}, best_num_trees {best_num_trees}, best_accuracy{best_accuracy}")

rforest_model_cv = ensemble.RandomForestClassifier(max_depth=best_depth, n_estimators=best_num_trees) 

rforest_model_cv.fit(X_train, y_train) 

y_pred = rforest_model_cv.predict(X_val)

trainaccuracy_random_forest = rforest_model_cv.score(X_train, y_train)
print('TrainAccuracy: {}'.format(trainaccuracy_random_forest))

accuracy_random_forest = accuracy_score(y_val, y_pred)
print('Accuracy: {}'.format(accuracy_random_forest))

print(classification_report(y_val, y_pred))

In [None]:
result = confusion_matrix(y_val, y_pred)
print("Confusion Matrix:")
#confusion matrix
labels = sorted(feature_df['90day-mortality'].unique())
ax = sns.heatmap(
confusion_matrix(y_val, y_pred),
annot=True,
xticklabels=labels,
yticklabels=labels
)
ax.set(xlabel='true label', ylabel='predicted label')

In [None]:
def objective(trial):
  criterion = trial.suggest_categorical('criterion', ['gini', 'entropy'])
  bootstrap = trial.suggest_categorical('bootstrap',['True','False'])
  max_depth = trial.suggest_int('max_depth', 1, 1000)
  max_features = trial.suggest_categorical('max_features', ['auto', 'sqrt','log2'])
  max_leaf_nodes = trial.suggest_int('max_leaf_nodes', 1,1000)
  n_estimators =  trial.suggest_int('n_estimators', 1, 1000)
  min_samples_split = trial.suggest_int('min_samples_split',2,5)
  min_samples_leaf = trial.suggest_int('min_samples_leaf',1,10)

  regr = ensemble.RandomForestClassifier(
      bootstrap = bootstrap, criterion = criterion,
      max_depth = max_depth, max_features = max_features,
      max_leaf_nodes = max_leaf_nodes,n_estimators = n_estimators,
      min_samples_split = min_samples_split,min_samples_leaf = min_samples_leaf,
      n_jobs=2)

  score = cross_val_score(regr, X_train_op, y_train_op, cv=5, scoring="r2")
  r2_mean = score.mean()
  return r2_mean

study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=100)

trial = study.best_trial
print('Accuracy: {}'.format(trial.value))

rforest_model_post_optuna = ensemble.RandomForestClassifier(bootstrap = trial.params['bootstrap'], criterion = trial.params['criterion'],
                                     max_depth = trial.params['max_depth'], max_features = trial.params['max_features'],
                                     max_leaf_nodes = trial.params['max_leaf_nodes'],n_estimators = trial.params['n_estimators'],
                                     min_samples_split = trial.params['min_samples_split'],min_samples_leaf = trial.params['min_samples_leaf'],                                     
                                     n_jobs=2)

rforest_model_post_optuna.fit(X_train_op, y_train_op) 

y_pred_op = rforest_model_post_optuna.predict(X_val_op)

trainaccuracy_random_forest_op = rforest_model_post_optuna.score(X_train_op, y_train_op)
print('TrainAccuracy: {}'.format(trainaccuracy_random_forest_op))

accuracy_random_forest_op = accuracy_score(y_val_op, y_pred_op)
print('Accuracy: {}'.format(accuracy_random_forest_op))

print(classification_report(y_val_op, y_pred_op))

In [None]:
optuna.visualization.plot_optimization_history(study)

In [None]:
optuna.visualization.plot_slice(study)

In [None]:
if accuracy_random_forest_op > accuracy_random_forest:
  print("Optuna had the higher prediciton accuracy")
  feature_importances = rforest_model_post_optuna.feature_importances_
else:
  print("Cross Validation had the higher prediction accuracy")
  feature_importances = rforest_model_cv.feature_importances_

feature_importances_dict = {}

for i, importance in enumerate(feature_importances):
  perc = importance * 100
  feature_importances_dict[feature_df.columns[i]] = perc 

sorted_dict = {}
sorted_keys = sorted(feature_importances_dict, key=feature_importances_dict.get)
for w in sorted_keys:
    sorted_dict[w] =  feature_importances_dict[w]
  
for keys in sorted_dict.keys():
  print(f"フィーチャー名：{keys:>12s}は出力データの結果に{sorted_dict[keys]:>7.2f}%関与しています")