> This NB applies windowing to the preprocessed segmented raw dataset

In [None]:
import numpy as np
import pandas as pd
import time
import json

from scipy import signal
from scipy.signal import welch

## Loading in the data

In [2]:
all_pids = ['P102', 'P103', 'P104', 'P105', 'P106', 'P107', 'P108', 'P109', 'P110', 'P111', 'P112', 'P114', 'P115', 'P116', 'P118', 'P119', 'P121', 'P122', 'P123', 'P124', 'P125', 'P126', 'P127', 'P128', 'P131', 'P132', 'P004', 'P005', 'P006', 'P008', 'P010', 'P011']
all_gesture_types = ['pan', 'delete', 'close', 'select-single', 'rotate', 'zoom-in', 'zoom-out', 'open', 'move', 'duplicate']
all_gesture_nums = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']

In [None]:
#brc_data_save_path = 'D:\\Kai_MetaGestureClustering_24\\saved_datasets\\filtered_datasets\\'
laptop_data_save_path = 'C:\\Users\\kdmen\\Box\\Yamagami Lab\\Data\Meta_Gesture_Project'
full_filesave_path = laptop_data_save_path+'\\ppdsegraw_allEMG.json'

print("Loading")
start_time = time.time()
with open(full_filesave_path, 'r') as f:
    loaded_dict = json.load(f)
end_time = time.time()
print(f"Completed in {end_time - start_time}s")

Loading
Completed in 437.7451078891754s


In [4]:
# Momona's code...

def resample(data, N):
    N_init = len(data)
    x = np.linspace(0, N_init-1, N)  # x coordinates at which to evaluate interpolated values
    xp = np.linspace(0, N_init-1, N_init)  # x coordinates of data points
    return np.interp(x, xp, data)

def rectify_EMG(data):
    
    # 1. high-pass filter @ 40 hz, 4th order butterworth
    sos_high = signal.butter(N=4, Wn=40, btype='high', fs=2000, output='sos')
    hp_filtered = signal.sosfilt(sos_high, data)
    
    # 2. demean and rectify
    emg_mean = hp_filtered - np.mean(hp_filtered)
    rectified = abs(emg_mean)
    
    # 3. low-pass filter @ 40 Hz, 4th order butterworth
    sos_low = signal.butter(N=4,Wn=40, btype='low', fs=2000, output='sos')
    lp_filtered = signal.sosfilt(sos_low, rectified)
    return lp_filtered

def compute_SNR(s,fs=2000):
    nperseg = len(s)//4.5
    f,Pxx = welch(s,fs=2000,window='hamming',
        nperseg=nperseg,noverlap=nperseg//2,
        scaling='density',detrend=False)
    idx400 = list(abs(f-800)).index(min(abs(f-800)))
    N = len(f)-idx400
    noise_power = sum(Pxx[idx400:])/N*len(f)
    total_power = sum(Pxx)
    return 10*np.log10(total_power / noise_power)

def bandpass_EMG(data):
    sos_high = signal.butter(N=4,Wn=[10,500],btype='bandpass',fs=2000,output='sos')
    EMG_filt = signal.sosfilt(sos_high,data)
    EMG_filt = EMG_filt - np.mean(EMG_filt)
    return EMG_filt

def movavg_EMG(data,window=100,overlap=0.5,fs=2000):
    # 200 ms window, overlap = 0 is no overlap, overlap=1 is complete overlap 
    N = int(1*window/1000*fs) # number of datapoints in one window
    N_overlap = int(N*overlap)
    
    movavg_data = []
    
    ix = N
    while ix < len(data):
        movavg_data.append(np.mean(data[ix-N:ix]))
        ix = ix + (N - N_overlap)
    
    return np.asarray(movavg_data)

## For the non-FE
1. rectify the data,
2. do MAV or RMS (moving average for now?),
3. pick a window size (50-300ms) and overlap (50%), 
4. use np.interp to resample after

> Need to do windowing on the non-FE data since if we have to apply MAV/RMS to the data (to make it less noisy) that would reduce it to one sample per channel. Windowing is how we get multiple samples per channel

> Note: preprocessed data file has already been BPF'd, MS'd, and standardized (std=1 for each gesture)


# Preprocessing pipeline for raw (non-handcrafted) EMG data:

1. Rectify the signal:
   - Take the absolute value of the raw EMG to ensure all values are positive
   - This is a standard EMG preprocessing step to reflect signal energy

2. Apply temporal feature extraction (e.g., MAV or RMS):
   - The signal is segmented into overlapping windows (e.g., 200ms windows with 50% overlap = 100ms step)
   - Each window is reduced to a **single scalar per channel** using an aggregation function like MAV (mean absolute value) or RMS
   - This turns a long sequence of raw EMG into a **low-resolution sequence of extracted features**, one per window

3. Resample the feature sequence:
   - After windowing, each gesture trial has a different number of windows (due to gesture duration)
   - We use resampling (e.g., `scipy.signal.resample`) to interpolate this variable-length sequence to a fixed length (e.g., 64 time steps)
   - This ensures all samples have the same shape `[64, num_channels]` and can be fed into CNN/LSTM models as batched input

# Why this is necessary:
- Without windowing + feature aggregation, rectifying alone would reduce each gesture to a single averaged vector (i.e., 1 sample per channel) â€” losing temporal structure
- Windowing preserves **time-local structure** while smoothing out noise via MAV/RMS
- Resampling standardizes the **temporal resolution** across gestures with different durations

# Note:
- Input EMG data is already:
   - Bandpass filtered (BPF)
   - Mean subtracted (MS)
   - Standardized per gesture (std = 1). This was done gesture-wide to account for some muscles being bigger/stronger than others?


In [5]:
def windowing_emg(emg_data, window_size=200, step_size=100, feature="MAV"):
    """
    Applies windowing and extracts MAV or RMS features from EMG data.

    Parameters:
        emg_data (numpy array): Shape (num_samples, 16), where 16 is the number of channels.
        window_size (int): Number of samples per window (default 200 for 100 ms at 2000 Hz).
        step_size (int): Step size for sliding window (default 100 for 50% overlap).
        feature (str): Feature type, either "MAV" or "RMS".

    Returns:
        feature_matrix (numpy array): Shape (num_windows, 16), where each row is a feature vector.
    """
    # NOTE: num_samples here refers to how many scalar values are present, this is more along the lines of sequence length actually
    num_samples, num_channels = emg_data.shape
    num_windows = int((num_samples - window_size) // step_size + 1)  # Total windows
    # NOTE: variablity in samples (num_samples-->num_windows) leads to different number of windows, but this gets standardized to 64 in resampling
    feature_matrix = np.zeros((num_windows, num_channels))  # Initialize output matrix

    for i in range(num_windows):
        start = i * step_size
        end = start + window_size
        window = emg_data[start:end, :]  # Extract window for all channels

        if feature == "MAV":
            feature_matrix[i, :] = np.mean(np.abs(window), axis=0)
        elif feature == "RMS":
            feature_matrix[i, :] = np.sqrt(np.mean(window**2, axis=0))
        else:
            raise ValueError("Feature must be 'MAV' or 'RMS'")

    return feature_matrix


In [6]:
def apply_rect_windowing_resampling(nested_dict, num_resampled_rows=64, fs=2000, window_size_ms=200, window_step_size_ms=100, aggregation_func="MAV", step_size_points=None):

    # Initialize an empty list to store the results
    result_data = []

    # Track shape stats for final print
    debug_shapes = {}

    # Iterate over all participants, gesture IDs, and gesture numbers
    for pid, gestures in nested_dict.items():
        for gesture_name, gesture_data in gestures.items():
            for gesture_num, modality_data in gesture_data.items():
                # Initialize dict to store the result for the current gesture
                result_dict = {
                    'Participant': pid,
                    'Gesture_ID': gesture_name,
                    'Gesture_Num': gesture_num
                }
                
                # Apply the feature engineering functions to each EMG channel
                for modality_str, single_gesture_data in modality_data.items():
                    # single_gesture_data is a list with 16 lists as elements
                    gesture_npy = np.array(single_gesture_data).T
                    debug_shapes['raw_input_shape'] = gesture_npy.shape
                    # Rectify the data
                    rect_channel_data = np.abs(gesture_npy)
                    debug_shapes['rectified_shape'] = rect_channel_data.shape
                    # Divide by 1000 to get out of ms
                    window_size_num_points = int(fs * window_size_ms // 1000)
                    debug_shapes['window_size_num_points'] = window_size_num_points
                    # Doing this so I can easily set the step size to 1 point, without having to do the math/rounding
                    if step_size_points is None:
                        window_step_size_num_points = int(fs * window_step_size_ms // 1000)
                    else:
                        window_step_size_num_points = step_size_points
                    debug_shapes['window_step_size_num_points'] = window_step_size_num_points
                    windows_matrix = windowing_emg(rect_channel_data, window_size=window_size_num_points, step_size=window_step_size_num_points, feature=aggregation_func)
                    debug_shapes['windows_matrix'] = windows_matrix.shape

                    # Resample the gesture after to get 64 rows
                    resampled_windows_matrix = np.array([resample(windows_matrix[:, col], num_resampled_rows) for col in range(windows_matrix.shape[1])])
                    debug_shapes['resampled_matrix'] = resampled_windows_matrix.shape

                    result_dict.update({
                        f'windowed_ts_data': resampled_windows_matrix
                    })
                # Append the result for the current gesture to the result list
                result_data.append(result_dict)

    # Print out debug info from the *last* sample
    print("\n--- DEBUG SHAPES (from last gesture processed) ---")
    for k, v in debug_shapes.items():
        print(f"{k}: {v}")
    print("--------------------------------------------------\n")

    # Convert the result list to a DataFrame (everything should have the same lengths now)
    result_df = pd.DataFrame(result_data)
    return result_df

In [7]:
def apply_sliding_window_segmentation(nested_dict, fs=2000, window_size_ms=200, step_size_ms=100, step_size_num_points=None):
    """
    Implements the 'sliding window cropping' approach described in the augmentation paper.
    Each gesture is split into multiple fixed-length overlapping windows (segments),
    and each segment becomes its own training sample.

    This differs from the original function, which compresses each gesture into a single
    fixed-size time-series feature matrix using feature extraction + resampling.

    This function does not perform MAV/RMS feature extraction or resampling.
    Each segment retains its raw (or rectified) EMG values.
    """

    result_data = []
    debug_shapes = {}

    # Convert ms to number of sample points
    window_size_pts = int(fs * window_size_ms / 1000)
    debug_shapes['window_size_pts'] = window_size_pts
    if step_size_num_points is None:
        step_size_pts = int(fs * step_size_ms / 1000)
    else:
        step_size_pts = step_size_num_points
    debug_shapes['step_size_pts'] = step_size_pts

    for pid, gestures in nested_dict.items():
        for gesture_name, gesture_data in gestures.items():
            for gesture_num, modality_data in gesture_data.items():
                for modality_str, single_gesture_data in modality_data.items():
                    # Convert to NumPy and transpose to shape [T, C] where T = time, C = channels
                    gesture_npy = np.array(single_gesture_data).T
                    debug_shapes['raw_input_shape'] = gesture_npy.shape

                    # Rectify the signal (absolute value)
                    rect_channel_data = np.abs(gesture_npy)
                    debug_shapes['rectified_shape'] = rect_channel_data.shape

                    total_samples = rect_channel_data.shape[0]
                    num_channels = rect_channel_data.shape[1]

                    # Slide a window over the gesture trial
                    for start_idx in range(0, total_samples - window_size_pts + 1, step_size_pts):
                        end_idx = start_idx + window_size_pts
                        segment = rect_channel_data[start_idx:end_idx, :]  # Shape: [window_size_pts, num_channels]

                        debug_shapes['segment_shape'] = segment.shape

                        result_data.append({
                            'Participant': pid,
                            'Gesture_ID': gesture_name,
                            'Gesture_Num': gesture_num,
                            'windowed_ts_data': segment
                        })

    # Print debug shapes from last segment processed
    print("\n--- DEBUG SHAPES (from last segment processed) ---")
    for k, v in debug_shapes.items():
        print(f"{k}: {v}")
    print("--------------------------------------------------\n")

    return pd.DataFrame(result_data)


In [8]:
start_time = time.time()

windowed_df = apply_rect_windowing_resampling(loaded_dict, num_resampled_rows=64, fs=2000, window_size_ms=100, window_step_size_ms=50, aggregation_func="MAV")

print(f"Completed in {(time.time() - start_time):.2f}s\n")

print(windowed_df.shape)
windowed_df.head()

KeyboardInterrupt: 

In [None]:
print(windowed_df.iloc[0, 3].shape)
windowed_df.iloc[0, 3]

In [None]:
assert(False)

In [None]:
start_time = time.time()

windowed_df2 = apply_sliding_window_segmentation(loaded_dict, window_size_ms=100, step_size_num_points=10)

print(f"Completed in {(time.time() - start_time):.2f}s\n")

print(windowed_df2.shape)
windowed_df2.head()

In [None]:
print(windowed_df2.iloc[0, 3].shape)
windowed_df2.iloc[0, 3]

In [None]:
assert(False)

In [None]:
noFE_filesave_path = laptop_data_save_path+'\\noFE_windowed_segmentation_10steps_segraw_allEMG.pkl'
windowed_df2.to_pickle(noFE_filesave_path)


In [None]:
# This is going to be like 500GB or something.......

start_time = time.time()

windowed_df3 = apply_sliding_window_segmentation(loaded_dict, window_size_ms=100, step_size_num_points=1)

print(f"Completed in {(time.time() - start_time):.2f}s\n")

print(windowed_df3.shape)  # (21217483, 4)
windowed_df3.head()

noFE_filesave_path = laptop_data_save_path+'\\noFE_windowed_segmentation_1step_segraw_allEMG.pkl'
windowed_df3.to_pickle(noFE_filesave_path)


In [None]:
assert(False)

In [None]:
start_time = time.time()

windowed_df4 = apply_sliding_window_segmentation(loaded_dict, window_size_ms=100, step_size_ms=100)

print(f"Completed in {(time.time() - start_time):.2f}s\n")

print(windowed_df4.shape)  # (107444, 4)
windowed_df4.head()

noFE_filesave_path = laptop_data_save_path+'\\noFE_windowed_segmentation_allsteps_segraw_allEMG.pkl'
windowed_df4.to_pickle(noFE_filesave_path)


In [None]:
assert(False)

In [None]:
noFE_filesave_path = laptop_data_save_path+'\\noFE_windowed_segraw_allEMG.pkl'

windowed_df.to_pickle(noFE_filesave_path)#, index=False, header=True)  # , sep='\t', encoding='utf-8'


In [None]:
loaded_windowed_df = pd.read_pickle(noFE_filesave_path)

In [None]:
type(loaded_windowed_df)

In [None]:
print(loaded_windowed_df.shape)
loaded_windowed_df.head()

In [None]:
loaded_windowed_df.iloc[0, 3]

In [None]:
print(type(loaded_windowed_df.iloc[0, 3]))