In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import warnings

from scipy import stats
from scipy.stats import mannwhitneyu
from scipy.spatial.distance import cdist

from sklearn.preprocessing import StandardScaler
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import LeaveOneGroupOut, learning_curve
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, confusion_matrix
from sklearn.base import clone


In [None]:
DATA_FINAL_DIR = ""
SESSION_DATA_DIR = ""
VIZ_DIR = ""
FEATURE_DIR = os.path.join(DATA_FINAL_DIR, "Extracted_Features_Final")
FIG_DIR = os.path.join(VIZ_DIR, "Bilateral_Analysis_Figures")
os.makedirs(FIG_DIR, exist_ok=True)

In [None]:
df_eda = pd.read_csv(os.path.join(FEATURE_DIR, "features_eda.csv"))
df_hr = pd.read_csv(os.path.join(FEATURE_DIR, "features_hr.csv"))
df_temp = pd.read_csv(os.path.join(FEATURE_DIR, "features_temp.csv"))
df_acc = pd.read_csv(os.path.join(FEATURE_DIR, "features_acc.csv"))

In [None]:
df_hands1 = pd.read_csv(os.path.join(SESSION_DATA_DIR, "hands1.csv"))
df_hands2 = pd.read_csv(os.path.join(SESSION_DATA_DIR, "hands2.csv"))

hands1 = df_hands1[["Molly ID", "dominant"]].copy()
hands2 = df_hands2[["Molly ID", "dominant"]].copy()

df_handedness = pd.concat([hands1, hands2], ignore_index=True)
df_handedness.columns = ["participant_id", "dominant_hand"]

In [None]:
def standardize_handedness(x):
    if pd.isna(x):
        return "unknown"
    x = str(x).lower().strip()
    if "left" in x:
        return "left"
    elif "right" in x:
        return "right"
    elif "no participant" in x:
        return "excluded"
    else:
        return "unknown"


df_handedness["dominant_hand"] = df_handedness["dominant_hand"].apply(
    standardize_handedness
)





print(df_handedness["dominant_hand"].value_counts())
print("handedness:")
print(df_handedness.to_string(index=False))

In [None]:
def map_phase_category(phase):
    if phase == "Baseline":
        return "Baseline"
    elif phase in ["Descriptive_Stress", "Stroop_Stress", "Math_Stress"]:
        return "Stress"
    elif phase == "MollyIntervention":
        return "Intervention"
    elif phase == "Post-Relaxation":
        return "Relaxation"
    else:
        return "Unknown"

In [None]:
for df in [df_eda, df_hr, df_temp, df_acc]:
    df["phase_category"] = df["phase"].apply(map_phase_category)


handedness_dict = dict(
    zip(df_handedness["participant_id"], df_handedness["dominant_hand"])
)

In [None]:
def map_wrist_type(row, handedness_dict):
    participant = row["participant_id"]
    side = row["side"]
    handedness = handedness_dict.get(participant, "unknown")

    if handedness == "right":
        return "dominant" if side == "RIGHT" else "non-dominant"
    elif handedness == "left":
        return "dominant" if side == "LEFT" else "non-dominant"
    else:
        return "unknown"


for df in [df_eda, df_hr, df_temp, df_acc]:
    df["handedness"] = df["participant_id"].map(handedness_dict)
    df["wrist_type"] = df.apply(
        lambda row: map_wrist_type(row, handedness_dict), axis=1
    )


valid_handedness = ["right", "left"]
df_eda = df_eda[df_eda["handedness"].isin(valid_handedness)].copy()
df_hr = df_hr[df_hr["handedness"].isin(valid_handedness)].copy()
df_temp = df_temp[df_temp["handedness"].isin(valid_handedness)].copy()
df_acc = df_acc[df_acc["handedness"].isin(valid_handedness)].copy()

In [None]:
sample = df_eda[
    ["participant_id", "side", "handedness", "wrist_type"]
].drop_duplicates()
print(sample.head(10).to_string(index=False))



print(df_eda["wrist_type"].value_counts())


print(df_eda["phase_category"].value_counts())



eda_feature_cols = [c for c in df_eda.columns if c.startswith("eda_")]
hr_feature_cols = [c for c in df_hr.columns if c.startswith("hr_")]
temp_feature_cols = [c for c in df_temp.columns if c.startswith("temp_")]
acc_feature_cols = [c for c in df_acc.columns if c.startswith("acc_")]

print(
    f"Feature counts: EDA={len(eda_feature_cols)}, HR={len(hr_feature_cols)}, TEMP={len(temp_feature_cols)}, ACC={len(acc_feature_cols)}"
)

In [None]:
key_features_map = {
    "EDA": {
        "df": df_eda,
        "features": [
            "eda_scl_mean",
            "eda_scr_rate",
            "eda_phasic_mean",
            "eda_scr_count",
            "eda_slope",
        ],
    },
    "HR": {"df": df_hr, "features": ["hr_mean", "hr_std", "hr_rmssd", "hr_pnn50"]},
    "TEMP": {"df": df_temp, "features": ["temp_mean", "temp_slope", "temp_std"]},
    "ACC": {
        "df": df_acc,
        "features": ["acc_enmo_mean", "acc_magnitude_mean", "acc_activity_level"],
    },
}

In [None]:
bilateral_results = []

for signal, config in key_features_map.items():
    df = config["df"]
    features = config["features"]


    for feature in features:
        nondom_data = df[df["wrist_type"] == "non-dominant"][feature].dropna()
        dom_data = df[df["wrist_type"] == "dominant"][feature].dropna()

        if len(nondom_data) > 0 and len(dom_data) > 0:
            stat, p_value = mannwhitneyu(nondom_data, dom_data, alternative="two-sided")
            sig = (
                "---"
                if p_value < 0.001
                else "--" if p_value < 0.01 else "-" if p_value < 0.05 else ""
            )

            nondom_mean = nondom_data.mean()
            dom_mean = dom_data.mean()
            diff = nondom_mean - dom_mean
            diff_pct = (
                ((nondom_mean - dom_mean) / abs(dom_mean)) * 100 if dom_mean != 0 else 0
            )

            bilateral_results.append(
                {
                    "signal": signal,
                    "feature": feature,
                    "nondom_mean": nondom_mean,
                    "dom_mean": dom_mean,
                    "diff (ND-D)": diff,
                    "diff_pct": diff_pct,
                    "p_value": p_value,
                    "significant": sig,
                }
            )

            print(
                f"  {feature:25s} NON-DOM={nondom_mean:10.4f}, DOM={dom_mean:10.4f}, diff={diff:+10.4f} ({diff_pct:+6.1f}%), p={p_value:.2e} {sig}"
            )

df_bilateral = pd.DataFrame(bilateral_results)

sig_features = df_bilateral[df_bilateral["significant"] != ""]
if len(sig_features) > 0:
    print(f"{len(sig_features)} big diffrences")
    for _, row in sig_features.iterrows():
        higher = "NON-DOM" if row["diff"] > 0 else "DOM"
        print(
            f"  {row['feature']}: {higher} higherby {abs(row['diff_pct']):.1f}% (p={row['p_value']:.2e})"
        )
else:
    print("insignificant")


In [None]:
phase_order = ["Baseline", "Stress", "Intervention", "Relaxation"]

stress_features_map = {
    "eda_scl_mean": df_eda,
    "eda_phasic_mean": df_eda,
    "eda_scr_rate": df_eda,
    "hr_mean": df_hr,
    "hr_rmssd": df_hr,
    "temp_mean": df_temp,
    "temp_slope": df_temp,
    "acc_enmo_mean": df_acc,
}

In [None]:
bilateral_phase_results = []

In [None]:
df_bilateral_phase = pd.DataFrame(bilateral_phase_results)

In [None]:
for feature in stress_features_map.keys():
    feat_data = df_bilateral_phase[df_bilateral_phase["feature"] == feature]

    baseline_row = feat_data[feat_data["phase"] == "Baseline"]
    stress_row = feat_data[feat_data["phase"] == "Stress"]

    if len(baseline_row) > 0 and len(stress_row) > 0:
        baseline_diff = baseline_row["diff_ND_D"].values[0]
        stress_diff = stress_row["diff_ND_D"].values[0]

        change = stress_diff - baseline_diff
        change_direction = (
            "increased" if abs(stress_diff) > abs(baseline_diff) else "decreased"
        )

        print(
            f"  {feature:20s}:bseline diff={baseline_diff:+.4f}, stress diff={stress_diff:+.4f}, change={change:+.4f} (asymmetry {change_direction})"
        )
    else:
        print(f"  {feature:20s}: data missing")

df_bilateral_phase.to_csv(
    os.path.join(FEATURE_DIR, "bilateral_dominant_vs_nondominant_by_phase.csv"),
    index=False,
)

In [None]:
fig, axes = plt.subplots(2, 4, figsize=(18, 10))
axes = axes.flatten()

phase_order = ["Baseline", "Stress", "Intervention", "Relaxation"]
phase_colors = {
    "Baseline": "#2ecc71",
    "Stress": "#e74c3c",
    "Intervention": "#3498db",
    "Relaxation": "#9b59b6",
}

In [None]:
stress_features_map = {
    "eda_scl_mean": df_eda,
    "eda_phasic_mean": df_eda,
    "eda_scr_rate": df_eda,
    "hr_mean": df_hr,
    "hr_rmssd": df_hr,
    "temp_mean": df_temp,
    "temp_slope": df_temp,
    "acc_enmo_mean": df_acc,
}

In [None]:
def plot_tsne_comparison(df, feature_cols, signal_name):
    fig, axes = plt.subplots(1, 3, figsize=(18, 5))

    scaler = StandardScaler()

    for ax, wrist_type in zip(axes[:2], ["non-dominant", "dominant"]):
        df_subset = df[
            (df["wrist_type"] == wrist_type)
            & (df["phase_category"].isin(["Baseline", "Stress"]))
        ].copy()

        X = df_subset[feature_cols].dropna()
        valid_idx = X.index
        y = df_subset.loc[valid_idx, "phase_category"]
        X_scaled = scaler.fit_transform(X)

        tsne = TSNE(n_components=2, random_state=42, perplexity=30, max_iter=1000)
        X_tsne = tsne.fit_transform(X_scaled)


        colors = {"Baseline": "#2ecc71", "Stress": "#e74c3c"}
        for phase in ["Baseline", "Stress"]:
            mask = y == phase
            ax.scatter(
                X_tsne[mask, 0],
                X_tsne[mask, 1],
                c=colors[phase],
                label=phase,
                alpha=0.6,
                s=30,
                edgecolor="white",
            )

        ax.set_xlabel("tsne 1")
        ax.set_ylabel("tsne 2")
        ax.set_title(
            f"{wrist_type.upper()}\n(n={len(X)})", fontsize=11, fontweight="bold"
        )
        ax.legend(loc="upper right")
        ax.grid(True, alpha=0.3)


    ax3 = axes[2]
    df_both = df[df["phase_category"].isin(["Baseline", "Stress"])].copy()
    X_both = df_both[feature_cols].dropna()
    valid_idx = X_both.index
    wrist_labels = df_both.loc[valid_idx, "wrist_type"]
    phase_labels = df_both.loc[valid_idx, "phase_category"]

    X_scaled_both = scaler.fit_transform(X_both)
    tsne_both = TSNE(n_components=2, random_state=42, perplexity=30, max_iter=1000)
    X_tsne_both = tsne_both.fit_transform(X_scaled_both)


    markers = {"Baseline": "o", "Stress": "^"}
    colors_wrist = {"non-dominant": "#3498db", "dominant": "#e74c3c"}

    for wrist in ["non-dominant", "dominant"]:
        for phase in ["Baseline", "Stress"]:
            mask = (wrist_labels == wrist) & (phase_labels == phase)
            ax3.scatter(
                X_tsne_both[mask, 0],
                X_tsne_both[mask, 1],
                c=colors_wrist[wrist],
                marker=markers[phase],
                label=f"{wrist[:3].upper()}-{phase[:4]}",
                alpha=0.5,
                s=30,
                edgecolor="white",
            )

    ax3.set_xlabel("tsne1")
    ax3.set_ylabel("tsne2")
    ax3.set_title(
        "bilateral", fontsize=11, fontweight="bold"
    )
    ax3.legend(loc="upper right", fontsize=8)
    ax3.grid(True, alpha=0.3)

    plt.suptitle(
        f"tsne for {signal_name}",
        fontsize=12,
        fontweight="bold",
    )
    plt.tight_layout()
    return fig

for signal_name, df, feature_cols in signal_configs:
    fig = plot_tsne_comparison(df, feature_cols, signal_name)
    plt.savefig(
        os.path.join(FIG_DIR, f"tsne_{signal_name.lower()}_dominant_comparison.png"),
        dpi=150,
        bbox_inches="tight",
    )
    plt.show()
    print(f"Saved: tsne_{signal_name.lower()}_dominant_comparison.png")

In [None]:
models = {
    "KNN": KNeighborsClassifier(
        n_neighbors=5,  
        weights="uniform", 
        metric="euclidean",  
        n_jobs=-1, 
    ),
    "SVM": SVC(
        kernel="rbf", 
        C=1.0, 
        gamma="scale",  
        probability=True, 
        random_state=42,  
    ),
    "XGBoost": GradientBoostingClassifier(
        n_estimators=100, 
        learning_rate=0.1, 
        max_depth=3, 
        min_samples_split=2,  
        min_samples_leaf=1,  
        subsample=1.0,  
        random_state=42, 
    ),
}

In [None]:
def prepare_ml_data(
    df, feature_cols, wrist_type=None, phase_pair=("Baseline", "Stress")
):
    if wrist_type:
        df_subset = df[df["wrist_type"] == wrist_type].copy()
    else:
        df_subset = df.copy()

    df_subset = df_subset[df_subset["phase_category"].isin(phase_pair)].copy()

    X = df_subset[feature_cols].copy()
    y = (df_subset["phase_category"] == phase_pair[1]).astype(int)  
    
    groups = df_subset["participant_id"]
    mask = ~X.isna().any(axis=1)
    X, y, groups = X[mask], y[mask], groups[mask]

    return X.values, y.values, groups.values

In [None]:
def evaluate_loso(X, y, groups, model):
    logo = LeaveOneGroupOut()

    y_true_all = []
    y_pred_all = []
    
    y_prob_all = []
    
    participant_results = []
    

    for train_idx, test_idx in logo.split(X, y, groups):
        X_train, X_test = X[train_idx], X[test_idx]
        y_train, y_test = y[train_idx], y[test_idx]
        test_participant = groups[test_idx][0]
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)


        model.fit(X_train_scaled, y_train)
        y_pred = model.predict(X_test_scaled)

        y_true_all.extend(y_test)
        y_pred_all.extend(y_pred)
        
        if hasattr(model, "predict_proba"):
            y_prob = model.predict_proba(X_test_scaled)[:, 1]
            y_prob_all.extend(y_prob)


        participant_acc = accuracy_score(y_test, y_pred)
        participant_results.append(
            {
                "participant": test_participant,
                "accuracy": participant_acc,
                "n_samples": len(y_test),
            }
        )


    acc = accuracy_score(y_true_all, y_pred_all)
    f1 = f1_score(y_true_all, y_pred_all, average="macro")

    if len(y_prob_all) > 0:
        auc = roc_auc_score(y_true_all, y_prob_all)
    else:
        auc = np.nan


    cm = confusion_matrix(y_true_all, y_pred_all)

    return {
        "accuracy": acc,
        "f1_macro": f1,
        "auc": auc,
        "y_true": y_true_all,
        "y_pred": y_pred_all,
        "y_prob": y_prob_all,
        "confusion_matrix": cm,
        "participant_results": participant_results,
    }

In [None]:
signal_configs = [
    ("EDA", df_eda, eda_feature_cols),
    ("HR", df_hr, hr_feature_cols),
    ("TEMP", df_temp, temp_feature_cols),
    ("ACC", df_acc, acc_feature_cols),
]

In [None]:
results_exp1 = []

for signal_name, df, feature_cols in signal_configs:
    print(f"{signal_name}: ({len(feature_cols)} features)")

    for wrist in ["non-dominant", "dominant"]:
        X, y, groups = prepare_ml_data(
            df, feature_cols, wrist_type=wrist, phase_pair=("Baseline", "Stress")
        )

        n_participants = len(np.unique(groups))
        n_samples = len(y)
        n_baseline = sum(y == 0)
        n_stress = sum(y == 1)

        for model_name, model in models.items():
            model_clone = clone(model)
            result = evaluate_loso(X, y, groups, model_clone)

            results_exp1.append(
                {
                    "signal": signal_name,
                    "wrist_type": wrist,
                    "model": model_name,
                    "accuracy": result["accuracy"],
                    "f1_macro": result["f1_macro"],
                    "auc": result["auc"],
                    "n_samples": n_samples,
                    "n_participants": n_participants,
                    "n_features": len(feature_cols),
                }
            )
            tn, fp, fn, tp = result["confusion_matrix"].ravel()
            sensitivity = tp / (tp + fn) if (tp + fn) > 0 else 0
            specificity = tn / (tn + fp) if (tn + fp) > 0 else 0

            print(
                f"    {model_name:10s}: Acc={result['accuracy']:.3f}, "
                f"F1={result['f1_macro']:.3f}, AUC={result['auc']:.3f} "
                f"(Sens={sensitivity:.2f}, Spec={specificity:.2f})"
            )

df_results_exp1 = pd.DataFrame(results_exp1)

In [None]:
for metric in ["accuracy", "f1_macro", "auc"]:
    pivot = df_results_exp1.pivot_table(
        index=["signal", "model"], columns="wrist_type", values=metric
    )
    pivot["Diff"] = pivot["non-dominant"] - pivot["dominant"]
    pivot["Better"] = pivot.apply(
        lambda row: "NON-DOM" if row["Diff (ND-D)"] > 0 else "DOMINANT", axis=1
    )
    print(pivot.round(3).to_string())





#---------------------------------
for signal_name in ["EDA", "HR", "TEMP", "ACC"]:
    signal_data = df_results_exp1[df_results_exp1["signal"] == signal_name]

    nondom_acc = signal_data[signal_data["wrist_type"] == "non-dominant"][
        "accuracy"
    ].mean()
    dom_acc = signal_data[signal_data["wrist_type"] == "dominant"]["accuracy"].mean()

    winner = "NON-DOMINANT" if nondom_acc > dom_acc else "DOMINANT"
    diff = nondom_acc - dom_acc

    print(
        f"  {signal_name}: NON-DOM={nondom_acc:.3f}, DOM={dom_acc:.3f}, Diff={diff:+.3f} â†’ {winner}"
    )

In [None]:
for signal_name in ["EDA", "HR", "TEMP", "ACC"]:
    signal_data = df_results_exp1[df_results_exp1["signal"] == signal_name]

    model_winners = []
    for model_name in ["KNN", "SVM", "XGBoost"]:
        model_data = signal_data[signal_data["model"] == model_name]
        nondom_acc = model_data[model_data["wrist_type"] == "non-dominant"][
            "accuracy"
        ].values[0]
        dom_acc = model_data[model_data["wrist_type"] == "dominant"]["accuracy"].values[
            0
        ]
        model_winners.append("ND" if nondom_acc > dom_acc else "D")

    status = "AGREE" if len(set(model_winners)) == 1 else "DISAGREE"
    print(
        f"  {signal_name}: {status} (KNN={model_winners[0]}, SVM={model_winners[1]}, XGB={model_winners[2]})"
    )

In [None]:
def get_per_participant_accuracies_single_signal(df, feature_cols, wrist_type, model):
    X, y, groups = prepare_ml_data(df, feature_cols, wrist_type=wrist_type)

    logo = LeaveOneGroupOut()
    participant_accs = {}

    for train_idx, test_idx in logo.split(X, y, groups):
        X_train, X_test = X[train_idx], X[test_idx]
        y_train, y_test = y[train_idx], y[test_idx]
        participant = groups[test_idx][0]

        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)

        model_clone = clone(model)
        model_clone.fit(X_train_scaled, y_train)
        y_pred = model_clone.predict(X_test_scaled)

        participant_accs[participant] = accuracy_score(y_test, y_pred)

    return participant_accs

In [None]:
model_agreement_results = []

for signal_name, df, feature_cols in signal_configs:
    print(f"(signal_name}:")

    model_winners = {}
    model_diffs = {}

    for model_name, model in models.items():
        nd_accs = get_per_participant_accuracies_single_signal(
            df, feature_cols, "non-dominant", model
        )
        d_accs = get_per_participant_accuracies_single_signal(
            df, feature_cols, "dominant", model
        )

        nd_mean = np.mean(list(nd_accs.values()))
        d_mean = np.mean(list(d_accs.values()))
        diff = nd_mean - d_mean

        winner = "ND" if diff > 0 else "D"
        model_winners[model_name] = winner
        model_diffs[model_name] = diff

    unique_winners = set(model_winners.values())
    if len(unique_winners) == 1:
        agreement = "FULL AGREEMENT"
        agreed_winner = list(unique_winners)[0]
    else:
        agreement = "DISAGREEMENT"
        agreed_winner = "Mixed"


#---------------------------------
    model_agreement_results.append(
        {
            "signal": signal_name,
            "KNN_winner": model_winners["KNN"],
            "SVM_winner": model_winners["SVM"],
            "XGBoost_winner": model_winners["XGBoost"],
            "KNN_diff": model_diffs["KNN"],
            "SVM_diff": model_diffs["SVM"],
            "XGBoost_diff": model_diffs["XGBoost"],
            "agreement": agreement,
            "consensus_winner": agreed_winner,
        }
    )

df_model_agreement = pd.DataFrame(model_agreement_results)
print(
    df_model_agreement[
        [
            "signal",
            "KNN_winner",
            "SVM_winner",
            "XGBoost_winner",
            "agreement",
            "consensus_winner",
        ]
    ].to_string(index=False)
)

In [None]:
def add_window_index(df):
    df = df.copy()
    df["window_idx"] = df.groupby(["participant_id", "phase", "side"]).cumcount()
    return df

df_eda_idx = add_window_index(df_eda)
df_hr_idx = add_window_index(df_hr)
df_temp_idx = add_window_index(df_temp)
df_acc_idx = add_window_index(df_acc)

print("\nWindow indices added for signal alignment.")

In [None]:
import pandas as pd

eda_features = pd.read_csv("Extracted_Features_Final/features_eda.csv")
hr_features = pd.read_csv("Extracted_Features_Final/features_hr.csv")
temp_features = pd.read_csv("Extracted_Features_Final/features_temp.csv")
acc_features = pd.read_csv("Extracted_Features_Final/features_acc.csv")
metadata_cols = ["participant_id", "timestamp", "phase", "window_start", "window_end"]

for name, df in [
    ("EDA", eda_features),
    ("HR", hr_features),
    ("TEMP", temp_features),
    ("ACC", acc_features),
]:
    feature_cols = [c for c in df.columns if c not in metadata_cols]
    print(f"{name}: {len(feature_cols)} features")

In [None]:
def prepare_combined_features_single_wrist(
    filter_col, filter_val, phase_pair=("Baseline", "Stress")
):

    df_eda_f = df_eda_idx[
        (df_eda_idx[filter_col] == filter_val)
        & (df_eda_idx["phase_category"].isin(phase_pair))
    ].copy()
    df_hr_f = df_hr_idx[
        (df_hr_idx[filter_col] == filter_val)
        & (df_hr_idx["phase_category"].isin(phase_pair))
    ].copy()
    df_temp_f = df_temp_idx[
        (df_temp_idx[filter_col] == filter_val)
        & (df_temp_idx["phase_category"].isin(phase_pair))
    ].copy()
    df_acc_f = df_acc_idx[
        (df_acc_idx[filter_col] == filter_val)
        & (df_acc_idx["phase_category"].isin(phase_pair))
    ].copy()


    merge_cols = ["participant_id", "phase_category", "phase", "side", "window_idx"]

    df_merged = (
        df_eda_f[merge_cols + eda_feature_cols]
        .merge(df_hr_f[merge_cols + hr_feature_cols], on=merge_cols, how="inner")
        .merge(df_temp_f[merge_cols + temp_feature_cols], on=merge_cols, how="inner")
        .merge(df_acc_f[merge_cols + acc_feature_cols], on=merge_cols, how="inner")
    )

    all_feature_cols = (
        eda_feature_cols + hr_feature_cols + temp_feature_cols + acc_feature_cols
    )

    X = df_merged[all_feature_cols].copy()
    y = (df_merged["phase_category"] == phase_pair[1]).astype(int)
    groups = df_merged["participant_id"]


    mask = ~X.isna().any(axis=1)
    X, y, groups = X[mask], y[mask], groups[mask]

    return X.values, y.values, groups.values, all_feature_cols

In [None]:
results_2a = []

for wrist_type in ["non-dominant", "dominant"]:
    X, y, groups, feature_cols_all = prepare_combined_features_single_wrist(
        "wrist_type", wrist_type
    )
    n_features = len(feature_cols_all)

    print(
        f"\n{wrist_type.upper()}: {len(y)} samples, {len(np.unique(groups))} participants, {n_features} features"
    )


    for model_name, model in models.items():
        print(f"  Running {model_name}...", end=" ", flush=True)
        model_clone = clone(model)
        result = evaluate_loso(X, y, groups, model_clone)

        results_2a.append(
            {
                "comparison": "DOM_vs_NONDOM",
                "wrist": wrist_type,
                "model": model_name,
                "accuracy": result["accuracy"],
                "f1_macro": result["f1_macro"],
                "auc": result["auc"],
                "n_features": n_features,
                "n_samples": len(y),
            }
        )

        tn, fp, fn, tp = result["confusion_matrix"].ravel()
        sens = tp / (tp + fn) if (tp + fn) > 0 else 0
        spec = tn / (tn + fp) if (tn + fp) > 0 else 0

In [None]:
dom_nondom_results = [r for r in results_2a if r["comparison"] == "DOM_vs_NONDOM"]
for model_name in ["KNN", "SVM", "XGBoost"]:
    nd_acc = [
        r["accuracy"]
        for r in dom_nondom_results
        if r["wrist"] == "non-dominant" and r["model"] == model_name
    ][0]
    d_acc = [
        r["accuracy"]
        for r in dom_nondom_results
        if r["wrist"] == "dominant" and r["model"] == model_name
    ][0]
    diff = nd_acc - d_acc
    winner = "ND" if diff > 0 else "D"
    print(
        f"  {model_name:10s}: ND={nd_acc:.3f}, D={d_acc:.3f}, Diff={diff:+.3f} -> {winner}"
    )

nd_avg = np.mean(
    [r["accuracy"] for r in dom_nondom_results if r["wrist"] == "non-dominant"]
)
d_avg = np.mean([r["accuracy"] for r in dom_nondom_results if r["wrist"] == "dominant"])

In [None]:
def prepare_bilateral_features(phase_pair=("Baseline", "Stress")):
    df_eda_nd = df_eda_idx[
        (df_eda_idx["wrist_type"] == "non-dominant")
        & (df_eda_idx["phase_category"].isin(phase_pair))
    ].copy()
    df_hr_nd = df_hr_idx[
        (df_hr_idx["wrist_type"] == "non-dominant")
        & (df_hr_idx["phase_category"].isin(phase_pair))
    ].copy()
    df_temp_nd = df_temp_idx[
        (df_temp_idx["wrist_type"] == "non-dominant")
        & (df_temp_idx["phase_category"].isin(phase_pair))
    ].copy()
    df_acc_nd = df_acc_idx[
        (df_acc_idx["wrist_type"] == "non-dominant")
        & (df_acc_idx["phase_category"].isin(phase_pair))
    ].copy()

#---------------------------------
    df_eda_d = df_eda_idx[
        (df_eda_idx["wrist_type"] == "dominant")
        & (df_eda_idx["phase_category"].isin(phase_pair))
    ].copy()
    df_hr_d = df_hr_idx[
        (df_hr_idx["wrist_type"] == "dominant")
        & (df_hr_idx["phase_category"].isin(phase_pair))
    ].copy()
    df_temp_d = df_temp_idx[
        (df_temp_idx["wrist_type"] == "dominant")
        & (df_temp_idx["phase_category"].isin(phase_pair))
    ].copy()
    df_acc_d = df_acc_idx[
        (df_acc_idx["wrist_type"] == "dominant")
        & (df_acc_idx["phase_category"].isin(phase_pair))
    ].copy()

    eda_nd_rename = {c: f"{c}_ND" for c in eda_feature_cols}
    hr_nd_rename = {c: f"{c}_ND" for c in hr_feature_cols}
    temp_nd_rename = {c: f"{c}_ND" for c in temp_feature_cols}
    acc_nd_rename = {c: f"{c}_ND" for c in acc_feature_cols}

    eda_d_rename = {c: f"{c}_D" for c in eda_feature_cols}
    hr_d_rename = {c: f"{c}_D" for c in hr_feature_cols}
    temp_d_rename = {c: f"{c}_D" for c in temp_feature_cols}
    acc_d_rename = {c: f"{c}_D" for c in acc_feature_cols}

    df_eda_nd = df_eda_nd.rename(columns=eda_nd_rename)
    df_hr_nd = df_hr_nd.rename(columns=hr_nd_rename)
    df_temp_nd = df_temp_nd.rename(columns=temp_nd_rename)
    df_acc_nd = df_acc_nd.rename(columns=acc_nd_rename)

    df_eda_d = df_eda_d.rename(columns=eda_d_rename)
    df_hr_d = df_hr_d.rename(columns=hr_d_rename)
    df_temp_d = df_temp_d.rename(columns=temp_d_rename)
    df_acc_d = df_acc_d.rename(columns=acc_d_rename)

    #---------------------------------
    merge_cols_nd = ["participant_id", "phase_category", "phase", "window_idx"]

    nd_feature_cols = (
        list(eda_nd_rename.values())
        + list(hr_nd_rename.values())
        + list(temp_nd_rename.values())
        + list(acc_nd_rename.values())
    )
    d_feature_cols = (
        list(eda_d_rename.values())
        + list(hr_d_rename.values())
        + list(temp_d_rename.values())
        + list(acc_d_rename.values())
    )

#---------------------------------
    df_nd_merged = (
        df_eda_nd[merge_cols_nd + list(eda_nd_rename.values())]
        .merge(
            df_hr_nd[merge_cols_nd + list(hr_nd_rename.values())],
            on=merge_cols_nd,
            how="inner",
        )
        .merge(
            df_temp_nd[merge_cols_nd + list(temp_nd_rename.values())],
            on=merge_cols_nd,
            how="inner",
        )
        .merge(
            df_acc_nd[merge_cols_nd + list(acc_nd_rename.values())],
            on=merge_cols_nd,
            how="inner",
        )
    )

    #---------------------------------
    df_d_merged = (
        df_eda_d[merge_cols_nd + list(eda_d_rename.values())]
        .merge(
            df_hr_d[merge_cols_nd + list(hr_d_rename.values())],
            on=merge_cols_nd,
            how="inner",
        )
        .merge(
            df_temp_d[merge_cols_nd + list(temp_d_rename.values())],
            on=merge_cols_nd,
            how="inner",
        )
        .merge(
            df_acc_d[merge_cols_nd + list(acc_d_rename.values())],
            on=merge_cols_nd,
            how="inner",
        )
    )

#---------------------------------
    df_bilateral = df_nd_merged.merge(df_d_merged, on=merge_cols_nd, how="inner")
    bilateral_feature_cols = nd_feature_cols + d_feature_cols

    X = df_bilateral[bilateral_feature_cols].copy()
    y = (df_bilateral["phase_category"] == phase_pair[1]).astype(int)
    groups = df_bilateral["participant_id"]

    
    mask = ~X.isna().any(axis=1)
    X, y, groups = X[mask], y[mask], groups[mask]

    return X.values, y.values, groups.values, bilateral_feature_cols

In [None]:
# Prepare bilateral data
X_bilateral, y_bilateral, groups_bilateral, bilateral_cols = (
    prepare_bilateral_features()
)

In [None]:


results_2b = []

#---------------------------------
for model_name, model in models.items():
    print(f"  Running {model_name}...", end=" ", flush=True)
    model_clone = clone(model)
    result = evaluate_loso(X_bilateral, y_bilateral, groups_bilateral, model_clone)

    results_2b.append(
        {
            "config": "bilateral",
            "model": model_name,
            "accuracy": result["accuracy"],
            "f1_macro": result["f1_macro"],
            "auc": result["auc"],
            "n_features": len(bilateral_cols),
            "n_samples": len(y_bilateral),
        }
    )

    tn, fp, fn, tp = result["confusion_matrix"].ravel()
    sens = tp / (tp + fn) if (tp + fn) > 0 else 0
    spec = tn / (tn + fp) if (tn + fp) > 0 else 0

In [None]:
for wrist_type in ["non-dominant", "dominant"]:
    X, y, groups, _ = prepare_combined_features_single_wrist("wrist_type", wrist_type)
    print(f"\n  {wrist_type.upper()}: {len(y)} samples")

    for model_name, model in models.items():
        model_clone = clone(model)
        result = evaluate_loso(X, y, groups, model_clone)

        results_2b.append(
            {
                "config": wrist_type,
                "model": model_name,
                "accuracy": result["accuracy"],
                "f1_macro": result["f1_macro"],
                "auc": result["auc"],
                "n_features": 87,
                "n_samples": len(y),
            }
        )
        print(f"Acc={result['accuracy']:.3f}")

df_results_2b = pd.DataFrame(results_2b)

In [None]:
for model_name in ["KNN", "SVM", "XGBoost"]:
    nd_acc = df_results_2b[
        (df_results_2b["config"] == "non-dominant")
        & (df_results_2b["model"] == model_name)
    ]["accuracy"].values[0]
    d_acc = df_results_2b[
        (df_results_2b["config"] == "dominant") & (df_results_2b["model"] == model_name)
    ]["accuracy"].values[0]
    bi_acc = df_results_2b[
        (df_results_2b["config"] == "bilateral")
        & (df_results_2b["model"] == model_name)
    ]["accuracy"].values[0]

    best_single = max(nd_acc, d_acc)
    best_wrist = "ND" if nd_acc > d_acc else "D"
    improvement = bi_acc - best_single
    status = (
        "BETTER" if improvement > 0.005 else "WORSE" if improvement < -0.005 else "SAME"
    )

    print(
        f"{model_name:<12} {nd_acc:<10.3f} {d_acc:<10.3f} {bi_acc:<10.3f} "
        f"{best_single:.3f} ({best_wrist})    {improvement:+.3f} ({status})"
    )
    
#---------------------------------
nd_avg = df_results_2b[df_results_2b["config"] == "non-dominant"]["accuracy"].mean()
d_avg = df_results_2b[df_results_2b["config"] == "dominant"]["accuracy"].mean()
bi_avg = df_results_2b[df_results_2b["config"] == "bilateral"]["accuracy"].mean()
best_single_avg = max(nd_avg, d_avg)

print(
    f"{'Average':<12} {nd_avg:<10.3f} {d_avg:<10.3f} {bi_avg:<10.3f} "
    f"{best_single_avg:.3f}        {bi_avg - best_single_avg:+.3f}"
)

#---------------------------------
bilateral_wins = 0
for model_name in ["KNN", "SVM", "XGBoost"]:
    nd_acc = df_results_2b[
        (df_results_2b["config"] == "non-dominant")
        & (df_results_2b["model"] == model_name)
    ]["accuracy"].values[0]
    d_acc = df_results_2b[
        (df_results_2b["config"] == "dominant") & (df_results_2b["model"] == model_name)
    ]["accuracy"].values[0]
    bi_acc = df_results_2b[
        (df_results_2b["config"] == "bilateral")
        & (df_results_2b["model"] == model_name)
    ]["accuracy"].values[0]
    if bi_acc > max(nd_acc, d_acc):
        bilateral_wins += 1


In [None]:
X_bilateral, y_bilateral, groups_bilateral, bilateral_cols = (
    prepare_bilateral_features()
)
df_bilateral_features = pd.DataFrame(X_bilateral, columns=bilateral_cols)


bilateral_correlations = []

for base_feature in (
    eda_feature_cols + hr_feature_cols + temp_feature_cols + acc_feature_cols
):
    nd_col = f"{base_feature}_ND"
    d_col = f"{base_feature}_D"

    if (
        nd_col in df_bilateral_features.columns
        and d_col in df_bilateral_features.columns
    ):
        corr = df_bilateral_features[nd_col].corr(df_bilateral_features[d_col])


        if base_feature.startswith("eda_"):
            signal = "EDA"
        elif base_feature.startswith("hr_"):
            signal = "HR"
        elif base_feature.startswith("temp_"):
            signal = "TEMP"
        else:
            signal = "ACC"

        bilateral_correlations.append(
            {"feature": base_feature, "signal": signal, "correlation": corr}
        )

df_bilateral_corr = pd.DataFrame(bilateral_correlations)

#---------------------------------
signal_corr = df_bilateral_corr.groupby("signal")["correlation"].agg(
    ["mean", "std", "min", "max"]
)
print(signal_corr.round(3))
#---------------------------------
for signal in ["EDA", "HR", "TEMP", "ACC"]:
    mean_corr = signal_corr.loc[signal, "mean"]
    if mean_corr > 0.8:
        interp = "high"
    elif mean_corr > 0.5:
        interp = "modrate"
    else:
        interp = "low"
    print(f"  {signal}: r={mean_corr:.3f} - {interp}")


low_corr = df_bilateral_corr.nsmallest(10, "correlation")
for _, row in low_corr.iterrows():
    print(f"  {row['feature']:30s} [{row['signal']:4s}]: r={row['correlation']:.3f}")
