In [4]:
from os import listdir

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

plt.style.use('classic')
plt.rcParams['figure.figsize'] = (15,5)
plt.rcParams['axes.grid'] = True

from scipy import signal
from scipy.optimize import curve_fit
import scipy.stats as sst

from itertools import product
from tqdm.notebook import tqdm
from tqdm.contrib.itertools import product as TQDMproduct

# import pingouin as pg

## Signal processing methods


In [9]:
class TTS:
    def __init__(self, subj, axes = 'ML'):        
        if axes == 'ML':
            axes = 'y'
        elif axes == 'AP':
            axes = 'x'
        self.subj = subj+"-"
        self.files = [c for c in listdir('data/') if self.subj in c]
        
        data = [self.ReadFile(f'data/{f}', axes) for f in self.files]
        self.x = [d[0] for d in data]
        self.y = [d[1] for d in data]
        
        self.group = self.subj.split('-')[0]
        
        self.y_TOP = [self.TOP(x, y) for x, y in zip(self.x, self.y)]
        self.y_SA = [self.SA(y) for y in self.y]
        self.y_RMS = [self.RMS(y) for y in self.y]
        self.y_CWT = [self.CWT(y) for y in self.y]
        self.y_CWT = [y / y.max() for y in self.y_CWT]
        
        # calculate 1sd of all files from the subject
        m = np.max([len(c) for c in self.y])
        f = np.vstack([np.pad(c, (0, m - len(c))) for c in self.y])
        f = np.mean(f, axis = 0)
        self.sd = self.CalculateThreshold(f)
        

    def ReadFile(self, file, axes = 'y'): 
    #     This line reads the exact Kistler txt outputs. 
    #     The Z-axis is used to find the landing instance
        data = pd.read_csv(file, sep = '\t', skiprows = 11).drop(0)[['abs time (s)', f'F{axes}', 'Fz']].astype(float)
        data.columns = ['time',axes,'z']
        data.set_index('time', inplace = True)
        data['z'] -= data['z'].min()

        # landing occurs when the Z-axis goes over a threshold (just above zero)
        landing = np.where(data['z'].abs() > 25)[0].min()
        landing = data.index[landing]

        # remove anything earlier than landing, and reset indeces.
        data = data.loc[landing: landing + 15]  
        data.index -= landing

        # filter using a 2nd order Butterworth low-pass with a cut of freq. of 12Hz. 
        self.fs = data.index.argmax() / data.index.max() # the Kistler sampling frequency
        sos = signal.butter(2, 12, 'low', output = 'sos', fs = self.fs)
        data[axes] = signal.sosfilt(sos, data[axes].values)

        # normalize data - set all values between 0 and 1
        data[axes] = np.abs(data[axes]) / np.abs(data[axes]).max()

        return data.index.values, data[axes].values
    
    def CalculateThreshold(self, y, w = 5): 
        # calculate the threshold as the minimal SD in a w consecutive seconds
        w = 200 * w
        return np.min([y[i:i+w].std() for i in range(len(y)-w)])
        
    def RAW(self, y):
        """
        This method doesn't do anything, and returns the RAW values.
        """
        return y

    def RMS(self, y, window = int(200 * 0.25), center = True):
        """
        Calculates the moving RMS with window size (number of samples, integer).
        center: boolean, if True pads the beginning and the end of the result vector with zeros. 
                Else, pads only the end.
        """
        if center:
            w = window // 2
            return np.array([np.nan] * w + 
                            [np.sqrt(np.mean(y[i - w : i + w] ** 2)) for i in range(w, len(y) - w)] + 
                            [np.nan] * w)
        else:
            w = window
            return np.array(
                            [np.sqrt(np.mean(y[i : i + w] ** 2)) for i in range(len(y) - w)] + 
                            [np.nan] * w)

    def SA(self, y):
        """
        Calculate the sequential average signal
        """
        return np.array([np.mean(y[:i]) for i in range(1,len(y)+1)])


    def TOP(self, x, y):
        """
        Fit a Third-Order Polynomial
        """
        s = np.argmax(y)
        objective = lambda x, a, b, c, d: (a * x**3) + (b * x**2) + (c * x) + d
        a, b, c, d = np.polyfit(x = x[s:] - x[s:].min(), 
                                y = y[s:],
                                deg = 3)
        top = objective(x, a, b, c, d)
        return top

    def CWT(self, y, window = 100):
        """
        Calculates the continuous wavelet transform and integrate it to find the energy.
        """
        wv = signal.cwt(y, signal.morlet, np.arange(1, window), dtype = np.complex128)
        energy = np.trapz(wv, axis = 0)
        return np.abs(energy)
    
    def CalculateTTS(self, y, SD, duration = 0.5):
        threshold = self.sd * SD
        duration = int(duration * self.fs)
        
        # find when the signal goes below the threshold for at least 0.5 seconds (100 data samples)        
        for i in range(len(y) - int(duration)):
            if np.all(y[i : i+duration] < threshold):
                return i / self.fs if i / self.fs != 0 else np.nan
        return np.nan

    

In [10]:
example = TTS('ex')

FileNotFoundError: [WinError 3] The system cannot find the path specified: 'data/'