In [None]:
import librosa
from librosa.effects import pitch_shift
import os
import pathlib
import random
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from librosa.feature import mfcc
from sklearn import svm
import scipy.io.wavfile as wav
import scipy.signal as signal
from sklearn.model_selection import train_test_split
import tensorflow as tf
features = []
labels = []
dataset_path: str = "C:\\Users\\rclendening\\researchData\\RedVox_TrainingBinary_wYTVids"
data_dir = pathlib.Path(dataset_path)
droneDict = {  # One hot encoding for labels probs should do it like I did below?
    "Drone": [1, 0],
    "Noise": [0, 1]
}
droneCountDict = {  # One hot encoding for labels
    "Drone": 0,
    "Noise": 1
}

dataCount = [0, 0]
# drones = np.array(tf.io.gfile.listdir(str(data_dir)))
# filenames = tf.io.gfile.glob(str(data_dir) + '/*/*')
# filenames = tf.random.shuffle(filenames)
# num_samples = len(filenames)
# train_files = filenames
# x = round((len(train_files) / 5))
# train_files = train_files[:x]
# print("Total num of samples: ", num_samples)
# print("Number of examples per label:", len(tf.io.gfile.listdir(str(data_dir / drones[0]))))
# print("Example file tensor: ", filenames[0])
# print(drones)
train_files=[]
for path, subdirs, files in os.walk(dataset_path):
    for name in files:
        train_files.append(os.path.join(path, name))
# test_file = tf.io.read_file(
#     "C:\\Users\\rclendening\\researchData\\Training_Data_NM_RS\\IF1200\\d301sA1r01p0120210823_6.wav")
# test_audio, _ = tf.audio.decode_wav(contents=test_file)
# test_audio.shape


def split_audio(waveData, labelName, sampleFreq):
    '''
    Frames audio data and converts to feature space (MFCC)
    :param waveData: waveData array of time-domain audio
    :param frame_duration: Duration of frames desired
    :param startTime: Start for each clip
    :param sampleFreq: Sample Frequency (8Khz)
    :param labelName: Name of label
    @return list of features (ds), list of labels corresponding to feature dataset:
    '''
    # middle third of data
    duration = waveData.shape[0]
    startTime = np.round(duration / 3)
    endTime = np.round(duration * 2 / 3)
    waveDataSplit= waveData[int(startTime):int(endTime)]
    features=MFCCCalc(waveDataSplit.squeeze(), sampleFreq)
    dataCount[droneCountDict[labelName]] += features.shape[1]
    label= [droneDict[labelName]] * features.shape[1]
    return features, label

def create_dataset(train_files):
    '''
    Creates feature dataset and label dataset.
    @param train_files: EagerTensor of file paths.
    @return list of features (ds), list of labels corresponding to feature dataset:
    '''
    i = 0
    features = []
    labels = []
    for x in train_files:
        #test_file = tf.io.read_file(x)
        #test_audio, sampleRate = tf.audio.decode_wav(contents=test_file)
        test_audio, sampleRate = librosa.load(x, sr=8000)
        if min(np.asarray(test_audio)) != 0:
            x = str(x)
            label = x.split('\\')
            label = label[5]
            newData = test_audio[0: test_audio.shape[0] - test_audio.shape[0] % sampleRate]  # trim to nearest second
            newFeats, newLabs = split_audio(newData, label, int(sampleRate))
            features.extend(newFeats.transpose())
            labels.extend(newLabs)

    return features, labels

def MFCCCalc(audioData, Fs):
    '''
    Converts decoded wav file to MFCC feature space
    @param audioData: Numpy array of decoded audio wav file
    @return MFCC coefficients
    '''
    #audioData=audioData.numpy()
    data= audioData.astype(float)
    #coefs = mfcc(data, sr=sampleRate, hop_length=2048)
    coefs = mfcc(y=data, hop_length=2048,n_mfcc=40, sr=Fs)

    return coefs

def grabTrainingSamples(n, trainingData):
    '''
    Ensures even training set by grabbing an even amount of training samples from each class.
    @param n: limiting class count
    @param trainingData: trainingData list that includes both features and labels
    @return MFCC coefficients
    '''
    droneCount=0
    noiseCount=0
    evenTrainingData = []
    evenLabelData = []
    for i in range(len(labels)):
        lab = trainingData[i][1]
        if lab == [1, 0] and droneCount < n:
            droneCount += 1
            evenTrainingData.append(trainingData[i][0])
            evenLabelData.append(lab)
        elif lab == [0,1] and noiseCount < n:
            noiseCount += 1
            evenTrainingData.append(trainingData[i][0])
            evenLabelData.append(lab)
    return evenTrainingData, evenLabelData

Fs = 8000
numFeat = 40 #COULD BE SOURCE OF ERROR
features, labels = create_dataset(train_files)
newSet = list(zip(features, labels))
random.seed(42)
random.shuffle(newSet)  # Ensure data is mixed together
n = np.min(dataCount)  # Ensure data is symmetric (aka even amounts of training data for all classes)
# features, labels = grabTrainingSamples(n, features, labels)
features, labels = grabTrainingSamples(n, newSet)

trainFeatures, testFeatures, trainTruth, testTruth = train_test_split(features, labels, test_size=0.8, random_state=42)
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_val_score, KFold
from sklearn.model_selection import GridSearchCV

newLabels=[]
for x in trainTruth: #convert one hot to actual numbers
    if x[0] == 1:
        val=0
    else:
        val=1
    newLabels.append(val)
newTestLabels=[]
for x in testTruth: #convert one hot to actual numbers
    if x[0] == 1:
        val=0
    else:
        val=1
    newTestLabels.append(val)


## Complex Model Evaluation (Linear SVM)

This function performances' linear svm with cross validation.

In [39]:
scalar= StandardScaler()
linear_model = svm.SVC(C=0.001,kernel='linear')
pipeline= Pipeline([('transformer', scalar), ('estimator', linear_model)])
cv = KFold(n_splits=5)
scores= cross_val_score(pipeline,trainFeatures,newLabels,cv=cv)

print(np.average(scores))
pipeline.fit(trainFeatures,newLabels)
print(pipeline.score(testFeatures,newTestLabels))


0.8402173913043478
0.8356459676835012


## Complex Model Evaluation (Poly SVM)

This function performances' linear svm with cross validation.

In [None]:
scalar= StandardScaler()
poly_model = svm.SVC(kernel='poly')
param_grid= {
    'estimator__C':[1,10,100], 'estimator__gamma':[1,0.1,0.001,0.0001],'estimator__degree':[2,3,4]}
pipeline= Pipeline([('transformer', scalar), ('estimator', poly_model)])
cv = KFold(n_splits=4)
search = GridSearchCV(pipeline, param_grid,refit=True,verbose=3, n_jobs=-1)
print(search.fit(trainFeatures,newLabels))
print(search.best_params_)
print(search.best_score_)
scores= cross_val_score(pipeline,trainFeatures,newLabels,cv=cv)
print(np.average(scores))
pipeline.fit(trainFeatures,newLabels)
print(pipeline.score(testFeatures,newTestLabels))

In [44]:
print(pipeline.get_params())

{'memory': None, 'steps': [('transformer', StandardScaler()), ('estimator', SVC(kernel='poly'))], 'verbose': False, 'transformer': StandardScaler(), 'estimator': SVC(kernel='poly'), 'transformer__copy': True, 'transformer__with_mean': True, 'transformer__with_std': True, 'estimator__C': 1.0, 'estimator__break_ties': False, 'estimator__cache_size': 200, 'estimator__class_weight': None, 'estimator__coef0': 0.0, 'estimator__decision_function_shape': 'ovr', 'estimator__degree': 3, 'estimator__gamma': 'scale', 'estimator__kernel': 'poly', 'estimator__max_iter': -1, 'estimator__probability': False, 'estimator__random_state': None, 'estimator__shrinking': True, 'estimator__tol': 0.001, 'estimator__verbose': False}


## Complex Model Evaluation (RBF)

This function performances' hyperparameter tuning of the RBF kernel for SVM. In TS which contains 80% of data, it achieves .90073 accuracy rate.

In [23]:
scalar= StandardScaler()
model = svm.SVC(kernel='rbf')
pipeline= Pipeline([('transformer', scalar), ('estimator', model)])
cv = KFold(n_splits=10)
param_grid= {
    'estimator__C':[0.1,1,10,100], 'estimator__gamma':[1,0.1,0.01,0.001,0.0001]}

search = GridSearchCV(pipeline, param_grid,refit=True,verbose=3, n_jobs=-1)
print(search.fit(trainFeatures,newLabels))
print(search.best_params_)
print(search.best_score_)
newTestLabels=[]
for x in testTruth: #convert one hot to actual numbers
    if x[0] == 1:
        val=0
    else:
        val=1
    newTestLabels.append(val)
search.score(testFeatures,newTestLabels)

{'estimator__C': 100, 'estimator__gamma': 0.01}
0.9008695652173913


0.9007318310267372

## Simple Model Evaluation: LDA/QDA

The fact that LDA outperforms QDA suggests that there is more of a linear decision boundary within the dataset.

In [29]:
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis, QuadraticDiscriminantAnalysis
lda= LinearDiscriminantAnalysis()
qda = QuadraticDiscriminantAnalysis()
lda.fit(trainFeatures,newLabels)
scores= cross_val_score(lda,trainFeatures,newLabels,cv=5)
print(scores)
qda.fit(trainFeatures,newLabels)
scores= cross_val_score(qda,trainFeatures,newLabels,cv=5)
print(scores)
print("LDA Test Set Score",lda.score(testFeatures,newTestLabels))
print("QDA Test Set Score",qda.score(testFeatures,newTestLabels))

[0.82753623 0.84963768 0.83188406 0.83623188 0.85072464]
[0.77681159 0.80217391 0.77826087 0.78985507 0.78985507]
LDA Test Set Score 0.8367690747047315
QDA Test Set Score 0.7816281428881965
