In [25]:
import glob
from pathlib import Path
import pandas as pd
import numpy as np

SOFT_FAILS = []

def soft(msg):
    print(msg)
    SOFT_FAILS.append(msg)

def hard(msg):
    raise AssertionError(msg)

In [26]:
# Notebook path: Behavioural-Biometrics-Analysis/notebooks
BASE = Path.cwd().parents[0]
SESSIONS_DIR = BASE / "data" / "raw" / "sessions"

assert SESSIONS_DIR.exists(), f"sessions folder not found: {SESSIONS_DIR}"

AUTH_GLOB = str(SESSIONS_DIR / "*" / "auth_windows.csv")
EVENTS_GLOB = str(SESSIONS_DIR / "*" / "events.csv")

def load_many(pattern, kind):
    files = sorted(glob.glob(pattern))
    if not files:
        hard(f"[HARD FAIL] No {kind} files found with pattern: {pattern}")

    dfs = []
    for f in files:
        df = pd.read_csv(f)
        df["_path"] = f
        dfs.append(df)

    return pd.concat(dfs, ignore_index=True), files

auth, auth_files = load_many(AUTH_GLOB, "auth_windows")
events, event_files = load_many(EVENTS_GLOB, "events")

print("auth files:", len(auth_files), "rows:", len(auth))
print("events files:", len(event_files), "rows:", len(events))

display(auth.head(3))
display(events.head(3))

auth files: 1 rows: 7
events files: 1 rows: 663


Unnamed: 0,sessionId,participantId,sessionIndex,timeBucket,fatigue,inputDevice,typing_ikt_global_mean,typing_ikt_global_std,typing_ikt_global_iqr,typing_ikt_global_p95,...,tap_miss_rate_pct,tap_drift_rt,tap_error_recovery_miss_median,coupling_var_ikt,coupling_var_rt,coupling_var_ratio,windowIndex,windowStartMs,windowEndMs,_path
0,1ea8d3a866aa482f951c9734e8232bd4,p1,12,morning,4,touch,298.0,352.0,125.0,1157.0,...,0,,,123654.566205,,,0,537629,567629,/Users/will/Documents/Behavioural-Biometrics-D...
1,1ea8d3a866aa482f951c9734e8232bd4,p1,12,morning,4,touch,277.0,345.0,125.0,1183.0,...,0,,,119231.298447,,,1,552629,582629,/Users/will/Documents/Behavioural-Biometrics-D...
2,1ea8d3a866aa482f951c9734e8232bd4,p1,12,morning,4,touch,307.0,384.0,113.0,1220.0,...,0,,,147385.915125,,,2,567629,597629,/Users/will/Documents/Behavioural-Biometrics-D...


Unnamed: 0,sessionId,participantId,t,ms,dt,tISO,alcohol,elapsedMs,fatigue,inLen,...,timeBucket,vibration,wordDiff,wordId,wordLen,x,xPct,y,yPct,_path
0,1ea8d3a866aa482f951c9734e8232bd4,p1,session_start,537629,,2026-02-03T16:04:03.915Z,no,,4.0,,...,morning,none,,,,,,,,/Users/will/Documents/Behavioural-Biometrics-D...
1,1ea8d3a866aa482f951c9734e8232bd4,p1,word_shown,537629,0.0,2026-02-03T16:04:03.915Z,,,,,...,,,,364.0,5.0,,,,,/Users/will/Documents/Behavioural-Biometrics-D...
2,1ea8d3a866aa482f951c9734e8232bd4,p1,typing_reaction,538737,1108.0,2026-02-03T16:04:05.023Z,,,,,...,,,,,,,,,,/Users/will/Documents/Behavioural-Biometrics-D...


In [27]:
def session_id_from_path(p):
    return Path(p).parent.name

auth_sids = {session_id_from_path(p) for p in auth_files}
event_sids = {session_id_from_path(p) for p in event_files}

only_auth = sorted(auth_sids - event_sids)
only_events = sorted(event_sids - auth_sids)

if only_auth:
    soft(f"[SOFT FAIL] Sessions with auth_windows but missing events.csv: {only_auth}")
if only_events:
    soft(f"[SOFT FAIL] Sessions with events.csv but missing auth_windows.csv: {only_events}")

print("Paired sessions:", len(auth_sids & event_sids))

Paired sessions: 1


In [28]:
REQUIRED_AUTH = [
    "sessionId","participantId","windowIndex","windowStartMs","windowEndMs"
]
REQUIRED_EVENTS = [
    "sessionId","participantId","t","ms","dt","tISO"
]

def require_cols(df, cols, name):
    missing = [c for c in cols if c not in df.columns]
    if missing:
        hard(f"[HARD FAIL] {name} missing columns: {missing}")

require_cols(auth, REQUIRED_AUTH, "auth_windows")
require_cols(events, REQUIRED_EVENTS, "events")

for c in ["sessionId","participantId"]:
    if auth[c].isna().any():
        hard(f"[HARD FAIL] auth_windows missing {c}")
    if events[c].isna().any():
        hard(f"[HARD FAIL] events missing {c}")

print("Participants:", auth["participantId"].nunique())
print("Sessions:", auth["sessionId"].nunique())

Participants: 1
Sessions: 1


In [29]:
# Uniqueness
if auth.duplicated(["participantId","sessionId","windowIndex"]).any():
    hard("[HARD FAIL] Duplicate window rows")

# Time sanity
if (auth["windowEndMs"] <= auth["windowStartMs"]).any():
    hard("[HARD FAIL] windowEndMs <= windowStartMs")

# Window counts
wc = auth.groupby(["participantId","sessionId"])["windowIndex"].nunique()
print(wc.describe())

if (wc < 6).any():
    soft("[SOFT FAIL] Incomplete sessions (<6 windows):")
    display(wc[wc < 6])

if (wc > 8).any():
    soft("[SOFT FAIL] Too many windows (>8 windows):")
    display(wc[wc > 8])

count    1.0
mean     7.0
std      NaN
min      7.0
25%      7.0
50%      7.0
75%      7.0
max      7.0
Name: windowIndex, dtype: float64


In [None]:
def window_stats(df):
    df = df.sort_values("windowIndex")
    starts = df["windowStartMs"].to_numpy()
    ends = df["windowEndMs"].to_numpy()
    lens = (ends - starts) / 1000
    steps = np.diff(starts) / 1000 if len(starts) > 1 else np.array([])

    return pd.Series({
        "len_med": np.median(lens),
        "step_med": np.median(steps) if len(steps) else np.nan
    })

ws = (
    auth.groupby(["participantId","sessionId"], group_keys=False)
        .apply(window_stats)
        .reset_index()
)
display(ws)

if ((ws.len_med < 25) | (ws.len_med > 35)).any():
    hard("[HARD FAIL] Window length far from 30s")

if ((ws.step_med < 10) | (ws.step_med > 20)).any():
    soft("[SOFT FAIL] Window step far from 15s")

  ws = auth.groupby(["participantId","sessionId"]).apply(window_stats).reset_index()


Unnamed: 0,participantId,sessionId,len_med,step_med
0,p1,1ea8d3a866aa482f951c9734e8232bd4,30.0,15.0


In [31]:
# Raw IKTs (ms)
RAW_IKT_MS = [
    "typing_ikt_global_mean","typing_ikt_global_std","typing_ikt_global_iqr",
    "typing_ikt_global_p95","typing_ikt_within_mean","typing_ikt_within_std",
    "typing_ikt_within_iqr","typing_ikt_within_p95"
]

for c in RAW_IKT_MS:
    if c in auth.columns:
        lo = 20 if ("std" in c or "iqr" in c) else 40
        if (auth[c] < lo).any() or (auth[c] > 2000).any():
            soft(f"[SOFT FAIL] Implausible raw IKT in {c}")

# Drift (delta) features
if "typing_drift_ikt" in auth.columns:
    if (auth["typing_drift_ikt"].abs() > 1500).any():
        soft("[SOFT FAIL] Large |typing_drift_ikt|")

if "tap_drift_rt" in auth.columns:
    if (auth["tap_drift_rt"].abs() > 1000).any():
        soft("[SOFT FAIL] Large |tap_drift_rt|")

# Reaction times
if "tap_rt_mean" in auth.columns:
    if (auth.tap_rt_mean < 80).any() or (auth.tap_rt_mean > 1500).any():
        soft("[SOFT FAIL] Implausible tap_rt_mean")

# Coupling / variance features
for c in ["coupling_var_ikt","coupling_var_rt","coupling_var_ratio"]:
    if c in auth.columns and (auth[c] < 0).any():
        hard(f"[HARD FAIL] {c} < 0")

# Percentages
pct_cols = [c for c in auth.columns if c.endswith("_pct")]
for c in pct_cols:
    if (auth[c] < 0).any() or (auth[c] > 100).any():
        hard(f"[HARD FAIL] % out of bounds in {c}")

In [32]:
if (events.ms < 0).any():
    hard("[HARD FAIL] Negative event ms")

if (events.dt.dropna() < 0).any():
    hard("[HARD FAIL] Negative dt")

print("Event types:")
display(events.t.value_counts())

ev = events.groupby(["sessionId","t"]).size().unstack(fill_value=0)
ev["n_key"] = ev.get("key", 0)
ev["n_tap"] = ev.get("tap_hit", 0) + ev.get("tap_miss", 0)

bad = ev[(ev.n_key < 30) | (ev.n_tap < 30)]
if len(bad):
    soft("[SOFT FAIL] Low engagement sessions:")
    display(bad[["n_key","n_tap"]])

Event types:


t
key                219
before_input       192
target_move         94
tap_hit             93
word_shown          28
typing_submit       27
tap_miss             6
typing_reaction      1
session_start        1
typing_end           1
tapping_end          1
Name: count, dtype: int64

In [33]:
num = auth.select_dtypes(include=[np.number])
miss = num.isna().mean()

display(miss.sort_values(ascending=False).head(15))

expected_na = [
    "typing_error_recovery_wrong_median",
    "tap_error_recovery_miss_median",
    "typing_drift_ikt",
    "tap_drift_rt",
]

for c in expected_na:
    if c in miss.index and miss[c] < 0.1:
        soft(f"[SOFT FAIL] {c} rarely missing")

tap_error_recovery_miss_median        1.000000
typing_error_recovery_wrong_median    1.000000
coupling_var_ratio                    0.857143
typing_drift_ikt                      0.571429
tap_drift_rt                          0.571429
typing_ikt_within_std                 0.428571
typing_ikt_global_clipped_pct         0.428571
typing_ikt_within_iqr                 0.428571
typing_ikt_global_std                 0.428571
typing_ikt_global_mean                0.428571
typing_ikt_global_p95                 0.428571
typing_ikt_global_iqr                 0.428571
coupling_var_rt                       0.428571
coupling_var_ikt                      0.428571
typing_ikt_within_clipped_pct         0.428571
dtype: float64

In [34]:
if SOFT_FAILS:
    print("\n--- QC VERDICT: PASS (WITH WARNINGS) ---")
    for m in SOFT_FAILS:
        print("-", m)
else:
    print("\n--- QC VERDICT: PASS (CLEAN) ---")


--- QC VERDICT: PASS (CLEAN) ---


In [35]:
# Per-session QC summary scaffold (fills as you scale)
summary = (
    auth.groupby(["participantId","sessionId"])
        .agg(
            n_windows=("windowIndex","nunique"),
            start_ms=("windowStartMs","min"),
            end_ms=("windowEndMs","max"),
        )
        .reset_index()
)

summary["duration_s"] = (summary["end_ms"] - summary["start_ms"]) / 1000
display(summary.sort_values(["participantId","sessionId"]).head(30))

Unnamed: 0,participantId,sessionId,n_windows,start_ms,end_ms,duration_s
0,p1,1ea8d3a866aa482f951c9734e8232bd4,7,537629,657629,120.0
