In [1]:
!pip install kneed
!pip install fastparquet
!pip install conda
!pip install pyarrow



In [None]:
import pandas as pd
import numpy as np
import scipy as sp
from scipy import signal
import matplotlib.pyplot as plt
from scipy.signal import find_peaks
#from kneed import KneeLocator

In [None]:
import pyarrow.parquet as pq
import pyarrow as pa

In [None]:
## Import Data
data = pd.read_csv(r"C:\Users\champ\Desktop\pp\pythonprocessing\rawEMGData.csv") 

## Sampling Rate and High/Low Pass CutOff Frequencies
fs = 2000 # Sampling Rate
high_band = 20 #High Pass Cut-Off Frequency for BandPass Filter (Hz)
low_band = 450 # Low Pass Cut-Off Frequency for BandPass Filter (Hz)
lowPassCutOff = 5 # Low Pass Cut-Off Frequency for LowPass Filter (Hz)

## PreProcess, Normalize, and Shift Mean of Raw Data
rawEMG1, normalizedEMG1, meancorrectedEMG1 = preprocessEMG(data)

## Filter Raw Data
emg_BandPass1, emg_rectified1, emg_lowPass1 = filterEMG(fs, high_band, low_band, lowPassCutOff, meancorrectedEMG1)

## Plot All Data
plotRawAndProcessedEMG(rawEMG1, normalizedEMG1, meancorrectedEMG1, emg_BandPass1, emg_rectified1, emg_lowPass1)

In [None]:
def processEKG(df):
    

In [None]:
def preprocessEMG(df):
  df.columns.values[0] = "Time"
  rawemg = df.EMG.dropna()
  rawemgToNumeric = pd.to_numeric(rawemg, errors = 'coerce') # Convert the string type raw emg data to float type
  normalizedEMG = rawemgToNumeric * (1/0.011) # this line normalizes the raw data to the range -1 to 1
  meancorrectedEMG = normalizedEMG - np.mean(normalizedEMG) # Shift EMG signal by mean
  meancorrectedEMG = meancorrectedEMG.dropna()

  return rawemgToNumeric, normalizedEMG, meancorrectedEMG

def plotRawAndProcessedEMG(rawEMG, normalizedEMG, meancorrectedEMG, emg_BandPass, emg_rectified, emg_lowPass):
  fig = plt.figure()
  #plt.subplot(3,1,1)
  plt.plot(range(len(rawEMG)), rawEMG)
  plt.xlabel('Sample Number ("Time")')
  plt.ylabel('EMG Signal (V)')

  fig = plt.figure()
  #plt.subplot(3,1,2)
  plt.plot(range(len(normalizedEMG)), normalizedEMG)
  plt.xlabel('Sample Number ("Time")')
  plt.ylabel('Normalized EMG Signal (a.u)')

  fig = plt.figure()
  #plt.subplot(3,1,3)
  plt.plot(range(len(meancorrectedEMG)), meancorrectedEMG)
  plt.xlabel('Sample Number ("Time")')
  plt.ylabel('Mean Corrected EMG Signal (a.u)')

  fig = plt.figure()
  #plt.subplot(3,1,3)
  plt.plot(range(len(emg_BandPass)), emg_BandPass)
  plt.xlabel('Sample Number ("Time")')
  plt.ylabel('BandPass Filtered EMG Signal (a.u)')

  fig = plt.figure()
  #plt.subplot(3,1,3)
  plt.plot(range(len(emg_rectified)), emg_rectified)
  plt.xlabel('Sample Number ("Time")')
  plt.ylabel('Rectified EMG Signal (a.u)')

  fig = plt.figure()
  #plt.subplot(3,1,3)
  plt.plot(range(len(emg_lowPass)), emg_lowPass)
  plt.xlabel('Sample Number ("Time")')
  plt.ylabel('LowPass Filtered EMG Signal (a.u)')

  fig = plt.figure()
  #plt.subplot(3,1,3)
  plt.plot(range(20000,26000), emg_lowPass[20000:26000])
  plt.xlabel('Sample Number ("Time")')
  plt.ylabel('LowPass Filtered EMG Signal (a.u)')


def filterEMG(fs, high_band, low_band, lowPassCutOff, meancorrectedEMG):
  # create bandpass filter for EMG with high and low cut off frequencies
  high = high_band/(fs/2)
  low = low_band/(fs/2)

  b, a = sp.signal.butter(4, [high,low], btype='bandpass') # Create Butterworth Filter

  emg_BandPass = sp.signal.filtfilt(b, a, meancorrectedEMG) # Apply BandPass Filter

  emg_rectified = abs(emg_BandPass) # Rectify BandPass Filtered EMG Signal

  low_pass = lowPassCutOff/(fs/2)
  b2, a2 = sp.signal.butter(4, low_pass, btype='lowpass') # Create Low Pass Filter for Rectified Emg Data

  emg_lowPass = sp.signal.filtfilt(b2, a2, emg_rectified) # Apply Low Pass Filter to Rectified EMG Data

  return emg_BandPass, emg_rectified, emg_lowPass

def freqAnalysis(fs, meancorrectedEMG):
  f, Pxx = signal.welch(meancorrectedEMG, fs) 

  # fig = plt.figure()
  # plt.plot(range(len(meancorrectedEMG)), meancorrectedEMG)
  # plt.xlabel('Sample Number ("Time")')
  # plt.ylabel('Mean Corrected EMG Signal (a.u)')

  # fig = plt.figure()
  # plt.plot(f, Pxx)
  # plt.xlim([0, 1000])
  # plt.xlabel('frequency [Hz]')
  # plt.ylabel('PSD')

  # plt.show()

# def statAnalysis():
# # If ratio of variance is >> 4 then say equal_var = False below
# print("Variance of Data1=", np.var(emg_envelope), "Variance of Data 2=", np.var(emg_envelope1), "ratio=",np.var(emg_envelope1)/np.var(emg_envelope))


# #perform two sample t-test with equal or unequal variances
# stats.ttest_ind(a=emg_envelope1, b=emg_envelope, equal_var=False)

def signal_peaks_old(emgsignal, bin_num):
  # Find Threshold
  # fig = plt.figure()
  # plt.plot(emgsignal)
  #print("Bin Size =", bin_num)
  counts, val = np.histogram(emgsignal, bins = bin_num)
  vals = val[0:-1]
  max_x = vals[np.argmax(counts)]
  x = vals[ (vals >= max_x) ]
  y = counts[len(counts)-len(x):] 


  kn = KneeLocator(x,y, curve='convex', direction='decreasing') # knee point is threshold point -- could be sensitive to bin size
  #print("Knee Point = ", kn.knee)
  # fig = plt.figure()
  # plt.plot(x,y)
  # plt.vlines(kn.knee, plt.ylim()[0], plt.ylim()[1], linestyles='dashed')  

  # Find Peaks
  peaks, _ = find_peaks(emgsignal, height=kn.knee)

  # fig = plt.figure()
  # plt.plot(emgsignal)
  # plt.plot(peaks, emgsignal[peaks], "x")
  # plt.plot(np.zeros_like(emgsignal), "--", color="gray")
  # plt.show()

  return kn.knee

def find_threshold(emgsignal, bin_num):
  counts, val = np.histogram(emgsignal, bins = bin_num)
  vals = val[0:-1]
  max_x = vals[np.argmax(counts)]
  x = vals[ (vals >= max_x) ]
  y = counts[len(counts)-len(x):] 

  kn = KneeLocator(x,y, curve='convex', direction='decreasing') # knee point is threshold point -- could be sensitive to bin size

  return kn.knee

def signal_peaks(emgsignal, threshold):
  peaks, _ = find_peaks(emgsignal, height=threshold)

  fig = plt.figure()
  plt.plot(emgsignal)
  plt.plot(peaks, emgsignal[peaks], "x")
  plt.plot(np.zeros_like(emgsignal), "--", color="gray")
  plt.xlabel('Sample Number ("Time")')
  plt.ylabel('EMG Amplitude')
  plt.show()