In [4]:
# ============================================================
# LS100 Cycle-Based Analysis:
#   Q1: Is HR correlated with cadence?
#   Q2: Is HR correlated with vertical "jumping" / wasted energy?
# ============================================================

from google.colab import drive
drive.mount('/content/drive')

import os
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time

def safe_read_csv(path, retries=5, delay=1):
    """
    Handles Google Drive random disconnects during pd.read_csv.
    Retries up to N times.
    """
    for i in range(retries):
        try:
            return pd.read_csv(path)
        except Exception as e:
            print(f"Read failed ({i+1}/{retries}) for {path}: {e}")
            time.sleep(delay)
    raise RuntimeError(f"Failed to read CSV after {retries} attempts: {path}")


# ------------------------------------------------------------
# 1. Paths
# ------------------------------------------------------------
BASE_DIR   = "/content/drive/MyDrive/Harvard/LS100"
DATA_DIR   = f"{BASE_DIR}/data"
HR_FILE    = f"{DATA_DIR}/LS100_Data.xlsx"
POSE_DIR   = f"{BASE_DIR}/videos/Frame_Reduced/clips/annotated_outputs"
OUT_DIR    = f"{BASE_DIR}/analysis"
os.makedirs(OUT_DIR, exist_ok=True)

FPS = 60  # effective fps of frame-reduced videos (adjust if wrong)

# ------------------------------------------------------------
# 2. Helpers: time + clip + phase
# ------------------------------------------------------------
def time_to_seconds(x):
    if isinstance(x, str):
        try:
            return pd.to_timedelta(x).total_seconds()
        except Exception:
            return np.nan
    if hasattr(x, "hour"):
        return x.hour * 3600 + x.minute * 60 + x.second
    return np.nan


def parse_clip_filename(path_str):
    """
    From 'Aryeh_1_clip3_pose2d_angles.csv' → (participant='Aryeh', trial='1', clip=3, phase)
    """
    stem = Path(path_str).stem  # e.g. 'Aryeh_1_clip3_pose2d_angles'
    parts = stem.split("_")
    participant = parts[0]
    trial = None
    clip = None
    for p in parts[1:]:
        if p.isdigit() and trial is None:
            trial = p
        if p.startswith("clip"):
            try:
                clip = int(p.replace("clip", ""))
            except:
                pass
    phase_map = {1: "warmup", 2: "mile1", 3: "mile2", 4: "mile3", 5: "cooldown"}
    phase = phase_map.get(clip, "unknown")
    return participant, trial, clip, phase


# HR windows (in seconds) from trial start:
clip_windows = {
    1: ("warmup",   (1, 60)),
    2: ("mile1",    (4*60 + 1, 5*60)),      # 241–300
    3: ("mile2",    (13*60 + 1, 14*60)),    # 781–840
    4: ("mile3",    (23*60 + 1, 24*60)),    # 1381–1440
    5: ("cooldown", (28*60, 29*60)),        # 1680–1740
}

# ------------------------------------------------------------
# 3. Load heart-rate data and define get_clip_hr
# ------------------------------------------------------------
HR_SHEETS = {
    "Danny": "Trial 1- Danny",
    "Aryeh": "Trial 2 - Aryeh",
    "Max":   "Trial 3 - Max",
}

hr_data = {}

for person, sheet in HR_SHEETS.items():
    df = pd.read_excel(HR_FILE, sheet_name=sheet)
    df.columns = [c.strip() for c in df.columns]

    # HR column: contains 'heart'
    hr_col = None
    for c in df.columns:
        if "heart" in c.lower():
            hr_col = c
            break
    if hr_col is None:
        raise ValueError(f"No HR column found for {person}/{sheet}")

    if "Time" not in df.columns:
        raise ValueError(f"'Time' column not found in sheet {sheet}")
    time_col = "Time"

    clean = df[[time_col, hr_col]].copy()
    clean = clean.dropna(subset=[hr_col], how="all")
    clean["time_s"] = clean[time_col].apply(time_to_seconds)
    clean = clean.dropna(subset=["time_s"])
    clean = clean.rename(columns={hr_col: "Heart Rate"})
    clean = clean.sort_values("time_s").reset_index(drop=True)
    hr_data[person] = clean
    print(f"{person}: HR rows = {clean.shape[0]}")


def get_clip_hr(person, clip):
    """
    Mean + std HR for a participant + clip, using fixed time windows.
    """
    if person not in hr_data:
        return np.nan, np.nan
    hr_df = hr_data[person]
    win = clip_windows.get(clip)
    if win is None:
        return np.nan, np.nan
    _, (t0, t1) = win
    mask = (hr_df["time_s"] >= t0) & (hr_df["time_s"] <= t1)
    if mask.sum() == 0:
        return np.nan, np.nan
    vals = hr_df.loc[mask, "Heart Rate"]
    return vals.mean(), vals.std()


# ------------------------------------------------------------
# 4. Cycle detection from knee angle
# ------------------------------------------------------------
def detect_cycles(angle_series, smooth_window=5):
    """
    Detect cycles as consecutive local maxima in angle_left_knee_angle.
    Returns list of indices (peaks). A cycle is peak[i] → peak[i+1].
    """
    if len(angle_series) < 3:
        return []

    smoothed = angle_series.rolling(smooth_window, center=True, min_periods=1).mean().values
    peaks = []
    for i in range(1, len(smoothed) - 1):
        if smoothed[i] > smoothed[i - 1] and smoothed[i] > smoothed[i + 1]:
            peaks.append(i)
    return peaks


def compute_cycle_metrics_for_clip(df_clip, participant, trial, clip, phase):
    """
    df_clip has: video, frame, time_ms, angle_left_knee_angle, hip_y
    Returns dict of clip-level metrics.
    """
    df_clip = df_clip.sort_values("frame").reset_index(drop=True)
    angles = df_clip["angle_left_knee_angle"]

    peaks = detect_cycles(angles)
    if len(peaks) < 2:
        return {
            "participant": participant,
            "trial": trial,
            "clip": clip,
            "phase": phase,
            "n_cycles": 0,
            "duration_min": np.nan,
            "cadence_spm": np.nan,
            "mean_jump_height": np.nan,
            "wasted_energy": np.nan,
        }

    # compute cycle jump heights
    jump_heights = []
    for i in range(len(peaks) - 1):
        start = peaks[i]
        end = peaks[i + 1]
        if end <= start:
            continue
        seg = df_clip.iloc[start:end+1]
        h_min = seg["hip_y"].min()
        h_max = seg["hip_y"].max()
        jump_heights.append(h_max - h_min)

    if len(jump_heights) == 0:
        n_cycles = 0
        mean_jump = np.nan
        wasted = np.nan
    else:
        n_cycles = len(jump_heights)
        mean_jump = float(np.mean(jump_heights))
        wasted = float(np.std(jump_heights))

    # duration from time_ms
    t0 = df_clip["time_ms"].min()
    t1 = df_clip["time_ms"].max()
    duration_min = (t1 - t0) / 1000.0 / 60.0 if pd.notna(t0) and pd.notna(t1) else np.nan
    if duration_min is None or duration_min <= 0:
        cadence = np.nan
    else:
        cadence = n_cycles / duration_min

    return {
        "participant": participant,
        "trial": trial,
        "clip": clip,
        "phase": phase,
        "n_cycles": n_cycles,
        "duration_min": duration_min,
        "cadence_spm": cadence,
        "mean_jump_height": mean_jump,
        "wasted_energy": wasted,
    }


# ------------------------------------------------------------
# 5. Loop over all clips: merge pose2d + angles, compute metrics
# ------------------------------------------------------------
pose2d_files = sorted(Path(POSE_DIR).glob("*_pose2d.csv"))

metrics_rows = []

for pose2d_path in pose2d_files:
    # Matching angles file
    angles_path = str(pose2d_path).replace("_pose2d.csv", "_pose2d_angles.csv")
    angles_path = Path(angles_path)
    if not angles_path.exists():
        print(f"Skipping {pose2d_path.name}: no matching angles file")
        continue

    participant, trial, clip, phase = parse_clip_filename(pose2d_path.name)
    print(f"Processing {pose2d_path.name} → {participant}, trial {trial}, clip {clip}, phase {phase}")

    # load data
    pose_df = safe_read_csv(pose2d_path)
    ang_df  = safe_read_csv(angles_path)

    # sanity
    required_ang_cols = ["video", "frame", "time_ms", "angle_left_knee_angle"]
    for c in required_ang_cols:
        if c not in ang_df.columns:
            raise ValueError(f"{angles_path.name} missing column {c}")

    # get hip_y = mean of left_hip.y and right_hip.y per frame
    hips = pose_df[pose_df["landmark_name"].isin(["left_hip", "right_hip"])].copy()
    hips_wide = hips.pivot_table(
        index=["video", "frame", "time_ms"],
        columns="landmark_name",
        values="y"
    )
    hips_wide = hips_wide.reset_index()

    if "left_hip" not in hips_wide.columns or "right_hip" not in hips_wide.columns:
        print(f"Warning: hips missing for {pose2d_path.name}, skipping.")
        continue

    hips_wide["hip_y"] = hips_wide[["left_hip", "right_hip"]].mean(axis=1)

    # merge angles + hip_y
    merged = pd.merge(
        ang_df[["video", "frame", "time_ms", "angle_left_knee_angle"]],
        hips_wide[["video", "frame", "time_ms", "hip_y"]],
        on=["video", "frame", "time_ms"],
        how="inner"
    )

    if merged.empty:
        print(f"Warning: empty merged df for {pose2d_path.name}")
        continue

    row = compute_cycle_metrics_for_clip(
        merged,
        participant=participant,
        trial=trial,
        clip=clip,
        phase=phase
    )
    metrics_rows.append(row)

cycle_metrics = pd.DataFrame(metrics_rows)
cycle_metrics_path = f"{OUT_DIR}/cycle_metrics.csv"
cycle_metrics.to_csv(cycle_metrics_path, index=False)
print("\nSaved cycle metrics to:", cycle_metrics_path)
print(cycle_metrics.head())


# ------------------------------------------------------------
# 6. Add HR to cycle metrics
# ------------------------------------------------------------
hr_means = []
hr_stds  = []

for _, r in cycle_metrics.iterrows():
    m, s = get_clip_hr(r["participant"], int(r["clip"]))
    hr_means.append(m)
    hr_stds.append(s)

cycle_metrics["HR_mean"] = hr_means
cycle_metrics["HR_std"]  = hr_stds

cycle_metrics_hr_path = f"{OUT_DIR}/cycle_metrics_with_hr.csv"
cycle_metrics.to_csv(cycle_metrics_hr_path, index=False)
print("\nSaved cycle metrics with HR to:", cycle_metrics_hr_path)
print(cycle_metrics.head())


# ------------------------------------------------------------
# 7. Correlations per phase (warmup, mile1, mile2, mile3, cooldown)
# ------------------------------------------------------------
phases = ["warmup", "mile1", "mile2", "mile3", "cooldown"]
corr_rows = []

for ph in phases:
    sub = cycle_metrics[cycle_metrics["phase"] == ph].dropna(subset=["HR_mean"])
    # we expect 3 rows: Danny, Aryeh, Max
    if sub.shape[0] < 2:
        print(f"Not enough data for phase {ph}")
        continue

    # Q1: corr(HR, cadence)
    c1 = sub[["HR_mean", "cadence_spm"]].corr(method="pearson").loc["HR_mean", "cadence_spm"]

    # Q2a: corr(HR, mean_jump_height)
    c2 = sub[["HR_mean", "mean_jump_height"]].corr(method="pearson").loc["HR_mean", "mean_jump_height"]

    # Q2b: corr(HR, wasted_energy)
    c3 = sub[["HR_mean", "wasted_energy"]].corr(method="pearson").loc["HR_mean", "wasted_energy"]

    corr_rows.append({
        "phase": ph,
        "corr_HR_cadence": c1,
        "corr_HR_mean_jump": c2,
        "corr_HR_wasted_energy": c3,
        "n_participants": sub.shape[0],
    })

correlations_by_phase = pd.DataFrame(corr_rows)
corr_path = f"{OUT_DIR}/correlations_by_phase.csv"
correlations_by_phase.to_csv(corr_path, index=False)

print("\nCorrelations by phase:")
print(correlations_by_phase)
print("\nSaved to:", corr_path)


# ------------------------------------------------------------
# 8. OPTIONAL: quick scatter plots (run manually)
# ------------------------------------------------------------
def scatter_hr_vs(feature, df, title_suffix=""):
    plt.figure()
    plt.scatter(df[feature], df["HR_mean"])
    for _, r in df.iterrows():
        plt.text(r[feature], r["HR_mean"], r["participant"])
    plt.xlabel(feature)
    plt.ylabel("HR_mean (bpm)")
    plt.title(f"HR vs {feature} {title_suffix}")
    plt.grid(True)
    plt.show()

# Example usage (run in separate cells if you want):
# for ph in phases:
#     sub = cycle_metrics[cycle_metrics["phase"] == ph]
#     scatter_hr_vs("cadence_spm", sub, f"({ph})")
#     scatter_hr_vs("mean_jump_height", sub, f"({ph})")
#     scatter_hr_vs("wasted_energy", sub, f"({ph})")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Danny: HR rows = 2280
Aryeh: HR rows = 2284
Max: HR rows = 2286
Processing Aryeh_1_clip1_pose2d.csv → Aryeh, trial 1, clip 1, phase warmup
Processing Aryeh_1_clip2_pose2d.csv → Aryeh, trial 1, clip 2, phase mile1
Processing Aryeh_1_clip3_pose2d.csv → Aryeh, trial 1, clip 3, phase mile2
Processing Aryeh_2_clip4_pose2d.csv → Aryeh, trial 2, clip 4, phase mile3
Processing Aryeh_2_clip5_pose2d.csv → Aryeh, trial 2, clip 5, phase cooldown
Processing Danny_1_clip1_pose2d.csv → Danny, trial 1, clip 1, phase warmup
Processing Danny_1_clip2_pose2d.csv → Danny, trial 1, clip 2, phase mile1
Processing Danny_1_clip3_pose2d.csv → Danny, trial 1, clip 3, phase mile2
Processing Danny_2_clip4_pose2d.csv → Danny, trial 2, clip 4, phase mile3
Processing Danny_2_clip5_pose2d.csv → Danny, trial 2, clip 5, phase cooldown
Processing Max_1_clip1_pose2d.csv → Max, trial 1, clip 1, p

In [3]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)


Mounted at /content/drive
