# Functions

In [1]:
import os
import glob
import numpy as np
import pandas as pd
import statsmodels.api as sm
import statsmodels.formula.api as smf
import statistics as stc
from scipy.fftpack import fft

In [2]:
def Antn_Data(file_name, root):

    tokens = file_name.split('_')
    
    recording_info = pd.DataFrame(data = [tokens], columns = ['Patient number', 'Recording index', 'Chest location','Acquisition mode','Recording equipment'])

    recording_annotations = pd.read_csv(os.path.join(root, file_name), names = ['Start', 'End', 'Crackles', 'Wheezes'], delimiter= '\t')

    return (recording_info, recording_annotations)


In [3]:
# fast fourier transforfmation
# background information from https://github.com/balzer82/FFT-Python
def fft_m(signal,SR,MiDF,MaDF):

    N=len(signal)
    T=1/SR
    Max=int((MaDF*(N//2))/(0.5*SR))
    Min=int((MiDF*(N//2))/(0.5*SR))
    yf = fft(signal)
    xf = np.linspace(0, 1.0/(2.0*T),N//2)
    Real_fft=2.0/N * np.abs(yf[0:N//2])   
    xf=xf[Min:Max]
    Real_fft=Real_fft[Min:Max]
    psd=(Real_fft**2)/xf
    return xf, Real_fft


In [4]:
# root mean square envelope analysis
# best option
def Envelope_rms(inputSignal, window_size):    

    a2 = np.power(inputSignal,2)

    window = np.ones(window_size)/float(window_size)

    Up=np.sqrt(np.convolve(a2, window, 'valid'))

    Down=-np.sqrt(np.convolve(a2, window, 'valid'))

    return Up,Down



In [6]:
# combinational max min envelope analysis
def Envelope_max_min(Signal,intervalLength,maxm):

    Up=np.zeros(len(Signal))

    Down=np.zeros(len(Signal))

    for i in range(len(Signal)-intervalLength):

        Up[i]=max(Signal[i:i+intervalLength])

        Down[i]=min(Signal[i:i+intervalLength])

    return Up,Down

In [7]:
def maddest(Signal, axis=None):

    """

    Mean Absolute Deviation

    """

    return np.mean(np.absolute(Signal - np.mean(Signal, axis)), axis)



In [8]:
def RMS(Signal, axis=None):

    return np.sqrt(np.mean(Signal**2))



In [9]:
def FM4(Signal, axis=None):

    return np.sqrt(np.mean(Signal**2))



In [10]:
def Shape_factor(Signal, axis=None):

    """

    Shape_factor

    """

    return len(Signal)*RMS(Signal)/sum(abs(Signal))



In [11]:
def Nrm_Kurtosis(Signal, axis=None):

    """

    normalized kurtosis

    """

    return scipy.stats.kurtosis(Signal)/(np.std(Signal)**4)



In [12]:
def crest_factor(x):

    return np.max(np.abs(x))/np.sqrt(np.mean(np.square(x)))



In [13]:
def impulse_factor(x):

    return np.max(np.abs(x))/sum(abs(x))



In [14]:
def Energy(x):

    return sum(x**2)




In [15]:
def high_pass_filter(X, low_cutoff, SR):

    

    sos = scipy.signal.butter(10, low_cutoff, 'highpass', SF, output='sos')

    filtered_sig = scipy.signal.sosfilt(sos, X)



    return filtered_sig

In [16]:
def low_pass_filter(X, high_cutoff, SR):

    

    sos = signal.butter(10, high_cutoff, 'lp', fs=SF, output='sos')

    filtered_sig = scipy.signal.sosfilt(sos, X)



    return filtered_sig



In [17]:
def denoise_wvlt( x, wavelet, level):

    """

    1. Adapted from waveletSmooth function found here:

    http://connor-johnson.com/2016/01/24/using-pywavelets-to-remove-high-frequency-noise/

    2. Threshold equation and using hard mode in threshold as mentioned

    by Tomas Vantuch:

    http://dspace.vsb.cz/bitstream/handle/10084/133114/VAN431_FEI_P1807_1801V001_2018.pdf

    """

    # Decompose to get the wavelet coefficients
    coeff = pywt.wavedec(x, wavelet, mode="per")


    # Calculate sigma for threshold as defined in http://dspace.vsb.cz/bitstream/handle/10084/133114/VAN431_FEI_P1807_1801V001_2018.pdf
    # As noted by @harshit92 MAD referred to in the paper is Mean Absolute Deviation not Median Absolute Deviation

    sigma = (1/0.6745) * maddest( coeff[-level])



    # Calculte the univeral threshold
    uthresh = sigma * np.sqrt( 2*np.log( len( x )))
    coeff[1:] = (pywt.threshold( i, value=uthresh, mode='hard') for i in coeff[1:])

    

    # Reconstruct the signal using the thresholded coefficients

    return pywt.waverec(coeff, wavelet, mode='per')





In [18]:
# overnight code
def Sound_features(Signal,SR):

    """

    Audio Spectral features

    n_fft=2048, hop_length=512,frame_length=2048, spectral_power=2.0

    """

    chroma_mean= np.mean(librosa.feature.chroma_stft(Signal, sr=SR).T, axis=0)

    chroma_std= np.std(librosa.feature.chroma_stft(Signal, sr=SR).T, axis=0)

    cens_mean= np.mean(librosa.feature.chroma_cens(Signal, sr=SR).T, axis=0)

    cens_std= np.std(librosa.feature.chroma_cens(Signal, sr=SR).T, axis=0)

    mel_spctrm_mean= np.mean(librosa.feature.melspectrogram(Signal, sr=SR).T, axis=0)

    mel_spctrm_std= np.std(librosa.feature.melspectrogram(Signal, sr=SR).T, axis=0)

    mfcc_mean= np.mean(librosa.feature.mfcc(Signal, sr=SR).T, axis=0)

    mfcc_std= np.std(librosa.feature.mfcc(Signal, sr=SR).T, axis=0)

    spctl_rms= np.mean(librosa.feature.rms(Signal))

    cntrd_mean= np.mean(librosa.feature.spectral_centroid(Signal, sr=SR).T, axis=0)

    cntrd_std= np.std(librosa.feature.spectral_centroid(Signal, sr=SR).T, axis=0)

    BW_mean= np.mean(librosa.feature.spectral_bandwidth(Signal, sr=SR).T, axis=0)

    BW_std= np.std(librosa.feature.spectral_bandwidth(Signal, sr=SR).T, axis=0)

    cntrst_mean= np.mean(librosa.feature.spectral_contrast(Signal, sr=SR).T, axis=0)

    cntrst_std= np.std(librosa.feature.spectral_contrast(Signal, sr=SR).T, axis=0)

    fltnss_mean= np.mean(librosa.feature.spectral_flatness(Signal).T, axis=0)

    fltnss_std= np.std(librosa.feature.spectral_flatness(Signal).T, axis=0)

    rolloff_mean= np.mean(librosa.feature.spectral_rolloff(Signal, sr=SR).T, axis=0)

    rolloff_std= np.std(librosa.feature.spectral_rolloff(Signal, sr=SR).T, axis=0)

    plyftr_mean= np.mean(librosa.feature.poly_features(Signal, sr=SR,order=10).T, axis=0)

    plyftr_std= np.std(librosa.feature.poly_features(Signal, sr=SR,order=10).T, axis=0)

    tonnetz_mean= np.mean(librosa.feature.tonnetz(Signal, sr=SR).T, axis=0)

    tonnetz_std= np.std(librosa.feature.tonnetz(Signal, sr=SR).T, axis=0)

    ZCrate_mean= np.mean(librosa.feature.zero_crossing_rate(Signal).T, axis=0)

    ZCrate_std= np.std(librosa.feature.zero_crossing_rate(Signal).T, axis=0)

    

    

    """

    Audio Rhythm  features

    n_fft=2048, hop_length=512,frame_length=2048, spectral_power=2.0

    """

    tempogram_mean= np.mean(librosa.feature.tempogram(Signal, sr=SR).T, axis=0)

    tempogram_std= np.std(librosa.feature.tempogram(Signal, sr=SR).T, axis=0)

    FrTmpgrm_mean= np.mean(librosa.feature.fourier_tempogram(Signal, sr=SR).T, axis=0)

    FrTmpgrm_std= np.std(librosa.feature.fourier_tempogram(Signal, sr=SR).T, axis=0)

    

    """

    Manipulation feature

    """

    derivativ_mean= np.mean(librosa.feature.delta(Signal, order=1).T, axis=0)

    derivativ_std= np.std(librosa.feature.delta(Signal, order=1).T, axis=0)

    StckMmr_mean= np.mean(librosa.feature.stack_memory(Signal, n_steps=2,).T, axis=0)

    StckMmr_std= np.std(librosa.feature.stack_memory(Signal, n_steps=2,).T, axis=0)    

    Extracted_features=np.abs(np.hstack((chroma_mean,chroma_std, cens_mean, cens_std, mel_spctrm_mean, mel_spctrm_std, mfcc_mean, 

                              mfcc_std, spctl_rms, cntrd_mean, cntrd_std, 

                              BW_mean, BW_std, cntrst_mean, cntrst_std,

                              fltnss_mean, fltnss_std, rolloff_mean, rolloff_std,

                              plyftr_mean, plyftr_std, tonnetz_mean, tonnetz_std,

                              ZCrate_mean, ZCrate_std, tempogram_mean,tempogram_std,

                              FrTmpgrm_mean, FrTmpgrm_std, derivativ_mean, derivativ_std,

                              StckMmr_mean, StckMmr_std)))

    Extracted_features=Extracted_features.T
# sound features 
    Features_name=pd.DataFrame(['chroma_mean','chroma_std', 'cens_mean', 'cens_std','mel_spctrm_mean', 'mel_spctrm_std', 'mfcc_mean', 'mfcc_std', 'spctl_rms', 'cntrd_mean', 'cntrd_std', 'BW_mean', 'BW_std', 'cntrst_mean', 'cntrst_std',\

                              'fltnss_mean', 'fltnss_std', 'rolloff_mean', 'rolloff_std',\

                              'plyftr_mean', 'plyftr_std', 'tonnetz_mean', 'tonnetz_std',\

                              'ZCrate_mean', 'ZCrate_std', 'tempogram_mean','tempogram_std',\

                              'FrTmpgrm_mean', 'FrTmpgrm_std', 'derivativ_mean', 'derivativ_std',\

                              'StckMmr_mean', 'StckMmr_std'])

    

    return Extracted_features,Features_name




In [19]:
def Time_features(Signal,SR):

    """

    Statistical features from time domain signals

    """

    maxm=np.max(Signal) 

    mean=np.mean(Signal)             #Arithmetic mean of data.

    h_mean=stc.harmonic_mean(abs(Signal))  #Harmonic mean of data.

    median=stc.median(Signal)             #Median (middle value) of data.

    m_low=stc.median_low(Signal)            #Low median of data.

    m_h=stc.median_high(Signal)            # High median of data.

    m_g=stc.median_grouped(Signal)           # Median, or 50th percentile of grouped data.

    var=stc.pvariance(Signal)

    std=np.std(Signal)

    skewnss=scipy.stats.skew(Signal)

    krts=scipy.stats.kurtosis(Signal)

    Nkrts=Nrm_Kurtosis(Signal)

    cumfreqs=scipy.stats.cumfreq(Signal)[0]

    entrp=scipy.stats.entropy(abs(Signal))

    shp_f=Shape_factor(Signal)

    RMS_S=RMS(Signal)

    mdst=maddest(Signal)

    enrg=Energy(Signal)

    IF=impulse_factor(Signal)

    Extracted_features=np.hstack((maxm, mean, h_mean, median, m_low, m_h, m_g, var,

                              std, skewnss, krts, Nkrts, cumfreqs, entrp, shp_f, 

                              RMS_S, mdst, enrg, IF))

    Features_name=pd.DataFrame(['maxm', 'mean','h_mean', 'median', 'm_low', 'm_h', \

                                'm_g', 'var','std', 'skewnss', 'krts', 'Nkrts',\

                                'cumfreqs', 'entrp', 'shp_f', 'RMS_S', 'mdst',\

                                'enrg', 'IF'],[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,

                                16,17,18,19])

    return Extracted_features,Features_name




In [20]:
def Freq_features(Signal,SR): 

    """

    Statistical features from frequency domain signals (FFT signals)

    """

    _,FFT_amp=fft_m(Signal,SR,0,1000)

    maxm_frq=np.max(FFT_amp) 

    mean_frq=np.mean(FFT_amp)             #Arithmetic mean of data.

    h_mean_frq=stc.harmonic_mean(abs(FFT_amp))  #Harmonic mean of data.

    median_frq=stc.median(FFT_amp)             #Median (middle value) of data.

    m_low_frq=stc.median_low(FFT_amp)            #Low median of data.

    m_h_frq=stc.median_high(FFT_amp)            # High median of data.

    m_g_frq=stc.median_grouped(FFT_amp)           # Median, or 50th percentile of grouped data.

    var_frq=stc.pvariance(FFT_amp)

    std_frq=np.std(FFT_amp)

    skewnss_frq=scipy.stats.skew(FFT_amp)

    krts_frq=scipy.stats.kurtosis(FFT_amp)

    Nkrts_frq=Nrm_Kurtosis(FFT_amp)

    cumfreqs_frq=scipy.stats.cumfreq(FFT_amp)[0]

    entrp_frq=scipy.stats.entropy(abs(FFT_amp))

    shp_f_frq=Shape_factor(FFT_amp)

    RMS_frq=RMS(FFT_amp)

    mdst_frq=maddest(FFT_amp)

    enrg_frq=Energy(FFT_amp)

    IF_frq=impulse_factor(FFT_amp)

    peak_frq=scipy.signal.find_peaks(FFT_amp,height=max(FFT_amp))[0]*SR/len(Signal)

    

    Extracted_features=np.hstack((maxm_frq, mean_frq, h_mean_frq, median_frq, m_low_frq,

                              m_h_frq, m_g_frq, var_frq, std_frq, skewnss_frq,

                              krts_frq, Nkrts_frq, cumfreqs_frq, entrp_frq,

                              shp_f_frq, RMS_frq, mdst_frq, enrg_frq, IF_frq))

    Features_name=pd.DataFrame(['maxm_frq', 'mean_frq','h_mean_frq', 'median_frq',

                            'm_low_frq', 'm_h_frq', 'm_g_frq', 'var_frq','std_frq',

                            'skewnss_frq', 'krts_frq', 'Nkrts_frq', 'cumfreqs_frq',

                            'entrp_frq', 'shp_f_frq', 'RMS__frq', 'mdst_frq',

                            'enrg_frq', 'IF_frq', 'peak_frq'],[1,2,3,4,5,6,7,8,

                             9,10,11,12,13,14,15,16,17,18,19,20])

    

    return Extracted_features,Features_name



In [21]:
def All_features(Signal,SR):

    S_feat,S_feat_name=Sound_features(Signal,SR)
    T_feat,T_feat_name=Time_features(Signal,SR)
    Frq_feat,Frq_feat_name=Freq_features(Signal,SR)
    All_features=np.hstack((S_feat,T_feat,Frq_feat))  

    return All_features


In [None]:
# xgbregressor feature importance for feature selection
def XGBRgrssr(Features,label,Train_R):

    def get_feature_importance_data(Features,label,Train_R):

        data = Features.copy()
        y = label
        X = Features

        train_samples = int(X.shape[0] *Train_R)

        X_train = X[:train_samples]
        X_test = X[train_samples:]

        y_train = y[:train_samples]
        y_test = y[train_samples:]

           return (X_train, y_train), (X_test, y_test)



    (X_train_FI, y_train_FI), (X_test_FI, y_test_FI) = get_feature_importance_data(All_feat,label,.6)

    regressor = xgb.XGBRegressor(gamma=0.0,n_estimators=200,base_score=0.7,colsample_bytree=1,learning_rate=0.001)

    xgbModel = regressor.fit(X_train_FI,y_train_FI, \

                         eval_set = [(X_train_FI, y_train_FI), (X_test_FI, y_test_FI)], \

                         verbose=False)

    

    eval_result = regressor.evals_result()

    training_rounds = range(len(eval_result['validation_0']['rmse']))

    plt.scatter(x=training_rounds,y=eval_result['validation_0']['rmse'],label='Training Error')

    plt.scatter(x=training_rounds,y=eval_result['validation_1']['rmse'],label='Validation Error')

    plt.xlabel('Iterations')

    plt.ylabel('RMSE')

    plt.title('Training Vs Validation Error')

    plt.legend()

    plt.show()

    features_score=xgbModel.feature_importances_.tolist()

    fig = plt.figure()

    plt.xticks(rotation='vertical')

    X_test_FI=pd.DataFrame(X_test_FI)

    plt.bar([i for i in range(len(xgbModel.feature_importances_))], 

             xgbModel.feature_importances_.tolist(), tick_label=X_test_FI.columns)

    plt.title('Feature importance')

    plt.show()

    

    return features_score

In [23]:
# Univariate feature selection

def Unvr_FS(Features,label,N_F):

    X=Features
    y=label

    # Split dataset to select features 

    X_train, X_test, y_train, y_test = train_test_split(X, y,train_size=.6, stratify=y, random_state=0)

    plt.figure()
    plt.clf()
    X_indices = np.arange(X.shape[-1])

    selector = SelectKBest(f_classif, N_F)
    selector.fit(X_train, y_train)
    scores = -np.log10(selector.pvalues_)
    features_score = scores/max(scores)
    plt.bar(X_indices - .45, scores, width=.2,

            label=r'Univariate score ($-Log(p_{value})$)', color='darkorange',

            edgecolor='black')

    plt.title("Univariate feature selection")
    plt.xlabel('Feature index')
    plt.yticks(())
    plt.axis('tight')
    plt.legend(loc='upper right')
    plt.show()

    

    # Compare to the weights of an SVM

    clf = make_pipeline(MinMaxScaler(), LinearSVC())

    clf.fit(X_train, y_train)

    print('Classification accuracy without selecting features: {:.3f}'

          .format(clf.score(X_test, y_test)))

    svm_weights = np.abs(clf[-1].coef_).sum(axis=0)

    svm_weights /= svm_weights.sum()

    plt.bar(X_indices - .25, svm_weights, width=.2, label='SVM weight',

        color='navy', edgecolor='black')

    clf_selected = make_pipeline(SelectKBest(f_classif, k=10), MinMaxScaler(), LinearSVC())

    clf_selected.fit(X_train, y_train)

    print('Classification accuracy after univariate feature selection: {:.3f}'

          .format(clf_selected.score(X_test, y_test)))

    svm_weights_selected = np.abs(clf_selected[-1].coef_).sum(axis=0)

    svm_weights_selected /= svm_weights_selected.sum()

    plt.bar(X_indices[selector.get_support()] - .05, svm_weights_selected,

            width=.2, label='SVM weights after selection', color='c', edgecolor='black')

    plt.title("Comparing feature selection")

    plt.xlabel('Feature number')

    plt.yticks(())

    plt.axis('tight')

    plt.legend(loc='upper right')

    plt.show()

    

    return features_score



"""

making classifications and predictions

"""

def XGBclass(Features,label,test_r):

    

    X_train, X_test, y_train, y_test = train_test_split(X, label,test_size=test_r)

    # fit model on training data

    model = XGBClassifier()
    model.fit(X_train, y_train)

    # make predictions for test data
    predictions = model.predict(X_test)

    # evaluate predictions
    accuracy = accuracy_score(y_test, predictions)

    print("Accuracy: %.2f%%" % (accuracy * 100.0))
    xx=np.linspace(1,predictions.shape[0],predictions.shape[0])
    plt.scatter(xx, y_test, s=1000, color='Blue',label='True condition')
    plt.scatter(xx,predictions, s=500, color='red',label='Predicted condition')
    plt.legend()

    

    return predictions,accuracy
