## LSTM fixation algo

In [3]:
# imports
import numpy as np
import pandas as pd
%matplotlib widget
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d.art3d import Poly3DCollection
import matplotlib.cm as cm
from matplotlib.animation import FuncAnimation
import itertools
import random
import pygwalker as pyg
import seaborn as sns
import os
import sys
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)
    
from src.d03_processing.BlinkProcessor import BlinkProcessor

from src.d00_utils.TaskObjects import *
from src.d03_processing.fixations.FixationProcessor import FixationProcessor
from src.d03_processing.TimepointProcessor import TimepointProcessor
from src.d01_data.fetch.fetch_timepoints import fetch_timepoints
from src.d01_data.fetch.fetch_viewings import fetch_viewings
from src.d01_data.fetch.fetch_trials import fetch_trials
from src.d03_processing.aoi import collision_sphere_radius
from src.d03_processing.feature_extract.to_viewing import to_viewing
from src.d03_processing.fixations.FixAlgos import *
from src.d03_processing.feature_calculate.viewing_compare_calcs import ea_td

In [None]:
# get data
# get viewings
n = 1
all_trials = pd.read_csv("C:\\Users\\Luke\\Documents\\AlloEye\\data\\feature_saves\\all_real_trials.csv")
n_trials = len(all_trials)
print(n_trials)
r_inds = np.random.randint(0, n_trials, n)
rand_trials = list(all_trials.trial_id.to_numpy()[r_inds])

# # temp while bug fixed
# for t in rand_trials:
#     if '21r2' in t:
#         rand_trials.remove(t)
        
viewings = []
for t in rand_trials:
    viewings.append(f"{t}_enc")
    viewings.append(f"{t}_ret")

viewings = ['alloeye_10r1_16_enc']
timepoints = fetch_timepoints("all", viewing_id=viewings)
print(timepoints.shape)


In [None]:
# clean data

# impute blinks

p_tps = []
n_viewings = len(viewings)

# preprocess - all but filtering noise
for i in range(n_viewings):
    # ind = random.randint(0, len(viewings)-1)
    viewing = viewings[i]
    # viewing = "alloeye_52r2_17_ret"

    # print(viewing)
    tps = timepoints[timepoints.viewing_id == viewing].reset_index(drop=True)
    if tps is None or len(tps) < 2:
        p_tps.append(None)
        continue
    # print(viewings[i])
    # print(tps.shape)
    # check for time split error (see function for details)
    tps = timestamp_split_correction(tps)
    tps = SignalProcessor.sandwiched_gp_filter(tps.copy(deep=True))
    tps = BlinkProcessor(tps.copy(deep=True), max_blink_duration=1000, d_impute_threshold=0.16, impute_buffer_duration=8).timepoints
    if b_tps is None:
        print(f"{viewing} all blinks?")
        p_tps.append(None)
        continue
        # missingness removal - e.g. mark as NaNs
    tps = BlinkProcessor.remove_missing(tps)
    # NB: noise not filtered
    p_tps.append(tps)


In [4]:
def angle_between(v1: np.array, v2: np.array) -> float:
    """Compute the angle theta between vectors v1 and v2.

    The scalar product of v1 and v2 is defined as:
      dot(v1,v2) = mag(v1) * mag(v2) * cos(theta)

    where dot() is a function which computes the dot product and mag()
    is a function which computes the magnitude of the given vector.

    Args:
        v1: vector with dim (m x n)
        v2: with dim (m x n)

    Returns:
        theta: angle between vectors v1 and v2 in degrees.
    """
    norms = np.linalg.norm(v1) * np.linalg.norm(v2)
    cos_theta = np.dot(v1, v2) / norms
    with warnings.catch_warnings():  # catch nan warnings
        warnings.simplefilter("ignore", category=DeprecationWarning)
    theta = np.arccos(np.clip(cos_theta, -1, 1))
    return np.rad2deg(theta)

def theta(timepoints, cols):
    df = timepoints.copy()
    x, y, z = cols
        # creating new shifted columns
    df['x_shift'] = df['x'].shift(-1)
    df['y_shift'] = df['y'].shift(-1)
    df['z_shift'] = df['z'].shift(-1)

    # calculate the norms of the current and shifted vectors
    df['norm_current'] = np.sqrt(df['x']**2 + df['y']**2 + df['z']**2)
    df['norm_shift'] = np.sqrt(df['x_shift']**2 + df['y_shift']**2 + df['z_shift']**2)

    # calculate the dot product of the current and shifted vectors
    df['dot_product'] = df['x']*df['x_shift'] + df['y']*df['y_shift'] + df['z']*df['z_shift']

    # calculate the cosine of the angle
    df['cos_theta'] = df['dot_product'] / (df['norm_current'] * df['norm_shift'])

    # to avoid numerical errors that might push the value of cos_theta outside the interval [-1, 1]
    df['cos_theta'] = df['cos_theta'].clip(-1, 1)

    # calculate the angle in radians
    df['theta_rad'] = np.arccos(df['cos_theta'])

    # if you want the angle in degrees, you can convert it as follows
    df['theta_deg'] = np.degrees(df['theta_rad'])

    # You can now drop the auxiliary columns if you want:
    df = df.drop(columns=['x_shift', 'y_shift', 'z_shift', 'norm_current', 'norm_shift', 'dot_product', 'cos_theta', 'theta_rad'])
    
    timepoints['theta'] = df['theta_deg']

In [1]:
# prepare data
def prepare_data(timepoints):
# normalise timestamps
    tps = timepoints.copy().sort_values(by='eye_timestamp_ms').reset_index(drop=True)
    t = timepoints.eye_timestamp_ms.to_numpy()
    t -= t[0]
    
# dt - np.diff
    dt = np.concatenate([0, np.diff(t)])

# angular shift - change in angle (keep signed?)
# need to do for left and right!
    l_angles = theta(tps, ('left_gaze_direction_x', 'left_gaze_direction_y', 'left_gaze_direction_z')).theta.to_numpy()
    r_angles = theta(tps, ('right_gaze_direction_x', 'right_gaze_direction_y', 'right_gaze_direction_z')).theta.to_numpy()
    
# angular speed - angular shift/dt
    l_speed = l_angles / dt
    r_speed = r_angles / dt

# acc - angular speed / dt
    l_acc = np.diff(l_speed) / dt[1:]
    r_acc = np.diff(r_speed) / dt[1:]
    
# camera distance - dxyz norm
    tps['x_shift'] = tps['camera_x'].shift(-1)
    tps['y_shift'] = tps['camera_y'].shift(-1)
    tps['z_shift'] = tps['camera_z'].shift(-1)
    cam_dist = np.sqrt((tps['x_shift'] - tps['camera_x'])**2 + (tps['y_shift'] - tps['camera_y'])**2 + (tps['z_shift'] - tps['camera_z'])**2)
    tps = tps.drop(columns=['x_shift', 'y_shift', 'z_shift'])

# camera speed - cam_dist / dt
    cam_speed = cam_dist / dt

# camera rotation distance - angular shift
    cam_angles = theta(tps, ('cam_rotation_x', 'cam_rotation_y', 'cam_rotation_z')).theta.to_numpy()

# camera rotation speed - angular shift/dt
    cam_angle_speed = cam_angles / dt



In [None]:
# remove nan rows