In [41]:
import pandas as pd
import numpy as np
from datetime import datetime
from zoneinfo import ZoneInfo
import math
from scipy.ndimage import gaussian_filter1d


subject_id = "001"
experiment_id = "001"


In [42]:
pose1_df = pd.read_csv(f"./exported_csv/pose_df/pose1_df_id{subject_id}-{experiment_id}.csv")

In [43]:
def prepare_pose_df(df):
    monitor_width_cm = 47.6
    monitor_height_cm = 26.8
    resolution = (1920, 1080)
    viewer_distance_cm = 70.0

    cm_per_pixel_x = monitor_width_cm / resolution[0]
    cm_per_pixel_y = monitor_height_cm / resolution[1]

    df = df.copy()
    df["gx_centered"] = df["mean_x"] - 0.5
    df["gy_centered"] = df["mean_y"] - 0.5
    df["x_cm"] = df["gx_centered"] * resolution[0] * cm_per_pixel_x
    df["y_cm"] = df["gy_centered"] * resolution[1] * cm_per_pixel_y
    df["x_deg"] = np.degrees(np.arctan2(df["x_cm"], viewer_distance_cm))
    df["y_deg"] = np.degrees(np.arctan2(df["y_cm"], viewer_distance_cm))
    df["epoch_sec"] = df["epoch_time"]
    df["is_valid"] = df["validity_sum"] == 2
    return df


In [44]:
def interpolate_missing(df, time_col="epoch_sec", max_gap_ms=100):
    df = df.copy()
    df["valid"] = df["is_valid"]
    df["interp_x"] = np.nan
    df["interp_y"] = np.nan
    df.loc[df["valid"], "interp_x"] = df.loc[df["valid"], "x_deg"]
    df.loc[df["valid"], "interp_y"] = df.loc[df["valid"], "y_deg"]
    df["interp_x"] = df["interp_x"].interpolate(limit_area="inside")
    df["interp_y"] = df["interp_y"].interpolate(limit_area="inside")

    invalid_mask = ~df["valid"]
    group_id = (invalid_mask != invalid_mask.shift()).cumsum()
    invalid_blocks = df[invalid_mask].groupby(group_id)

    for _, block in invalid_blocks:
        if len(block) == 0:
            continue
        t_start = block[time_col].iloc[0]
        t_end = block[time_col].iloc[-1]
        duration_ms = (t_end - t_start) * 1000
        if duration_ms > max_gap_ms:
            df.loc[block.index, ["interp_x", "interp_y"]] = np.nan
    return df

In [45]:
def apply_gaussian_filter_by_block(df, col_x="interp_x", col_y="interp_y", sigma=1.0):
    df = df.copy()
    df["filtered_x"] = np.nan
    df["filtered_y"] = np.nan
    valid_mask = df[col_x].notna() & df[col_y].notna()
    block_id = (valid_mask != valid_mask.shift()).cumsum()
    blocks = df[valid_mask].groupby(block_id)

    for _, block in blocks:
        idx = block.index
        smoothed_x = gaussian_filter1d(block[col_x], sigma=sigma)
        smoothed_y = gaussian_filter1d(block[col_y], sigma=sigma)
        df.loc[idx, "filtered_x"] = smoothed_x
        df.loc[idx, "filtered_y"] = smoothed_y
    return df


In [46]:
def detect_fixations_ivt(df, velocity_threshold=100, duration_threshold_ms=100):
    fixations = []
    timestamps = df["epoch_sec"].to_numpy()
    xs = df["filtered_x"].to_numpy()
    ys = df["filtered_y"].to_numpy()

    delta_t = np.diff(timestamps)
    delta_x = np.diff(xs)
    delta_y = np.diff(ys)
    safe_delta_t = np.where(delta_t == 0, np.nan, delta_t)
    velocities = np.sqrt(delta_x**2 + delta_y**2) / safe_delta_t
    velocities = np.insert(velocities, 0, 0)
    velocities = np.nan_to_num(velocities, nan=0.0, posinf=0.0, neginf=0.0)

    in_fixation = False
    start_idx = 0
    for i in range(len(df)):
        if np.isnan(xs[i]) or np.isnan(ys[i]):
            if in_fixation:
                in_fixation = False
                t_start = timestamps[start_idx]
                t_end = timestamps[i - 1]
                duration = (t_end - t_start) * 1000
                if duration >= duration_threshold_ms:
                    fixations.append({
                        "start_time": t_start,
                        "end_time": t_end,
                        "duration_ms": duration,
                        "x_mean_deg": np.mean(xs[start_idx:i]),
                        "y_mean_deg": np.mean(ys[start_idx:i]),
                    })
            continue

        if velocities[i] < velocity_threshold:
            if not in_fixation:
                in_fixation = True
                start_idx = i
        else:
            if in_fixation:
                in_fixation = False
                t_start = timestamps[start_idx]
                t_end = timestamps[i - 1]
                duration = (t_end - t_start) * 1000
                if duration >= duration_threshold_ms:
                    fixations.append({
                        "start_time": t_start,
                        "end_time": t_end,
                        "duration_ms": duration,
                        "x_mean_deg": np.mean(xs[start_idx:i]),
                        "y_mean_deg": np.mean(ys[start_idx:i]),
                    })

    if in_fixation:
        t_start = timestamps[start_idx]
        t_end = timestamps[-1]
        duration = (t_end - t_start) * 1000
        if duration >= duration_threshold_ms:
            fixations.append({
                "start_time": t_start,
                "end_time": t_end,
                "duration_ms": duration,
                "x_mean_deg": np.mean(xs[start_idx:]),
                "y_mean_deg": np.mean(ys[start_idx:]),
            })

    return pd.DataFrame(fixations)


In [47]:
def process_pose_df_ivt(df):
    df_prepared = prepare_pose_df(df)
    df_interp = interpolate_missing(df_prepared, time_col="epoch_sec", max_gap_ms=100)
    df_filtered = apply_gaussian_filter_by_block(df_interp, col_x="interp_x", col_y="interp_y", sigma=1.0)
    fix_df = detect_fixations_ivt(df_filtered, velocity_threshold=100, duration_threshold_ms=100)
    return fix_df

In [48]:
fix_df = process_pose_df_ivt(pose1_df)
print(fix_df)


      start_time      end_time  duration_ms  x_mean_deg  y_mean_deg
0   1.732521e+09  1.732521e+09   731.999874    0.474628    0.598110
1   1.732521e+09  1.732521e+09   509.999990    1.031713    3.443667
2   1.732521e+09  1.732521e+09   219.000101    0.917628    2.768350
3   1.732521e+09  1.732521e+09   477.999926   -0.164774   -0.514441
4   1.732521e+09  1.732521e+09   363.999844    0.358916    0.412127
..           ...           ...          ...         ...         ...
67  1.732521e+09  1.732521e+09   654.999971   -0.805175    2.024153
68  1.732521e+09  1.732521e+09   535.000086   -0.315271    0.064781
69  1.732521e+09  1.732521e+09   174.999952    0.149287    0.636868
70  1.732521e+09  1.732521e+09   585.999966   -0.953916    0.824927
71  1.732521e+09  1.732521e+09   583.999872    0.117083    0.370749

[72 rows x 5 columns]


In [None]:
def calculate_true_precision(pose_df, fix_df):
    results = []
    for _, row in fix_df.iterrows():
        start = row["start_time"]
        end = row["end_time"]
        fixation_data = pose_df[
            (pose_df["epoch_sec"] >= start) & 
            (pose_df["epoch_sec"] <= end)
        ]
        xs = fixation_data["filtered_x"].dropna().to_numpy()
        ys = fixation_data["filtered_y"].dropna().to_numpy()
        if len(xs) > 1 and len(ys) > 1:
            std_x = np.std(xs)
            std_y = np.std(ys)
            precision_rms = np.sqrt(std_x**2 + std_y**2)
        else:
            std_x = std_y = precision_rms = np.nan
        results.append({
            "start_time": start,
            "end_time": end,
            "duration_ms": row["duration_ms"],
            "x_mean_deg": row["x_mean_deg"],
            "y_mean_deg": row["y_mean_deg"],
            "x_std_deg": std_x,
            "y_std_deg": std_y,
            "precision_rms_deg": precision_rms,
            "num_samples": len(xs)
        })
    return pd.DataFrame(results)


In [None]:
from math import atan, sqrt


def evaluate_precision(df):
    
    # validity_sum == 2 のデータのみを使う
    valid_df = df[df["validity_sum"] == 2].copy()

    std_x = valid_df["mean_x"].std()
    std_y = valid_df["mean_y"].std()
    
    precision_rms = sqrt(std_x**2 + std_y**2)
    
    precision_cm = precision_rms * diag_lefngth_cm
    precision_deg = atan(precision_cm / viewing_distance_cm)



    # サマリデータの返却
    return {
        "subject_id": subject_id,
        "task_id": experiment_id,
        "valid_data_count": len(valid_df),
        "precision_rms": precision_rms,
        "precision_cm": precision_cm,
        "precision_deg": precision_deg
    }