##### This script will take each participant's tasks and generate both a heatmap and a saccade/fixation plot for each task attempted.

In [3]:
import os
import json
import requests
import matplotlib
from PIL import Image
from io import BytesIO
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import patheffects
from copy import deepcopy
from sklearn.cluster import DBSCAN
from typing import List

matplotlib.use('Agg')

IMAGES = { }

class ExperimentTab:
    def __init__(self, id=None, name=None, width=None, height=None, time_elapsed=None, mouseClicks=None, gazeEstimations=None):
        self.id = id
        self.name = name
        self.width = width
        self.height = height
        self.time_elapsed = time_elapsed
        self.mouseClicks = mouseClicks
        self.gazeEstimations = gazeEstimations

class Experiment():
    def __init__(self, tabs: List[ExperimentTab]):
        if not all(isinstance(tab, ExperimentTab) for tab in tabs):
            raise ValueError("All tabs must be instances of ExperimentTab")
        self.tabs = tabs

def get_images(img_urls):
    not_found = []
    for url in img_urls:
        if(url in IMAGES):
            if IMAGES[url] is not Image.NONE:
                continue
        print(f"Fetching {url}")
        response = requests.get(url, stream=True)
        if(response.ok is False):
            IMAGES[url] = Image.NONE
            not_found.append(url)
        else:
            IMAGES[url] = Image.open(BytesIO(response.content))
    if(len(not_found) > 0):
        return { "error": {
                    "message": f"Image/s not found in GitHub.",
                    "images": not_found
                }
        }

def clean_data(img_w, img_h, wp_w, wp_h, data):
    scale_factor_width = img_w / wp_w
    scale_factor_height = img_h / wp_h
    cleaned_data = [
        {
            'x': point['x'] * scale_factor_width,
            'y': point['y'] * scale_factor_height,
            'time': point['time']
        }
        for point in data
    ]
    return cleaned_data

def parse_experiment_data(e):
    experiment = Experiment([])
    tab_id_counter = 1
    for tabUrl, tabDetails in e["data"].items():
        tab = ExperimentTab(
            f"T{tab_id_counter}",
            tabUrl[3:].replace(":", "-",).replace("/", "-").replace(".", "-").replace("#", "%23").replace("?", "-"),
            tabDetails["webpage_width"],
            tabDetails["webpage_height"],
            tabDetails["time_taken_seconds"],
            tabDetails["mouseClickLocations"],
            tabDetails["coords"]
        )
        experiment.tabs.append(tab)
        tab_id_counter += 1
    return experiment

def clean_eye_tracking_data(experiment, velocity_threshold=1500):
    cleaned_experiment = deepcopy(experiment)
    for tab in cleaned_experiment.tabs:
        cleaned_points = []
        previous_point = None
        for point in tab.gazeEstimations:
            if previous_point is None:
                cleaned_points.append(point)
                previous_point = point
                continue
            dx = point['x'] - previous_point['x']
            dy = point['y'] - previous_point['y']
            dt = point['time'] - previous_point['time']
            if dt == 0:
                continue
            velocity = np.sqrt(dx**2 + dy**2) / dt
            if velocity <= velocity_threshold:
                cleaned_points.append(point)
            previous_point = point
        tab.gazeEstimations = cleaned_points
    return cleaned_experiment

def save_heatmaps(experiment: Experiment, output_dir: str):
    print("Saving heatmaps...")
    for tab in experiment.tabs:
        img = IMAGES[f"https://github.com/lukeformosa/gazetimator-images/blob/main/experiment-images/{tab.name}.png?raw=true"]
        if img == Image.NONE:
            print(f"Cannot continue, this image is not in GitHub: {tab.name}")
            continue

        img_array = np.array(img)
        img_height, img_width = img_array.shape[:2]
        cleaned_data = clean_data(img_w=img_width, 
                                  img_h=img_height, 
                                  wp_w=tab.width, 
                                  wp_h=tab.height, 
                                  data=tab.gazeEstimations)
        
        x = [point['x'] for point in cleaned_data]
        y = [point['y'] for point in cleaned_data]
        y = [img_height - point for point in y]

        plt.figure(figsize=(10, 8))
        plt.imshow(img_array, extent=[0, img_width, 0, img_height])

        heatmap, xedges, yedges = np.histogram2d(x, y, bins=(100, 100), range=[[0, img_width], [0, img_height]])
        extent = [xedges[0], xedges[-1], yedges[0], yedges[-1]]
        
        plt.imshow(heatmap.T, extent=extent, origin='lower', cmap='inferno', alpha=0.5, interpolation='bicubic')
        plt.axis('off')
        plt.subplots_adjust(left=0, right=1, top=1, bottom=0, wspace=0, hspace=0)
        
        save_path = os.path.join(output_dir, "heatmaps", f"{tab.id}-{tab.name}.png")
        os.makedirs(os.path.dirname(save_path), exist_ok=True)
        plt.savefig(save_path, format='png', bbox_inches='tight', pad_inches=0)
        plt.close()

def save_saccades_fixations(experiment, output_dir: str, eps=50, min_samples=5, max_circle_size=500, min_saccade_length=200):
    print("Saving saccades and fixations...")
    for tab in experiment.tabs:
        img = IMAGES[f"https://github.com/lukeformosa/gazetimator-images/blob/main/experiment-images/{tab.name}.png?raw=true"]
        if(img == Image.NONE):
            print(f"Cannot continue, this image is not in GitHub: {tab.name}")
            continue
        img_array = np.array(img)
        img_height, img_width = img_array.shape[:2]
        cleaned_data = clean_data(img_w=img_width,
                                  img_h=img_height,
                                  wp_w=tab.width,
                                  wp_h=tab.height,
                                  data=tab.gazeEstimations)
        points_with_time = np.array([[point['x'], img_height - point['y'], point['time']] for point in cleaned_data])
        points_with_time = points_with_time[points_with_time[:, 2].argsort()]
        points = points_with_time[:, :2]
        db = DBSCAN(eps=eps, min_samples=min_samples).fit(points)
        labels = db.labels_
        plt.figure(figsize=(10, 8))
        plt.imshow(img_array, extent=[0, img_width, 0, img_height])
        unique_labels = set(labels)
        unique_labels.discard(-1)
        fixation_durations = {label: points_with_time[labels == label, 2].ptp() for label in unique_labels}
        cluster_centroids = {label: points[labels == label].mean(axis=0) for label in unique_labels}
        sorted_labels = sorted(unique_labels, key=lambda label: min(points_with_time[labels == label, 2]))
        for label in unique_labels:
            centroid = cluster_centroids[label]
            cluster_size = sum(labels == label)
            duration = fixation_durations[label]
            circle_size = min(100 + 10 * cluster_size, max_circle_size)
            plt.scatter(centroid[0], centroid[1], s=circle_size, c='blue', alpha=0.6, edgecolors='black')
            plt.text(centroid[0], centroid[1], str(cluster_size), color='white', fontsize=8, ha='center', va='center', weight='bold')
            text_path_effects = [patheffects.withStroke(linewidth=3, foreground='black')]
            text_offset = (circle_size ** 1) * 0.2
            plt.text(centroid[0], centroid[1] + text_offset, f"{duration:.2f}s", color='white', fontsize=8, ha='center', va='bottom', weight='bold', path_effects=text_path_effects)
        prev_centroid = None
        saccade_num = 1
        for i, label in enumerate(sorted_labels):
            centroid = cluster_centroids[label]
            if prev_centroid is not None:
                saccade_length = np.linalg.norm(centroid - prev_centroid)
                if saccade_length >= min_saccade_length:
                    plt.annotate('', xy=centroid, xytext=prev_centroid, arrowprops=dict(arrowstyle='->', color='red'), annotation_clip=False)
                    mid_point = (centroid + prev_centroid) / 2
                    plt.text(mid_point[0], mid_point[1], str(saccade_num), color='yellow', fontsize=9, ha='center', va='center', bbox=dict(facecolor='black', alpha=0.5, edgecolor='none'))
                    saccade_num += 1
            prev_centroid = centroid
        plt.axis('off')
        save_path = os.path.join(output_dir, "saccades_fixations", f"{tab.id}-{tab.name}.png")
        os.makedirs(os.path.dirname(save_path), exist_ok=True)
        plt.savefig(save_path, format='png', bbox_inches='tight', pad_inches=0)
        plt.close()

def find_json_files(parent_dir):
    json_files = []
    for root, _, files in os.walk(parent_dir):
        for file in files:
            if file.startswith("e-") and file.endswith(".json"):
                file_path = os.path.join(root, file)
                json_files.append(file_path)
    return json_files

parent_directory = "../../experiments/phase2/official-experiments"
json_files = find_json_files(parent_directory)

for json_file_path in json_files:
    
    # Used to regenerate visulisations for specific experiments
    # if("e-06-05-2024T20-26-11.json" not in json_file_path):
    #     continue
    # else:
    with open(json_file_path, 'r') as f:
        data_read = json.load(f)
        
    for tab, values in data_read['data'].items():
        for coord in values["coords"]:
            if coord["x"] < 0:
                coord["x"] = 0
            if coord["y"] < 0:
                coord["y"] = 0
            if coord["x"] > values["webpage_width"]:
                coord["x"] = values["webpage_width"]
            if coord["y"] > values["webpage_height"]:
                coord["y"] = values["webpage_height"]

    experiment = parse_experiment_data(data_read)
    experiment = clean_eye_tracking_data(experiment)

    img_urls = []
    for tab in experiment.tabs:
        img_urls.append(f"https://github.com/lukeformosa/gazetimator-images/blob/main/experiment-images/{tab.name}.png?raw=true")

    out = get_images(img_urls)
    if(out is not None and "error" in out):
        print(out["error"])
        continue

    output_dir = os.path.join(os.path.dirname(json_file_path), "visualizations")
    save_heatmaps(experiment, output_dir)
    save_saccades_fixations(experiment, output_dir)
    print(f"Saved visualisations for experiment in {json_file_path}")

Fetching https://github.com/lukeformosa/gazetimator-images/blob/main/experiment-images/https---www-lombardmalta-com-.png?raw=true
Fetching https://github.com/lukeformosa/gazetimator-images/blob/main/experiment-images/https---www-lombardmalta-com-en-deposit-accounts-1.png?raw=true
Fetching https://github.com/lukeformosa/gazetimator-images/blob/main/experiment-images/https---www-lombardmalta-com-en-about-us.png?raw=true
Fetching https://github.com/lukeformosa/gazetimator-images/blob/main/experiment-images/-https---www-lombardmalta-com-en-personal-banking.png?raw=true
Fetching https://github.com/lukeformosa/gazetimator-images/blob/main/experiment-images/-https---www-lombardmalta-com-en-deposit-accounts-1.png?raw=true
Saving heatmaps...
Saving saccades and fixations...
Saved visualisations for experiment in ../../experiments/phase2/official-experiments\participant10\Lombard\FTD\e-06-05-2024T20-26-11.json
