This scripts depends on the generatation of envelope features that needs to be generated first.

In [8]:
#filename where you want to store the audio features
filename = 'script-2-N95_Audio_Features_FVC.pickle'

In [2]:
import librosa
import librosa.display
import IPython.display as ipd
import numpy as np
import sklearn
import pandas as pd
from sklearn.model_selection import LeaveOneOut
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
from sklearn.ensemble import BaggingRegressor
from sklearn.svm import SVR
from sklearn.linear_model import Lasso
from sklearn.linear_model import Ridge
import xgboost as xgb
from sklearn.metrics import mean_squared_error
import matplotlib
import matplotlib.pyplot as plt
import latexify as lt
import os
#from thinkdsp import read_json
import scipy
import speechpy
#LOSO Validation
loo = LeaveOneOut()
import tsfel
#for filter
from scipy.signal import butter
from scipy.signal import lfilter
from scipy.signal import freqz;
import pickle
import json

def butter_highpass(cutoff, fs, order=5):
    nyq = 0.5 * fs
    normal_cutoff = cutoff / nyq
    b, a = butter(order, normal_cutoff, btype='highpass', analog=False)
    return b, a

def butter_highpass_filter(data, cutoff, fs, order=5):
    b, a = butter_highpass(cutoff, fs, order=order)
    y = lfilter(b, a, data)
    return y;

def butter_bandpass(lowcut, highcut, fs, order=5):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    return b, a


def butter_bandpass_filter(data, lowcut, highcut, fs, order=5):
    b, a = butter_bandpass(lowcut, highcut, fs, order=order)
    y = lfilter(b, a, data)
    return y
#the file where all data is arranged.
data = pd.read_csv("data_FVC_N95.csv")
print("Total Data Points = ",len(data))
#data.head(25)

Total Data Points =  60


## New Feature Engineering | Adddressed After ACM HEALTH Revision | 8th May'22

In [27]:
#load time series feature of the envelope
    
#with open('script-1-ts-fel-n95-Envelope.pickle', 'rb') as handle:
#    ts_features_envelope = pickle.load(handle)

Audio_Features = {}

try: 
    measured = pd.read_csv('data-forced-breathing-1/GT_FB_N95.csv', index_col='Unnamed: 0')
except:
    measured = pd.read_csv('data-forced-breathing-1/GT_FB_N95.csv')
    
def get_features(filepath, lungParam, num_filters, num_cepstral, fft_length):
    
    #use the below line if the data is in json
    #wave = read_json(filepath=filepath)
    
    #use the below line if the data is in wav
    if (filepath).endswith('.wav'):
        ys, fs = librosa.load(filepath, sr=16000)
    else:
        with open(filepath) as fp:
            input_data = json.load(fp)
        ys = np.array(input_data['payload']['values'])
        fs = 1000/input_data['payload']['interval_ms']
    
    if lungParam == 'FEV1':
        #print("FEV1 is the param. Audio signal will be clipped to peak only")
        peakIndex = np.where(ys == ys.max())[0][0]
        ys = ys[0:peakIndex+1];


    
    stft_spectrogram=np.abs(librosa.stft(ys, n_fft=480, hop_length=240))
    #we are only interested in the highest frequency component for each frame.
    stft_spectrogram_frame_mean = np.mean(stft_spectrogram, axis=1)
    
    #we have to ensure that the vetor size is same. Since each audio file is of different length
    #the number of frames are different. We pad 0s to make the vector size uniform across samples
    #stft_spectrogram_padded = np.pad(stft_spectrogram_frame_max,(650-len(stft_spectrogram_frame_max),0), 'constant', constant_values=(0))
    
    mfc_coefficients = np.mean(librosa.feature.mfcc(y=ys, n_mfcc=50, sr=16000),axis=1)

    melspectogram = np.mean(librosa.feature.melspectrogram(y=ys, sr=16000, n_mels=16, n_fft=480, hop_length=240),axis=1)
    #print("Shape of Melspectogram = {}\n".format(melspectogram.shape))

    
    mfe = speechpy.feature.mfe(ys, sampling_frequency=16000, frame_length=0.030, frame_stride=0.015,
            num_filters=num_filters, fft_length=fft_length, low_frequency=0)
    
    mfe = np.mean(mfe[0],axis=0)
    
    #print("Shape of MFE = {}\n".format(mfe.shape))
    
    mfcc = speechpy.feature.mfcc(ys, sampling_frequency=16000, frame_length=0.030, frame_stride=0.015,
             num_filters=num_filters, fft_length=fft_length, low_frequency=0, num_cepstral=num_cepstral)
    
    
    mfcc_cmvn = np.mean(speechpy.processing.cmvnw(mfcc,win_size=301,variance_normalization=True), axis=0)
    #print("Shape of VarNorm MFCC = {}\n".format(mfcc_cmvn.shape))
        
    mfcc = np.mean(mfcc,axis=0)
    #print("Shape of MFCC = {}\n".format(mfcc.shape))
    
    cfg = tsfel.get_features_by_domain("spectral")
    stat_features = np.array(tsfel.time_series_features_extractor(cfg, ys, fs=16000, window=320))[0]
    
    
    if lungParam == 'FVC':
    
        feature_matrix_r = np.hstack((mfcc_cmvn)) #melspectogram,#mfe,#mfcc,,mfcc_cmvn  removed
    else:
        feature_matrix_r = np.hstack((mfcc,mfcc_cmvn)) #melspectogram,mfe,mfcc,mfcc_cmvn  removed
    
    #feature_matrix_r = np.random.rand(66)
    
    return feature_matrix_r.T


def load_data(lungParam, sex, LH, file, use_height=False):
    '''
    feature: it is either 'FEV1', 'FVC' or 'PEF'
    '''
    data = pd.read_csv(file)
    
    if sex == 'M' or sex == 'F':
        data = data[data['Sex'] == sex]
    if LH == 'Y' or LH == 'N':
        data = data[data['LH'] == LH]
    
    
    #get the ground truth
    y=  np.array([i for i in data['g'+lungParam]])
    
    num_filters = 40#40
    num_cepstral = 5 #10
    fft_length = 512#512
    
    #prepare to store a lot of features in X
    X=[]
    count = 0
    for file in data['Filename']:
        #print(file.split('.')[0])
        features = get_features("data-forced-breathing-1/N95/"+file, lungParam, num_filters, num_cepstral, fft_length)
        #print("Size of features = ",features.shape)
        if use_height:
            uid = data.loc[data['Filename'] == file, 'Name'].values[0]
            height = measured.loc[measured['UID'] == uid, 'height'].values[0]
            features = np.append(features, height)
        Audio_Features[file] = features
        X.append(features)
        count += 1
        #print("\n")
    X =  np.array(X)
    
    
    #add the estiamted lung param as a feature
    #estiamtedValues = np.atleast_2d(data['r'+lungParam].to_numpy()).T
    #X = np.hstack((X, estiamtedValues))
    # Return arrays to plug into sklearn's caross-validation algorithms
    return X, np.array(y)

In [None]:
#options for first param: 'FVC', 'FEV1', 'PEF'

X, Y = load_data('FVC', 'A', 'A', "data_FVC_N95.csv", use_height=False)
print("Shape of input data is {} and the output data is {}".format(X.shape, Y.shape))

#remove faulty samples based on BEV and Wrong FV Curve
all_set = np.delete(X,[23,55,4,9,52,44,45,33,43,20,1,50], axis=0) 
all_labels = np.delete(Y,[23,55,4,9,52,44,45,33,43,20,1,50], axis=0)

In [None]:
#save if required
np.save('features-data/FVC_FEATURES_60.npy',X)
np.save('features-data/FVC_LABELS_60.npy',Y)

5.66 FVC