In [5]:
import os
import random
import numpy as np
import pandas as pd
from scipy.signal import butter, lfilter, iirnotch, welch
from scipy.stats import skew, kurtosis
import csv

fs = 200 
window_size = 200  
step_size = 100 
data_dir = r"C:\Users\marcm\Desktop\spring 2025\machineLearning\project\EMG_data_for_gestures-master"
output_train_csv = r"C:\Users\marcm\Desktop\spring 2025\machineLearning\project\leave_one_out\train.csv"
output_test_csv = r"C:\Users\marcm\Desktop\spring 2025\machineLearning\project\leave_one_out\test.csv"

# === Filtering Functions ===
def bandpass_filter(signal, lowcut=20, highcut=90, fs=200, order=4):
    nyq = 0.5 * fs
    b, a = butter(order, [lowcut/nyq, highcut/nyq], btype='band')
    return lfilter(b, a, signal)

def notch_filter(signal, freq=50.0, fs=200, Q=30.0):
    b, a = iirnotch(freq, Q, fs)
    return lfilter(b, a, signal)

def rectify(signal):
    return np.abs(signal)

def normalize(signal):
    return (signal - np.mean(signal)) / np.std(signal)

# === Preprocessing ===
def preprocess_signal(signal):
    signal = bandpass_filter(signal, fs=fs)
    signal = notch_filter(signal, fs=fs)
    signal = rectify(signal)
    signal = normalize(signal)
    return signal

def process_file(filepath):
    df = pd.read_csv(filepath, delim_whitespace=True, skiprows=1, header=None)
    df.dropna(inplace=True)
    emg_data = df.iloc[:, 1:9].astype(float).values
    labels = df.iloc[:, 9].astype(int).values
    processed_channels = [preprocess_signal(emg_data[:, i]) for i in range(8)]
    processed_emg = np.stack(processed_channels, axis=1)
    return processed_emg, labels

def window_emg(data, labels, window_size=100, step_size=50):
    X, y = [], []
    for i in range(0, len(data) - window_size, step_size):
        window = data[i:i + window_size]
        label = labels[i + window_size // 2]
        if label == 0:
            continue
        feats = extract_features(window)
        X.append(feats)
        y.append(label)
    return X, y

def extract_features(seg):
    feats = []
    for c in range(seg.shape[1]):
        s = seg[:, c]
        mav = np.mean(np.abs(s))
        rms = np.sqrt(np.mean(s**2))
        wl = np.sum(np.abs(np.diff(s)))
        zc = np.sum(np.diff(np.sign(s)) != 0)
        ssc = np.sum(np.diff(np.sign(np.diff(s))) != 0)
        wa = np.sum(np.abs(np.diff(s)) > 0.01)
        var_s = np.var(s)
        sk = skew(s)
        kt = kurtosis(s)
        f, Pxx = welch(s, fs=fs, nperseg=window_size)
        mfreq = f[np.where(np.cumsum(Pxx) >= Pxx.sum()/2)[0][0]]
        meanfreq = np.sum(f * Pxx) / Pxx.sum()
        total_p = Pxx.sum()
        bp1 = Pxx[(f >= 20) & (f < 50)].sum()
        bp2 = Pxx[(f >= 50) & (f < 90)].sum()
        feats.extend([mav, rms, wl, zc, ssc, wa, var_s, sk, kt, mfreq, meanfreq, total_p, bp1, bp2])
    return feats

def process_subject(subject_path):
    all_X, all_y = [], []
    for file in sorted(os.listdir(subject_path)):
        if file.endswith(".txt"):
            file_path = os.path.join(subject_path, file)
            emg, labels = process_file(file_path)
            X, y = window_emg(emg, labels, window_size, step_size)
            all_X.extend(X)
            all_y.extend(y)
    return all_X, all_y

def save_to_csv(X, y, output_path):
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    with open(output_path, "w", newline="") as f:
        writer = csv.writer(f)
        for features, label in zip(X, y):
            writer.writerow(list(features) + [label])

# === Leave-One-Subject-Out Split ===
if __name__ == "__main__":
    subjects = sorted([d for d in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, d))])

    test_subject = random.choice(subjects)  # Randomly pick 1 folder
    print(f"ðŸ§ª Selected {test_subject} as test subject")

    X_train, y_train = [], []
    X_test, y_test = [], []

    for subject_folder in subjects:
        subject_path = os.path.join(data_dir, subject_folder)

        X, y = process_subject(subject_path)

        if subject_folder == test_subject:
            X_test.extend(X)
            y_test.extend(y)
        else:
            X_train.extend(X)
            y_train.extend(y)

    save_to_csv(X_train, y_train, output_train_csv)
    save_to_csv(X_test, y_test, output_test_csv)

    print(f"âœ… Training data: {len(X_train)} samples -> {output_train_csv}")
    print(f"âœ… Testing data: {len(X_test)} samples -> {output_test_csv}")


ðŸ§ª Selected 08 as test subject


  df = pd.read_csv(filepath, delim_whitespace=True, skiprows=1, header=None)
  df = pd.read_csv(filepath, delim_whitespace=True, skiprows=1, header=None)
  df = pd.read_csv(filepath, delim_whitespace=True, skiprows=1, header=None)
  df = pd.read_csv(filepath, delim_whitespace=True, skiprows=1, header=None)
  df = pd.read_csv(filepath, delim_whitespace=True, skiprows=1, header=None)
  df = pd.read_csv(filepath, delim_whitespace=True, skiprows=1, header=None)
  df = pd.read_csv(filepath, delim_whitespace=True, skiprows=1, header=None)
  df = pd.read_csv(filepath, delim_whitespace=True, skiprows=1, header=None)
  df = pd.read_csv(filepath, delim_whitespace=True, skiprows=1, header=None)
  df = pd.read_csv(filepath, delim_whitespace=True, skiprows=1, header=None)
  df = pd.read_csv(filepath, delim_whitespace=True, skiprows=1, header=None)
  df = pd.read_csv(filepath, delim_whitespace=True, skiprows=1, header=None)
  df = pd.read_csv(filepath, delim_whitespace=True, skiprows=1, header=None)

âœ… Training data: 14766 samples -> C:\Users\marcm\Desktop\spring 2025\machineLearning\project\leave_one_out\train.csv
âœ… Testing data: 379 samples -> C:\Users\marcm\Desktop\spring 2025\machineLearning\project\leave_one_out\test.csv
