In [45]:
import warnings
warnings.filterwarnings("ignore", category=RuntimeWarning)
import cmlreaders as cml
from cmldask import CMLDask as da
from dask.distributed import wait, as_completed
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
import matplotlib.colors as mcolors
import seaborn as sns
import pandas as pd
import xarray as xr
import scipy as scp
import re
from scipy import stats
from ptsa.data.timeseries import *
from statsmodels.stats.multitest import multipletests
import pyedflib
from mne_bids import get_entity_vals
from ReportRawEEG import *
pd.options.display.max_rows = 100
pd.options.display.max_columns = 50
%matplotlib inline
import mne
from mne_bids import BIDSPath, read_raw_bids

In [46]:
# Convert to DataFrame
def fix_evs_bids(full_evs):
    value_recalls = full_evs[full_evs.trial_type == "VALUE_RECALL"] 
    words = full_evs[full_evs.trial_type == "WORD"]
    rec_words = full_evs[full_evs.trial_type == "REC_WORD"]
    rec_vv_words = full_evs[full_evs.trial_type == "REC_WORD_VV"]

    # WORD --> storepointtype, recalled--> VALUE_RECALL, REC_WORD, REC_WORD_VV
    word_trial_to_storepointtype = words.set_index("trial")["storepointtype"].to_dict()
    word_trial_to_recalled = words.set_index("trial")["recalled"].to_dict()
    for event_type in ["VALUE_RECALL", "REC_WORD", "REC_WORD_VV"]:
        subset = full_evs[full_evs.trial_type == event_type]
        for idx, row in subset.iterrows():
            trial = row["trial"]
            if trial in word_trial_to_storepointtype:
                full_evs.at[idx, "storepointtype"] = word_trial_to_storepointtype[trial]
            if trial in word_trial_to_recalled:
                full_evs.at[idx, "recalled"] = word_trial_to_recalled[trial]

    # VALUE_RECALL --> actualvalue, valuerecall --> WORD, `REC_WORD`, REC_WORD_VV
    valuerecall_trial_to_actualvalue = value_recalls.set_index("trial")["actualvalue"].to_dict()
    valuerecall_trial_to_valuerecall = value_recalls.set_index("trial")["valuerecall"].to_dict()

    # --- Apply to multi-row event types ---
    for event_type in ["WORD", "REC_WORD", "REC_WORD_VV"]:
        subset = full_evs[full_evs.trial_type == event_type]
        for idx, row in subset.iterrows():
            trial = row["trial"]

            # actualvalue
            if trial in valuerecall_trial_to_actualvalue:
                full_evs.at[idx, "actualvalue"] = valuerecall_trial_to_actualvalue[trial]

            # valuerecall
            if trial in valuerecall_trial_to_valuerecall:
                full_evs.at[idx, "valuerecall"] = valuerecall_trial_to_valuerecall[trial]
                
    return full_evs

def fix_evs_cml(full_evs):
    value_recalls = full_evs[full_evs.type == "VALUE_RECALL"] 
    words = full_evs[full_evs.type == "WORD"]
    rec_words = full_evs[full_evs.type == "REC_WORD"]
    rec_vv_words = full_evs[full_evs.type == "REC_WORD_VV"]

    # WORD --> storepointtype, recalled--> VALUE_RECALL, REC_WORD, REC_WORD_VV
    word_trial_to_storepointtype = words.set_index("trial")["storepointtype"].to_dict()
    word_trial_to_recalled = words.set_index("trial")["recalled"].to_dict()
    for event_type in ["VALUE_RECALL", "REC_WORD", "REC_WORD_VV"]:
        subset = full_evs[full_evs.type == event_type]
        for idx, row in subset.iterrows():
            trial = row["trial"]
            if trial in word_trial_to_storepointtype:
                full_evs.at[idx, "storepointtype"] = word_trial_to_storepointtype[trial]
            if trial in word_trial_to_recalled:
                full_evs.at[idx, "recalled"] = word_trial_to_recalled[trial]

    # VALUE_RECALL --> actualvalue, valuerecall --> WORD, `REC_WORD`, REC_WORD_VV
    valuerecall_trial_to_actualvalue = value_recalls.set_index("trial")["actualvalue"].to_dict()
    valuerecall_trial_to_valuerecall = value_recalls.set_index("trial")["valuerecall"].to_dict()

    # --- Apply to multi-row event types ---
    for event_type in ["WORD", "REC_WORD", "REC_WORD_VV"]:
        subset = full_evs[full_evs.type == event_type]
        for idx, row in subset.iterrows():
            trial = row["trial"]

            # actualvalue
            if trial in valuerecall_trial_to_actualvalue:
                full_evs.at[idx, "actualvalue"] = valuerecall_trial_to_actualvalue[trial]

            # valuerecall
            if trial in valuerecall_trial_to_valuerecall:
                full_evs.at[idx, "valuerecall"] = valuerecall_trial_to_valuerecall[trial]
                
    return full_evs

In [47]:

### ACROSS MULTIPLE SUBJECTS AND SESSIONS
bids_root = "/home1/maint/LTP_BIDS/"
subjects = get_entity_vals(bids_root, "subject")


# subject
def process_raw_signals(sub, exp, sess, bids_root, out_path): # entire signal, not epoched
    ### load cml
    reader = cml.CMLReader(subject=sub, experiment=exp, session=sess)
    eeg_cml = reader.load_eeg().to_ptsa()

    ### load bdf
    # BIDS
    bids_path = BIDSPath(
        subject=sub,
        session=str(sess),
        task=exp.lower(),
        datatype="eeg",
        root=bids_root,
    )

    raw = read_raw_bids(
        bids_path,
        verbose=True,
    )

    eeg_bids = xr.DataArray(
        raw.get_data()[None, :, :],                           # -> (1, n_channels, n_times)
        dims=("event", "channel", "time"),         # match eeg_cml dim names
        coords={
            "event": [0],                          # singleton event index
            "channel": raw.ch_names,
            "time": raw.times * 1000,
            "samplerate": raw.info["sfreq"],                    # scalar coord (optional)
        },
        name="eeg",
    )

    ## load pyedf
    # cml_bdf_path  = f"/protocols/ltp/subjects/{sub}/experiments/{exp}/sessions/{sess}/ephys/current_processed/{sub}_session_{sess}.bdf"
    # eeg_pyedflib = load_bdf_as_xarray(cml_bdf_path)

    # compare
    results = compare_eeg_sources(
        eeg_dict={"BIDS": eeg_bids, "CMLReader": eeg_cml},
        subject=sub,
        experiment=exp,
        session=sess,
        options=["strip_metadata", "compare_raw_signals", "compare_time_coords"]
    )
    
    results["df_raw"].to_csv(f"{out_path}df_raw_{sub}_{exp}_{sess}.csv", index=False)
    results["df_raw_summary"].to_csv(f"{out_path}df_raw_summary_{sub}_{exp}_{sess}.csv", index=False)
    results["df_time"].to_csv(f"{out_path}df_time_{sub}_{exp}_{sess}.csv", index=False)
    return results


In [48]:
def _all_exist(paths):
    return all(os.path.exists(p) for p in paths)

def load_bids_events(sub, exp, sess, bids_root):
    """
    Load BIDS events file, trying eeg datatype first, then beh datatype.
    
    Returns:
    --------
    pd.DataFrame or None
        Events dataframe if found, None otherwise
    """
    exp_lower = exp.lower()
    
    # Try eeg datatype first
    try:
        bids_path = BIDSPath(
            subject=sub,
            session=str(sess),
            task=exp_lower,
            datatype="eeg",
            root=bids_root,
        )
        events_path = os.path.join(bids_path.directory, bids_path.basename + "_events.tsv")
        evs_bids = pd.read_csv(events_path, sep="\t")
        print(f"Loaded events from eeg datatype: {events_path}")
        return evs_bids
    except Exception as e:
        print(f"Failed to load {sub} | {exp} | {sess} events from eeg folder: {e}")
    
    # Try beh datatype as fallback
    try:
        bids_path = BIDSPath(
            subject=sub,
            session=str(sess),
            task=exp_lower,
            datatype="eeg",
            suffix="beh",
            extension=".tsv",
            root=bids_root
        )
        evs_bids = pd.read_csv(bids_path.fpath, sep="\t")
        print(f"Loaded events from beh datatype: {bids_path.fpath}")
        return evs_bids
    except Exception as e:
        print(f"Failed to load {sub} | {exp} | {sess} events from beh folder: {e}")
    
    return None

def process_events(sub, exp, sess, evs_types, bids_root, out_path, *, skip_if_exists=True):
    os.makedirs(out_path, exist_ok=True)
    out_behavior_summary = os.path.join(out_path, f"df_behavior_summary_{sub}_{exp}_{sess}.csv")
    
    expected = [out_behavior_summary]
    if skip_if_exists and _all_exist(expected):
        return {"skipped": True, "reason": "outputs_exist", "paths": expected}
    
    # Load CML events
    try:
        cmlreader = cml.CMLReader(subject=sub, experiment=exp, session=sess)
        evs_cml = cmlreader.load('events')
    except Exception as e:
        print(f"Failed to load CML events for {sub} | {exp} | {sess}: {e}")
        return {"skipped": True, "reason": "cml_load_failed", "error": str(e)}
    
    evs_types_set = set(evs_types) if evs_types is not None else set(evs_cml["type"].unique())
    
    if exp == "ValueCourier":
        evs_cml = fix_evs_cml(evs_cml)
    
    filtered_evs_cml = evs_cml[evs_cml["type"].isin(evs_types_set)]
    
    # Load BIDS events
    evs_bids = load_bids_events(sub, exp, sess, bids_root)
    
    if evs_bids is None:
        print(f"Skipping {sub} | {exp} | {sess}: BIDS events file not found")
        return {"skipped": True, "reason": "bids_events_not_found"}
    
    if exp == "ValueCourier":
        evs_bids = fix_evs_bids(evs_bids)
    
    # Check for required columns
    required_cols = {'sample', 'onset', 'trial_type'}
    missing_cols = required_cols - set(evs_bids.columns)
    if missing_cols:
        print(f"Skipping {sub} | {exp} | {sess}: BIDS events missing required columns: {missing_cols}")
        print(f"Available columns: {list(evs_bids.columns)}")
        return {"skipped": True, "reason": "missing_columns", "missing": list(missing_cols)}
    
    filtered_evs_bids = evs_bids[evs_bids["trial_type"].isin(evs_types_set)]
    
    if filtered_evs_bids.empty:
        print(f"Skipping {sub} | {exp} | {sess}: No events match the requested types")
        return {"skipped": True, "reason": "no_matching_events"}
    
    # Compare behavioral data
    try:
        results = compare_behavioral(
            filtered_evs_cml, "CMLReader",
            filtered_evs_bids, "OpenBIDS",
            options=[
                "compare_onset_as_diff",
                "tolerant_numeric",
                "return_col_summary",
                "return_mismatches",
            ],
            drop_cols=[],
        )
        
        os.makedirs(out_path, exist_ok=True)
        results["df_behavior_summary"].to_csv(
            os.path.join(out_path, f"df_behavior_summary_{sub}_{exp}_{sess}.csv"),
            index=False,
        )
        
        print(f"Successfully processed {sub} | {exp} | {sess}")
        return results
        
    except Exception as e:
        print(f"Failed to compare events for {sub} | {exp} | {sess}: {e}")
        import traceback
        traceback.print_exc()
        return {"skipped": True, "reason": "comparison_failed", "error": str(e)}

In [49]:
# process_events("LTP606", "ValueCourier", 0, None, "/home1/maint/LTP_BIDS/", "raw_results/")

In [50]:
from mne_bids import BIDSPath
import os
import gc
import numpy as np
import pandas as pd
import mne
from ReportRawEEG import *

def _all_exist(paths):
    return all(os.path.exists(p) for p in paths)

def _dedupe_events_by_sample(df: pd.DataFrame, sample_col: str, *, keep="first") -> pd.DataFrame:
    if sample_col not in df.columns:
        raise ValueError(f"Expected column '{sample_col}' in events df. Columns={list(df.columns)[:20]}")
    df2 = df.copy()
    df2[sample_col] = pd.to_numeric(df2[sample_col], errors="coerce")
    df2 = df2.dropna(subset=[sample_col])
    df2 = df2.sort_values(sample_col, kind="mergesort")
    df2 = df2[~df2[sample_col].duplicated(keep=keep)]
    return df2

def _as_list(x):
    if x is None:
        return None
    if isinstance(x, (list, tuple, set, np.ndarray, pd.Index)):
        return list(x)
    return [x]

def process_epoched_signals_by_type(
    sub,
    exp,
    sess,
    evs_types,
    tmin,
    tmax,
    bids_root,
    out_path,
    *,
    skip_if_exists=True,
    keep="first",
    verbose=False,
):
    """
    Run epoch+compare separately for each event type, append results across types,
    save and return the appended DataFrames.
    """
    os.makedirs(out_path, exist_ok=True)

    # aggregated outputs (ONE set per sub/exp/sess)
    out_raw = os.path.join(out_path, f"df_raw_{sub}_{exp}_{sess}.csv")
    out_raw_summary = os.path.join(out_path, f"df_raw_summary_{sub}_{exp}_{sess}.csv")
    out_time = os.path.join(out_path, f"df_time_{sub}_{exp}_{sess}.csv")
    expected = [out_raw, out_raw_summary, out_time]

    if skip_if_exists and _all_exist(expected):
        print("Files exist: skipped")
        return {"skipped": True, "reason": "outputs_exist", "paths": expected}

    # --------------------------
    # CML: load events once
    # --------------------------
    cmlreader = cml.CMLReader(subject=sub, experiment=exp, session=sess)
    evs_cml = cmlreader.load("events")

    # decide which types to run
    if evs_types is None:
        types_to_run = sorted(pd.unique(evs_cml["type"]))
    else:
        types_to_run = sorted(set(_as_list(evs_types)))

    if len(types_to_run) == 0:
        raise ValueError("types_to_run is empty.")

    # --------------------------
    # BIDS: load raw + annotations once
    # --------------------------
    task = exp.lower()
    bids_path = BIDSPath(
        subject=sub,
        session=str(sess),
        task=task,
        datatype="eeg",
        root=bids_root,
    )

    raw_bids = read_raw_bids(bids_path)
    raw_bids.set_channel_types({
        "EXG1": "eog", "EXG2": "eog", "EXG3": "eog", "EXG4": "eog",
        "EXG5": "misc", "EXG6": "misc", "EXG7": "misc", "EXG8": "misc",
    })

    events_all, event_id_all = mne.events_from_annotations(raw_bids)
    sfreq = float(raw_bids.info["sfreq"])

    # collect per-type outputs
    all_raw = []
    all_raw_summary = []
    all_time = []

    # optional bookkeeping
    per_type_status = []

    for etype in types_to_run:
        if verbose:
            print(f"[{sub} | {exp} | {sess}] type={etype}")

        try:
            # --------------------------
            # CML: filter to this type + dedupe by eegoffset, then epoch
            # --------------------------
            evs_cml_t = evs_cml[evs_cml["type"] == etype].copy()
            if evs_cml_t.empty:
                per_type_status.append((etype, "skip", "no_cml_events"))
                continue

            evs_cml_t = _dedupe_events_by_sample(evs_cml_t, "eegoffset", keep=keep)

            eeg_cml = cmlreader.load_eeg(evs_cml_t, rel_start=tmin, rel_stop=tmax).to_ptsa()

            # --------------------------
            # BIDS: filter annotation labels/codes for this type, dedupe by sample, epoch
            # --------------------------
            if etype not in event_id_all:
                per_type_status.append((etype, "skip", "etype_not_in_annotations"))
                # free CML epoch before continue
                del eeg_cml
                gc.collect()
                continue

            filtered_event_id = {etype: event_id_all[etype]}
            code = filtered_event_id[etype]

            events_filt = events_all[events_all[:, 2] == code]
            if len(events_filt) == 0:
                per_type_status.append((etype, "skip", "no_bids_events"))
                del eeg_cml
                gc.collect()
                continue

            # dedupe by sample
            _, first_idx = np.unique(events_filt[:, 0], return_index=True)
            events_filt = events_filt[np.sort(first_idx)]

            epochs_bids = mne.Epochs(
                raw_bids,
                events=events_filt,
                event_id=filtered_event_id,
                tmin=tmin / 1000.0,
                tmax=tmax / 1000.0,
                baseline=None,
                preload=True,
            )

            picks_eeg = mne.pick_types(epochs_bids.info, eeg=True, eog=False, misc=False)
            epochs_bids = epochs_bids.pick(picks_eeg)

            # metadata aligned to events_filt
            meta = pd.DataFrame({
                "sample": events_filt[:, 0].astype(int),
                "trial_type": [etype] * len(events_filt),
            })
            meta["onset"] = meta["sample"] / sfreq

            eeg_bids = TimeSeries.from_mne_epochs(epochs_bids, meta)
            eeg_bids = eeg_bids.assign_coords(time=eeg_bids["time"] * 1000.0)
            eeg_bids["time"].attrs["units"] = "ms"

            # --------------------------
            # Compare
            # --------------------------
            res = compare_eeg_sources(
                eeg_dict={"BIDS": eeg_bids, "CMLReader": eeg_cml},
                subject=sub,
                experiment=exp,
                session=sess,
                options=["strip_metadata", "compare_raw_signals", "compare_time_coords"],
            )

            # append dfs; add event type column so you can stratify later
            if res.get("df_raw") is not None and not res["df_raw"].empty:
                df = res["df_raw"].copy()
                df["event_type"] = etype
                all_raw.append(df)

            if res.get("df_raw_summary") is not None and not res["df_raw_summary"].empty:
                df = res["df_raw_summary"].copy()
                df["event_type"] = etype
                all_raw_summary.append(df)

            if res.get("df_time") is not None and not res["df_time"].empty:
                df = res["df_time"].copy()
                df["event_type"] = etype
                all_time.append(df)

            per_type_status.append((etype, "ok", ""))

        except Exception as e:
            per_type_status.append((etype, "fail", repr(e)))

        finally:
            # free big objects per type
            for name in ("epochs_bids", "eeg_bids", "eeg_cml", "res", "events_filt", "meta"):
                if name in locals():
                    try:
                        del locals()[name]
                    except Exception:
                        pass
            gc.collect()

    # done with BIDS raw
    try:
        raw_bids.close()
    except Exception:
        pass
    del raw_bids
    gc.collect()

    # concatenate and save
    df_raw_all = pd.concat(all_raw, ignore_index=True) if all_raw else pd.DataFrame()
    df_raw_summary_all = pd.concat(all_raw_summary, ignore_index=True) if all_raw_summary else pd.DataFrame()
    df_time_all = pd.concat(all_time, ignore_index=True) if all_time else pd.DataFrame()

    df_raw_all.to_csv(out_raw, index=False)
    df_raw_summary_all.to_csv(out_raw_summary, index=False)
    df_time_all.to_csv(out_time, index=False)

    return {
        "df_raw": df_raw_all,
        "df_raw_summary": df_raw_summary_all,
        "df_time": df_time_all,
        "per_type_status": pd.DataFrame(per_type_status, columns=["event_type", "status", "detail"]),
        "paths": expected,
    }


In [51]:
client = da.new_dask_client_slurm(
    job_name="raw_signals",
    memory_per_job="100GB",
    max_n_jobs=20,
    queue="RAM",
    local_directory="~/scratch",
    log_directory=os.path.expanduser("~/log_directory")
)

Unique port for zrentala is 51618
{'dashboard_address': ':51618'}
To view the dashboard, run: 
`ssh -fN zrentala@rhino2.psych.upenn.edu -L 8000:192.168.86.140:38422` in your local computer's terminal (NOT rhino) 
and then navigate to localhost:8000 in your browser


Perhaps you already have a cluster running?
Hosting the HTTP server on port 38422 instead


In [52]:
max_subjects = 10
# experiments = ["ValueCourier", "ltpFR", "ltpFR2", "VFFR"]
experiments = ["VFFR", "ValueCourier"]

subjects_to_exclude = {"LTP001", "LTP9992", "LTP9993"}  # <-- your list here

df = cml.get_data_index()

df_exp = df[df["experiment"].isin(experiments)].copy()

# remove excluded subjects up front
df_exp = df_exp[~df_exp["subject"].isin(subjects_to_exclude)].copy()

dfs = []

for exp in experiments:
    df_this = df_exp[df_exp["experiment"] == exp]

    subjects = (
        df_this["subject"]
        .drop_duplicates()
        .sort_values()      # deterministic
        .head(max_subjects)
    )

    df_keep = df_this[df_this["subject"].isin(subjects)].copy()
    dfs.append(df_keep)

df_subset = pd.concat(dfs, ignore_index=True)
df_subset

Unnamed: 0,Recognition,all_events,contacts,experiment,import_type,localization,math_events,montage,original_experiment,original_session,pairs,ps4_events,session,subject,subject_alias,system_version,task_events
0,,protocols/ltp/subjects/LTP229/experiments/VFFR...,,VFFR,build,0,,0,,0,,,0,LTP229,LTP229,,protocols/ltp/subjects/LTP229/experiments/VFFR...
1,,protocols/ltp/subjects/LTP229/experiments/VFFR...,,VFFR,build,0,,0,,1,,,1,LTP229,LTP229,,protocols/ltp/subjects/LTP229/experiments/VFFR...
2,,protocols/ltp/subjects/LTP229/experiments/VFFR...,,VFFR,build,0,,0,,2,,,2,LTP229,LTP229,,protocols/ltp/subjects/LTP229/experiments/VFFR...
3,,protocols/ltp/subjects/LTP229/experiments/VFFR...,,VFFR,build,0,,0,,3,,,3,LTP229,LTP229,,protocols/ltp/subjects/LTP229/experiments/VFFR...
4,,protocols/ltp/subjects/LTP229/experiments/VFFR...,,VFFR,build,0,,0,,4,,,4,LTP229,LTP229,,protocols/ltp/subjects/LTP229/experiments/VFFR...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
109,,protocols/ltp/subjects/LTP610/experiments/Valu...,,ValueCourier,build,0,,0,,4,,,4,LTP610,LTP610,,protocols/ltp/subjects/LTP610/experiments/Valu...
110,,protocols/ltp/subjects/LTP610/experiments/Valu...,,ValueCourier,build,0,,0,,5,,,5,LTP610,LTP610,,protocols/ltp/subjects/LTP610/experiments/Valu...
111,,protocols/ltp/subjects/LTP612/experiments/Valu...,,ValueCourier,build,0,,0,,0,,,0,LTP612,LTP612,,protocols/ltp/subjects/LTP612/experiments/Valu...
112,,protocols/ltp/subjects/LTP613/experiments/Valu...,,ValueCourier,build,0,,0,,0,,,0,LTP613,LTP613,,protocols/ltp/subjects/LTP613/experiments/Valu...


In [54]:
# get futures
bids_root = "/home1/maint/LTP_BIDS/"
subjects = get_entity_vals(bids_root, "subject")
out_path = "raw_results_type/"
futures = []
REL_START, REL_STOP = 200, 1000
BUFFER_MS = 1000
# evs_type = ["WORD"]
evs_type = None
tmin = (-BUFFER_MS)
tmax = ((REL_STOP + BUFFER_MS))
future_meta = {} 
futures_eeg = []
for i, row in df_subset.iterrows():
    sub = row["subject"]
    exp = row["experiment"]
    sess = row["session"]
    try:
        process_epoched_signals_by_type(sub, exp, sess, evs_type, tmin, tmax, bids_root, out_path, verbose=True)
    except Exception as e:
        print(e)
#     fut = client.submit(
#         process_epoched_signals_by_type,
#         sub, exp, sess, evs_type, tmin, tmax, bids_root, out_path
#     )

#     futures_eeg.append(fut)
#     future_meta[fut.key] = (sub, exp, sess)
    # if i < 15:
    #     break

# for sub in subjects:
#     subject_root = os.path.join(bids_root, f"sub-{sub}")
#     experiments = get_entity_vals(subject_root, "experiment")
#     print(experiments)
#     sessions = get_entity_vals(subject_root, "session")
#     # futures.extend([client.submit(process_raw_signals, sub, "ValueCourier", sess, bids_root,out_path) for sess in sessions])
#     futures.extend([client.submit(process_epoched_signals, sub, exp, sess, evs_type, tmin, tmax, bids_root, out_path)for sess in sessions for exp in experiments])
#     futures.extend([client.submit(process_events, sub, exp, sess, evs_type, bids_root, out_path) for sess in sessions])
#     break
#     # process_epoched_signals(sub, exp, sess, evs_types, tmin, tmax, bids_root, out_path)

In [55]:
# run futures 
from dask.distributed import as_completed

all_df_raw = []
all_df_raw_summary = []
all_df_time = []

n_done, n_fail = 0, 0

for fut in as_completed(futures_eeg):
    sub, exp, sess = future_meta.get(fut.key, ("<unknown>", "<unknown>", "<unknown>"))

    try:
        out = fut.result()

        if out.get("skipped"):
            print(f"[SKIP] {sub} | {exp} | {sess}")
            continue

        if out["df_raw"] is not None and not out["df_raw"].empty:
            all_df_raw.append(out["df_raw"])

        if out["df_raw_summary"] is not None and not out["df_raw_summary"].empty:
            all_df_raw_summary.append(out["df_raw_summary"])

        if out["df_time"] is not None and not out["df_time"].empty:
            all_df_time.append(out["df_time"])

        n_done += 1
        print(f"[DONE] {sub} | {exp} | {sess}  ({n_done})")

    except Exception as e:
        n_fail += 1
        print(f"[FAIL] {sub} | {exp} | {sess}  -> {e}  ({n_fail})")

KeyboardInterrupt: 

In [None]:
futures_beh = []
evs_type = None
for i, row in df_subset.iterrows():
    sub = row["subject"]
    exp = row["experiment"]
    sess = row["session"]
    process_events(sub, exp, sess, evs_type, bids_root, out_path)
    # futures_beh.append(client.submit(process_events, sub, exp, sess, evs_type, bids_root, out_path))

In [None]:
import os
import pandas as pd

out_path = "raw_results_type/"

df_raw_filenames = []
df_raw_summary_filenames = []
df_time_filenames = []
df_behavior_summary_filenames = []

for dirpath, _, filenames in os.walk(out_path):
    for f in filenames:
        full_path = os.path.join(dirpath, f)
        
        # Categorize based on string patterns
        if f.startswith('df_raw_summary_') and f.endswith('.csv'):
            df_raw_summary_filenames.append(full_path)
        elif f.startswith('df_raw_') and f.endswith('.csv'):
            df_raw_filenames.append(full_path)
        elif f.startswith('df_time_') and f.endswith('.csv'):
            df_time_filenames.append(full_path)
        elif f.startswith('df_behavior_summary_') and f.endswith('.csv'):
            df_behavior_summary_filenames.append(full_path)

def load_and_concat(file_list, remove_duplicates=True):
    """
    Load and concatenate CSV files with duplicate handling.
    
    Parameters:
    -----------
    file_list : list
        List of file paths to concatenate
    remove_duplicates : bool
        Whether to remove duplicate rows (default: True)
    
    Returns:
    --------
    pd.DataFrame
        Concatenated DataFrame with duplicates optionally removed
    """
    if not file_list:
        return pd.DataFrame()  # Return empty DF if no files found
    
    # Read each CSV and combine them into one, skipping empty files
    dfs = []
    for f in file_list:
        try:
            df = pd.read_csv(f)
            if not df.empty:
                dfs.append(df)
            else:
                print(f"Warning: Skipping empty file: {f}")
        except pd.errors.EmptyDataError:
            print(f"Warning: Skipping empty/malformed file: {f}")
        except Exception as e:
            print(f"Warning: Error reading {f}: {e}")
    
    if not dfs:
        print("Warning: No valid CSV files found to concatenate")
        return pd.DataFrame()
    
    df = pd.concat(dfs, ignore_index=True)
    
    if remove_duplicates:
        initial_rows = len(df)
        df = df.drop_duplicates()
        removed_rows = initial_rows - len(df)
        if removed_rows > 0:
            print(f"Removed {removed_rows} duplicate rows")
    
    return df

def delete_source_files(file_list, delete_files=False):
    """
    Delete source files after successful concatenation.
    
    Parameters:
    -----------
    file_list : list
        List of file paths to delete
    delete_files : bool
        Whether to actually delete the files (default: False for safety)
    """
    if delete_files and file_list:
        for f in file_list:
            try:
                os.remove(f)
                print(f"Deleted: {f}")
            except Exception as e:
                print(f"Error deleting {f}: {e}")

# Configuration: Set to True to delete source files after concatenation
DELETE_SOURCE_FILES = True  # Change to True when you're ready

# Create the 4 distinct DataFrames
print("Processing df_raw...")
df_raw_all = load_and_concat(df_raw_filenames)

print("Processing df_raw_summary...")
df_raw_summary_all = load_and_concat(df_raw_summary_filenames)

print("Processing df_time...")
df_time_all = load_and_concat(df_time_filenames)

print("Processing df_behavior_summary...")
df_behavior_summary_all = load_and_concat(df_behavior_summary_filenames)

# Save the concatenated files
print("\nSaving concatenated files...")
df_raw_all.to_csv("df_raw_all.csv", index=False)
df_raw_summary_all.to_csv("df_raw_summary_all.csv", index=False)
df_time_all.to_csv("df_time_all.csv", index=False)
df_behavior_summary_all.to_csv("df_behavior_summary_all.csv", index=False)

print("Concatenation complete!")

# Delete source files if configured
if DELETE_SOURCE_FILES:
    print("\nDeleting source files...")
    delete_source_files(df_raw_filenames, DELETE_SOURCE_FILES)
    delete_source_files(df_raw_summary_filenames, DELETE_SOURCE_FILES)
    delete_source_files(df_time_filenames, DELETE_SOURCE_FILES)
    delete_source_files(df_behavior_summary_filenames, DELETE_SOURCE_FILES)
    print("Source file deletion complete!")
else:
    print("\nSource files retained (set DELETE_SOURCE_FILES=True to delete)")

# Print summary statistics
print("\n" + "="*50)
print("Summary:")
print(f"df_raw_all: {len(df_raw_all)} rows")
print(f"df_raw_summary_all: {len(df_raw_summary_all)} rows")
print(f"df_time_all: {len(df_time_all)} rows")
print(f"df_behavior_summary_all: {len(df_behavior_summary_all)} rows")
print("="*50)

In [None]:
# load eeg
df_raw_all = pd.read_csv("df_raw_all.csv")
df_raw_summary_all = pd.read_csv("df_raw_summary_all.csv")
df_time_all = pd.read_csv("df_time_all.csv")
df_behavior_summary_all = pd.read_csv("df_behavior_summary_all.csv")

In [None]:
# plot mean and std difference
plot_comp_results(df_time_all, "mean_abs_time_diff", "std_time_diff", col_label="Mean Abs Time Diff")

In [None]:
# plot mse
plot_comp_results(df_time_all, "mse_time", col_label="MSE Time")

In [None]:
plot_comp_results(df_raw_summary_all, "mean_abs_diff", "std_diff", col_label="Mean Abs Signal Diff")

In [None]:
# plot mse
plot_comp_results(df_raw_summary_all, "mse", col_label="MSE Raw Signal")

In [None]:
# plot n_channels diff
plot_comp_results(df_raw_summary_all, "n_exact_diff_channels")
plot_comp_results(df_raw_summary_all, "n_close_diff_channels")

In [None]:
plot_comp_results(df_behavior_summary_all, "n_differing_columns")