In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# !wget -O /content/drive/MyDrive/Capstone/dataset/gaze.zip "https://zenodo.org/records/14794876/files/gaze.zip?download=1"
# !wget -O /content/drive/MyDrive/Capstone/dataset/pupil.zip "https://zenodo.org/records/14794876/files/pupil.zip?download=1"
# !wget -O /content/drive/MyDrive/Capstone/dataset/videostream.zip "https://zenodo.org/records/14794876/files/videostream.zip?download=1"
# !wget -O /content/drive/MyDrive/Capstone/dataset/dataset_description.json "https://zenodo.org/records/14794876/files/dataset_description.json?download=1"
# !wget -O /content/drive/MyDrive/Capstone/dataset/core.zip "https://zenodo.org/records/14794876/files/core.zip?download=1"

In [None]:
# !unzip -q /content/drive/MyDrive/Capstone/dataset/gaze.zip -d /content/drive/MyDrive/Capstone/dataset/gaze
# !unzip -q /content/drive/MyDrive/Capstone/dataset/pupil.zip -d /content/drive/MyDrive/Capstone/dataset/pupil
# !unzip -q /content/drive/MyDrive/Capstone/dataset/videostream.zip -d /content/drive/MyDrive/Capstone/dataset/videostream
# !unzip -q /content/drive/MyDrive/Capstone/dataset/core.zip -d /content/drive/MyDrive/Capstone/dataset/core

In [None]:
from tqdm.auto import tqdm
import re, glob, numpy as np, pandas as pd
from pathlib import Path
from scipy import interpolate
import matplotlib.pyplot as plt
import cv2
import os
from PIL import Image
import glob

0. 메타 인덱스 만들기

In [None]:
# === 1) 기본 경로 설정 (구글 드라이브 내 dataset 루트) ===
BASE_DIR = Path("/content/drive/MyDrive/Capstone/dataset")

GAZE_ROOT   = BASE_DIR / "gaze"
PUPIL_ROOT  = BASE_DIR / "pupil"
VIDEO_ROOT  = BASE_DIR / "videostream"
EVENTS_ROOT = BASE_DIR / "events"

OUT_GAZE   = BASE_DIR / "manifest_gaze.csv"
OUT_PUPIL  = BASE_DIR / "manifest_pupil.csv"
OUT_VIDEO  = BASE_DIR / "manifest_videostream.csv"
OUT_EVENTS = BASE_DIR / "manifest_events.csv"

In [None]:
# === 2) 구현: 스캐닝 & CSV 저장 ===
import re
import pandas as pd
from typing import Optional, Tuple, Dict, List

FNAME_RE = re.compile(
    r"sub-(?P<sub>[^/_\s]+).*?task-(?P<task>[^/_\s]+).*?run-(?P<run>\d+)",
    re.IGNORECASE,
)

def parse_stem(name: str) -> Optional[Tuple[str, str, int]]:
    m = FNAME_RE.search(name)
    if not m:
        return None
    return m.group("sub"), m.group("task"), int(m.group("run"))

def find_dir_subject(p: Path) -> Optional[str]:
    for anc in p.parents:
        if anc.name.startswith("sub-") and len(anc.name) > 4:
            return anc.name[4:]
    return None

def scan_by_keyword(root: Path, include_keywords: List[str]) -> Dict[Tuple[str, str, int], Path]:
    mapping: Dict[Tuple[str, str, int], Path] = {}
    if not root.exists():
        print(f"[WARN] Root not found: {root}")
        return mapping

    scanned, passed_kw, parsed_ok = 0, 0, 0
    for pat in ("*.tsv", "*.tsv.gz"):
        for p in root.rglob(pat):
            scanned += 1
            name_low = p.name.lower()
            if not any(kw in name_low for kw in include_keywords):
                continue
            passed_kw += 1

            parsed = parse_stem(p.name)
            if not parsed:
                print(f"[PARSE_FAIL] {p.name}")
                continue
            parsed_ok += 1

            sub_file, task, run = parsed
            sub_dir = find_dir_subject(p) or sub_file
            key = (sub_dir, task, run)
            mapping.setdefault(key, p)

    print(f"[SCAN] root={root}")
    print(f"  scanned={scanned}, passed_keyword={passed_kw}, parsed_ok={parsed_ok}, mapped={len(mapping)}")
    return mapping

def build_manifest(mapping: Dict[Tuple[str, str, int], Path], modality: str) -> pd.DataFrame:
    rows: List[Dict] = []
    for (sub, task, run), path in sorted(mapping.items(), key=lambda x: (x[0][0], x[0][1], x[0][2])):
        parsed = parse_stem(path.name)
        fname_sub = parsed[0] if parsed else ""
        dir_sub = find_dir_subject(path) or ""
        sub_mismatch = (dir_sub != "" and fname_sub != "" and dir_sub != fname_sub)
        rows.append({
            "sub": sub,
            "task": task,
            "run": run,
            f"{modality}_path": str(path),
            "dir_sub": dir_sub,
            "fname_sub": fname_sub,
            "sub_mismatch": bool(sub_mismatch),
            "key": f"sub-{sub}_task-{task}_run-{run}",
        })

    # ✅ 빈 DF여도 컬럼 헤더는 유지
    cols = ["sub","task","run", f"{modality}_path","dir_sub","fname_sub","sub_mismatch","key"]
    df = pd.DataFrame(rows, columns=cols)
    if not df.empty:
        df = df.sort_values(["sub", "task", "run"]).reset_index(drop=True)
    else:
        print(f"[INFO] Empty manifest for modality={modality} (no rows).")
    return df

def save_csv(df: pd.DataFrame, out_path: Path):
    out_path.parent.mkdir(parents=True, exist_ok=True)
    df.to_csv(out_path, index=False)
    print(f"[OK] saved: {out_path} (rows={len(df)})")

def build_and_save_all():
    # 키워드 집합을 넓게 (이름 변형을 대비)
    gaze_map   = scan_by_keyword(GAZE_ROOT,   ["gaze_physio", "gaze"])
    pupil_map  = scan_by_keyword(PUPIL_ROOT,  ["pupil_physio", "pupil"])
    video_map  = scan_by_keyword(VIDEO_ROOT,  ["videostream", "video_physio", "video"])
    events_map = scan_by_keyword(EVENTS_ROOT, ["events", "event"])

    df_gaze   = build_manifest(gaze_map,   "gaze")
    df_pupil  = build_manifest(pupil_map,  "pupil")
    df_video  = build_manifest(video_map,  "video")
    df_events = build_manifest(events_map, "events")

    save_csv(df_gaze, OUT_GAZE)
    save_csv(df_pupil, OUT_PUPIL)
    save_csv(df_video, OUT_VIDEO)
    save_csv(df_events, OUT_EVENTS)
     # 머리 확인
    for name, df in [("GAZE", df_gaze), ("PUPIL", df_pupil), ("VIDEO", df_video), ("EVENTS", df_events)]:
        print(f"\n[{name}] rows: {len(df)}")
        if not df.empty:
            display(df.head(10))
    return df_gaze, df_pupil, df_video, df_events

df_gaze, df_pupil, df_video, df_events = build_and_save_all()

[SCAN] root=/content/drive/MyDrive/Capstone/dataset/gaze
  scanned=273, passed_keyword=273, parsed_ok=273, mapped=273
[SCAN] root=/content/drive/MyDrive/Capstone/dataset/pupil
  scanned=273, passed_keyword=273, parsed_ok=273, mapped=273
[SCAN] root=/content/drive/MyDrive/Capstone/dataset/videostream
  scanned=265, passed_keyword=265, parsed_ok=265, mapped=265
[SCAN] root=/content/drive/MyDrive/Capstone/dataset/events
  scanned=547, passed_keyword=273, parsed_ok=273, mapped=273
[OK] saved: /content/drive/MyDrive/Capstone/dataset/manifest_gaze.csv (rows=273)
[OK] saved: /content/drive/MyDrive/Capstone/dataset/manifest_pupil.csv (rows=273)
[OK] saved: /content/drive/MyDrive/Capstone/dataset/manifest_videostream.csv (rows=265)
[OK] saved: /content/drive/MyDrive/Capstone/dataset/manifest_events.csv (rows=273)

[GAZE] rows: 273


Unnamed: 0,sub,task,run,gaze_path,dir_sub,fname_sub,sub_mismatch,key
0,acl,fer,0,/content/drive/MyDrive/Capstone/dataset/gaze/s...,acl,acl,False,sub-acl_task-fer_run-0
1,acl,fer,1,/content/drive/MyDrive/Capstone/dataset/gaze/s...,acl,acl,False,sub-acl_task-fer_run-1
2,acl,fer,2,/content/drive/MyDrive/Capstone/dataset/gaze/s...,acl,acl,False,sub-acl_task-fer_run-2
3,acl,fer,3,/content/drive/MyDrive/Capstone/dataset/gaze/s...,acl,acl,False,sub-acl_task-fer_run-3
4,adr,fer,0,/content/drive/MyDrive/Capstone/dataset/gaze/s...,adr,adr,False,sub-adr_task-fer_run-0
5,adr,fer,1,/content/drive/MyDrive/Capstone/dataset/gaze/s...,adr,adr,False,sub-adr_task-fer_run-1
6,adr,fer,2,/content/drive/MyDrive/Capstone/dataset/gaze/s...,adr,adr,False,sub-adr_task-fer_run-2
7,aerj,fer,0,/content/drive/MyDrive/Capstone/dataset/gaze/s...,aerj,aerj,False,sub-aerj_task-fer_run-0
8,aerj,fer,1,/content/drive/MyDrive/Capstone/dataset/gaze/s...,aerj,aerj,False,sub-aerj_task-fer_run-1
9,aerj,fer,2,/content/drive/MyDrive/Capstone/dataset/gaze/s...,aerj,aerj,False,sub-aerj_task-fer_run-2



[PUPIL] rows: 273


Unnamed: 0,sub,task,run,pupil_path,dir_sub,fname_sub,sub_mismatch,key
0,acl,fer,0,/content/drive/MyDrive/Capstone/dataset/pupil/...,acl,acl,False,sub-acl_task-fer_run-0
1,acl,fer,1,/content/drive/MyDrive/Capstone/dataset/pupil/...,acl,acl,False,sub-acl_task-fer_run-1
2,acl,fer,2,/content/drive/MyDrive/Capstone/dataset/pupil/...,acl,acl,False,sub-acl_task-fer_run-2
3,acl,fer,3,/content/drive/MyDrive/Capstone/dataset/pupil/...,acl,acl,False,sub-acl_task-fer_run-3
4,adr,fer,0,/content/drive/MyDrive/Capstone/dataset/pupil/...,adr,adr,False,sub-adr_task-fer_run-0
5,adr,fer,1,/content/drive/MyDrive/Capstone/dataset/pupil/...,adr,adr,False,sub-adr_task-fer_run-1
6,adr,fer,2,/content/drive/MyDrive/Capstone/dataset/pupil/...,adr,adr,False,sub-adr_task-fer_run-2
7,aerj,fer,0,/content/drive/MyDrive/Capstone/dataset/pupil/...,aerj,aerj,False,sub-aerj_task-fer_run-0
8,aerj,fer,1,/content/drive/MyDrive/Capstone/dataset/pupil/...,aerj,aerj,False,sub-aerj_task-fer_run-1
9,aerj,fer,2,/content/drive/MyDrive/Capstone/dataset/pupil/...,aerj,aerj,False,sub-aerj_task-fer_run-2



[VIDEO] rows: 265


Unnamed: 0,sub,task,run,video_path,dir_sub,fname_sub,sub_mismatch,key
0,acl,fer,0,/content/drive/MyDrive/Capstone/dataset/videos...,acl,acl,False,sub-acl_task-fer_run-0
1,acl,fer,1,/content/drive/MyDrive/Capstone/dataset/videos...,acl,acl,False,sub-acl_task-fer_run-1
2,acl,fer,2,/content/drive/MyDrive/Capstone/dataset/videos...,acl,acl,False,sub-acl_task-fer_run-2
3,acl,fer,3,/content/drive/MyDrive/Capstone/dataset/videos...,acl,acl,False,sub-acl_task-fer_run-3
4,adr,fer,0,/content/drive/MyDrive/Capstone/dataset/videos...,adr,adr,False,sub-adr_task-fer_run-0
5,adr,fer,1,/content/drive/MyDrive/Capstone/dataset/videos...,adr,adr,False,sub-adr_task-fer_run-1
6,adr,fer,2,/content/drive/MyDrive/Capstone/dataset/videos...,adr,adr,False,sub-adr_task-fer_run-2
7,aerj,fer,0,/content/drive/MyDrive/Capstone/dataset/videos...,aerj,aerj,False,sub-aerj_task-fer_run-0
8,aerj,fer,1,/content/drive/MyDrive/Capstone/dataset/videos...,aerj,aerj,False,sub-aerj_task-fer_run-1
9,aerj,fer,2,/content/drive/MyDrive/Capstone/dataset/videos...,aerj,aerj,False,sub-aerj_task-fer_run-2



[EVENTS] rows: 273


Unnamed: 0,sub,task,run,events_path,dir_sub,fname_sub,sub_mismatch,key
0,acl,fer,0,/content/drive/MyDrive/Capstone/dataset/events...,acl,acl,False,sub-acl_task-fer_run-0
1,acl,fer,1,/content/drive/MyDrive/Capstone/dataset/events...,acl,acl,False,sub-acl_task-fer_run-1
2,acl,fer,2,/content/drive/MyDrive/Capstone/dataset/events...,acl,acl,False,sub-acl_task-fer_run-2
3,acl,fer,3,/content/drive/MyDrive/Capstone/dataset/events...,acl,acl,False,sub-acl_task-fer_run-3
4,adr,fer,0,/content/drive/MyDrive/Capstone/dataset/events...,adr,adr,False,sub-adr_task-fer_run-0
5,adr,fer,1,/content/drive/MyDrive/Capstone/dataset/events...,adr,adr,False,sub-adr_task-fer_run-1
6,adr,fer,2,/content/drive/MyDrive/Capstone/dataset/events...,adr,adr,False,sub-adr_task-fer_run-2
7,aerj,fer,0,/content/drive/MyDrive/Capstone/dataset/events...,aerj,aerj,False,sub-aerj_task-fer_run-0
8,aerj,fer,1,/content/drive/MyDrive/Capstone/dataset/events...,aerj,aerj,False,sub-aerj_task-fer_run-1
9,aerj,fer,2,/content/drive/MyDrive/Capstone/dataset/events...,aerj,aerj,False,sub-aerj_task-fer_run-2


1. 교집합 manifest 만들기

In [None]:
# 1) 읽기
g = pd.read_csv(BASE_DIR / "manifest_gaze.csv")
p = pd.read_csv(BASE_DIR / "manifest_pupil.csv")
v = pd.read_csv(BASE_DIR / "manifest_videostream.csv")
e = pd.read_csv(BASE_DIR / "manifest_events.csv")

# 2) 키 컬럼 정리 (중복/타입)
for df in (g, p, v, e):
    # 혹시 중복 있으면 제거
    df.drop_duplicates(subset=["sub","task","run","key"], inplace=True)
    # 타입 안전
    df["sub"]  = df["sub"].astype(str)
    df["task"] = df["task"].astype(str)
    df["run"]  = df["run"].astype(int)
    df["key"]  = df["key"].astype(str)

# 3) 교집합 만들기
# (A) gaze+pupil+events
gp = (
    g[["sub","task","run","key","gaze_path"]]
    .merge(p[["sub","task","run","key","pupil_path"]], on=["sub","task","run","key"], how="inner")
    .merge(e[["sub","task","run","key","events_path"]], on=["sub","task","run","key"], how="inner")
)
gp.sort_values(["sub","task","run"], inplace=True)
gp_out = BASE_DIR / "manifest_align_gp_events.csv"
gp.to_csv(gp_out, index=False)

# (B) video+events
ve = (
    v[["sub","task","run","key","video_path"]]
    .merge(e[["sub","task","run","key","events_path"]], on=["sub","task","run","key"], how="inner")
)
ve.sort_values(["sub","task","run"], inplace=True)
ve_out = BASE_DIR / "manifest_align_video_events.csv"
ve.to_csv(ve_out, index=False)

# (C) cross-modal용: gaze+pupil+video+events (선택)
gpve = (
    gp[["sub","task","run","key","gaze_path","pupil_path","events_path"]]
    .merge(v[["sub","task","run","key","video_path"]], on=["sub","task","run","key"], how="inner")
)
gpve.sort_values(["sub","task","run"], inplace=True)
gpve_out = BASE_DIR / "manifest_align_crossmodal.csv"
gpve.to_csv(gpve_out, index=False)

print("[OK] saved:")
print(" -", gp_out, f"(rows={len(gp)})")
print(" -", ve_out, f"(rows={len(ve)})")
print(" -", gpve_out, f"(rows={len(gpve)})")

# 4) 간단 요약
print("\n[Counts]")
print("gaze:", len(g), "pupil:", len(p), "video:", len(v), "events:", len(e))
print("g∩p∩events:", len(gp), "video∩events:", len(ve), "g∩p∩v∩events:", len(gpve))

# 5) (선택) 누가 빠졌는지 진단 테이블
#    gp세트에는 있는데 비디오가 없는 key들
keys_gp = set(gp[["sub","task","run","key"]].itertuples(index=False, name=None))
keys_v  = set(v[["sub","task","run","key"]].itertuples(index=False, name=None))
missing_video_keys = sorted(keys_gp - (keys_gp & keys_v))  # gp에는 있는데 v에는 없음

print("\n[Examples missing VIDEO among GP+Events]:", len(missing_video_keys))
print(missing_video_keys[:10])  # 몇 개만 미리보기

[OK] saved:
 - /content/drive/MyDrive/Capstone/dataset/manifest_align_gp_events.csv (rows=273)
 - /content/drive/MyDrive/Capstone/dataset/manifest_align_video_events.csv (rows=265)
 - /content/drive/MyDrive/Capstone/dataset/manifest_align_crossmodal.csv (rows=265)

[Counts]
gaze: 273 pupil: 273 video: 265 events: 273
g∩p∩events: 273 video∩events: 265 g∩p∩v∩events: 265

[Examples missing VIDEO among GP+Events]: 8
[('agk', 'fer', 0, 'sub-agk_task-fer_run-0'), ('bcz', 'fer', 0, 'sub-bcz_task-fer_run-0'), ('bdn', 'fer', 0, 'sub-bdn_task-fer_run-0'), ('bxn', 'fer', 0, 'sub-bxn_task-fer_run-0'), ('ehk', 'fer', 0, 'sub-ehk_task-fer_run-0'), ('ksu', 'fer', 0, 'sub-ksu_task-fer_run-0'), ('mdt', 'fer', 0, 'sub-mdt_task-fer_run-0'), ('ubc', 'fer', 0, 'sub-ubc_task-fer_run-0')]


2. 각 tsv 파일을 청크 단위로 읽어 필요 컬럼만 추려서 타입 좁히기

In [None]:
BASE = Path("/content/drive/MyDrive/Capstone/dataset")

MANIFEST_G = BASE / "manifest_gaze.csv"
MANIFEST_P = BASE / "manifest_pupil.csv"
MANIFEST_V = BASE / "manifest_videostream.csv"

# 산출물 루트 (중간 산출물)
OUT_ROOT = BASE / "intermediate" / "parquet"
OUT_ROOT.mkdir(parents=True, exist_ok=True)

In [None]:
import pyarrow as pa
import pyarrow.parquet as pq
from typing import Optional, Tuple, List, Dict

NUMERIC_NAME_RE = re.compile(r"^[+-]?\d+(\.\d+)?(\.\d+)?$")  # '1', '1.0', '1.0.2' 같은 중복명 방지용

def _all_numeric_like(cols: List[str]) -> bool:
    return all(isinstance(c, str) and NUMERIC_NAME_RE.match(c) for c in cols)

def _dedupe(cols: List[str]) -> List[str]:
    seen, out = {}, []
    for c in cols:
        base = str(c)
        if base not in seen:
            seen[base] = 0; out.append(base)
        else:
            seen[base] += 1; out.append(f"{base}__{seen[base]}")
    return out

def _infer_time_col_from_sample(df: pd.DataFrame) -> str:
    best, best_step = None, np.inf
    for c in df.columns:
        s = pd.to_numeric(df[c], errors="coerce")
        if s.isna().mean() > 0.2:  # 결측 너무 많으면 제외
            continue
        vals = s.dropna().to_numpy()
        if len(vals) < 10:  # 샘플 너무 작으면 제외
            continue
        dif = np.diff(vals)
        if dif.size == 0:
            continue
        if np.nanpercentile(dif, 5) < -1e-6:  # 하위 5%가 음수면 단조 증가 아님
            continue
        if (vals.max() - vals.min()) < 0.05:
            continue
        med_step = float(np.nanmedian(dif))
        if med_step <= 0:
            continue
        if med_step < best_step:
            best_step, best = med_step, c

    if best is None:
        # 최후의 보루: 숫자 비율 높은 첫 컬럼, 없다면 첫 컬럼
        for c in df.columns:
            s = pd.to_numeric(df[c], errors="coerce")
            if s.notna().mean() > 0.8:
                return c
        return df.columns[0]
    return best


def detect_schema(tsv_path: str, modality: str, sample_rows: int = 2000) -> Dict:
    # 1) header=0 가정으로 1줄 읽어서 '헤더가 숫자처럼 보이는지' 판단
    head0 = pd.read_csv(
        tsv_path, sep="\t", nrows=1, header=0,
        engine="python", on_bad_lines="skip", encoding="utf-8"
    )
    header_is_numeric = _all_numeric_like([str(c) for c in head0.columns])

    if header_is_numeric:
        df_sample = pd.read_csv(
            tsv_path, sep="\t", nrows=sample_rows, header=None,
            engine="python", on_bad_lines="skip", encoding="utf-8"
        )
        names = [f"c{i}" for i in range(df_sample.shape[1])]
        df_sample.columns = names
        header = None
    else:
        df_sample = pd.read_csv(
            tsv_path, sep="\t", nrows=sample_rows, header=0,
            engine="python", on_bad_lines="skip", encoding="utf-8"
        )
        names = _dedupe([str(c) for c in df_sample.columns])
        df_sample.columns = names
        header = 0

    time_col = _infer_time_col_from_sample(df_sample)

    pats = {
        "gaze":  r"(gaze|x|y|theta|phi|conf|valid|blink)",
        "pupil": r"(pupil|diam|radius|area|size|conf|valid|left|right)",
        "video": r"(face|landmark|pose|angle|au|prob|score|^x_|^y_|^z_|frame|roi)"
    }
    rx = re.compile(pats.get(modality, ""), re.I)
    others = [c for c in df_sample.columns if c != time_col and rx.search(c)]
    if not others:
        # 규칙에 하나도 안 걸리면 숫자형 위주로 최대한 보존
        num_like = []
        for c in df_sample.columns:
            if c == time_col:
                continue
            s = pd.to_numeric(df_sample[c], errors="coerce")
            if s.notna().mean() > 0.5:
                num_like.append(c)
        others = num_like

    # 최종 usecols
    seen, usecols = set(), []
    for c in [time_col] + [x for x in others if x != time_col]:
        if c not in seen:
            seen.add(c); usecols.append(c)

    print(f"[detect_schema] header={header} names={len(names)} time_col={time_col} usecols={len(usecols)} (path={tsv_path.split('/')[-1]})")
    return {"header": header, "names": names if header is None else None, "time_col": time_col, "usecols": usecols}

In [None]:
def downcast_chunk(df: pd.DataFrame, time_col: str) -> pd.DataFrame:
    df[time_col] = pd.to_numeric(df[time_col], errors="coerce")
    for c in list(df.columns):
        if c == time_col:
            continue
        s = pd.to_numeric(df[c], errors="coerce")
        if s.isna().all():
            df.drop(columns=[c], inplace=True); continue
        if pd.api.types.is_float_dtype(s):
            df[c] = pd.to_numeric(s, errors="coerce", downcast="float")
        else:
            df[c] = pd.to_numeric(s, errors="coerce", downcast="integer")
    return df.dropna(subset=[time_col])

def add_prefix_and_time(df: pd.DataFrame, modality: str, time_col: str) -> pd.DataFrame:
    if time_col != "time":
        df = df.rename(columns={time_col: "time"})
    pref = modality.lower() + "_"
    rename = {}
    for c in df.columns:
        if c == "time":
            continue
        if not c.lower().startswith(pref):
            rename[c] = pref + c
    if rename:
        df = df.rename(columns=rename)
    return df

def tsv_to_parquet_stream(tsv_path: str, out_parquet: Path, modality: str, chunksize: int = 200_000) -> Dict:
    sch = detect_schema(tsv_path, modality=modality, sample_rows=2000)
    header, names, time_col, usecols = sch["header"], sch["names"], sch["time_col"], sch["usecols"]

    out_parquet.parent.mkdir(parents=True, exist_ok=True)
    writer, total_rows = None, 0
    tmin, tmax = np.inf, -np.inf

    read_kwargs = dict(
    sep="\t", usecols=usecols, chunksize=chunksize, header=header,
    engine="python", on_bad_lines="skip", encoding="utf-8"
)

    if header is None and names is not None:
        read_kwargs["names"] = names

    for chunk in pd.read_csv(tsv_path, **read_kwargs):
        # 샘플과 실제 청크 열 구성이 달라질 수 있음 → 교집합만 사용
        common = [c for c in usecols if c in chunk.columns]
        if not common or time_col not in common:
            continue
        chunk = chunk[common]

        chunk = downcast_chunk(chunk, time_col=time_col)
        if chunk.empty:
            continue

        chunk = add_prefix_and_time(chunk, modality=modality, time_col=time_col)

        tmin = min(tmin, float(chunk["time"].min()))
        tmax = max(tmax, float(chunk["time"].max()))
        total_rows += len(chunk)

        tbl = pa.Table.from_pandas(chunk, preserve_index=False)
        if writer is None:
            writer = pq.ParquetWriter(out_parquet, tbl.schema, compression="snappy")
        writer.write_table(tbl)

    if writer is not None:
        writer.close()

    return {
        "ok": writer is not None,
        "rows": int(total_rows),
        "tmin": None if not np.isfinite(tmin) else float(tmin),
        "tmax": None if not np.isfinite(tmax) else float(tmax),
        "out": str(out_parquet),
        "time_col_detected": time_col,
        "usecols": usecols,
        "header_mode": header,
    }


In [None]:
sample_tsv = "/content/drive/MyDrive/Capstone/dataset/gaze/sub-acl/beh/sub-acl_task-fer_run-0_recording-gaze_physio.tsv.gz"
out_pq = Path("/content/drive/MyDrive/Capstone/dataset/intermediate/parquet/gaze/test.parquet")

info = tsv_to_parquet_stream(sample_tsv, out_pq, modality="gaze", chunksize=100_000)
print(info)

[detect_schema] header=None names=17 time_col=c1 usecols=17 (path=sub-acl_task-fer_run-0_recording-gaze_physio.tsv.gz)
{'ok': True, 'rows': 11569, 'tmin': 0.0, 'tmax': 187.1905999999999, 'out': '/content/drive/MyDrive/Capstone/dataset/intermediate/parquet/gaze/test.parquet', 'time_col_detected': 'c1', 'usecols': ['c1', 'c0', 'c2', 'c3', 'c4', 'c5', 'c6', 'c7', 'c8', 'c9', 'c10', 'c11', 'c12', 'c13', 'c14', 'c15', 'c16'], 'header_mode': None}


In [None]:
def process_manifest_to_parquet(manifest_csv: Path, modality: str, path_col: str, base_out: Path) -> pd.DataFrame:
    dfm = pd.read_csv(manifest_csv)
    recs = []
    for i, r in dfm.iterrows():
        src = r[path_col]
        if not isinstance(src, str) or not len(src):
            continue
        sub, task, run, key = r["sub"], r["task"], int(r["run"]), r["key"]
        out_dir = base_out / modality / f"sub-{sub}" / f"task-{task}"
        out_path = out_dir / f"{key}_{modality}.parquet"

        try:
            info = tsv_to_parquet_stream(src, out_path, modality=modality, chunksize=200_000)
            info.update({"sub": sub, "task": task, "run": run, "key": key, "src": src})
            recs.append(info)
            print(f"[{modality.upper()}] {key} → rows={info['rows']}  out={out_path.name}")
        except Exception as e:
          recs.append({
              "ok": False, "rows": 0, "tmin": None, "tmax": None,
              "out": str(out_path),
              "time_col_detected": None,     # ← 통일
              "usecols": None,               # ← 통일
              "header_mode": None,           # ← 통일
             "sub": sub, "task": task, "run": run, "key": key, "src": src,
             "error": str(e)
       })

    log = pd.DataFrame(recs)
    log.sort_values(["sub","task","run"], inplace=True)
    log_path = base_out / f"parquet_index_{modality}.csv"
    log.to_csv(log_path, index=False)
    print(f"[OK] {modality} parquet index saved → {log_path}")
    return log

# 실행
log_g = process_manifest_to_parquet(MANIFEST_G, "gaze", "gaze_path", OUT_ROOT)
log_p = process_manifest_to_parquet(MANIFEST_P, "pupil", "pupil_path", OUT_ROOT)
log_v = process_manifest_to_parquet(MANIFEST_V, "video", "video_path", OUT_ROOT)


[detect_schema] header=None names=17 time_col=c1 usecols=17 (path=sub-acl_task-fer_run-0_recording-gaze_physio.tsv.gz)
[GAZE] sub-acl_task-fer_run-0 → rows=11569  out=sub-acl_task-fer_run-0_gaze.parquet
[detect_schema] header=None names=17 time_col=c1 usecols=17 (path=sub-acl_task-fer_run-1_recording-gaze_physio.tsv.gz)
[GAZE] sub-acl_task-fer_run-1 → rows=66809  out=sub-acl_task-fer_run-1_gaze.parquet
[detect_schema] header=None names=17 time_col=c1 usecols=17 (path=sub-acl_task-fer_run-2_recording-gaze_physio.tsv.gz)
[GAZE] sub-acl_task-fer_run-2 → rows=59651  out=sub-acl_task-fer_run-2_gaze.parquet
[detect_schema] header=None names=17 time_col=c1 usecols=17 (path=sub-acl_task-fer_run-3_recording-gaze_physio.tsv.gz)
[GAZE] sub-acl_task-fer_run-3 → rows=53510  out=sub-acl_task-fer_run-3_gaze.parquet
[detect_schema] header=None names=17 time_col=c5 usecols=17 (path=sub-adr_task-fer_run-0_recording-gaze_physio.tsv.gz)
[GAZE] sub-adr_task-fer_run-0 → rows=15685  out=sub-adr_task-fer_run-

In [None]:
# 결과 점검
idx_g = pd.read_csv(OUT_ROOT / "parquet_index_gaze.csv")
idx_p = pd.read_csv(OUT_ROOT / "parquet_index_pupil.csv")
idx_v = pd.read_csv(OUT_ROOT / "parquet_index_video.csv")

print("[gaze] files:", len(idx_g), "ok:", idx_g["ok"].sum(), "rows sum:", int(idx_g["rows"].sum()))
print("[pupil] files:", len(idx_p), "ok:", idx_p["ok"].sum(), "rows sum:", int(idx_p["rows"].sum()))
print("[video] files:", len(idx_v), "ok:", idx_v["ok"].sum(), "rows sum:", int(idx_v["rows"].sum()))

[gaze] files: 273 ok: 273 rows sum: 26260454
[pupil] files: 273 ok: 273 rows sum: 26260454
[video] files: 265 ok: 265 rows sum: 7772426
sample: /content/drive/MyDrive/Capstone/dataset/intermediate/parquet/gaze/sub-acl/task-fer/sub-acl_task-fer_run-0_gaze.parquet
11569 17 ['time', 'gaze_c0', 'gaze_c2', 'gaze_c3', 'gaze_c4', 'gaze_c5', 'gaze_c6', 'gaze_c7']


Unnamed: 0,time,gaze_c0,gaze_c2,gaze_c3,gaze_c4,gaze_c5,gaze_c6,gaze_c7,gaze_c8,gaze_c9,gaze_c10,gaze_c11,gaze_c12,gaze_c13,gaze_c14,gaze_c15,gaze_c16
0,0.0,0.0,0.51117,0.48432,2245.168213,2.65234,2834.0,1.0,0.54816,0.48367,1.0,0.54061,0.47649,1.0,0.54439,0.48008,1.0
1,0.0164,0.016183,0.51224,0.48416,2245.168213,2.6687,2834.0,1.0,0.54816,0.48367,1.0,0.54295,0.46922,1.0,0.54556,0.47644,1.0
2,0.0324,0.032366,0.51217,0.48443,2245.168213,2.68481,2834.0,1.0,0.51973,0.51671,1.0,0.50585,0.4422,1.0,0.51279,0.47946,1.0
3,0.0485,0.048549,0.51209,0.4847,2245.168213,2.70093,2834.0,1.0,0.50982,0.50804,1.0,0.49013,0.43694,1.0,0.49997,0.47249,1.0
4,0.0647,0.064731,0.512,0.48459,2245.168213,2.71704,2834.0,1.0,0.50593,0.49329,1.0,0.49212,0.42916,1.0,0.49902,0.46122,1.0


In [None]:
# 한 파일 열어보기 (예시)
sample_path = idx_v[idx_v["ok"]].iloc[0]["out"]
print("sample:", sample_path)

import pyarrow.parquet as pq
tbl = pq.read_table(sample_path)
print(tbl.num_rows, tbl.num_columns, tbl.column_names[:8])
tbl.to_pandas().head()

sample: /content/drive/MyDrive/Capstone/dataset/intermediate/parquet/video/sub-acl/task-fer/sub-acl_task-fer_run-0_video.parquet
7004 715 ['time', 'video_c0', 'video_c1', 'video_c2', 'video_c4', 'video_c5', 'video_c6', 'video_c7']


Unnamed: 0,time,video_c0,video_c1,video_c2,video_c4,video_c5,video_c6,video_c7,video_c8,video_c9,...,video_c705,video_c706,video_c707,video_c708,video_c709,video_c710,video_c711,video_c712,video_c713,video_c714
0,0.025,0.0,23774.0,0,0.98,1,0.178098,0.29189,-0.939724,0.043614,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,0.05,0.026725,23775.0,0,0.98,1,0.212237,0.329441,-0.920013,0.03522,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,0.075,0.053449,23776.0,0,0.98,1,0.188878,0.309838,-0.93184,0.023445,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,0.1,0.080174,23777.0,0,0.98,1,0.181394,0.315983,-0.931263,0.031402,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,0.125,0.106899,23778.0,0,0.98,1,0.166685,0.294618,-0.940965,0.032511,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


2. events 정리(run 단위)

In [None]:
BASE = Path("/content/drive/MyDrive/Capstone/dataset")

# 입력 manifest (이미 만들어둔 것들)
MANIFEST_EVENTS = BASE / "manifest_events.csv"
ALIGN_GP       = BASE / "manifest_align_gp_events.csv"        # gaze+pupil+events 교집합 (선택)
ALIGN_VE       = BASE / "manifest_align_video_events.csv"     # video+events 교집합 (선택)

# 산출물 폴더
OUT_EVENTS_DIR = BASE / "intermediate" / "events_clean"
OUT_EVENTS_DIR.mkdir(parents=True, exist_ok=True)

In [None]:
from typing import Union, List

def resolve_existing_tsv(p: Union[str, Path]) -> Path:
    P = Path(str(p)).expanduser()
    if P.exists(): return P
    # .tsv.gz ↔ .tsv 전환
    if P.suffixes[-2:] == [".tsv", ".gz"]:
        alt = P.with_suffix("")
        if alt.exists(): return alt
    elif P.suffix == ".tsv":
        alt = P.with_suffix(".tsv.gz")
        if alt.exists(): return alt
    # stem 기반 탐색
    parent = P.parent
    if parent.exists():
        stem = P.stem[:-4] if P.stem.endswith(".tsv") else P.stem
        cands: List[Path] = list(parent.glob(stem + "*.tsv*"))
        if cands:
            cands.sort(key=lambda x: (len(str(x)), -x.stat().st_mtime))
            return cands[0]
    raise FileNotFoundError(f"Cannot resolve path: {p}")

In [None]:
def _lower_map(cols):
    """원래 컬럼명을 소문자 사전으로 맵핑 {lower: original}."""
    return {c.lower(): c for c in cols}

def _pick_first(d, candidates):
    """후보 중 존재하는 첫 컬럼명을 원본 이름으로 반환."""
    for k in candidates:
        if k in d: return d[k]
    return None

def read_events_raw(ev_path: str, nrows: int | None = None) -> pd.DataFrame:
    # C 엔진 → 실패 시 python 엔진
    try:
        return pd.read_csv(ev_path, sep="\t", nrows=nrows, engine="c", encoding="utf-8")
    except Exception:
        return pd.read_csv(ev_path, sep="\t", nrows=nrows, engine="python", on_bad_lines="skip", encoding="utf-8")

def standardize_events(ev_path: str) -> pd.DataFrame:
    """
    events.tsv(.gz)을 읽어서 (trial_id, t_start, t_end, condition)만 표준화해 반환.
    onset+duration만 있으면 t_end = onset+duration 계산.
    condition은 없으면 NaN으로 둠.
    """
    ev_path = str(resolve_existing_tsv(ev_path))
    df = read_events_raw(ev_path)
    if df.empty:
        raise ValueError(f"Empty events: {ev_path}")
    d = _lower_map(df.columns)

    # 후보들
    start_candidates = ["t_start","start","onset","tstart","trial_start","trial_onset"]
    end_candidates   = ["t_end","end","offset","tend","trial_end","trial_offset"]
    dur_candidates   = ["duration","dur","trial_duration"]
    id_candidates    = ["trial_id","trial","trl","id","event_id","trialindex","trial_index","index"]
    cond_candidates  = ["condition","trial_type","event","label","stim","stimulus","emotion","cond","type"]

    start_col = _pick_first(d, start_candidates)
    end_col   = _pick_first(d, end_candidates)
    dur_col   = _pick_first(d, dur_candidates)
    id_col    = _pick_first(d, id_candidates)
    cond_col  = _pick_first(d, cond_candidates)

    # 최소 요구: start는 있어야 함
    if start_col is None:
        # 첫 컬럼을 start로 가정 (예외적 케이스)
        start_col = list(df.columns)[0]

    # end 없으면 duration으로 계산
    if end_col is None and dur_col is not None:
        df["__t_end__"] = pd.to_numeric(df[start_col], errors="coerce") + pd.to_numeric(df[dur_col], errors="coerce")
        end_work = "__t_end__"
    else:
        end_work = end_col

    # id 없으면 0..N-1 부여
    if id_col is None:
        df["__trial_id__"] = np.arange(len(df), dtype=np.int64)
        id_work = "__trial_id__"
    else:
        id_work = id_col

    # condition 없으면 NaN
    if cond_col is None:
        df["__cond__"] = np.nan
        cond_work = "__cond__"
    else:
        cond_work = cond_col

    out = pd.DataFrame({
        "trial_id": pd.to_numeric(df[id_work], errors="coerce"),
        "t_start":  pd.to_numeric(df[start_col], errors="coerce"),
        "t_end":    pd.to_numeric(df[end_work], errors="coerce") if end_work is not None else np.nan,
        "condition": df[cond_work].astype(str)
    })

    # duration만 있는 경우 end가 NaN일 수도 있음 → 드롭
    # (필요하면 t_end 결측은 이후 trial 잘라낼 수 없으므로 제외)
    out = out.dropna(subset=["trial_id","t_start"])
    # trial_id는 int로
    out["trial_id"] = out["trial_id"].astype(np.int64, errors="ignore")
    return out

def clean_intervals(ev_df: pd.DataFrame, clip_overlap: bool = True, eps: float = 1e-6,
                    reassign_ids: bool = False, min_duration: float = 0.0) -> pd.DataFrame:
    """
    - 음수/NaN 제거
    - t_end < t_start → t_end = t_start 로 보정
    - 오버랩 제거: t_end = min(t_end, next.t_start - eps)
    - min_duration 아래는 드롭
    - trial_id 재부여 옵션(reassign_ids=True)면 0..N-1로 재시퀀스
    """
    df = ev_df.copy()
    # t_end가 없으면 드랍
    df = df.dropna(subset=["t_start","t_end"])
    # 음수 제거/보정
    df = df[(df["t_start"] >= 0) & (df["t_end"] >= 0)]
    # 시간 정렬
    df = df.sort_values("t_start").reset_index(drop=True)
    # 역전 보정
    df.loc[df["t_end"] < df["t_start"], "t_end"] = df.loc[df["t_end"] < df["t_start"], "t_start"]
    # 오버랩 보정
    if clip_overlap and len(df) > 1:
        next_start = df["t_start"].shift(-1)
        need_clip = df["t_end"] > (next_start - eps)
        df.loc[need_clip & next_start.notna(), "t_end"] = (next_start - eps)
        # 마지막행의 t_end가 t_start보다 작아졌다면 다시 보정
        df.loc[df["t_end"] < df["t_start"], "t_end"] = df["t_start"]

    # duration
    df["duration"] = df["t_end"] - df["t_start"]
    if min_duration > 0:
        df = df[df["duration"] >= min_duration].reset_index(drop=True)

    # trial_id 재부여 (중복/깨짐 방지)
    if reassign_ids:
        df["trial_id"] = np.arange(len(df), dtype=np.int64)

    # condition 정리: 'nan' 문자열 → NaN
    df["condition"] = df["condition"].replace({"nan": np.nan, "None": np.nan})
    return df[["trial_id","t_start","t_end","duration","condition"]]


In [None]:
def load_restrict_keys() -> set[str]:
    keys = set()
    for path in [ALIGN_GP, ALIGN_VE]:
        if path.exists():
            dfk = pd.read_csv(path, usecols=["key"])
            keys.update(dfk["key"].astype(str).tolist())
    return keys

def process_events_manifest(manifest_events_csv: Path,
                            out_dir: Path,
                            restrict_keys: set[str] | None = None,
                            reassign_ids: bool = False,
                            min_duration: float = 0.0) -> pd.DataFrame:
    me = pd.read_csv(manifest_events_csv)
    if restrict_keys is not None:
        me = me[me["key"].astype(str).isin(restrict_keys)].reset_index(drop=True)
    me = me.drop_duplicates(subset=["sub","task","run","key","events_path"]).reset_index(drop=True)

    recs = []
    for _, r in me.iterrows():
        sub, task, run, key, ev_path = r["sub"], r["task"], int(r["run"]), r["key"], r["events_path"]
        try:
            raw = standardize_events(ev_path)
            clean = clean_intervals(raw, clip_overlap=True, eps=1e-6,
                                    reassign_ids=reassign_ids, min_duration=min_duration)

            out_run_dir = out_dir / f"sub-{sub}" / f"task-{task}"
            out_run_dir.mkdir(parents=True, exist_ok=True)
            out_pq = out_run_dir / f"{key}_events.parquet"

            # 저장
            import pyarrow as pa, pyarrow.parquet as pq
            pq.write_table(pa.Table.from_pandas(clean, preserve_index=False), out_pq)

            recs.append({
                "ok": True, "sub": sub, "task": task, "run": run, "key": key,
                "events_path": ev_path,
                "n_raw": len(raw), "n_clean": len(clean),
                "min_start": float(clean["t_start"].min()) if not clean.empty else None,
                "max_end": float(clean["t_end"].max()) if not clean.empty else None,
                "out": str(out_pq)
            })
            print(f"[EV] {key} → raw={len(raw)} clean={len(clean)} saved: {out_pq.name}")
        except Exception as e:
            recs.append({
                "ok": False, "sub": sub, "task": task, "run": run, "key": key,
                "events_path": ev_path, "error": str(e)
            })
            print(f"[ERR:EV] {key} → {e}")

    log = pd.DataFrame(recs).sort_values(["sub","task","run"]).reset_index(drop=True)
    log_path = out_dir / "events_clean_index.csv"
    log.to_csv(log_path, index=False)
    print(f"[OK] events clean index saved → {log_path}")
    print(log["ok"].value_counts(dropna=False))
    return log


In [None]:
restrict = load_restrict_keys()   # 없으면 빈 set 반환 가능
log_events = process_events_manifest(
    manifest_events_csv = MANIFEST_EVENTS,
    out_dir            = OUT_EVENTS_DIR,
    restrict_keys      = restrict,      # 없애면 모든 events 처리
    reassign_ids       = False,         # True면 0..N-1로 trial_id 재부여
    min_duration       = 0.0            # 너무 짧은 trial을 버리려면 초 단위 임계치 지정 (예: 0.1)
)


[EV] sub-acl_task-fer_run-0 → raw=32 clean=32 saved: sub-acl_task-fer_run-0_events.parquet
[EV] sub-acl_task-fer_run-1 → raw=211 clean=211 saved: sub-acl_task-fer_run-1_events.parquet
[EV] sub-acl_task-fer_run-2 → raw=207 clean=207 saved: sub-acl_task-fer_run-2_events.parquet
[EV] sub-acl_task-fer_run-3 → raw=207 clean=207 saved: sub-acl_task-fer_run-3_events.parquet
[EV] sub-adr_task-fer_run-0 → raw=32 clean=32 saved: sub-adr_task-fer_run-0_events.parquet
[EV] sub-adr_task-fer_run-1 → raw=213 clean=213 saved: sub-adr_task-fer_run-1_events.parquet
[EV] sub-adr_task-fer_run-2 → raw=178 clean=178 saved: sub-adr_task-fer_run-2_events.parquet
[EV] sub-aerj_task-fer_run-0 → raw=28 clean=28 saved: sub-aerj_task-fer_run-0_events.parquet
[EV] sub-aerj_task-fer_run-1 → raw=197 clean=197 saved: sub-aerj_task-fer_run-1_events.parquet
[EV] sub-aerj_task-fer_run-2 → raw=207 clean=207 saved: sub-aerj_task-fer_run-2_events.parquet
[EV] sub-aerj_task-fer_run-3 → raw=208 clean=208 saved: sub-aerj_task-

In [None]:
import pyarrow.parquet as pq
if (OUT_EVENTS_DIR / "events_clean_index.csv").exists():
    idx = pd.read_csv(OUT_EVENTS_DIR / "events_clean_index.csv")
    ok = idx[idx["ok"]]
    print("cleaned runs:", len(ok))
    if len(ok) > 0:
        df = pq.read_table(ok.iloc[0]["out"]).to_pandas()
        display(df.head())


cleaned runs: 273


Unnamed: 0,trial_id,t_start,t_end,duration,condition
0,1,0.0,0.001044,0.001044,sad
1,1,0.001045,0.714467,0.713422,sad
2,1,0.936941,36.421777,35.484836,sad
3,1,36.434549,37.143108,0.708558,sad
4,1,37.729786,41.567859,3.838073,sad


3. Trial 슬라이스 & 40Hz 리샘플링

In [None]:
import pyarrow as pa, pyarrow.parquet as pq
import pyarrow.dataset as ds

BASE = Path("/content/drive/MyDrive/Capstone/dataset")
PQ_ROOT = BASE / "intermediate" / "parquet"         # 앞에서 만든 파케
EV_CLEAN_DIR = BASE / "intermediate" / "events_clean"
OUT40 = BASE / "processed40"
(OUT40 / "gp").mkdir(parents=True, exist_ok=True)
(OUT40 / "video").mkdir(parents=True, exist_ok=True)

# 변환 로그(파케 경로 인덱스)
idx_g = pd.read_csv(PQ_ROOT / "parquet_index_gaze.csv")    # 'out' 컬럼에 파케 경로
idx_p = pd.read_csv(PQ_ROOT / "parquet_index_pupil.csv")
idx_v = pd.read_csv(PQ_ROOT / "parquet_index_video.csv")
ev_idx = pd.read_csv(EV_CLEAN_DIR / "events_clean_index.csv")  # 'out' 컬럼에 clean events 파케 경로

# key -> parquet path 매핑
g_map = dict(zip(idx_g["key"].astype(str), idx_g["out"].astype(str)))
p_map = dict(zip(idx_p["key"].astype(str), idx_p["out"].astype(str)))
v_map = dict(zip(idx_v["key"].astype(str), idx_v["out"].astype(str)))
e_map = dict(zip(ev_idx[ev_idx["ok"]]["key"].astype(str), ev_idx[ev_idx["ok"]]["out"].astype(str)))

In [None]:
# 슬라이스 + 리샘플 함수
from fnmatch import fnmatch

def read_parquet_time_range(parquet_path: str, t0: float, t1: float, cols: list[str] | None = None):
    """
    파케에서 time 범위만 읽기(pyarrow.dataset 필터 사용).
    """
    dataset = ds.dataset(parquet_path, format="parquet")
    flt = (ds.field("time") >= t0) & (ds.field("time") <= t1)
    if cols is None:
        table = dataset.to_table(filter=flt)
    else:
        table = dataset.to_table(filter=flt, columns=cols)
    return table.to_pandas()

def make_time_grid(t0: float, t1: float, hz: float) -> np.ndarray:
    step = 1.0 / hz
    # 끝점 포함/미포함은 자유지만, 보통 오른쪽 닫힘은 off-by-one 생김 → 반열림 권장
    n = int(np.floor((t1 - t0) / step)) + 1
    return t0 + np.arange(n, dtype="float64") * step

def _is_near_binary(s: pd.Series, tol=0.95):
    v = pd.to_numeric(s, errors="coerce").dropna()
    if v.empty:
        return False
    return (v.isin([0, 1]).mean() >= tol)

def _is_integerish_monotone(s: pd.Series):
    v = pd.to_numeric(s, errors="coerce").dropna().to_numpy()
    if v.size < 2:
        return False
    # 거의 정수?
    integerish = np.all(np.isfinite(v) & (np.abs(v - np.round(v)) < 1e-6))
    if not integerish:
        return False
    # 단조 증가(약간의 잡음 허용)
    dif = np.diff(v)
    return np.nanpercentile(dif, 5) >= -1e-6

def _choose_method(col: str, s: pd.Series, overrides: dict[str, str]):
    # 1) 와일드카드 override 우선
    for pattern, m in overrides.items():
        if fnmatch(col, pattern):   # m in {"linear","nearest","ffill"}
            return m
    # 2) 휴리스틱
    if _is_near_binary(s):         # 플래그/유효성
        return "ffill"
    if _is_integerish_monotone(s): # 프레임 인덱스류
        return "ffill"
    return "linear"                # 기본은 연속값 가정


def resample_to_grid(df: pd.DataFrame,
                     grid: np.ndarray,
                     nearest_tol: float = 0.05,
                     ffill_tol: float = 0.2,
                     method_overrides: dict[str,str] = None):
    """
    df: ['time', feat1, feat2, ...]  (time: float sec)
    grid: 대상 시간 그리드(초), 예: 40Hz -> make_time_grid(...)
    nearest_tol: 최근접 허용 반경(초)
    ffill_tol: 전진채움 허용 반경(초) (그 이상 오래 비면 NaN)
    method_overrides: {'*_valid*':'ffill', '*_frame*':'ffill', 'pupil_c4':'linear', ...}
    """
    if method_overrides is None:
        method_overrides = {}

    if df.empty:
        return pd.DataFrame({"time": grid})

    df = df.sort_values("time").drop_duplicates(subset=["time"])
    t_src = df["time"].to_numpy(dtype="float64")
    out = {"time": grid}
    grid_df = pd.DataFrame({"time": grid})

    for col in df.columns:
        if col == "time":
            continue
        s = pd.to_numeric(df[col], errors="coerce")
        method = _choose_method(col, s, method_overrides)

        # 유효 샘플만
        mask = s.notna().to_numpy()
        x = t_src[mask]; y = s[mask].to_numpy(dtype="float64")

        if len(x) == 0:
            out[col] = np.full_like(grid, np.nan, dtype="float64"); continue

        if method == "linear":
            # 선형보간: x가 2점 이상 있어야 함
            if len(np.unique(x)) < 2:
                out[col] = np.full_like(grid, np.nan, dtype="float64")
            else:
                out[col] = np.interp(grid, x, y)
        elif method == "nearest":
            # 최근접: merge_asof 사용 (허용 반경 밖은 NaN)
            tmp = pd.merge_asof(
                grid_df, pd.DataFrame({"time": x, col: y}).sort_values("time"),
                on="time", direction="nearest", tolerance=nearest_tol
            )
            out[col] = tmp[col].to_numpy(dtype="float64")
        elif method == "ffill":
            # 전진채움: 직전 관측을 가져오되, 직전-현재의 시간차가 ffill_tol보다 크면 NaN
            tmp = pd.merge_asof(
                grid_df, pd.DataFrame({"time": x, col: y}).sort_values("time"),
                on="time", direction="backward", tolerance=ffill_tol
            )
            out[col] = tmp[col].to_numpy(dtype="float64")
        else:
            # 예외: 기본은 선형
            if len(np.unique(x)) < 2:
                out[col] = np.full_like(grid, np.nan, dtype="float64")
            else:
                out[col] = np.interp(grid, x, y)

    # 다운캐스트(용량↓)
    out_df = pd.DataFrame(out)
    for c in out_df.columns:
        if c != "time":
            out_df[c] = out_df[c].astype("float32")
    return out_df

def save_parquet(df: pd.DataFrame, out_path: Path):
    out_path.parent.mkdir(parents=True, exist_ok=True)
    pq.write_table(pa.Table.from_pandas(df, preserve_index=False), out_path)


In [None]:
def process_gp_trials(target_hz: float = 40.0, force: bool = False):
    logs = []
    for key, ev_pq in e_map.items():
        g_pq = g_map.get(key, None)
        p_pq = p_map.get(key, None)
        if g_pq is None or p_pq is None:
            continue  # 둘 중 하나라도 없으면 건너뛰기

        # sub/task/run 추출 (key: 'sub-xxx_task-yyy_run-z')
        parts = dict(s.split("-") for s in key.split("_"))
        sub, task, run = parts["sub"], parts["task"], int(parts["run"])

        # 이벤트 로드
        ev = pq.read_table(ev_pq).to_pandas()  # columns: trial_id, t_start, t_end, duration, condition
        for _, r in ev.iterrows():
            tid = int(r["trial_id"]); t0 = float(r["t_start"]); t1 = float(r["t_end"])
            grid = make_time_grid(t0, t1, target_hz)

            out_dir = OUT40 / "gp" / f"sub-{sub}" / f"task-{task}"
            out_path = out_dir / f"{key}_trial-{tid:03d}_gp40.parquet"
            if out_path.exists() and not force:
                logs.append({"key": key, "trial_id": tid, "out": str(out_path), "skipped": True})
                continue

            # 각 모달 슬라이스+리샘플
            overrides = {"*_valid*":"ffill", "*_frame*":"ffill", "*_idx*":"ffill"}

            g_df = read_parquet_time_range(g_pq, t0, t1)
            g_df = resample_to_grid(g_df, grid, nearest_tol=0.05, ffill_tol=0.2,
                              method_overrides=overrides)

            p_df = read_parquet_time_range(p_pq, t0, t1)
            p_df = resample_to_grid(p_df, grid, nearest_tol=0.05, ffill_tol=0.2,
                              method_overrides=overrides)

            # 결합 (동일 grid라 inner로도 ok)
            merged = g_df.merge(p_df, on="time", how="inner", copy=False)

            save_parquet(merged, out_path)
            logs.append({"key": key, "trial_id": tid, "rows": len(merged), "out": str(out_path)})
        print(f"[GP40] {key}: {len(ev)} trials")
    log_df = pd.DataFrame(logs)
    log_df.to_csv(OUT40 / "gp" / "gp40_index.csv", index=False)
    print("[OK] gp40 saved →", OUT40 / "gp")
    return log_df

gp40_idx = process_gp_trials(target_hz=40.0, force=False)


[GP40] sub-acl_task-fer_run-0: 32 trials
[GP40] sub-acl_task-fer_run-1: 211 trials
[GP40] sub-acl_task-fer_run-2: 207 trials
[GP40] sub-acl_task-fer_run-3: 207 trials
[GP40] sub-adr_task-fer_run-0: 32 trials
[GP40] sub-adr_task-fer_run-1: 213 trials
[GP40] sub-adr_task-fer_run-2: 178 trials
[GP40] sub-aerj_task-fer_run-0: 28 trials
[GP40] sub-aerj_task-fer_run-1: 197 trials
[GP40] sub-aerj_task-fer_run-2: 207 trials
[GP40] sub-aerj_task-fer_run-3: 208 trials
[GP40] sub-afri_task-fer_run-0: 30 trials
[GP40] sub-afri_task-fer_run-1: 198 trials
[GP40] sub-afri_task-fer_run-2: 213 trials
[GP40] sub-afri_task-fer_run-3: 206 trials
[GP40] sub-agk_task-fer_run-0: 32 trials
[GP40] sub-agk_task-fer_run-1: 212 trials
[GP40] sub-agk_task-fer_run-2: 206 trials
[GP40] sub-agk_task-fer_run-3: 201 trials
[GP40] sub-almn_task-fer_run-0: 32 trials
[GP40] sub-almn_task-fer_run-1: 205 trials
[GP40] sub-almn_task-fer_run-2: 213 trials
[GP40] sub-almn_task-fer_run-3: 209 trials
[GP40] sub-bcxz_task-fer_run

In [None]:
def process_video_trials(target_hz: float = 40.0, force: bool = False):
    logs = []
    for key, ev_pq in e_map.items():
        v_pq = v_map.get(key, None)
        if v_pq is None:
            continue

        parts = dict(s.split("-") for s in key.split("_"))
        sub, task, run = parts["sub"], parts["task"], int(parts["run"])

        ev = pq.read_table(ev_pq).to_pandas()
        for _, r in ev.iterrows():
            tid = int(r["trial_id"]); t0 = float(r["t_start"]); t1 = float(r["t_end"])
            grid = make_time_grid(t0, t1, target_hz)

            out_dir = OUT40 / "video" / f"sub-{sub}" / f"task-{task}"
            out_path = out_dir / f"{key}_trial-{tid:03d}_video40.parquet"
            if out_path.exists() and not force:
                logs.append({"key": key, "trial_id": tid, "out": str(out_path), "skipped": True})
                continue

            overrides_video = {
                "*_valid*": "ffill",
                "*_detect*": "ffill",
                "*_frame*":  "ffill",
               "*_idx*":    "ffill",
            }

            v_df = read_parquet_time_range(v_pq, t0, t1)
            v_df = resample_to_grid(
                v_df, grid,
                nearest_tol=0.05,   # 최근접 허용 반경 50ms
                ffill_tol=0.2,      # 전진채움 최대 0.2s(=8프레임 @40Hz)
                method_overrides=overrides_video
            )

            save_parquet(v_df, out_path)
            logs.append({"key": key, "trial_id": tid, "rows": len(v_df), "out": str(out_path)})
        print(f"[VID40] {key}: {len(ev)} trials")
    log_df = pd.DataFrame(logs)
    log_df.to_csv(OUT40 / "video" / "video40_index.csv", index=False)
    print("[OK] video40 saved →", OUT40 / "video")
    return log_df

video40_idx = process_video_trials(target_hz=40.0, force=False)


[VID40] sub-acl_task-fer_run-0: 32 trials
[VID40] sub-acl_task-fer_run-1: 211 trials
[VID40] sub-acl_task-fer_run-2: 207 trials
[VID40] sub-acl_task-fer_run-3: 207 trials
[VID40] sub-adr_task-fer_run-0: 32 trials
[VID40] sub-adr_task-fer_run-1: 213 trials
[VID40] sub-adr_task-fer_run-2: 178 trials
[VID40] sub-aerj_task-fer_run-0: 28 trials
[VID40] sub-aerj_task-fer_run-1: 197 trials
[VID40] sub-aerj_task-fer_run-2: 207 trials
[VID40] sub-aerj_task-fer_run-3: 208 trials
[VID40] sub-afri_task-fer_run-0: 30 trials
[VID40] sub-afri_task-fer_run-1: 198 trials
[VID40] sub-afri_task-fer_run-2: 213 trials
[VID40] sub-afri_task-fer_run-3: 206 trials
[VID40] sub-agk_task-fer_run-1: 212 trials
[VID40] sub-agk_task-fer_run-2: 206 trials
[VID40] sub-agk_task-fer_run-3: 201 trials
[VID40] sub-almn_task-fer_run-0: 32 trials
[VID40] sub-almn_task-fer_run-1: 205 trials
[VID40] sub-almn_task-fer_run-2: 213 trials
[VID40] sub-almn_task-fer_run-3: 209 trials
[VID40] sub-bcxz_task-fer_run-0: 32 trials
[VID

In [None]:
# GP 하나 열어보기
ex = pd.read_csv(OUT40 / "gp" / "gp40_index.csv").query("skipped != True").iloc[0]["out"]
import pyarrow.parquet as pq
df = pq.read_table(ex).to_pandas()
print(df.shape, "has time:", "time" in df.columns)
print("grid Hz ≈", 1.0 / np.median(np.diff(df["time"].values)))
df.head()

(1, 38) has time: True
grid Hz ≈ nan


  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)


Unnamed: 0,time,gaze_c1,gaze_c2,gaze_c3,gaze_c4,gaze_c5,gaze_c6,gaze_c7,gaze_c8,gaze_c9,...,pupil_c12,pupil_c13,pupil_c14,pupil_c15,pupil_c16,pupil_c17,pupil_c18,pupil_c19,pupil_c20,pupil_c21
0,259.940531,,,,,,,,,,...,,,,,,,,,,


4. 데이터 전처리 결과 확인 및 (key, trial) 페어 인덱스로 매핑

In [None]:
# === 교집합 페어 인덱스 + 규모 요약 ===
BASE = Path("/content/drive/MyDrive/Capstone/dataset")
GP_IDX = BASE / "processed40" / "gp" / "gp40_index.csv"
V_IDX  = BASE / "processed40" / "video" / "video40_index.csv"
OUT    = BASE / "processed40"

# 인덱스 로드 & 유효만 필터
gp_idx = pd.read_csv(GP_IDX)
vid_idx = pd.read_csv(V_IDX)
# gp_idx = gp_idx[gp_idx.get("skipped") != True]  # <-- 이 줄 삭제
# vid_idx = vid_idx[vid_idx.get("skipped") != True]  # <-- 이 줄 삭제

# (key, trial_id) -> 경로 매핑
gp_map  = {(str(r["key"]), int(r["trial_id"])): str(r["out"]) for _, r in gp_idx.iterrows()}
vid_map = {(str(r["key"]), int(r["trial_id"])): str(r["out"]) for _, r in vid_idx.iterrows()}

# 교집합 (실제로 쓸 수 있는 trial만)
pairs = sorted(set(gp_map.keys()) & set(vid_map.keys()))

def parse_key(key: str):
    # key 형식: 'sub-ACL_task-fer_run-0'
    parts = dict(s.split("-") for s in key.split("_"))
    return parts["sub"], parts["task"], int(parts["run"])

rows = []
target_hz = 40.0  # 우리 리샘플 레이트

for key, tid in pairs:
    gp_path = gp_map[(key, tid)]
    vd_path = vid_map[(key, tid)]

    # 메타만(빠르게) 읽기
    pgp = pq.ParquetFile(gp_path)
    pvd = pq.ParquetFile(vd_path)

    n_steps_gp = pgp.metadata.num_rows
    n_steps_vd = pvd.metadata.num_rows
    # 길이(초) 추정: 같은 그리드이므로 (n-1)/Hz
    # (정확한 t_end - t_start를 원하면 'time' 컬럼만 읽어 first/last 차이 계산해도 OK)
    duration_s = (min(n_steps_gp, n_steps_vd) - 1) / target_hz if min(n_steps_gp, n_steps_vd) > 1 else 0.0

    # 스키마에서 컬럼명만 추출해 접두사별 피처 수 계산
    gp_cols = pgp.schema_arrow.names
    vd_cols = pvd.schema_arrow.names
    gaze_dim  = sum(c.startswith("gaze_")  for c in gp_cols)
    pupil_dim = sum(c.startswith("pupil_") for c in gp_cols)
    video_dim = sum(c.startswith("video_") for c in vd_cols)

    sub, task, run = parse_key(key)
    rows.append({
        "key": key, "trial_id": tid,
        "sub": sub, "task": task, "run": run,
        "gp_path": gp_path, "video_path": vd_path,
        "n_steps_gp": n_steps_gp, "n_steps_video": n_steps_vd,
        "duration_sec": duration_s,
        "gaze_dim": gaze_dim, "pupil_dim": pupil_dim, "video_dim": video_dim,
        "steps_match": (n_steps_gp == n_steps_vd),
    })

pair_df = pd.DataFrame(rows)
pair_path = OUT / "pair_index_gp_video40.csv"
pair_df.to_csv(pair_path, index=False)
print(f"[OK] pair_index saved → {pair_path}  (rows={len(pair_df)})")

# ====== 요약 리포트 ======
if len(pair_df):
    total_trials = len(pair_df)
    total_frames = int(pair_df[["n_steps_gp","n_steps_video"]].min(axis=1).sum())
    total_hours  = pair_df["duration_sec"].sum() / 3600.0
    dur_stats = pair_df["duration_sec"].describe(percentiles=[.25,.5,.75]).to_dict()

    print("\n=== Overall ===")
    print(f"Trials (intersection): {total_trials}")
    print(f"Total frames (@40Hz): {total_frames:,}")
    print(f"Total hours (@40Hz):  {total_hours:.2f} h")
    print("Duration (sec) stats:", {k: round(v,2) for k,v in dur_stats.items()})

    # 스텝 불일치(이론상 거의 0이어야 함)
    mis = pair_df[~pair_df["steps_match"]]
    print(f"Step-length mismatch trials: {len(mis)}")

    # 주제자/태스크별 분포
    by_sub  = pair_df.groupby("sub").size().sort_values(ascending=False)
    by_task = pair_df.groupby("task").size().sort_values(ascending=False)
    print("\nTop subjects by #trials:")
    print(by_sub.head(10))
    print("\nTop tasks by #trials:")
    print(by_task.head(10))

    # 피처 차원 요약
    print("\nFeature dims (across trials):")
    for name, col in [("gaze_dim","gaze_dim"), ("pupil_dim","pupil_dim"), ("video_dim","video_dim")]:
        s = pair_df[col]
        print(f"  {name}: min={int(s.min())}, max={int(s.max())}, median={int(s.median())}")
else:
    print("No intersection trials found.")


[OK] pair_index saved → /content/drive/MyDrive/Capstone/dataset/processed40/pair_index_gp_video40.csv  (rows=5781)

=== Overall ===
Trials (intersection): 5781
Total frames (@40Hz): 745,086
Total hours (@40Hz):  5.13 h
Duration (sec) stats: {'count': 5781.0, 'mean': 3.2, 'std': 6.64, 'min': 0.0, '25%': 0.0, '50%': 0.0, '75%': 0.93, 'max': 76.17}
Step-length mismatch trials: 0

Top subjects by #trials:
sub
acl     88
aerj    88
afri    88
almn    88
bcxz    88
cztf    88
cxy     88
blgv    88
ziym    88
zig     88
dtype: int64

Top tasks by #trials:
task
fer    5781
dtype: int64

Feature dims (across trials):
  gaze_dim: min=0, max=16, median=0
  pupil_dim: min=0, max=21, median=21
  video_dim: min=0, max=714, median=0


In [None]:
BASE = Path("/content/drive/MyDrive/Capstone/dataset")
PROC40   = BASE / "processed40"
GP_IDX   = PROC40 / "gp" / "gp40_index.csv"
VID_IDX  = PROC40 / "video" / "video40_index.csv"
EV_DIR   = BASE / "intermediate" / "events_clean"
EV_IDX   = EV_DIR / "events_clean_index.csv"

# 1) 인덱스 로드
gp_idx  = pd.read_csv(GP_IDX)
vid_idx = pd.read_csv(VID_IDX)
ev_idx  = pd.read_csv(EV_IDX)

# 유효만 (skipped != True)
gp_idx  = gp_idx[gp_idx.get("skipped") != True].copy()
vid_idx = vid_idx[vid_idx.get("skipped") != True].copy()
ev_ok   = ev_idx[ev_idx["ok"]].copy()

# 2) (key, trial_id) 교집합 만들기
pairs = pd.merge(
    gp_idx[["key","trial_id","out"]].rename(columns={"out":"gp_path"}),
    vid_idx[["key","trial_id","out"]].rename(columns={"out":"video_path"}),
    on=["key","trial_id"],
    how="inner"
)

# 3) 전체 trial(분모)과 duration 합계 계산: events clean에서 가져오기
total_trials = 0
total_duration = 0.0
dur_map = {}  # (key, trial_id) -> duration_sec

for _, r in ev_ok.iterrows():
    key = str(r["key"])
    ev_pq = r["out"]
    tb = pq.read_table(ev_pq, columns=["trial_id","t_start","t_end"])
    df = tb.to_pandas()
    df["duration"] = df["t_end"] - df["t_start"]
    total_trials += len(df)
    total_duration += float(df["duration"].sum())
    for tid, dur in zip(df["trial_id"].astype(int), df["duration"].astype(float)):
        dur_map[(key, tid)] = dur

# 4) 교집합 trial 길이 합계(분자)
pairs["duration_sec"] = pairs.apply(lambda r: float(dur_map.get((str(r["key"]), int(r["trial_id"])), np.nan)), axis=1)
usable_trials   = len(pairs)
usable_duration = float(pairs["duration_sec"].dropna().sum())

# 5) Run 커버리지(이벤트가 있는 run 중, gp & video 둘 다 있는 run)
runs_all   = set(ev_ok["key"].astype(str))
runs_gp    = set(gp_idx["key"].astype(str))
runs_vid   = set(vid_idx["key"].astype(str))
runs_both  = runs_all & runs_gp & runs_vid
run_cov_pct = (len(runs_both) / len(runs_all) * 100.0) if runs_all else 0.0

# 6) 비율 계산
trial_cov_pct    = (usable_trials / total_trials * 100.0) if total_trials else 0.0
duration_cov_pct = (usable_duration / total_duration * 100.0) if total_duration else 0.0

# 7) 교집합 페어 인덱스 저장(나중 학습용 매핑으로 재사용)
pairs_out = PROC40 / "pair_index_gp_video40.csv"
pairs.to_csv(pairs_out, index=False)

# 8) 요약 출력
print("=== Coverage Summary (intersection of gp40 & video40) ===")
print(f"Runs with events (cleaned):     {len(runs_all)}")
print(f"Runs usable (gp & video both):  {len(runs_both)}  ({run_cov_pct:.1f}%)")
print()
print(f"Trials total (events):          {total_trials}")
print(f"Trials usable (intersection):   {usable_trials}  ({trial_cov_pct:.1f}%)")
print()
print(f"Duration total (events):        {total_duration/3600.0:.2f} h")
print(f"Duration usable (intersection): {usable_duration/3600.0:.2f} h  ({duration_cov_pct:.1f}%)")
print()
print(f"[OK] pair_index saved → {pairs_out}")


=== Coverage Summary (intersection of gp40 & video40) ===
Runs with events (cleaned):     273
Runs usable (gp & video both):  224  (82.1%)

Trials total (events):          43448
Trials usable (intersection):   4822  (11.1%)

Duration total (events):        60.54 h
Duration usable (intersection): 10.61 h  (17.5%)

[OK] pair_index saved → /content/drive/MyDrive/Capstone/dataset/processed40/pair_index_gp_video40.csv


In [None]:
# (key,trial) 집합 비교
gp_set  = set(zip(gp_idx["key"].astype(str), gp_idx["trial_id"].astype(int)))
vd_set  = set(zip(vid_idx["key"].astype(str), vid_idx["trial_id"].astype(int)))
ev_set  = set( (k, int(tid)) for _,row in ev_ok.iterrows()
               for k, tid in pq.read_table(row["out"], columns=["trial_id"]).to_pandas()
                                 .assign(key=str(row["key"]))[["key","trial_id"]].itertuples(index=False) )

print("ev only:",  len(ev_set - gp_set - vd_set))
print("gp only:",  len(gp_set - vd_set))
print("video only:", len(vd_set - gp_set))
print("both:",     len(gp_set & vd_set))

ev only: 16
gp only: 15
video only: 959
both: 4822


------------- 전처리 시행착오 ------------

In [None]:
# 파일 경로 (원하는 tsv.gz 파일 지정)
file_path = "/content/drive/MyDrive/Capstone/dataset/gaze/sub-acl/beh/sub-acl_task-fer_run-0_recording-gaze_physio.tsv.gz"

# TSV + gzip 압축 → sep='\t', compression='gzip'
df_gaze = pd.read_csv(file_path, sep='\t', compression='gzip')

# print("Shape:", df_gaze.shape)
# print("Columns:", df_gaze.columns.tolist())
# df_gaze.head()

Shape: (11568, 17)
Columns: ['0.0', '0.0.1', '0.51117', '0.48432', '2245.1682', '2.65234', '2834.0', '1.0', '0.54816', '0.48367', '1.0.1', '0.54061', '0.47649', '1.0.2', '0.54439', '0.48008', '1.0.3']


Unnamed: 0,0.0,0.0.1,0.51117,0.48432,2245.1682,2.65234,2834.0,1.0,0.54816,0.48367,1.0.1,0.54061,0.47649,1.0.2,0.54439,0.48008,1.0.3
0,0.016183,0.0164,0.51224,0.48416,2245.1682,2.6687,2834.0,1.0,0.54816,0.48367,1.0,0.54295,0.46922,1.0,0.54556,0.47644,1.0
1,0.032366,0.0324,0.51217,0.48443,2245.1682,2.68481,2834.0,1.0,0.51973,0.51671,1.0,0.50585,0.4422,1.0,0.51279,0.47946,1.0
2,0.048549,0.0485,0.51209,0.4847,2245.1682,2.70093,2834.0,1.0,0.50982,0.50804,1.0,0.49013,0.43694,1.0,0.49997,0.47249,1.0
3,0.064731,0.0647,0.512,0.48459,2245.1682,2.71704,2834.0,1.0,0.50593,0.49329,1.0,0.49212,0.42916,1.0,0.49902,0.46122,1.0
4,0.080914,0.081,0.51325,0.48376,2245.1682,2.7334,2834.0,1.0,0.54647,0.4836,1.0,0.54867,0.44067,1.0,0.54757,0.46214,1.0


In [None]:
file_path = "/content/drive/MyDrive/Capstone/dataset/pupil/sub-acl/beh/sub-acl_task-fer_run-0_recording-pupil_physio.tsv.gz"

df_pupil = pd.read_csv(file_path, sep='\t', compression='gzip')

# print("Shape:", df_pupil.shape)
# print("Columns:", df_pupil.columns.tolist())
# df_gaze.head()

In [None]:
video_path = "/content/drive/MyDrive/Capstone/dataset/videostream/sub-acl/beh/sub-acl_task-fer_run-0_recording-videostream_physio.tsv.gz"

# gzip + tsv 읽기
df_video = pd.read_csv(video_path, sep="\t", compression="gzip")

# print("Shape:", df_video.shape)
# print("Columns:", df_video.columns.tolist()[:20])  # 앞쪽 20개 컬럼만 확인
# print(df_video.head())

In [None]:
events_path = "/content/drive/MyDrive/Capstone/dataset/core/sub-acl/sub-acl_task-fer_run-0_events.tsv"

df_events = pd.read_csv(events_path, sep="\t")

# print("Shape:", df_events.shape)
# print("Columns:", df_events.columns.tolist())
# print(df_events.head())

In [None]:
# ============ 경로 ============ #
ROOT = "/content/drive/MyDrive/Capstone/dataset"
GAZE_DIR, PUPIL_DIR = f"{ROOT}/gaze", f"{ROOT}/pupil"
VS_DIR, CORE_DIR    = f"{ROOT}/videostream", f"{ROOT}/core"

In [None]:
# 체크포인트 & 최종 결과 경로
CKPT_GP = Path("/content/drive/MyDrive/Capstone/checkpoints/gp");   CKPT_GP.mkdir(parents=True, exist_ok=True)
CKPT_V  = Path("/content/drive/MyDrive/Capstone/checkpoints/video"); CKPT_V.mkdir(parents=True, exist_ok=True)
OUT_GP  = Path("/content/drive/MyDrive/Capstone/df_sync_gp.parquet")
OUT_V   = Path("/content/drive/MyDrive/Capstone/df_sync_video.parquet")

In [None]:
# ============ 파일명에서 (sub, task, run) 키 추출 ============ #
PAT_PHYSIO = re.compile(r"sub-(?P<sub>[^_/]+).*task-(?P<task>[^_/]+).*run-(?P<run>\d+).*_physio\.tsv\.gz$")
PAT_EVENTS = re.compile(r"sub-(?P<sub>[^_/]+)_task-(?P<task>[^_/]+)_run-(?P<run>\d+)_events\.tsv(\.gz)?$")

def key_from_path(path, is_events=False):
    m = (PAT_EVENTS if is_events else PAT_PHYSIO).search(str(path))
    if not m: return None
    return (m.group("sub"), m.group("task"), int(m.group("run")))

In [None]:
# ============ time 컬럼 표준화(초 단위) ============ #
KNOWN_TIME_NAMES = ["time","timestamp","frame_time","ts","onset"]

def standardize_time(df: pd.DataFrame) -> pd.DataFrame:
    for c in KNOWN_TIME_NAMES:
        if c in df.columns:
            t = pd.to_numeric(df[c], errors="coerce"); break
    else:
        num_cols = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])]
        if not num_cols: raise ValueError("시간축 후보(수치형 컬럼)가 없습니다.")
        t = pd.to_numeric(df[num_cols[0]], errors="coerce")
    if float(t.dropna().max() or 0) > 1e4:  # ms → s
        t = t / 1000.0
    out = df.copy()
    if "time" in out.columns: out = out.drop(columns=["time"])
    out.insert(0, "time", t.values)
    return out

In [None]:
# ============ 로더들 ============ #
def load_physio(path: str) -> pd.DataFrame:
    return standardize_time(pd.read_csv(path, sep="\t", compression="gzip"))

def load_events(path: str) -> pd.DataFrame:
    ev = pd.read_csv(path, sep="\t", compression="infer")
    assert {"onset","duration"}.issubset(ev.columns), "events에 onset/duration 필요"
    ev = ev.copy()
    ev["t_start"] = pd.to_numeric(ev["onset"], errors="coerce")
    ev["t_end"]   = pd.to_numeric(ev["onset"], errors="coerce") + pd.to_numeric(ev["duration"], errors="coerce")
    ev["trial_id"] = np.arange(len(ev), dtype=int)
    return ev[["trial_id","t_start","t_end"] + [c for c in ev.columns if c not in ("trial_id","t_start","t_end")]]

In [None]:
# ============ 40Hz 격자 & 최근접 매핑 ============ #
def snap_to_grid(df: pd.DataFrame, t0: float, t1: float, hz: int = 40, prefix: str|None = None, tol: float|None = None) -> pd.DataFrame:
    seg = df[(df["time"] >= t0) & (df["time"] < t1)].sort_values("time")
    grid = pd.DataFrame({"time": np.arange(t0, max(t0, t1), 1.0/hz)})
    if seg.empty:
        if prefix:
            new_cols = ["time"] + [f"{prefix}{c}" for c in df.columns if c != "time"]
            grid = grid.reindex(columns=new_cols)  # NaN 자동 채움(성능 경고 회피)
        return grid
    tol = (0.5/hz) if tol is None else tol
    merged = pd.merge_asof(grid, seg, on="time", direction="nearest", tolerance=tol)
    if prefix:
        merged = merged.rename(columns={c:f"{prefix}{c}" for c in merged.columns if c!="time"})
    return merged

In [None]:
gaze_files   = glob.glob(f"{GAZE_DIR}/**/*gaze*_physio.tsv.gz", recursive=True)
pupil_files  = glob.glob(f"{PUPIL_DIR}/**/*pupil*_physio.tsv.gz", recursive=True)
vs_files     = list(set(glob.glob(f"{VS_DIR}/**/*videostream*_physio.tsv.gz", recursive=True) +
                        glob.glob(f"{VS_DIR}/**/*video*_physio.tsv.gz", recursive=True)))
events_files = glob.glob(f"{CORE_DIR}/**/*_events.tsv*", recursive=True)

gaze_map   = {key_from_path(p): p for p in gaze_files   if key_from_path(p)}
pupil_map  = {key_from_path(p): p for p in pupil_files  if key_from_path(p)}
video_map  = {key_from_path(p): p for p in vs_files     if key_from_path(p)}
events_map = {key_from_path(p, True): p for p in events_files if key_from_path(p, True)}
all_keys   = sorted(set(events_map.keys()))

print(f"[INDEX] runs with events: {len(all_keys)}  | gaze:{len(gaze_map)} pupil:{len(pupil_map)} video:{len(video_map)}")

[INDEX] runs with events: 273  | gaze:273 pupil:273 video:265


In [None]:
def count_total_trials(events_map):
    import pandas as pd
    total = 0
    for ev_path in events_map.values():
        total += len(pd.read_csv(ev_path, sep="\t", compression="infer"))
    return total

TOTAL_TRIALS = count_total_trials(events_map)
print(TOTAL_TRIALS)

43448
