In [1]:
import os
import pandas as pd
import numpy as np
import datetime
import matplotlib.pyplot as plt

DATA_DIR = r"mmash\mmash\MMASH\DataPaper"

activity_labels = {
    1: "Sleeping",
    2: "Lying down",
    3: "Sitting",
    4: "Light movement",
    5: "Medium movement",
    6: "Heavy movement",
    7: "Eating",
    8: "Small screen usage",
    9: "Large screen usage",
    10: "Caffeinated drinks",
    11: "Smoking",
    12: "Alcohol consumption"
}

def load_data(user_id):
    """
    Reads Activity.csv and Actigraph.csv for a single user.
    Returns two dataframes: df_acti, df_actig
    """
    user_folder = os.path.join(DATA_DIR, f"user_{user_id}")
    activity_file = os.path.join(user_folder, "Activity.csv")
    actigraph_file = os.path.join(user_folder, "Actigraph.csv")

    df_acti = pd.read_csv(activity_file)
    df_actig = pd.read_csv(actigraph_file)
    return df_acti, df_actig

# Example: load for a single user
df_acti_1, df_actig_1 = load_data(1)


In [4]:
# Convert Times and Label Each Second
def parse_times(df_acti, df_actig):
    """
    Create new datetime columns in both dataframes.
    """
    # Convert Day + "HH:MM" -> datetime
    def make_datetime_activity(day_number, hhmm_str, base_date=datetime.date(2025,1,1)):
        day_offset = datetime.timedelta(days=int(day_number) - 1)
        if hhmm_str == "24:00":
            # Roll over to next day at 00:00
            day_offset += datetime.timedelta(days=1)
            hhmm_str = "00:00"
        t = datetime.datetime.strptime(hhmm_str, "%H:%M").time()
        return datetime.datetime.combine(base_date + day_offset, t)

    df_acti = df_acti.dropna(subset=["Start", "End"]).copy()
    df_acti["Start_dt"] = df_acti.apply(
        lambda row: make_datetime_activity(row["Day"], row["Start"]), axis=1
    )
    df_acti["End_dt"] = df_acti.apply(
        lambda row: make_datetime_activity(row["Day"], row["End"]), axis=1
    )

    # Convert day + "HH:MM:SS" -> datetime
    def make_datetime_actigraph(day_number, hhmmss_str, base_date=datetime.date(2025,1,1)):
        day_offset = datetime.timedelta(days=int(day_number) - 1)
        t = datetime.datetime.strptime(hhmmss_str, "%H:%M:%S").time()
        return datetime.datetime.combine(base_date + day_offset, t)

    df_actig = df_actig.copy()
    df_actig["Datetime"] = df_actig.apply(
        lambda row: make_datetime_actigraph(row["day"], row["time"]), axis=1
    )

    return df_acti, df_actig


In [9]:
all_activity_counts = {}
for user_id in range(1, 23):
    try:
        df_acti, df_actig = load_data(user_id)
        df_acti, df_actig = parse_times(df_acti, df_actig)

        df_acti = df_acti.dropna(subset=["Start", "End", "Activity"])
        df_acti = df_acti.dropna()
        df_acti = df_acti[df_acti["End_dt"] >= df_acti["Start_dt"]]
        

        # Sum the duration for each activity
        df_acti["Duration_minutes"] = (
            df_acti["End_dt"] - df_acti["Start_dt"]
        ).dt.total_seconds() / 60.0

        activity_duration = df_acti.groupby("Activity")["Duration_minutes"].sum()
        for act_code, duration in activity_duration.items():
            all_activity_counts[act_code] = all_activity_counts.get(act_code, 0) + duration

    except FileNotFoundError:
        pass  # in case any user is missing

# Convert to a DataFrame
df_distribution = pd.DataFrame([
    {"ActivityCode": k, 
     "ActivityLabel": activity_labels.get(k, f"Unknown ({k})"),
     "TotalMinutes": v}
    for k, v in all_activity_counts.items()
])
print(df_distribution)


    ActivityCode        ActivityLabel  TotalMinutes
0              1             Sleeping         318.0
1              2           Lying down        4321.0
2              3              Sitting        4812.0
3              4       Light movement          87.0
4              5      Medium movement         460.0
5              6       Heavy movement        2059.0
6              7               Eating        3209.0
7              8   Small screen usage        1262.0
8              9   Large screen usage         385.0
9             10   Caffeinated drinks          65.0
10             0          Unknown (0)        6038.0
11            12  Alcohol consumption         227.0
12            11              Smoking         160.0


In [10]:
# Quick check for out-of-range HR
invalid_hr = df_actig_1[(df_actig_1["HR"] < 20) | (df_actig_1["HR"] > 220)]
print("Out-of-range HR samples:", len(invalid_hr))


Out-of-range HR samples: 1


In [None]:
def preprocess_user_raw(user_id):
    """
    Preprocess raw actigraph data for a user:
    - Load data
    - Parse datetime columns
    - Interpolate missing sensor values
    """
    df_acti, df_actig = load_data(user_id)
    df_acti, df_actig = parse_times(df_acti, df_actig)
    

    sensor_cols = ["Axis1", "Axis2", "Axis3"]
    df_actig[sensor_cols] = df_actig[sensor_cols].interpolate(method='linear', limit_direction='both')

    return df_acti, df_actig


In [None]:
user_data_dict = {}

for user_id, segments in all_user_segments.items():
    X_user = np.array(segments)
    y_user = np.array([user_id] * len(segments))  # or use activity label here
    user_data_dict[user_id] = (X_user, y_user)


In [None]:
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical

# Fit on all data
global_le = LabelEncoder()
all_labels = np.concatenate([y for _, y in user_data_dict.values()])
global_le.fit(all_labels)

num_classes = len(global_le.classes_)


In [None]:
def loso_evaluate_model(create_model_fn, user_data_dict, label_type="user", epochs=30):
    """
    Performs Leave-One-Subject-Out (LOSO) cross-validation.
    user_data_dict: {user_id: (X, y)} — where X is raw windows, y is categorical labels
    label_type: "user" or "activity" (for printing purposes)
    """
    all_accuracies = []
    all_f1_scores = []
    all_precisions = []
    all_recalls = []

    user_ids = sorted(user_data_dict.keys())

    for test_user in user_ids:
        print(f"\n🔁 Testing on User {test_user} (LOSO)")

        # Prepare train/test sets
        X_test, y_test = user_data_dict[test_user]
        X_train = []
        y_train = []
        for user_id, (X_u, y_u) in user_data_dict.items():
            if user_id != test_user:
                X_train.extend(X_u)
                y_train.extend(y_u)

        # Convert to numpy arrays
        X_train = np.array(X_train)
        y_train = np.array(y_train)
        X_test = np.array(X_test)
        y_test = np.array(y_test)

        # Encode using global encoder
        y_train_enc = global_le.transform(y_train)
        y_test_enc = global_le.transform(y_test)

        # One-hot encode
        y_train_cat = to_categorical(y_train_enc, num_classes=num_classes)
        y_test_cat = to_categorical(y_test_enc, num_classes=num_classes)

        model = create_model_fn(input_shape=X_train.shape[1:], num_classes=y_train_cat.shape[1])
        early_stop = EarlyStopping(monitor='val_accuracy', patience=5, restore_best_weights=True)

        model.fit(
            X_train, y_train_cat,
            epochs=10,
            batch_size=64,
            validation_data=(X_test, y_test_cat),
            callbacks=[early_stop],
            verbose=0
        )

        # Evaluation
        loss, accuracy = model.evaluate(X_test, y_test_cat, verbose=0)

        y_pred = model.predict(X_test)
        y_pred_labels = np.argmax(y_pred, axis=1)
        y_true_labels = np.argmax(y_test_cat, axis=1)

        report = classification_report(y_true_labels, y_pred_labels, output_dict=True, zero_division=0)
        f1 = report['weighted avg']['f1-score']
        precision = report['weighted avg']['precision']
        recall = report['weighted avg']['recall']

        print(f"✅ Accuracy: {accuracy:.4f} | F1: {f1:.4f} | Precision: {precision:.4f} | Recall: {recall:.4f}")

        all_accuracies.append(accuracy)
        all_f1_scores.append(f1)
        all_precisions.append(precision)
        all_recalls.append(recall)

    print("\n📊 LOSO Summary:")
    print(f"Avg Accuracy:  {np.mean(all_accuracies):.4f}")
    print(f"Avg F1 Score:  {np.mean(all_f1_scores):.4f}")
    print(f"Avg Precision: {np.mean(all_precisions):.4f}")
    print(f"Avg Recall:    {np.mean(all_recalls):.4f}")
