# Data Export
The minimal work to translate and export the data from RealEye and Tobii.

In [1]:
#|default_exp data_export

In [None]:
#| export
import polars as pl
from datetime import datetime, timedelta, timezone as UTC
from pathlib import Path
from typing import Callable, Iterable, Iterator, TypeVar, Union


## A fully typed approach to parsing and loading RealEye data

In [None]:
DATA_ROOT = "../../RevChemData/2025-05-14-Data_Export"

TOBII_ROOT = f"{DATA_ROOT}/Tobii-All-Snapshot"
REALEYE_ROOT = f"{DATA_ROOT}/RealEye"

In [None]:
raw_gazes_csv = pl.read_csv(
    f"{REALEYE_ROOT}/raw-gazes.csv",
    columns=["participant_id", "item_id", "test_created_at", "test_raw_data"],
    schema_overrides={"test_created_at": pl.Datetime},
)

In [None]:
raw_gazes_csv.head()

participant_id,item_id,test_created_at,test_raw_data
str,str,datetime[μs],str
"""62e1d6db-e570-4194-b792-1fc4ef…","""5b8637a5-c1c7-47cc-a1b4-5abe24…",2025-02-26 20:12:00,"""[[973,475,28,0,1588,1079],[917…"
"""a19074f0-818d-42d2-8d29-290429…","""5b8637a5-c1c7-47cc-a1b4-5abe24…",2025-02-26 21:38:52,"""[[902,599,36,0,1484,1077],[938…"
"""5f08f1c2-e9f3-4adb-9fb4-a68b3a…","""5b8637a5-c1c7-47cc-a1b4-5abe24…",2024-10-23 18:37:18,"""[[1085,721,36,0,1919,593],[102…"
"""105a3ff2-f0de-4d4b-adaa-feddee…","""5b8637a5-c1c7-47cc-a1b4-5abe24…",2024-10-23 20:12:28,"""[[1054,543,51,0,897,551],[1052…"
"""7fdc2add-bd15-4a04-b77b-242e5b…","""5b8637a5-c1c7-47cc-a1b4-5abe24…",2024-10-25 20:53:25,"""[[938,648,44,0,1919,400],[982,…"


In [None]:
#| export
from dataclasses import dataclass
from RevChem.realeye import GazeInfo, iter_parse_raw_data

T_source = TypeVar("T_source")
T_out = TypeVar("T_out")


class Resettable(Iterable[T_out]):
    def __init__(
        self, source_data: T_source, iter_gen: Callable[[T_source], Iterable[T_out]]
    ):
        self._source = source_data
        self.iter_gen = iter_gen

    def __iter__(self) -> Iterable[tuple[int, int, int, float, float, float]]:
        return self.iter_gen(self._source)


def iter_parse_raw_to_GazeInfo(raw_data: str) -> Iterator[GazeInfo]:
    for sextuple in iter_parse_raw_data(raw_data):
        if (length_ := len(sextuple)) > 6:
            # indicates that the RealEye system captured a mouse click. Nothing more.
            print(f"Got a {length_}-tuple: {sextuple = }")
        yield GazeInfo(*sextuple[:6])


def resettable_iter_raw(test_raw_data: str):
    return Resettable(test_raw_data, iter_parse_raw_to_GazeInfo)


@dataclass
class RealEyeRawRow:
    participant_id: str  # the participant being tested
    item_id: str  # the stimulus being shown
    test_created_at: datetime  # when the web browser started running (NOT THE SAME AS THE STIMULUS START TIME)
    raw_data: Iterable[GazeInfo] = None

    def __post_init__(self):
        self.raw_data = resettable_iter_raw(self.raw_data)

    @classmethod
    def from_row_tuples(cls, tuple) -> "RealEyeRawRow":
        return cls(*tuple)

In [None]:
#| hide
def fiddle_realeye_one():
    raw_row: RealEyeRawRow = RealEyeRawRow.from_row_tuples(
        raw_gazes_csv.row(0)
    )
    # first_ = first(list(raw_row.raw_data))
    # print(f"{type(first_) = }")
    # print(first_)
    for sextuple in raw_row.raw_data:
        print(sextuple)
        break

fiddle_realeye_one()

GazeInfo(gaze_point_X=973, gaze_point_Y=475, time_ms_since_start=28, scroll_offset_Y=0, mouse_pos_X=1588, mouse_pos_Y=1079)


In [None]:
#| export
def cumulative_sum(items: list[int|float]) -> list[int|float]:
    """Calculate the cumulative sum up to and including a given index"""
    csum = 0
    res = [None] * len(items)
    for i, item in enumerate(items):
        csum += item
        res[i] = csum
    return res

In [None]:

def test_cumulative_sum():
    t1 = [0, 1, 2, 3]
    r1 = [0, 1, 3, 6]
    assert (a1:=cumulative_sum(t1)) == r1, f"Cumulative sum was incorrect: {a1} != {r1}"

    t2 = [0, 2, 4, 6, 8]
    r2 = [0, 2, 6, 12, 20]
    assert (a2:=cumulative_sum(t2)) == r2, f"Cumulative sum was incorrect: {a2} != {r2}"

    t3 = [1, 3, 5, 7, 9]
    r3 = [1, 4, 9, 16, 25]
    assert (a3:=cumulative_sum(t3)) == r3, f"Cumulative sum was incorrect: {a3} != {r3}"

test_cumulative_sum()

In [None]:
# | export
def raw_gazes_row_to_df(
    row: RealEyeRawRow, # typed row from the CSV. should have few, if any changes, from the raw CSV file. Used for semantic tidyness
    *,
    time_since_name: str = "time_since_start", # new name given to the column that records the time (ms) since this stimulus was shown
    x_name: str = "X", # new name given to the column that captures the X-coordinate of the GazeInfo gaze
    y_name: str = "Y", # new name given to the column that captures the Y-coordinate of the GazeInfo gaze
) -> pl.DataFrame:
    df = pl.DataFrame(
        [
            (
                row.test_created_at,
                gaze_info.time_ms_since_start,
                gaze_info.gaze_point_X,
                gaze_info.gaze_point_Y,
            )
            for gaze_info in row.raw_data
        ],
        schema={
            "test_created_at": pl.Datetime,
            time_since_name: pl.Int32,
            "X": pl.Int32,
            "Y": pl.Int32,
        },
        orient="row",
    )
    return df.with_columns( # force everything to be UTC because that's what it should be (per docs)
        pl.col("test_created_at").dt.replace_time_zone(time_zone="UTC"),
    )

In [None]:
#| export
from RevChem.common import dt_str_now, group_by

In [None]:
EXPORT_ROOT = Path(DATA_ROOT, "..", f"{dt_str_now()}-python-outputs").resolve()

def run_realeye_df_group_statistics(dfs: list[pl.DataFrame]):
    grouped = group_by(lambda df: df["test_created_at"][0], dfs)
    group_statistics = pl.DataFrame(
        [[group_key, len(group)] for group_key, group in grouped.items()],
        schema=["test_created_at", "n_rows"],
        orient="row",
    )  # .sort("test_created_at")
    with pl.Config(tbl_rows=50):
        display(group_statistics)
    group_statistics.write_csv(EXPORT_ROOT / f"{dt_str_now()}-row_stats.csv")

In [None]:
#| export
def realeye_timestamp_to_datetime(
    datetime_col: str = "test_created_at", # column with the recording start datetime
    timestamp_col: str = "time_ms_since_start", # the integer column representing the milliseconds since stimulus exposure
    *,
    overwrite: bool = True, # Whether, the `timestamp_col` will have a datetime type in the result, or (if False) a new column is created
    additional_offset_ms: int = 0, # Additional offset to 
) -> pl.DataFrame:
    """Update `timestamp_col` to be an increasing datetime rather than the default (i64 or int).

    Corrects the `timestamp_col` of `df` to be a `pl.Datetime`, to ease legibility and computation.
    Sums the `timestamp_col` with that of the reference `datetime_col`, incrementing the time forward.

    Returns:
        A dataframe with the described change to the `timestamp_col`
    """
    new_name = timestamp_col if overwrite else f"{timestamp_col}__new_dt_time"
    new_column = pl.col(datetime_col) + pl.duration(milliseconds=timestamp_col) + pl.duration(milliseconds=additional_offset_ms)
    new_column = new_column.alias(new_name)

    return new_column

In [None]:
# | export

def correct_realeye_df_group(
    group_dfs: list[pl.DataFrame], *, time_col: str = "time_since_start"
):
    """In-place mutation to correct the dfs' timing, assuming dfs are aggregated by `test_created_at`"""
    # 1. sum the last|largest millisecond offset from each of the dfs
    group_millisecond_offset_maxes = [df[time_col].max() for df in group_dfs]
    total_milliseconds_since_start = sum(group_millisecond_offset_maxes)
    # 1a. assume we have fully contiguous time series
    # 2. compute the start time: "test_created_at" - total relative milliseconds
    # Using min (not max), because the file outputs take time, even though the recording is done,
    # and the trial is started earlier than the one-ish sec it takes to create the output.
    re_recording_end = min(map(lambda df: df["test_created_at"][0], group_dfs))
    re_recording_start = re_recording_end - timedelta(
        microseconds=total_milliseconds_since_start * 1000
    )
    # 3. roll the relative milliseconds forward for each subsequent DataFrame
    # this is like a "scan" or "cummulative sum"
    # We shift everything "left" one, because the first doesn't need anything additional
    # The second df need only add the first, third df only add the two prior, etc.
    addend_group_millisecond_offsets = [0] + cumulative_sum(
        group_millisecond_offset_maxes[:-1]
    )
    # update the dfs
    for group_member_index in range(len(group_dfs)):
        df = group_dfs[group_member_index].with_columns(
            __temp_start_time=re_recording_start
        )
        group_dfs[group_member_index] = df.with_columns(
            realeye_timestamp_to_datetime(
                datetime_col="__temp_start_time",
                timestamp_col=time_col,
                additional_offset_ms=addend_group_millisecond_offsets[
                    group_member_index
                ],
            )
        ).drop("__temp_start_time")

In [None]:
# | export


# TODO: rename realeye data pipeline function
from RevChem.common import group_by, list_concat


def pipeline_raw_realeye_to_timed_dataframe(
    re_raw_df: pl.DataFrame,  # result of pl.read_csv("raw-gazes.csv").
    *,
    do_group_stats_export: bool = False,  # whether compute early stats, write them to EXPORT_ROOT and exit early. Fails if EXPORT_ROOT is undefined.
    debug: bool = False,  # whether we output the first row of each dataframe, to debug what we're looking at.
    dt_timestamp_col: str = "time_since_start",  # name for the datetime timestamp column in the output dataframes
):
    real_eye_rows: list[RealEyeRawRow] = sorted(
        map(RealEyeRawRow.from_row_tuples, re_raw_df.rows()),
        # sorted by item_id, leveraging that list.index(...) -> ordinal position
        key=lambda re_row: REALEYE_ITEM_IDS.index(re_row.item_id),
    )
    # rows to dataframe
    dfs = [
        raw_gazes_row_to_df(row, time_since_name=dt_timestamp_col)
        for row in real_eye_rows
    ]

    if do_group_stats_export:
        run_realeye_df_group_statistics(dfs)
        return

    if debug:
        display(pl.concat([df.head(1) for df in dfs]))

    # group by "creation" time, down to the second, to get first ordering
    # then flatten so we have an overall sequence with subsequences which are in order.
    # Sorting is performed to make sure the test_created_at and test_created_at+1sec are in the correct order.
    dfs = list_concat(
        sorted(
            group_by(lambda df: df["test_created_at"][0], dfs).values(),
            key=lambda dfs: dfs[0]["test_created_at"][0],
        )
    )
    # group by the minute, order is retained within the group
    # giving us groups that put all entries of a given trial in the right order
    # even if split by a single second, they are collected in the correct stimulus order
    # and are results are output within a minute of each other.
    grouped = group_by(
        lambda df: df["test_created_at"][0].replace(second=0, microsecond=0), dfs
    )
    # now we can apply the timestamp correction algorithm to the groups
    for group_start_minute, group_dfs in grouped.items():
        correct_realeye_df_group(group_dfs, time_col=dt_timestamp_col)

    # lastly, concatenate all of the groups, now that their time columns are fixed
    mapped = {
        group_start_minute: pl.concat(group_dfs)
        for group_start_minute, group_dfs in grouped.items()
    }

    return mapped


Got a 7-tuple: sextuple = [857, 857, 4249, 0, 944, 580, 1]
Got a 7-tuple: sextuple = [1416, 209, 27024, 0, 1919, 1037, 1]
Got a 7-tuple: sextuple = [545, 1043, 3167, 0, 665, 932, 1]
Got a 7-tuple: sextuple = [237, 267, 40087, 0, 1172, 701, 1]
Got a 7-tuple: sextuple = [804, 475, 40287, 0, 1172, 701, 1]


{datetime.datetime(2024, 10, 23, 18, 37, tzinfo=zoneinfo.ZoneInfo(key='UTC')): shape: (7_279, 4)
 ┌─────────────────────────┬─────────────────────────────┬──────┬─────┐
 │ test_created_at         ┆ time_since_start            ┆ X    ┆ Y   │
 │ ---                     ┆ ---                         ┆ ---  ┆ --- │
 │ datetime[μs, UTC]       ┆ datetime[μs, UTC]           ┆ i32  ┆ i32 │
 ╞═════════════════════════╪═════════════════════════════╪══════╪═════╡
 │ 2024-10-23 18:37:18 UTC ┆ 2024-10-23 18:33:24.015 UTC ┆ 1085 ┆ 721 │
 │ 2024-10-23 18:37:18 UTC ┆ 2024-10-23 18:33:24.042 UTC ┆ 1027 ┆ 765 │
 │ 2024-10-23 18:37:18 UTC ┆ 2024-10-23 18:33:24.075 UTC ┆ 986  ┆ 776 │
 │ 2024-10-23 18:37:18 UTC ┆ 2024-10-23 18:33:24.106 UTC ┆ 1284 ┆ 729 │
 │ 2024-10-23 18:37:18 UTC ┆ 2024-10-23 18:33:24.143 UTC ┆ 1058 ┆ 842 │
 │ …                       ┆ …                           ┆ …    ┆ …   │
 │ 2024-10-23 18:37:18 UTC ┆ 2024-10-23 18:37:17.876 UTC ┆ 847  ┆ -36 │
 │ 2024-10-23 18:37:18 UTC ┆ 2024-10-23

In [None]:

pipeline_raw_realeye_to_timed_dataframe(raw_gazes_csv)

# Fiddling with new things

## Redoing the pairing
Cleaner this time

In [None]:
# full Tobii pipeline is just a few lines of code
# TODO
from RevChem.tobii import clean_tsv_file_name, filter_tobii_dfs_by_new_years_heuristics, read_tobii_individual_tsv, tobii_timestamp_to_datetime


def pipeline_tobii_directory_to_all_dfs(
    directory_of_individual_tobii_sessions: str,
    *,
    columns_subset: list[str] = COLUMNS_TOBII,
    column_renaming: dict[str, str] = {},
) -> list[pl.DataFrame]:
    all_tobii_dfs = [
        read_tobii_individual_tsv(tsv_file)
        .with_columns(tobii_timestamp_to_datetime(datetime_col="start_dt_utc"))[
            columns_subset
        ]
        .rename(column_renaming)
        .with_columns(source_tsv=pl.lit(clean_tsv_file_name(tsv_file.name)))
        for tsv_file in Path(directory_of_individual_tobii_sessions).iterdir()
    ]
    all_tobii_dfs = filter_tobii_dfs_by_new_years_heuristics(
        # filter_to_newyear_and_sort_by_timestamp(all_tobii_data_as_exportable)
        all_tobii_dfs
    )

    return all_tobii_dfs

In [None]:
def test_tobii_pipeline():
    tobii_dfs = pipeline_tobii_directory_to_all_dfs(
        TOBII_ROOT, column_renaming=COLUMN_RENAMING_TOBII_TO_CSV
    )
    print(tobii_dfs[0])

test_tobii_pipeline()

1 dfs failed the 0-th criterion "len(df) >= 51k, corresponding to >= 7 minutes of recording @ 120-HZ"
The following previews show the dfs that missed the criteria: len(df) >= 51k, corresponding to >= 7 minutes of recording @ 120-HZ
shape: (2, 5)
┌────────────────────────────────┬──────┬──────┬──────────────────┬───────┐
│ timestamp                      ┆ X    ┆ Y    ┆ source_tsv       ┆ count │
│ ---                            ┆ ---  ┆ ---  ┆ ---              ┆ ---   │
│ datetime[μs, UTC]              ┆ i32  ┆ i32  ┆ str              ┆ i32   │
╞════════════════════════════════╪══════╪══════╪══════════════════╪═══════╡
│ 2025-02-27 23:36:21.203 UTC    ┆ null ┆ null ┆ 2025_2_27_Pichu2 ┆ 9934  │
│ 2025-02-27 23:36:21.325819 UTC ┆ null ┆ null ┆ 2025_2_27_Pichu2 ┆ 9934  │
└────────────────────────────────┴──────┴──────┴──────────────────┴───────┘
shape: (55_856, 4)
┌────────────────────────────────┬──────┬──────┬──────────────────────┐
│ timestamp                      ┆ X    ┆ Y    ┆ source

In [None]:
from RevChem.tobii import export_ordered_pairs, find_tobii_realeye_df_pairs


EXPORT_ROOT = Path(DATA_ROOT, "..", f"{date_str_now()}-python-outputs").resolve()


# new way
def run_data_export_NEW():
    # Not less code in itself, but the full pipeine is right before your eyes! That's better than call strewn about
    print("RE processing...")
    realeye_dfs = list(
        pipeline_raw_realeye_to_timed_dataframe(
            raw_gazes_csv, dt_timestamp_col="timestamp"
        ).values(),
    )
    print("RE done. Sample:")
    print(realeye_dfs[0].head(2))
    print("Tobii processing...")
    tobii_dfs = pipeline_tobii_directory_to_all_dfs(
        TOBII_ROOT, column_renaming=COLUMN_RENAMING_TOBII_TO_CSV
    )
    print("Tobii done. Sample:")
    print(tobii_dfs[0].head(2))

    print("\n\nPairing")
    pairs = find_tobii_realeye_df_pairs(tobii_dfs, realeye_dfs)

    print("Pairing complete. Exporting")
    export_ordered_pairs(
        pairs,
        output_dir_root=EXPORT_ROOT,
        output_suffix=Path(f"{dt_str_now()}-new-reconciliation"),
        include_joined_output=True,
    )


run_data_export_NEW()

RE processing...
Got a 7-tuple: sextuple = [857, 857, 4249, 0, 944, 580, 1]
Got a 7-tuple: sextuple = [1416, 209, 27024, 0, 1919, 1037, 1]
Got a 7-tuple: sextuple = [545, 1043, 3167, 0, 665, 932, 1]
Got a 7-tuple: sextuple = [237, 267, 40087, 0, 1172, 701, 1]
Got a 7-tuple: sextuple = [804, 475, 40287, 0, 1172, 701, 1]
RE done. Sample:
shape: (2, 4)
┌─────────────────────────┬─────────────────────────────┬──────┬─────┐
│ test_created_at         ┆ timestamp                   ┆ X    ┆ Y   │
│ ---                     ┆ ---                         ┆ ---  ┆ --- │
│ datetime[μs, UTC]       ┆ datetime[μs, UTC]           ┆ i32  ┆ i32 │
╞═════════════════════════╪═════════════════════════════╪══════╪═════╡
│ 2024-10-23 18:37:18 UTC ┆ 2024-10-23 18:33:24.015 UTC ┆ 1085 ┆ 721 │
│ 2024-10-23 18:37:18 UTC ┆ 2024-10-23 18:33:24.042 UTC ┆ 1027 ┆ 765 │
└─────────────────────────┴─────────────────────────────┴──────┴─────┘
Tobii processing...
1 dfs failed the 0-th criterion "len(df) >= 51k, correspondi

In [None]:
EXPORT_ROOT

PosixPath('/Users/stephen/dev/RevChemData/2025-06-12-python-outputs')