In [1]:
import numpy as np
import pandas as pd
from scipy.signal import butter, filtfilt
from sklearn.decomposition import PCA

# -------------------------------
# Filtering Utilities
# -------------------------------
def butter_lowpass_filter(data, cutoff=5, fs=100, order=2):
    nyquist = 0.5 * fs
    normal_cutoff = cutoff / nyquist
    b, a = butter(order, normal_cutoff, btype='low')
    return filtfilt(b, a, data)

def high_pass_filter(data, cutoff=0.1, fs=100, order=2):
    nyquist = 0.5 * fs
    normal_cutoff = cutoff / nyquist
    b, a = butter(order, normal_cutoff, btype='high')
    return filtfilt(b, a, data)

def filter_signal(signal, fs=100):
    return high_pass_filter(butter_lowpass_filter(signal, fs=fs), fs=fs)

# -------------------------------
# PCA + Integration
# -------------------------------
def identify_joint_axis(gyro_data):
    pca = PCA(n_components=1)
    pca.fit(gyro_data)
    return pca.components_[0]

def project_onto_joint_axis(gyro_data, joint_axis):
    return np.dot(gyro_data, joint_axis)

def integrate_gyro(gyro_data, time, initial_angle=0):
    angle = np.cumsum(gyro_data * np.gradient(time))
    return angle - np.mean(angle[:50]) + initial_angle

# -------------------------------
# Gait Phase Labeling
# -------------------------------
def label_gait_phases(force, joint_angle, time, hs_thresh=1500, to_thresh=300):
    labels = []
    heel_strikes = []
    toe_offs = []
    stance = False

    for i in range(1, len(force)):
        if not stance:
            if force[i] > hs_thresh:
                stance = True
                heel_strikes.append(time[i])
                labels.append("Heel Strike")
            else:
                labels.append("Swing")
        else:
            if force[i] < to_thresh:
                stance = False
                toe_offs.append(time[i])
                labels.append("Toe-Off")
            else:
                labels.append("Stance")

    labels = [labels[0]] + labels
    return labels[:len(force)], heel_strikes, toe_offs

# -------------------------------
# Main Processing Function
# -------------------------------
def process_combined_dataset(file_path, fs=100):
    import pandas as pd
    df = pd.read_csv(file_path)
    time = df["Time"].values / 1000.0  # convert ms to seconds

    axes = ["X", "Y", "Z"]
    parts = ["LT", "LS", "LA", "RT", "RS", "RA"]

    # -------------------------------
    # 1. Filter All Gyro Signals
    # -------------------------------
    for part in parts:
        for axis in axes:
            col = f"{part}_Gyro{axis}"
            df[col + "_filtered"] = filter_signal(df[col].values, fs=fs)

    # -------------------------------
    # 2. Estimate Ankle Angles
    # -------------------------------
    def compute_ankle_angle(ankle_prefix, shank_prefix):
        ankle_gyro = df[[f"{ankle_prefix}_Gyro{a}_filtered" for a in axes]].to_numpy()
        shank_gyro = df[[f"{shank_prefix}_Gyro{a}_filtered" for a in axes]].to_numpy()
        min_len = min(len(ankle_gyro), len(shank_gyro), len(time))
        joint_axis = identify_joint_axis(ankle_gyro[:min_len] - shank_gyro[:min_len])
        ankle_proj = project_onto_joint_axis(ankle_gyro[:min_len], joint_axis)
        shank_proj = project_onto_joint_axis(shank_gyro[:min_len], joint_axis)
        angle = integrate_gyro(ankle_proj - shank_proj, time[:min_len])
        return angle, min_len

    left_ankle_angle, len_l = compute_ankle_angle("LA", "LS")
    right_ankle_angle, len_r = compute_ankle_angle("RA", "RS")
    min_len = min(len_l, len_r, len(df))


    # -------------------------------
    # 3. Gait Phase Labeling
    # -------------------------------
    labels_left, hs_L, to_L = label_gait_phases(df["Left_Force"].values[:min_len], left_ankle_angle[:min_len], time[:min_len])
    labels_right, hs_R, to_R = label_gait_phases(df["Right_Force"].values[:min_len], right_ankle_angle[:min_len], time[:min_len])

    # -------------------------------
    # 4. Store in DataFrame
    # -------------------------------
    df = df.iloc[:min_len].copy()
    df["Left_Ankle_Angle"] = left_ankle_angle[:min_len]
    df["Right_Ankle_Angle"] = right_ankle_angle[:min_len]
    df["Left_Gait_Phase"] = labels_left
    df["Right_Gait_Phase"] = labels_right

    return df, hs_L, to_L, hs_R, to_R


In [2]:
# -------------------------------
# Usage
# -------------------------------
if __name__ == "__main__":
    labeled_df, hs_L, to_L, hs_R, to_R = process_combined_dataset("combined_dataset.csv")
    labeled_df.to_csv("labeled_gait_output.csv", index=False)
    print("✅ Processed and saved to labeled_gait_output.csv")

    # Optionally, save gait events as separate CSVs
    pd.DataFrame({"Heel_Strike_Left": hs_L}).to_csv("heel_strikes_left.csv", index=False)
    pd.DataFrame({"Toe_Off_Left": to_L}).to_csv("toe_offs_left.csv", index=False)
    pd.DataFrame({"Heel_Strike_Right": hs_R}).to_csv("heel_strikes_right.csv", index=False)
    pd.DataFrame({"Toe_Off_Right": to_R}).to_csv("toe_offs_right.csv", index=False)

    print("📁 Gait event timestamps saved for both legs.")


✅ Processed and saved to labeled_gait_output.csv
📁 Gait event timestamps saved for both legs.
