In [None]:
from datetime import datetime, time
from pathlib import Path

import numpy as np
import pandas as pd
from tqdm.auto import tqdm

In [None]:
DATA_DIR = Path("./pistachio_1_data")
DEVICE_DATA_DIR = DATA_DIR / "SMRawDec27"

cohort_df = pd.read_csv(DATA_DIR / "cohort_analysis.csv")
pdi_sheets = pd.read_excel(DATA_DIR / "Artificial_PDI_12-27.xlsx", sheet_name=None)
assert len(pdi_sheets) == 1
pdi_start_dates_df = pdi_sheets["PDI start dates"]

dyads_df = pd.merge(cohort_df, pdi_start_dates_df, on="ID", how="left")
dyads_df = dyads_df.set_index("ID")

In [None]:
from hrv import daily_hrv_sdann_sleep
from util import FORMAT_DAY, FORMAT_MIN, FORMAT_SEC

NIGHTTIME_CUTOFF = time(18, 0)
TANTRUM_INTERVAL_MINUTES = [15, 30, 45, 60]


def make_dyad_df(dyad: int) -> pd.DataFrame:
    dyad_code = f"{dyad:03d}"

    ## Activity
    # Data every 15 minutes
    epoch_df = get_child_garmin_df(dyad_code, "Epoch")
    epoch_log_df = get_child_garmin_df(dyad_code, "EpochLog")

    # Add total activity seconds by intensity to each epoch
    epoch_df_ext = epoch_df.copy().set_index("ActivityDateTime")
    for intensity in ["SEDENTARY", "ACTIVE", "HIGHLY_ACTIVE"]:
        epoch_df_ext = add_activity_intensity_to_epoch_df(
            epoch_df_ext, epoch_log_df, intensity
        )

    ## Sleep
    # Daily summaries of sleep (e.g., total REM for a night)
    sleep_df = get_child_garmin_df(dyad_code, "Sleep")
    sleep_df["CalendarDate"] = pd.to_datetime(
        sleep_df["CalendarDate"], format=FORMAT_DAY
    )
    # Each detected sleep stage with its own row w/start time, duration
    sleep_details_df = get_child_garmin_df(dyad_code, "SleepDetails")
    # Summarize sleep details
    sleep_summary_df = summarize_sleep_details(
        sleep_details_df, nighttime_cutoff=NIGHTTIME_CUTOFF
    )

    ## Heart rate
    hr_df = get_child_garmin_df(dyad_code, "HeartRate")
    hr_df["ActivityTime"] = pd.to_datetime(hr_df["ActivityTime"], format=FORMAT_SEC)

    ## Stress
    # Daily stress summary
    stress_df = get_child_garmin_df(dyad_code, "Stress")
    stress_df["ActivityDateTime"] = pd.to_datetime(
        stress_df["ActivityDateTime"], format=FORMAT_MIN
    )
    stress_df = stress_df.set_index(stress_df["ActivityDateTime"].dt.date).drop(
        ["ActivityDateTime"], axis=1
    )

    # High resolution stress (every 3 minutes)
    stress_details_df = get_child_garmin_df(dyad_code, "StressDetails")
    # Rollup stress_details_df to 15-minute bins, averaging StressLevelValue (ignoring -1)
    stress_details_df["ActivityDate"] = pd.to_datetime(
        stress_details_df["ActivityDate"]
    )
    stress_details_df = stress_details_df[stress_details_df["StressLevelValue"] != -1]
    stress_details_summary_df = (
        stress_details_df.set_index("ActivityDate")
        .resample("15min")["StressLevelValue"]
        .mean()
        .to_frame()
        .rename(columns={"StressLevelValue": "StressLevelValueAverage"})
    )

    ## EMA logs
    ilumivu_dfs = get_ilumivu_dfs(dyad_code)
    tantrums_df = tantrum_onsets_from_ilumivu_dfs(*ilumivu_dfs)

    ## Now combine all the dataframes
    combined_df = epoch_df_ext.copy()
    for interval in ["10m", "30m", "60m"]:
        recent_hrs_by_time = pd.Series(
            [
                most_recent_hrs(
                    hr_df, pd.to_datetime(time), lookback=pd.Timedelta(interval)
                )
                for time, _ in epoch_df_ext.iterrows()
            ],
            index=epoch_df_ext.index,
        )
        combined_df[f"hr_moving_avg_{interval}"] = recent_hrs_by_time.map(np.mean)
        combined_df[f"hr_moving_std_{interval}"] = recent_hrs_by_time.map(np.std)
        combined_df[f"hr_moving_max_{interval}"] = recent_hrs_by_time.map(np.max)
        combined_df[f"hr_moving_min_{interval}"] = recent_hrs_by_time.map(np.min)

    combined_df["ActivityDateTimeDt"] = pd.to_datetime(
        combined_df.index, format=FORMAT_MIN
    )
    combined_df = combined_df.join(
        stress_details_summary_df, on="ActivityDateTimeDt", how="left"
    )
    combined_df = combined_df.drop(columns=["ActivityDateTimeDt"])

    hrv_df = daily_hrv_sdann_sleep(hr_df, sleep_df)
    combined_df["ActivityDate"] = pd.to_datetime(
        combined_df.index, format=FORMAT_MIN
    ).date
    combined_df = combined_df.join(
        hrv_df,
        on="ActivityDate",
        how="left",
    )
    combined_df = combined_df.drop(columns=["ActivityDate"])

    # Look at stress over the past 1 to 6 days
    stress_lookback_days = 5
    for day in range(1, stress_lookback_days + 1):
        temp_col = f"StressDate_T-{day}"
        temp_df = stress_df.copy().add_suffix(f"_T-{day}")
        combined_df[temp_col] = pd.to_datetime(combined_df.index).date - pd.Timedelta(
            days=day
        )
        combined_df = combined_df.join(temp_df, on=temp_col, how="left")
        combined_df = combined_df.drop(columns=[temp_col])

    # Look at sleep over the past 0 to 5 days (sleep from the previous night is marked by the waking date)
    sleep_lookback_days = 5
    for day in range(sleep_lookback_days):
        temp_col = f"SleepNightT_{day}"
        temp_df = sleep_summary_df.copy().add_suffix(f"_T-{day}")
        combined_df[temp_col] = pd.to_datetime(combined_df.index).date - pd.Timedelta(
            days=day
        )
        combined_df = combined_df.join(temp_df, on=temp_col, how="left")
        combined_df = combined_df.drop(columns=[temp_col])

    activity_times = pd.to_datetime(combined_df.index)
    tantrum_starts = tantrums_df.sort_values().to_numpy()
    for interval in TANTRUM_INTERVAL_MINUTES:
        combined_df[f"tantrum_within_{interval}m"] = activity_times.map(
            lambda x: has_tantrum_within_period_minutes(tantrum_starts, x, interval)
        )

    for col in dyads_df.columns:
        combined_df[col] = dyads_df.loc[dyad, col]

    combined_df.reset_index(inplace=True)
    combined_df.rename(columns={"index": "ActivityDateTime"}, inplace=True)
    return combined_df


def get_child_garmin_df(dyad: str, csv_name: str) -> pd.DataFrame:
    # Example name: pistachio003_c_garminActivity_20220325_20220721.csv
    glob = f"*_garmin{csv_name}_*.csv"  # Need flanking underscores bc some names (e.g., garminActivity) are prefixes
    child = f"{dyad}_C"
    [csv] = list((DEVICE_DATA_DIR / child / "Garmin").glob(glob))
    return pd.read_csv(csv)


def summarize_sleep_details(
    sleep_details_df: pd.DataFrame, nighttime_cutoff: time
) -> pd.DataFrame:
    """Summarize sleep details similar to the garminSleep CSV, but using a different cutoff than midnight"""

    sleep_details_df["ActivityDateTime"] = pd.to_datetime(
        sleep_details_df["ActivityDateTime"], format="mixed"
    )
    sleep_date_col = "SleepNightT-1"
    sleep_details_df[sleep_date_col] = sleep_details_df["ActivityDateTime"].apply(
        lambda dt: dt.date()
        if dt.time() < nighttime_cutoff
        else dt.date() + pd.Timedelta(days=1)
    )
    sleep_summary = (
        sleep_details_df.groupby([sleep_date_col, "SleepStage"])["Duration"]
        .sum()
        .unstack(fill_value=0)
    )
    return sleep_summary


def add_activity_intensity_to_epoch_df(
    epoch_df_ext: pd.DataFrame,
    epoch_log_df: pd.DataFrame,
    intensity: str,
) -> pd.DataFrame:
    grouped = epoch_log_df.groupby(["ActivityDateTime", "Intensity"])[
        "ActiveTimeInSeconds"
    ].sum()
    intensity_seconds = grouped.loc[(epoch_df_ext.index, intensity)].drop(  # pyright: ignore[reportCallIssue,reportArgumentType]
        columns=["Intensity"]
    )
    intensity_seconds.index = intensity_seconds.index.get_level_values(0)
    col_name = f"activity_seconds_{intensity.lower()}"  # pyright: ignore[reportCallIssue]
    epoch_df_ext[col_name] = intensity_seconds
    epoch_df_ext[col_name] = epoch_df_ext[col_name].fillna(0)
    return epoch_df_ext


def get_ilumivu_df(subject_dir: Path) -> pd.DataFrame | None:
    ilumivu_dir = [
        d
        for d in subject_dir.iterdir()
        # It looks like all filenames start with "Ilumivu" but some actually start with ZERO WIDTH SPACE?!?
        if d.is_dir() and ("Ilumivu" in d.name or "Illumivu" in d.name)
    ]
    if not ilumivu_dir:
        return None

    [ilumivu_dir] = ilumivu_dir
    for f in ilumivu_dir.iterdir():
        if f.name.endswith(".csv"):
            return pd.read_csv(f)

    # Some folders are empty
    return None


def get_ilumivu_dfs(dyad: str) -> tuple[pd.DataFrame, pd.DataFrame | None]:
    """Ilumivu data for the child is only available if the dyad is in the Active (AI) arm."""
    # This directory doesn't have a standard name, but they all start with Ilumivu
    parent = f"{dyad}_P"
    child = f"{dyad}_C"

    parent_df = get_ilumivu_df(DEVICE_DATA_DIR / parent)
    if parent_df is None:
        assert dyad == "049"
        parent_df = pd.DataFrame(
            columns=[
                "survey_name",
                "mobile_code",
                "instance_id",
                "instance_date",
                "item_id",
                "question_code",
                "answer_code",
                "answer_value",
                "page_visible",
                "answer_clicked",
                "seconds_to_respond",
            ]
        )

    child_df = get_ilumivu_df(DEVICE_DATA_DIR / child)
    active_dyads = [
        f"{d:03d}" for d in dyads_df[(dyads_df["Arm"] == "Active")].index.tolist()
    ]
    if dyad not in active_dyads:
        assert child_df is None, f"Sham dyad {dyad} should not have child Ilumivu data"
    return parent_df, child_df


def datetime_from_survey_answer(time: str) -> datetime:
    """Survey answer example: '1:45PM/1649443501.804'"""
    timestamp = float(time.split("/")[1])
    return datetime.fromtimestamp(timestamp)


def tantrum_onsets_from_ilumivu_dfs(
    parent_df: pd.DataFrame,
    child_df: pd.DataFrame | None,
    verbose: bool = False,
) -> pd.Series:
    tantrums = []
    invalid_count = {
        "no_end": 0,
        "submit_before_end": 0,
        "negative_duration": 0,
        "excessive_duration": 0,
        "overlap": 0,
    }

    def get_tantrums(df: pd.DataFrame, start_key: str, end_key: str):
        tantrum_start_indices = df[df["question_code"] == start_key].index
        for start_index in tantrum_start_indices:
            tantrum_start = datetime_from_survey_answer(
                str(df.loc[start_index, "answer_code"])
            )
            tantrum_end = None

            if (
                start_index + 1 in df.index
                and df.loc[start_index + 1, "question_code"] == end_key
            ):
                tantrum_end = datetime_from_survey_answer(
                    str(df.loc[start_index + 1, "answer_code"])
                )

            tantrum_end_input_date = datetime.fromisoformat(
                df.loc[start_index + 1, "instance_date"]
            )

            submit_grace_period = pd.Timedelta(minutes=1)
            # Criteria that Kyle and Arjun discussed a while ago
            if tantrum_end is None:
                invalid_count["no_end"] += 1
                continue
            # We expect the survey to be submitted after the tantrum ends
            if (tantrum_end_input_date + submit_grace_period) < tantrum_end:
                invalid_count["submit_before_end"] += 1
                continue
            duration = tantrum_end - tantrum_start
            if duration < pd.Timedelta(0):
                invalid_count["negative_duration"] += 1
                continue
            tantrums.append((tantrum_start, tantrum_end))
        return tantrums

    tantrums = get_tantrums(
        parent_df, "TIME_OF_ONSET_OF_TANTRUM", "END_TIME_OF_TANTRUM"
    )
    if child_df is not None:
        tantrums += get_tantrums(child_df, "TANTRUM_START", "TANTRUM_END")

    # Combine overlapping tantrums into a single interval
    if tantrums:
        tantrums.sort(key=lambda x: x[0])
        merged_tantrums = []
        current_start, current_end = tantrums[0]
        for start, end in tantrums[1:]:
            if start <= current_end:
                # Overlap: extend the current interval
                invalid_count["overlap"] += 1
                current_end = max(current_end, end)
            else:
                # No overlap: save current and start new
                merged_tantrums.append((current_start, current_end))
                current_start, current_end = start, end
        merged_tantrums.append((current_start, current_end))
        tantrums = merged_tantrums

    # Remove tantrums that are excessively long
    to_remove = []
    for i, (tantrum_start, tantrum_end) in enumerate(tantrums):
        duration = tantrum_end - tantrum_start
        if duration > pd.Timedelta(seconds=3600 * 2):
            invalid_count["excessive_duration"] += 1
            to_remove.append(i)
    tantrums = [t for i, t in enumerate(tantrums) if i not in to_remove]

    # tantrums = [t for idx, t in enumerate(tantrums) if idx not in overlapping_tantrums]
    tantrum_starts = [ts for (ts, _) in tantrums]
    if verbose:
        print(
            f"Removed {sum(invalid_count.values())} invalid tantrums: {invalid_count}"
        )
    return pd.Series(tantrum_starts)


def has_tantrum_within_period_minutes(
    tantrum_starts: np.ndarray, activity_time: pd.DatetimeIndex, period_minutes: int
):
    # Find if any tantrum starts within the specified period after activity_time
    return np.any(
        (tantrum_starts >= activity_time)
        & (tantrum_starts < activity_time + pd.Timedelta(minutes=period_minutes))
    )


def most_recent_hrs(
    hr_df: pd.DataFrame, time: datetime, lookback: pd.Timedelta
) -> pd.Series:
    """Get the last N heart rate measurements before a certain time.

    Assumes hr_df is sorted by ActivityTime.

    Returns a Series of variable length
    """
    end = time
    start = time - lookback

    # Runs in O(log N) time if df is sorted by ActivityTime
    left_idx = hr_df["ActivityTime"].searchsorted(start)
    right_idx = hr_df["ActivityTime"].searchsorted(end)
    recent_hrs = hr_df.iloc[left_idx:right_idx]["HeartRate"]
    return recent_hrs


def pad_or_truncate_series(s: pd.Series, length: int):
    """Convert a Series to a fixed-length numpy array, padding with NaNs if necessary."""
    # Convert s to float type so padding with np.nan works correctly
    s = s.astype(float)
    arr = (
        s.values[-length:]
        if len(s) >= length
        else np.pad(s.values, (length - len(s), 0), constant_values=np.nan)
    )
    return arr

In [None]:
active_dyads = dyads_df[dyads_df["Arm"] == "Active"].index.tolist()
sham_dyads = dyads_df[dyads_df["Arm"] == "Sham"].index.tolist()
counts = {
    "active": 0,
    "sham": 0,
}
for dyad in dyads_df.index:
    tantrums = tantrum_onsets_from_ilumivu_dfs(
        *get_ilumivu_dfs(f"{dyad:03d}"), verbose=True
    )
    if dyad in active_dyads:
        counts["active"] += len(tantrums)
    else:
        counts["sham"] += len(tantrums)

print()
print(counts)
avg_active = counts["active"] / len(active_dyads)
avg_sham = counts["sham"] / len(sham_dyads)
print("Active dyads: ", len(active_dyads))
print("Sham dyads: ", len(sham_dyads))
print(f"Average tantrum count per active dyad: {avg_active:.2f}")
print(f"Average tantrum count per sham dyad: {avg_sham:.2f}")

In [None]:
data_df = pd.DataFrame()
pbar = tqdm(dyads_df.index.tolist())
for dyad in pbar:
    pbar.set_description(f"Processing dyad {dyad}")
    dyad_df = make_dyad_df(dyad)
    dyad_df["dyad"] = dyad

    data_df = pd.concat([data_df, dyad_df], ignore_index=True)

In [None]:
data_df.to_csv(DATA_DIR / "all_dyads.csv", index=False)