# "The Eye Of The Typer" Dataset Rerun

Visualize using `rerun`.

In [None]:
%load_ext dotenv
%dotenv

In [None]:
from pathlib import Path
from pprint import pprint
from datetime import datetime, timedelta

from decord import VideoReader

import polars as pl
import rerun as rr

from eott_dataset import *


In [None]:
ds = EyeTyperDataset()

for source, schema in ds.describe(Source.WEBCAM, Source.LOG).items():
    print_schema(source, schema)

In [None]:
ds.timeline().filter(pid=1).collect()

In [None]:
get_source_timeline(ds.scan(Source.TOBII), ds.scan(Source.FORM)).filter(pid=1).collect()

In [None]:
get_source_timeline(ds.scan(Source.DOT), ds.scan(Source.FORM)).filter(pid=1).collect()

In [None]:
get_source_timeline(ds.scan(Source.LOG), ds.scan(Source.FORM), "record", "study").filter(pid=1).collect()

In [None]:
get_screen_timeline(ds.scan(Source.SCREEN), ds.scan(Source.FORM)).filter(pid=1).collect()

In [None]:
get_webcam_timeline(ds.scan(Source.WEBCAM), ds.scan(Source.LOG)).filter(pid=1).collect()

In [None]:
# %env set EOTT_DATASET_PATH
pdf = read_participant_characteristics()
participants = [Participant.from_dict(**row) for row in pdf.iter_rows(named=True)]
pdf

In [None]:
from eott_dataset.rerun import *

p = participants[0]
study_type = Study.DOT_TEST

recording_id = f"{p.participant_id}/{study_type}"
timeline_df = get_source_timeline(p).filter(pl.col("study") == study_type)

rr.init("EOTT", recording_id=recording_id, spawn=True)
rr.log(
    "participant",
    rr.TextDocument(
        "\n".join(f"[{key}]\n{value}\n" for key, value in p.to_dict().items())
    ),
    timeless=True,
)

screen_cap: cv.VideoCapture | None = None
webcam_cap: cv.VideoCapture | None = None
webcam_path: Path | None = None

screen_size: tuple[int, int] = (p.display_width, p.display_height)
screen_size = tuple(map(lambda v: int(v * screen_scale), screen_size))
screen_factor = 2 if p.setting is Setting.LAPTOP else 1

webcam_scale = 1.0
screen_scale = 0.5

if p.screen_recording_path.exists():
    screen_cap = cv.VideoCapture(str(p.screen_recording_path))

rr.log(
    f"screen",
    rr.Boxes2D(sizes=[screen_size], centers=[tuple(map(lambda v: .5 * v, screen_size))]),
    timeless=True,
)

rr.log(
    "participant/pupil/left/tobii",
    rr.SeriesLine(name="left pupil diameter (tobii)", color=(255, 255, 0), width=1),
    timeless=True,
)
rr.log(
    "participant/pupil/right/tobii",
    rr.SeriesLine(name="right pupil diameter (tobii)", color=(255, 0, 255), width=1),
    timeless=True,
)

frame_index: int
webcam_index: int | None
study_name: str | None
offset_time: timedelta
source_name: Source
for log_index, (
    frame_index,
    webcam_index,
    study_name,
    source_name,
    offset_time,
) in enumerate(timeline_df.iter_rows()):
    study_name = Study(study_name) if study_name is not None else None

    rr.set_time_sequence("log_index", log_index)
    rr.set_time_seconds("capture_time", offset_time.total_seconds())

    timeline_name = f"webcam-{webcam_index}" if source_name == "webcam" else source_name
    rr.set_time_sequence(f"{timeline_name}_index", frame_index)

    match source_name:
        case "tobii":
            entry: TobiiEntry = p.tobii_gaze_predictions.row(frame_index, named=True)
            rerun_log_tobii(entry, screen=screen_size)

        case "screen" if screen_cap is not None:
            assert p.screen_offset is not None
            screen_cap = rerun_log_screen(screen_cap, position=frame_index, size=screen_size)

        case "webcam" if webcam_index is not None and study_name is not None:
            paths = p.get_webcam_video_paths(study=study_name, index=webcam_index)

            if len(paths) > 0 and (webcam_path != paths[0] or webcam_cap is None):
                webcam_path = paths[0]
                webcam_cap = cv.VideoCapture(str(webcam_path.absolute()))

            webcam_cap = rerun_log_webcam(webcam_cap, scale=webcam_scale)

        case "log":
            entry: LogEntry = p.user_interaction_logs.row(frame_index, named=True)
            rerun_log_user(entry, scale=screen_scale)

# end of logging
if screen_cap is not None:
    screen_cap.release()

if webcam_cap is not None:
    webcam_cap.release()