In [129]:
import torch
import torchaudio
import torchaudio.functional as F
import torchaudio.transforms as T
import numpy as np
import pandas as pd
import numpy as np
import pandas as pd
from scipy.interpolate import interp1d
from scipy.signal import butter, filtfilt
import os

In [130]:
FS = 160  # Sampling frequency
CUTOFF = 5  # Cutoff frequency
NUM_SAMPLES = 100
CLIP_DURATION = 10 # Duration in seconds
NUM_CLIPS = 10 
DATA_PATH = "data/ActionNet-EMG"
MODE = "test"
FILENAME = f"ActionNet_{MODE}_emg.pkl"
NEW_FILENAME = f"ActionNet_{MODE}_emg_processed.pkl"
NUM_CHANNELS = 8
SIDES=["left", "right"]

In [131]:
def rectify_signal(data):
    return np.abs(data)

def filter_signal(data, fs, cutoff, num_channels):
    for i in range(num_channels):
        data[:,i] = low_pass_filter(data[:,i], fs, cutoff)
    return data

def low_pass_filter(data, fs, cutoff, order=5):
    nyquist = 0.5 * fs
    normal_cutoff = cutoff / nyquist
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    return filtfilt(b, a, data, padlen=12)

def normalization(data):
    return 2 * (data - np.min(data)) / (np.max(data) - np.min(data)) - 1

def preprocessing(data, steps):
    for step in steps:
        data.apply(step)
        
def load_data(path):
    with open(path, "rb") as f:
        data = pd.read_pickle(f)
    return data

def save_data(data, path):
    with open(path, "wb") as f:
        pd.to_pickle(data, f)


In [132]:
data = load_data(os.path.join(DATA_PATH, FILENAME))
print(data["myo_left_readings"].head())

4     [[-1, -46, 29, 37, -8, 25, -10, 18], [4, 33, -...
34    [[3, 34, 3, 0, -1, -3, -5, -4], [5, 28, 22, 7,...
48    [[-1, -1, 1, 9, 32, 3, 0, -2], [1, 0, 0, -12, ...
4     [[-11, 4, -1, -2, -31, 2, 21, -75], [14, -11, ...
62    [[-1, -3, -1, 0, -3, -5, -15, 0], [1, 3, 4, 6,...
Name: myo_left_readings, dtype: object


In [133]:
rectified_data = data.copy()
rectified_data["myo_left_readings"] = rectified_data["myo_left_readings"].apply(rectify_signal)
print(rectified_data["myo_left_readings"].head())

4     [[1, 46, 29, 37, 8, 25, 10, 18], [4, 33, 23, 1...
34    [[3, 34, 3, 0, 1, 3, 5, 4], [5, 28, 22, 7, 4, ...
48    [[1, 1, 1, 9, 32, 3, 0, 2], [1, 0, 0, 12, 11, ...
4     [[11, 4, 1, 2, 31, 2, 21, 75], [14, 11, 17, 12...
62    [[1, 3, 1, 0, 3, 5, 15, 0], [1, 3, 4, 6, 3, 1,...
Name: myo_left_readings, dtype: object


In [134]:
filtered_data = rectified_data.copy()
filtered_data["myo_left_readings"]= filtered_data["myo_left_readings"].apply(filter_signal, args=(FS, CUTOFF, NUM_CHANNELS))
print(filtered_data["myo_left_readings"].head())

4     [[1, 45, 26, 36, 13, 22, 8, 18], [3, 41, 26, 3...
34    [[3, 35, 3, 0, 0, 2, 5, 3], [3, 34, 4, 0, 0, 2...
48    [[0, 1, 0, 8, 32, 2, 0, 1], [0, 1, 1, 8, 30, 2...
4     [[10, 3, 1, 1, 28, 1, 22, 73], [10, 4, 2, 2, 2...
62    [[0, 2, 0, 0, 3, 5, 14, 0], [1, 2, 1, 0, 2, 4,...
Name: myo_left_readings, dtype: object


In [135]:
normalized_data = filtered_data.copy()
normalized_data["myo_left_readings"] = normalized_data["myo_left_readings"].apply(normalization)
print(normalized_data["myo_left_readings"].head())

4     [[-0.9733333333333334, 0.19999999999999996, -0...
34    [[-0.8888888888888888, 0.2962962962962963, -0....
48    [[-1.0, -0.972972972972973, -1.0, -0.783783783...
4     [[-0.7468354430379747, -0.9240506329113924, -0...
62    [[-1.0, -0.9166666666666666, -1.0, -1.0, -0.87...
Name: myo_left_readings, dtype: object


In [136]:
def preprocessing():
    data = load_data(os.path.join(DATA_PATH, FILENAME))
    for side in SIDES:
        column = f"myo_{side}_readings"
        data[column] = data[column].apply(rectify_signal)
        data[column]= data[column].apply(filter_signal, args=(FS, CUTOFF, NUM_CHANNELS))
        data[column] = data[column].apply(normalization)
    save_data(data, os.path.join(DATA_PATH, NEW_FILENAME))

In [137]:
preprocessing()