In [1]:
# Remove these two lines when converting to .py files later.
from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

import time
import pandas as pd
import psycopg2

# !pip install pyspark scipy sqlalchemy python_speech_features
from sqlalchemy import create_engine

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from scipy import fftpack
from scipy.integrate import cumtrapz
from scipy import signal
from scipy.signal import stft

from python_speech_features import mfcc
from statsmodels.tsa.ar_model import AutoReg


In [2]:
# Create a SparkSession
spark = SparkSession \
    .builder \
    .appName("Data Stager") \
    .config("spark.driver.memory", '8g') \
    .getOrCreate()

spark.sparkContext.setLogLevel('ERROR')

print("Reading the CSV files..")
semg_df = spark.read.parquet("denoised_grouped_semg_stage/", header=True, inferSchema=True)

semg_df = semg_df.withColumn('activity', semg_df['activity'].cast('integer')).withColumn('trial_index', semg_df['trial_index'].cast('integer'))
semg_df.printSchema()
print("Completed reading the CSV files.")


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/08/20 22:56:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/08/20 22:56:49 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Reading the CSV files..


                                                                                

root
 |-- subject_id: integer (nullable = true)
 |-- activity: integer (nullable = true)
 |-- trial_index: integer (nullable = true)
 |-- semg_1_list: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- semg_2_list: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- semg_3_list: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- semg_4_list: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- semg_5_list: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- filtered_semg_1_list: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- filtered_semg_2_list: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- filtered_semg_3_list: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- filtered_semg_4_list: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- filtered_semg_5_list

**Extracting Time Domain Features for each signal**


In [4]:
NO_WINDOWS = 5

activity_length_map = {1: 16000, 2: 16000, 3: 16000, 4: 16000, 5: 16000, 6: 16000, 7: 16000, 8: 16000, 9: 16000, 10: 16000,
              11: 22000, 12: 22000, 13: 22000, 14: 30000, 15: 40000, 16: 26000, 17: 30000, 18: 26000, 19: 26000, 20: 26000, 21: 26000}

compute_mean = F.udf(lambda emg_signal, activity, x: calculate_mean(emg_signal, activity_length_map[activity]/NO_WINDOWS, activity, x), T.DoubleType())
compute_root_mean_square = F.udf(lambda emg_signal, activity, x: calculate_root_mean_square(emg_signal, activity_length_map[activity]/NO_WINDOWS, activity, x), T.DoubleType())
compute_standard_deviation = F.udf(lambda emg_signal, activity, x: calculate_standard_deviation(emg_signal, activity_length_map[activity]/NO_WINDOWS, activity, x), T.DoubleType())
compute_variance = F.udf(lambda emg_signal, activity, x: calculate_variance(emg_signal, activity_length_map[activity]/NO_WINDOWS, activity, x), T.DoubleType())
compute_iemg = F.udf(lambda emg_signal, activity, x: calculate_iemg(emg_signal, activity_length_map[activity]/NO_WINDOWS, activity, x), T.DoubleType())
compute_zero_crossing_rate = F.udf(lambda emg_signal, activity, x: calculate_zero_crossing_rate(emg_signal, activity_length_map[activity]/NO_WINDOWS, activity, x), T.IntegerType())
compute_slope_sign_changes = F.udf(lambda emg_signal, activity, x: calculate_slope_sign_changes(emg_signal, activity_length_map[activity]/NO_WINDOWS, activity, x), T.IntegerType())
compute_waveform_length = F.udf(lambda emg_signal, activity, x: calculate_waveform_length(emg_signal, activity_length_map[activity]/NO_WINDOWS, activity, x), T.DoubleType())


def get_start_end_for_window(window_length, x):
    if x < 0:
        start = 0
        end = window_length * NO_WINDOWS
    elif x == 1:
        start = 0
        end = window_length
    else:
        start = window_length * (x - 1)
        end = window_length * x

    return start, end

def calculate_mean(emg_signal, window_length, activity, x):

    start_end = get_start_end_for_window(window_length, x)
    
    return float(np.mean(emg_signal[int(start_end[0]):int(start_end[1])]))

def calculate_root_mean_square(emg_signal, window_length, activity, x):

    start_end = get_start_end_for_window(window_length, x)
    
    return float(np.sqrt(np.mean(np.square(emg_signal[int(start_end[0]):int(start_end[1])]))))

def calculate_standard_deviation(emg_signal, window_length, activity, x):

    start_end = get_start_end_for_window(window_length, x)
    
    return float(np.std(emg_signal[int(start_end[0]):int(start_end[1])]))

def calculate_variance(emg_signal, window_length, activity, x):

    start_end = get_start_end_for_window(window_length, x)
    
    return float(np.var(emg_signal[int(start_end[0]):int(start_end[1])]))

def calculate_iemg(emg_signal, window_length, activity, x):

    start_end = get_start_end_for_window(window_length, x)
    
    return float(np.sum(np.abs(emg_signal[int(start_end[0]):int(start_end[1])])))

def calculate_zero_crossing_rate(emg_signal, window_length, activity, x):

    start_end = get_start_end_for_window(window_length, x)
    zero_crossings = np.where(np.diff(np.sign(emg_signal[int(start_end[0]):int(start_end[1])])))[0]
    
    return len(zero_crossings)

def calculate_slope_sign_changes(emg_signal, window_length, activity, x):

    start_end = get_start_end_for_window(window_length, x)

    # First compute the derivative
    derivative = np.diff(emg_signal[int(start_end[0]):int(start_end[1])])
    
    # Then compute zero crossingts of the derivative
    slope_sign_changes = np.where(np.diff(np.sign(derivative)))[0]
    
    return len(slope_sign_changes)

def calculate_waveform_length(emg_signal, window_length, activity, x):

    start_end = get_start_end_for_window(window_length, x)
    
    return float(np.sum(np.abs(np.diff(emg_signal[int(start_end[0]):int(start_end[1])]))))

# Extracting Time-domain features:
semg_df = semg_df.withColumn('semg_1_td_mean', compute_mean(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_1_td_rms', compute_root_mean_square(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_1_td_sd', compute_standard_deviation(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_1_td_var', compute_variance(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_1_td_iemg', compute_iemg(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_1_td_zcr', compute_zero_crossing_rate(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_1_td_ssc', compute_slope_sign_changes(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_1_td_wfl', compute_waveform_length(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(-1)))

semg_df = semg_df.withColumn('semg_2_td_mean', compute_mean(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_2_td_rms', compute_root_mean_square(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_2_td_sd', compute_standard_deviation(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_2_td_var', compute_variance(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_2_td_iemg', compute_iemg(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_2_td_zcr', compute_zero_crossing_rate(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_2_td_ssc', compute_slope_sign_changes(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_2_td_wfl', compute_waveform_length(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(-1)))

semg_df = semg_df.withColumn('semg_3_td_mean', compute_mean(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_3_td_rms', compute_root_mean_square(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_3_td_sd', compute_standard_deviation(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_3_td_var', compute_variance(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_3_td_iemg', compute_iemg(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_3_td_zcr', compute_zero_crossing_rate(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_3_td_ssc', compute_slope_sign_changes(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_3_td_wfl', compute_waveform_length(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(-1)))

semg_df = semg_df.withColumn('semg_4_td_mean', compute_mean(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_4_td_rms', compute_root_mean_square(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_4_td_sd', compute_standard_deviation(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_4_td_var', compute_variance(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_4_td_iemg', compute_iemg(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_4_td_zcr', compute_zero_crossing_rate(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_4_td_ssc', compute_slope_sign_changes(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_4_td_wfl', compute_waveform_length(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(-1)))

semg_df = semg_df.withColumn('semg_5_td_mean', compute_mean(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_5_td_rms', compute_root_mean_square(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_5_td_sd', compute_standard_deviation(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_5_td_var', compute_variance(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_5_td_iemg', compute_iemg(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_5_td_zcr', compute_zero_crossing_rate(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_5_td_ssc', compute_slope_sign_changes(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_5_td_wfl', compute_waveform_length(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(-1)))


In [5]:
# Generate 5 time-domain features per trial.
for x in range(1, NO_WINDOWS + 1):
    semg_df = semg_df.withColumn('semg_1_td_mean_' + str(x), compute_mean(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_1_td_rms_' + str(x), compute_root_mean_square(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_1_td_sd_' + str(x), compute_standard_deviation(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_1_td_var_' + str(x), compute_variance(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_1_td_iemg_' + str(x), compute_iemg(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_1_td_zcr_' + str(x), compute_zero_crossing_rate(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_1_td_ssc_' + str(x), compute_slope_sign_changes(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_1_td_wfl_' + str(x), compute_waveform_length(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(x)))

    semg_df = semg_df.withColumn('semg_2_td_mean_' + str(x), compute_mean(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_2_td_rms_' + str(x), compute_root_mean_square(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_2_td_sd_' + str(x), compute_standard_deviation(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_2_td_var_' + str(x), compute_variance(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_2_td_iemg_' + str(x), compute_iemg(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_2_td_zcr_' + str(x), compute_zero_crossing_rate(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_2_td_ssc_' + str(x), compute_slope_sign_changes(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_2_td_wfl_' + str(x), compute_waveform_length(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(x)))

    semg_df = semg_df.withColumn('semg_3_td_mean_' + str(x), compute_mean(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_3_td_rms_' + str(x), compute_root_mean_square(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_3_td_sd_' + str(x), compute_standard_deviation(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_3_td_var_' + str(x), compute_variance(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_3_td_iemg_' + str(x), compute_iemg(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_3_td_zcr_' + str(x), compute_zero_crossing_rate(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_3_td_ssc_' + str(x), compute_slope_sign_changes(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_3_td_wfl_' + str(x), compute_waveform_length(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(x)))

    semg_df = semg_df.withColumn('semg_4_td_mean_' + str(x), compute_mean(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_4_td_rms_' + str(x), compute_root_mean_square(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_4_td_sd_' + str(x), compute_standard_deviation(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_4_td_var_' + str(x), compute_variance(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_4_td_iemg_' + str(x), compute_iemg(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(-x)))
    semg_df = semg_df.withColumn('semg_4_td_zcr_' + str(x), compute_zero_crossing_rate(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_4_td_ssc_' + str(x), compute_slope_sign_changes(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_4_td_wfl_' + str(x), compute_waveform_length(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(x)))

    semg_df = semg_df.withColumn('semg_5_td_mean_' + str(x), compute_mean(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_5_td_rms_' + str(x), compute_root_mean_square(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_5_td_sd_' + str(x), compute_standard_deviation(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_5_td_var_' + str(x), compute_variance(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_5_td_iemg_' + str(x), compute_iemg(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_5_td_zcr_' + str(x), compute_zero_crossing_rate(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_5_td_ssc_' + str(x), compute_slope_sign_changes(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_5_td_wfl_' + str(x), compute_waveform_length(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(x)))


**Extracting Frequency Domain Features for each signal**

In [6]:
compute_fft = F.udf(lambda emg_signal, activity, x: calculate_fft(emg_signal, activity_length_map[activity]/NO_WINDOWS, activity, x), T.ArrayType(T.DoubleType()))
compute_power_spectral_density = F.udf(lambda emg_signal, activity, x: calculate_power_spectral_density(emg_signal, activity_length_map[activity]/NO_WINDOWS, activity, x), T.ArrayType(T.DoubleType()))
compute_median_frequency = F.udf(lambda emg_signal, activity, x: calculate_median_frequency(emg_signal, activity_length_map[activity]/NO_WINDOWS, activity, x), T.DoubleType())
compute_mean_frequency = F.udf(lambda emg_signal, activity, x: calculate_mean_frequency(emg_signal, activity_length_map[activity]/NO_WINDOWS, activity, x), T.DoubleType())

def calculate_fft(emg_signal, window_length, activity, x):

    start_end = get_start_end_for_window(window_length, x)

    # Compute FFT and take absolute value to get magnitude
    fft_values = np.abs(fftpack.fft(emg_signal))
    
    # Only keep first half of FFT values for real signals
    fft_values = fft_values[:len(emg_signal)//2]
    
    return fft_values.tolist()

def calculate_power_spectral_density(emg_signal, window_length, activity, x):

    start_end = get_start_end_for_window(window_length, x)

    # First compute FFT
    fft_values = calculate_fft(emg_signal, window_length, activity, x)

    # Then, square the FFT values to get power spectral density
    psd_values = np.square(fft_values)
    
    return psd_values.tolist()

def calculate_median_frequency(emg_signal, window_length, activity, x, sample_rate=2000):

    start_end = get_start_end_for_window(window_length, x)

    psd_values = calculate_power_spectral_density(emg_signal, window_length, activity, x)

    # Calculate frequencies for each FFT sample
    freqs = fftpack.fftfreq(len(emg_signal), d=1/sample_rate)
    
    # Keep only the positive frequencies (first half)
    freqs = freqs[:len(emg_signal)//2]
    
    # Calculate cumulative sum of power spectral density
    cumulative_psd = np.cumsum(psd_values)
    
    # Find frequency where cumulative sum is 50% of total sum
    median_freq = freqs[np.where(cumulative_psd >= 0.5 * cumulative_psd[-1])[0][0]]
    
    return float(median_freq)

def calculate_mean_frequency(emg_signal, window_length, activity, x, sample_rate=2000):

    start_end = get_start_end_for_window(window_length, x)

    psd_values = calculate_power_spectral_density(emg_signal, window_length, activity, x)

    # Calculate frequencies for each FFT sample
    freqs = fftpack.fftfreq(len(emg_signal), d=1/sample_rate)

    # Keep only the positive frequencies (first half)
    freqs = freqs[:len(emg_signal)//2]
    
    # Calculate mean frequency
    mean_freq = np.sum(psd_values * freqs) / np.sum(psd_values)
    
    return float(mean_freq)

semg_df = semg_df.withColumn('semg_1_fd_fft', compute_fft(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_1_fd_psd', compute_power_spectral_density(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_1_fd_medfreq', compute_median_frequency(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_1_fd_meanfreq', compute_mean_frequency(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(-1)))

semg_df = semg_df.withColumn('semg_2_fd_fft', compute_fft(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_2_fd_psd', compute_power_spectral_density(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_2_fd_medfreq', compute_median_frequency(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_2_fd_meanfreq', compute_mean_frequency(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(-1)))

semg_df = semg_df.withColumn('semg_3_fd_fft', compute_fft(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_3_fd_psd', compute_power_spectral_density(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_3_fd_medfreq', compute_median_frequency(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_3_fd_meanfreq', compute_mean_frequency(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(-1)))

semg_df = semg_df.withColumn('semg_4_fd_fft', compute_fft(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_4_fd_psd', compute_power_spectral_density(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_4_fd_medfreq', compute_median_frequency(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_4_fd_meanfreq', compute_mean_frequency(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(-1)))

semg_df = semg_df.withColumn('semg_5_fd_fft', compute_fft(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_5_fd_psd', compute_power_spectral_density(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_5_fd_medfreq', compute_median_frequency(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_5_fd_meanfreq', compute_mean_frequency(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(-1)))


In [7]:
# Generate 4 frequency-domain features per trial.
for x in range(1, NO_WINDOWS + 1):
    semg_df = semg_df.withColumn('semg_1_fd_fft_' + str(x), compute_fft(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_1_fd_psd_' + str(x), compute_power_spectral_density(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_1_fd_medfreq_' + str(x), compute_median_frequency(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_1_fd_meanfreq_' + str(x), compute_mean_frequency(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(x)))

    semg_df = semg_df.withColumn('semg_2_fd_fft_' + str(x), compute_fft(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_2_fd_psd_' + str(x), compute_power_spectral_density(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_2_fd_medfreq_' + str(x), compute_median_frequency(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_2_fd_meanfreq_' + str(x), compute_mean_frequency(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(x)))

    semg_df = semg_df.withColumn('semg_3_fd_fft_' + str(x), compute_fft(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_3_fd_psd_' + str(x), compute_power_spectral_density(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_3_fd_medfreq_' + str(x), compute_median_frequency(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_3_fd_meanfreq_' + str(x), compute_mean_frequency(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(x)))

    semg_df = semg_df.withColumn('semg_4_fd_fft_' + str(x), compute_fft(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_4_fd_psd_' + str(x), compute_power_spectral_density(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_4_fd_medfreq_' + str(x), compute_median_frequency(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_4_fd_meanfreq_' + str(x), compute_mean_frequency(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(x)))

    semg_df = semg_df.withColumn('semg_5_fd_fft_' + str(x), compute_fft(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_5_fd_psd_' + str(x), compute_power_spectral_density(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_5_fd_medfreq_' + str(x), compute_median_frequency(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(x)))
    semg_df = semg_df.withColumn('semg_5_fd_meanfreq_' + str(x), compute_mean_frequency(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(x)))



**Time-Frequency Domain and other additional Features**

In [8]:
WINDOW_SIZE = 100
OVERLAP = 50
NUM_COEFFS = 12
ORDER = 4

compute_stft = F.udf(lambda emg_signal, activity, x: calculate_stft(emg_signal, activity_length_map[activity]/NO_WINDOWS, activity, x), T.ArrayType(T.DoubleType()))
compute_mfcc = F.udf(lambda emg_signal, activity, x: calculate_mfcc(emg_signal, activity_length_map[activity]/NO_WINDOWS, activity, x), T.ArrayType(T.ArrayType(T.DoubleType())))
compute_hjorth = F.udf(lambda emg_signal, activity, x: calculate_hjorth(emg_signal, activity_length_map[activity]/NO_WINDOWS, activity, x), T.ArrayType(T.DoubleType()))
compute_autoregressive_coeffs = F.udf(lambda emg_signal, activity, x: calculate_autoregressive_coeffs(emg_signal, activity_length_map[activity]/NO_WINDOWS, activity, x), T.ArrayType(T.DoubleType()))

def calculate_stft(emg_signal, window_length, activity, x, sample_rate=2000, window_size=WINDOW_SIZE, overlap=OVERLAP):

    start_end = get_start_end_for_window(window_length, x)

    # window_size and overlap are in seconds
    window_sample_count = int(window_size * sample_rate)
    overlap_sample_count = int(overlap * sample_rate)

    # Compute stft
    f, t, Zxx = stft(emg_signal, fs=sample_rate, nperseg=window_sample_count, noverlap=overlap_sample_count)
    
    return list(f, t, Zxx)

def calculate_mfcc(emg_signal, window_length, activity, x, sample_rate=2000, num_coeffs=NUM_COEFFS):

    start_end = get_start_end_for_window(window_length, x)

    # num_coeffs is the number of MFCCs to compute
    return mfcc(emg_signal, samplerate=sample_rate, numcep=num_coeffs)

def calculate_hjorth(emg_signal, window_length, activity, x):

    start_end = get_start_end_for_window(window_length, x)

    # Compute the 3 hjorth parameters
    activeness = np.var(emg_signal)
    mobility = np.sqrt(np.var(np.diff(emg_signal)) / activeness)
    complexity = np.sqrt(np.var(np.diff(np.diff(emg_signal))) / np.var(np.diff(emg_signal))) / mobility

    return list(activity, mobility, complexity)

def calculate_autoregressive_coeffs(emg_signal, window_length, activity, x, order=ORDER):

    start_end = get_start_end_for_window(window_length, x)

    model = AutoReg(emg_signal, lags=order)
    model_fit = model.fit()
    return model_fit.params.tolist()

semg_df = semg_df.withColumn('semg_1_tfd_stft', compute_fft(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_1_tfd_mfcc', compute_fft(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_1_tfd_hjorth', compute_fft(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_1_tfd_arcoeffs', compute_fft(semg_df['filtered_semg_1_list'], semg_df['activity'], F.lit(-1)))

semg_df = semg_df.withColumn('semg_2_tfd_stft', compute_fft(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_2_tfd_mfcc', compute_fft(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_2_tfd_hjorth', compute_fft(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_2_tfd_arcoeffs', compute_fft(semg_df['filtered_semg_2_list'], semg_df['activity'], F.lit(-1)))

semg_df = semg_df.withColumn('semg_3_tfd_stft', compute_fft(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_3_tfd_mfcc', compute_fft(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_3_tfd_hjorth', compute_fft(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_3_tfd_arcoeffs', compute_fft(semg_df['filtered_semg_3_list'], semg_df['activity'], F.lit(-1)))

semg_df = semg_df.withColumn('semg_4_tfd_stft', compute_fft(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_4_tfd_mfcc', compute_fft(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_4_tfd_hjorth', compute_fft(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_4_tfd_arcoeffs', compute_fft(semg_df['filtered_semg_4_list'], semg_df['activity'], F.lit(-1)))

semg_df = semg_df.withColumn('semg_5_tfd_stft', compute_fft(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_5_tfd_mfcc', compute_fft(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_5_tfd_hjorth', compute_fft(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(-1)))
semg_df = semg_df.withColumn('semg_5_tfd_arcoeffs', compute_fft(semg_df['filtered_semg_5_list'], semg_df['activity'], F.lit(-1)))


**Write the data as output to parquet files.**

In [10]:
# Get the start time
st = time.time()

#Writing to files:
print("Repartitioning and writing as parquet..")
semg_df = semg_df.repartition(21, 'activity')
# semg_df.write.mode('overwrite').parquet("semg_with_extracted_features/")

semg_df = semg_df.drop('filtered_semg_1_list', 'filtered_semg_2_list', 'filtered_semg_3_list', 'filtered_semg_4_list', 'filtered_semg_5_list')
semg_df = semg_df.drop('semg_1_list', 'semg_2_list', 'semg_3_list', 'semg_4_list', 'semg_5_list')

# remove_list = ['semg_1_fd_fft', 'semg_2_fd_fft', 'semg_3_fd_fft', 'semg_4_fd_fft', 'semg_5_fd_fft',
#                       'semg_1_fd_psd', 'semg_2_fd_psd', 'semg_3_fd_psd', 'semg_4_fd_psd', 'semg_5_fd_psd',
#                       'semg_1_tfd_stft', 'semg_2_tfd_stft', 'semg_3_tfd_stft', 'semg_4_tfd_stft', 'semg_5_tfd_stft',
#                       'semg_1_tfd_mfcc', 'semg_2_tfd_mfcc', 'semg_3_tfd_mfcc', 'semg_4_tfd_mfcc', 'semg_5_tfd_mfcc',
#                       'semg_1_tfd_hjorth', 'semg_2_tfd_hjorth', 'semg_3_tfd_hjorth', 'semg_4_tfd_hjorth', 'semg_5_tfd_hjorth',
#                       'semg_1_tfd_arcoeffs', 'semg_2_tfd_arcoeffs', 'semg_3_tfd_arcoeffs', 'semg_4_tfd_arcoeffs', 'semg_5_tfd_arcoeffs']

# remove_list = remove_list + [f"{item}_{suffix}" for item in remove_list for suffix in range(1, NO_WINDOWS + 1)]
# print(remove_list)

# for item in remove_list:
#     semg_df = semg_df.drop(item)

semg_df.write.mode('overwrite').parquet("semg_extracted_more_features_detrended/")

print("Completed repartitioning and writing.")

# Get the end time
et = time.time()

# Calculate the execution time
elapsed_time = et - st
print('Execution time:', elapsed_time, 'seconds')

Repartitioning and writing as parquet..


                                                                                

Completed repartitioning and writing.
Execution time: 256.0938618183136 seconds


In [11]:
print('Writing to database..')
semg_df.write.format('jdbc').option("url", "jdbc:postgresql://localhost:5432/postgres") \
    .option("driver", "org.postgresql.Driver").option("dbtable", "semg_all_scalar_features") \
    .option("user", "postgres").option("password", "postgres").save(mode='append')
print("Completed writing to database.")
print('Processing complete.')
