In [13]:
import numpy as np
import pandas as pd

# -------- settings --------
FS = 500          # Hz
WINDOW_MS = 100   # window length
STEP_MS = 75      # step (50% overlap)

WINDOW_SAMPLES = int(FS * WINDOW_MS / 1000)
STEP_SAMPLES   = int(FS * STEP_MS   / 1000)

INPUT_PATH  = r"C:\Users\lilin\OneDrive\Desktop\Project\EMG\Filtered\P3P4_EMG_filtered.csv"
OUTPUT_PATH = r"C:\Users\lilin\OneDrive\Desktop\Project\EMG\Features\P3P4_EMG_features_MVC.csv"

# EMG_COLS = [f"EMG_CH{i}" for i in range(1, 9)]
EMG_COLS = [f"EMG_MVC_CH{i}" for i in range(1, 9)]
META_COLS = [
    "Participant", "Sex", "Age", "Timeline", "Activity",
    "Lifting Height", "Box Weight", "Load Type", "Lifting Depth"
]

# -------- load --------
df = pd.read_csv(INPUT_PATH)

rows = []

# process per trial
for trial_name, trial_df in df.groupby("Timeline"):
    trial_df = trial_df.reset_index(drop=True)
    n = len(trial_df)

    # mask for lifting intervals
    is_lift = (trial_df["Activity"] == "Lifting-Lowering").values

    start = None
    for i, flag in enumerate(is_lift):
        if flag and start is None:
            start = i                      # start of a lifting segment
        elif (not flag or i == n - 1) and start is not None:
            # end of lifting segment
            end = i - 1 if not flag else i
            seg = trial_df.iloc[start:end+1].reset_index(drop=True)
            seg_len = len(seg)

            if seg_len >= WINDOW_SAMPLES:
                # slide windows within this lifting segment
                for w_start in range(0, seg_len - WINDOW_SAMPLES + 1, STEP_SAMPLES):
                    w_end = w_start + WINDOW_SAMPLES
                    w = seg.iloc[w_start:w_end]

                    center_idx = w_start + WINDOW_SAMPLES // 2
                    meta = seg.iloc[center_idx]

                    feat = {}
                    # metadata
                    for col in META_COLS:
                        feat[col] = meta[col]

                    feat["Segment_Start_Index"] = start
                    feat["Segment_End_Index"]   = end
                    feat["Window_Start_Offset"] = w_start
                    feat["Window_End_Offset"]   = w_end - 1
                    feat["Window_Center_Timestamp"] = meta["Timestamp"]

                    # EMG features per channel (on filtered envelope)
                    for ch in EMG_COLS:
                        x = w[ch].values.astype(float)
                        feat[f"{ch}_mean"] = np.mean(x)
                        feat[f"{ch}_std"]  = np.std(x, ddof=0)
                        feat[f"{ch}_rms"]  = np.sqrt(np.mean(x ** 2))
                        feat[f"{ch}_max"]  = np.max(x)
                        feat[f"{ch}_min"]  = np.min(x)

                    rows.append(feat)

            start = None  # reset for next segment

# -------- save --------
features_df = pd.DataFrame(rows)
features_df.to_csv(OUTPUT_PATH, index=False)
print(f"Saved features to: {OUTPUT_PATH}")


Saved features to: C:\Users\lilin\OneDrive\Desktop\Project\EMG\Features\P3P4_EMG_features_MVC.csv


In [3]:
import numpy as np
import pandas as pd

# ---------- settings ----------
FS = 500          # Hz
WINDOW_MS = 100   # window length (change if you want)
STEP_MS   = 50    # step (overlap = WINDOW_MS - STEP_MS)

WINDOW_SAMPLES = int(FS * WINDOW_MS / 1000)
STEP_SAMPLES   = int(FS * STEP_MS   / 1000)

INPUT_PATH  = r"C:\Users\lilin\OneDrive\Desktop\Project\EMG\Filtered\P4_EMG_butter_low_high.csv"
OUTPUT_PATH = r"C:\Users\lilin\OneDrive\Desktop\Project\EMG\Features\P4_EMG_butter_low_high.csv"

EMG_COLS = [f"EMG_MVC_CH{i}" for i in range(1, 9)]
META_COLS = [
    "Participant", "Sex", "Age", "Timeline", "Activity",
    "Lifting Height", "Box Weight", "Load Type", "Lifting Depth"
]


# ---------- feature functions ---------- 3 time and 3 frequency
def td_fd_features(x, fs):
    """
    x: 1D numpy array (window)
    returns dict with MAV, VAR, MAD, WL, TP, SM1, SM2, SM3
    """
    x = np.asarray(x, dtype=float)
    N = len(x)
    if N < 2:
        return {k: 0.0 for k in ["MAV", "VAR", "MAD", "WL", "TP", "SM1", "SM2", "SM3"]}

    # time-domain
    mav = np.mean(np.abs(x))
    var = np.var(x)  # population variance
    mad = np.mean(np.abs(x - np.mean(x)))
    wl  = np.sum(np.abs(np.diff(x)))

    # frequency-domain (one-sided spectrum)
    X = np.fft.rfft(x)
    freqs = np.fft.rfftfreq(N, d=1.0 / fs)
    P = np.abs(X) ** 2  # power spectrum

    total_power = P.sum()

    if total_power <= 0:
        sm1 = sm2 = sm3 = 0.0
    else:
        sm1 = np.sum(freqs * P) / total_power
        sm2 = np.sum((freqs ** 2) * P) / total_power
        sm3 = np.sum((freqs ** 3) * P) / total_power

    return {
        "MAV": mav,
        "VAR": var,
        "MAD": mad,
        "WL": wl,
        "TP": total_power,
        "SM1": sm1,
        "SM2": sm2,
        "SM3": sm3,
    }


# ---------- load ----------
df = pd.read_csv(INPUT_PATH)

rows = []

# process trial by trial
for trial_name, trial_df in df.groupby("Timeline"):
    trial_df = trial_df.reset_index(drop=True)
    n = len(trial_df)

    # mask for lifting intervals
    is_lift = (trial_df["Activity"] == "Lifting-Lowering").values

    start = None
    for i, flag in enumerate(is_lift):
        if flag and start is None:
            start = i  # start of a lifting segment
        elif (not flag or i == n - 1) and start is not None:
            # end of lifting segment
            end = i - 1 if not flag else i
            seg = trial_df.iloc[start:end+1].reset_index(drop=True)
            seg_len = len(seg)

            if seg_len >= WINDOW_SAMPLES:
                # slide windows within this lifting segment
                for w_start in range(0, seg_len - WINDOW_SAMPLES + 1, STEP_SAMPLES):
                    w_end = w_start + WINDOW_SAMPLES
                    w = seg.iloc[w_start:w_end]

                    center_idx = w_start + WINDOW_SAMPLES // 2
                    meta = seg.iloc[center_idx]

                    feat = {}
                    # metadata
                    for col in META_COLS:
                        if col in seg.columns:
                            feat[col] = meta[col]

                    feat["Segment_Start_Index"] = start
                    feat["Segment_End_Index"]   = end
                    feat["Window_Start_Offset"] = w_start
                    feat["Window_End_Offset"]   = w_end - 1
                    feat["Window_Center_Timestamp"] = meta.get("Timestamp", np.nan)

                    # EMG features per channel
                    for ch in EMG_COLS:
                        if ch not in w.columns:
                            continue
                        x = w[ch].values.astype(float)
                        f = td_fd_features(x, FS)
                        for name, val in f.items():
                            feat[f"{ch}_{name}"] = val

                    rows.append(feat)

            start = None  # reset for next segment


# ---------- save ----------
features_df = pd.DataFrame(rows)
features_df.to_csv(OUTPUT_PATH, index=False)
print(f"Saved features to: {OUTPUT_PATH}")


Saved features to: C:\Users\lilin\OneDrive\Desktop\Project\EMG\Features\P4_EMG_butter_low_high.csv
