## INITIALIZATION

In [120]:
# import numpy for math calculations
import numpy as np
def dummy_npwarn_decorator_factory():
  def npwarn_decorator(x):
    return x
  return npwarn_decorator
np._no_nep50_warning = getattr(np, '_no_nep50_warning', dummy_npwarn_decorator_factory)

# import pandas for data (csv) manipulation
import pandas as pd

# import gc to collect garbage
import gc

# import matplotlib for plotting
import matplotlib.pyplot as plt
import matplotlib
matplotlib.style.use('fivethirtyeight') 
%matplotlib inline

# import seaborn for more plotting options(built on top of matplotlib)
import seaborn as sns

# import librosa for analysing audio signals : visualize audio, display the spectogram
import librosa
import soundfile as sf

# import librosa for analysing audio signals : visualize audio, display the spectogram
import librosa.display

# import wav for reading and writing wav files
import wave

# import IPython.dispaly for playing audio in Jupter notebook
import IPython.display as ipd

# import os for system operations
import os

# import random for get random values/choices
import random

from scipy.stats import skew
from scipy.stats import kurtosis

# importing Machine Learning Models
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC, LinearSVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC

# importing from sklearn the evaluation metrics for classification
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import log_loss
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report

# importing from sklearn model selection 
from sklearn.model_selection import PredefinedSplit, GridSearchCV, train_test_split, cross_val_score, StratifiedKFold, learning_curve

from sklearn.preprocessing import MinMaxScaler

from sklearn.impute import SimpleImputer, MissingIndicator

from sklearn.pipeline import FeatureUnion, Pipeline, make_pipeline

# import tqdm to show a smart progress meter
from tqdm.notebook import trange,tqdm

# import warnings to hide the unnessairy warniings
import warnings
warnings.filterwarnings('ignore')

In [121]:
def seed_everything(seed=42): 
    random.seed(seed) 
    os.environ['PYTHONHASHSEED'] = str(seed) 
    np.random.seed(seed) 
SEED = 42
seed_everything(SEED)

## CONFIGURATION

In [122]:
# To get a list of the pathes of all the audio files
INPUT_DIR = "./ESC-50/input/audio/"
INPUT_FEATURE_FILE = "./ESC-50/input/Esc50_features_extracted.csv"
AUG_DIR = "./ESC-50/augmented_input/audio/"
AUG_FEATURE_FILE = "./ESC-50/augmented_input/Esc50_features_extracted.csv"

#
AUGMENT_DATA = False
        
# A dictionary to decode the categories into targets
DECODER = {0: 'dog', 14: 'chirping_birds', 36: 'vacuum_cleaner', 19: 'thunderstorm', 30: 'door_wood_knock',34: 'can_opening', 9: 'crow', 22: 'clapping', 48: 'fireworks', 41: 'chainsaw', 47: 'airplane', 31: 'mouse_click', 17: 'pouring_water', 45: 'train', 8: 'sheep', 15: 'water_drops', 46: 'church_bells', 37: 'clock_alarm', 32: 'keyboard_typing', 16: 'wind', 25: 'footsteps', 4: 'frog', 3: 'cow', 27: 'brushing_teeth', 43: 'car_horn', 12: 'crackling_fire', 40: 'helicopter', 29: 'drinking_sipping', 10: 'rain', 7: 'insects', 26: 'laughing', 6: 'hen', 44: 'engine', 23: 'breathing', 20: 'crying_baby', 49: 'hand_saw', 24: 'coughing', 39: 'glass_breaking', 28: 'snoring', 18: 'toilet_flush', 2: 'pig', 35: 'washing_machine', 38: 'clock_tick', 21: 'sneezing', 1: 'rooster', 11: 'sea_waves', 42: 'siren', 5: 'cat', 33: 'door_wood_creaks', 13: 'crickets'}

# A dictionary to encode the categories into targets
ENCODER = {'dog': 0, 'chirping_birds': 14, 'vacuum_cleaner': 36, 'thunderstorm': 19, 'door_wood_knock': 30, 'can_opening': 34, 'crow': 9, 'clapping': 22, 'fireworks': 48, 'chainsaw': 41, 'airplane': 47, 'mouse_click': 31, 'pouring_water': 17, 'train': 45, 'sheep': 8, 'water_drops': 15, 'church_bells': 46, 'clock_alarm': 37, 'keyboard_typing': 32, 'wind': 16, 'footsteps': 25, 'frog': 4, 'cow': 3, 'brushing_teeth': 27, 'car_horn': 43, 'crackling_fire': 12, 'helicopter': 40, 'drinking_sipping': 29, 'rain': 10, 'insects': 7, 'laughing': 26, 'hen': 6, 'engine': 44, 'breathing': 23, 'crying_baby': 20, 'hand_saw': 49, 'coughing': 24, 'glass_breaking': 39, 'snoring': 28, 'toilet_flush': 18, 'pig': 2, 'washing_machine': 35, 'clock_tick': 38, 'sneezing': 21, 'rooster': 1, 'sea_waves': 11, 'siren': 42, 'cat': 5, 'door_wood_creaks': 33, 'crickets': 13}

## DATA AUGMENTATION

In [123]:
def add_noise(data):
    noise = np.random.normal(0, 0.1, len(data))
    audio_noisy = data + noise
    return audio_noisy
    
def pitch_shifting(data):
    sr  = 16000
    bins_per_octave = 12
    pitch_pm = 2
    pitch_change =  pitch_pm * 2*(np.random.uniform())   
    data = librosa.effects.pitch_shift(y = data.astype('float64'),  sr = sr, n_steps=pitch_change, bins_per_octave=bins_per_octave)
    return data

def random_shift(data):
    timeshift_fac = 0.2 *2*(np.random.uniform()-0.5)  # up to 20% of length
    start = int(data.shape[0] * timeshift_fac)
    if (start > 0):
        data = np.pad(data,(start,0),mode='constant')[0:data.shape[0]]
    else:
        data = np.pad(data,(0,-start),mode='constant')[0:data.shape[0]]
    return data

def volume_scaling(data):
    sr  = 16000
    dyn_change = np.random.uniform(low=1.5,high=2.5)
    data = data * dyn_change
    return data
    
def time_stretching(data, rate=1.5):
    input_length = len(data)
    streching = data.copy()
    streching = librosa.effects.time_stretch(y = streching, rate = rate)
    
    if len(streching) > input_length:
        streching = streching[:input_length]
    else:
        streching = np.pad(data, (0, max(0, input_length - len(streching))), "constant")
    return streching
def save_augmentation(filepath, aug):
    aug = np.array(aug,dtype='float32').reshape(-1,1)
    sf.write(filepath, aug, 16000, 'PCM_24')

In [124]:
def data_aug(input_dir, aug_dir):
    if not os.path.exists(aug_dir):
      os.makedirs(aug_dir)
      path_ = np.random.choice(os.listdir(input_dir), size = (2000,), replace= False)
      for k,files in zip(trange(len(path_)), path_):
          data_, fs = librosa.load(os.path.join(input_dir, files), sr = 16000)
          noise_data = add_noise(data_)
          # pitch_data = pitch_shifting(data_)
          random_shift_data = random_shift(data_)
          volume_scale_data = volume_scaling(data_)
          time_stretching_data =  time_stretching(data_, rate=1.5)
          aug = [noise_data,time_stretching_data, random_shift_data,volume_scale_data ]
          for j in range(len(aug)):
            filepath = os.path.join(aug_dir, files[0:2]+'generated'+'-'+str(j)+'-'+str(k)+'-'+files[2:])
            save_augmentation(filepath, aug[j])

In [125]:
if AUGMENT_DATA:
  data_aug(INPUT_DIR, AUG_DIR)

## FEATURE EXTRACTION

In [126]:
def get_dataset_from_files(dir):
    cols = ['filename','fold','target','files_path']
    data = []
    for path, subdirs, files in os.walk(dir):
        for name in files:
            fold = int(name[0])
            target = int(name.split('-')[-1].replace('.wav', ''))
            file_path = dir + name
            data.append((name, fold, target, file_path))
    dataset = pd.DataFrame(data, index = range(len(data)), columns = cols)
    return(dataset)

In [127]:
def feature_stats(values):
    stats = []
    stats.extend(np.mean(values, axis=1))
    stats.extend(np.std(values, axis=1))
    stats.extend(skew(values, axis=1))
    stats.extend(kurtosis(values, axis=1))
    stats.extend(np.median(values, axis=1))
    stats.extend(np.min(values, axis=1))
    stats.extend(np.max(values, axis=1))
    return(stats)

def extract_features_from_audio_file(audio_path):
    y , sr = librosa.load(audio_path, mono=True)

    zcr = librosa.feature.zero_crossing_rate(y)
    cqt = np.abs(librosa.cqt(y, sr=sr, tuning=None))

    chroma_cqt = librosa.feature.chroma_cqt(C=cqt, n_chroma=12)
    chroma_cens = librosa.feature.chroma_cens(C=cqt, n_chroma=12)
    tonnetz = librosa.feature.tonnetz(chroma=chroma_cens)

    del cqt
    S, phase = librosa.magphase(librosa.stft(y))
    power_S = S**2
    del y

    chroma_stft = librosa.feature.chroma_stft(S=power_S, n_chroma=12)

    rmse = librosa.feature.rms(S=S)

    spectral_centroid = librosa.feature.spectral_centroid(S=S)
    spectral_bandwidth = librosa.feature.spectral_bandwidth(S=S)
    spectral_contrast = librosa.feature.spectral_contrast(S=S, n_bands=6)
    spectral_rolloff = librosa.feature.spectral_rolloff(S=S)
    spectral_flatness = librosa.feature.spectral_flatness(S=S)

    mel = librosa.feature.melspectrogram(sr=sr, S=power_S)
    del S, power_S
    mfcc = librosa.feature.mfcc(S=librosa.power_to_db(mel), n_mfcc=20)

    features = [chroma_stft,chroma_cqt,chroma_cens,tonnetz,mfcc,rmse,zcr,spectral_centroid,spectral_bandwidth,spectral_contrast,spectral_rolloff,spectral_flatness]
    stats = []
    for val in features:
        stats.extend(feature_stats(val))
    return(stats)


def extract_features_from_dir(audio_dir):
    dataset = get_dataset_from_files(audio_dir)
    
    stats = ['mean','std','skew','kurtosis','median','min','max']
    cols = []
    data = []
    
    feature_sizes = {'chroma_stft':12, 'chroma_cqt':12, 'chroma_cens':12,
                         'tonnetz':6, 'mfcc':20, 'rmse':1, 'zcr':1,
                         'spectral_centroid':1, 'spectral_bandwidth':1,
                         'spectral_contrast':7, 'spectral_rolloff':1,
                         'spectral_flatness':1
                         }

    def generate_columns(name, values):
        for stat in stats:
            for i in range(values):
              column = stat + '_' + name
              if values > 1 :
                 column = column + f'_{i}'
              cols.append(column)

    for key, value in feature_sizes.items():
       generate_columns(key, value)

    n_samples = dataset['files_path'].shape[0]
    for i in trange(n_samples):
      data.append(extract_features_from_audio_file(dataset['files_path'][i]))

    feature_set = pd.DataFrame(data, index = range(len(data)), columns = cols)

    return(pd.concat([dataset,feature_set], axis=1))

def get_features_dataset(audio_dir,feature_file):
    if os.path.exists(feature_file):
      dataset = pd.read_csv(feature_file)
    else:
      dataset = extract_features_from_dir(audio_dir)
      dataset.to_csv(feature_file, index=False)
    return(dataset)
       

In [128]:
feature_dataset = get_features_dataset(INPUT_DIR,INPUT_FEATURE_FILE)
if AUGMENT_DATA:
  augmented_feature_dataset = get_features_dataset(AUG_DIR,AUG_FEATURE_FILE)

## DATA PREPARATION

In [129]:
def split_data(dataset):

  learn = dataset[dataset['fold'] < 5]
  validate = feature_dataset[feature_dataset['fold'] == 5]

  X_validate = validate.drop(columns=['fold','filename','target','files_path'])
  y_validate = validate.target

  ps = PredefinedSplit(learn.fold)
  X = learn.drop(columns=['fold','filename','target','files_path'])
  y = learn.target

  return X, y, ps, X_validate, y_validate

In [130]:
if AUGMENT_DATA:
  X, y, ps, X_validate, y_validate = split_data(augmented_feature_dataset)
else:
  X, y, ps, X_validate, y_validate = split_data(feature_dataset)

## GRID SEARCH CROSS-VALIDATION

In [131]:
class MyGridSearchCV:
    
  def __init__(self,type,param_grid):
    self.type=type

    self.scaler = MinMaxScaler()

    self.filler_list=[
        ('features', SimpleImputer(strategy='mean')),
        ('indicators', MissingIndicator(features="all"))]
    
    self.filler = FeatureUnion(transformer_list=self.filler_list)

    self.estimator = self.get_estimator()

    self.grid_search = GridSearchCV(estimator=self.estimator,
                              param_grid=param_grid,
                              #  n_jobs=-1,
                              cv=ps,
                              scoring='accuracy',
                              verbose=3)
    

    self.gridsearch_pipeline_steps=[  ('scaling'  , self.scaler),
                                      ('filler'   , self.filler),
                                      ('gridsearch', self.grid_search)]

    self.gridsearch_pipeline = Pipeline(steps=self.gridsearch_pipeline_steps, verbose=True)
  
  def get_estimator(self):
    if self.type == 'LinearSVC':
      return LinearSVC(random_state=SEED)
    elif self.type == 'SVC':
      return SVC(random_state=SEED)
    elif self.type == 'KNeighborsClassifier':
      return KNeighborsClassifier()
    elif self.type == 'RandomForestClassifier':
      return RandomForestClassifier(random_state=SEED)

  def tune_hyperparameters(self,X,y):
    self.gridsearch_pipeline.fit(X, y)
    print(
      "The best parameters are %s with a score of %0.2f"
      % (self.gridsearch_pipeline.named_steps['gridsearch'].best_params_, self.gridsearch_pipeline.named_steps['gridsearch'].best_score_)
    )

    self.best_estimator_=self.gridsearch_pipeline.named_steps['gridsearch'].best_estimator_
    self.best_pipeline_steps=[  ('scaling'  , self.scaler),
                                ('filler'   , self.filler),
                                ('estimator', self.best_estimator_)]

    self.best_pipeline = Pipeline(steps=self.best_pipeline_steps, verbose=True)

    y_predicted = self.predict(X)
    self.accuracy_train = accuracy_score(y,y_predicted)
    self.f1_score_train = f1_score(y,y_predicted,average='macro')
    self.precision_score_train = precision_score(y,y_predicted,average='macro')
    self.recall_score_train = recall_score(y,y_predicted,average='macro')

  def predict(self,X):
    y_predicted = self.best_pipeline.predict(X)
    return(y_predicted)

  def validate(self,X,y):
    y_predicted = self.predict(X)
    self.accuracy_validate = accuracy_score(y,y_predicted)
    self.f1_score_validate = f1_score(y,y_predicted,average='macro')
    self.precision_score_validate = precision_score(y,y_predicted,average='macro')
    self.recall_score_validate = recall_score(y,y_predicted,average='macro')
    cols = ['dataset', 'accuracy', 'f1_score', 'precision', 'recall']
    data = []
    data.append(('Training+Testing', self.accuracy_train, self.f1_score_train, self.precision_score_train, self.recall_score_train))
    data.append(('Validation', self.accuracy_validate, self.f1_score_validate, self.precision_score_validate, self.recall_score_validate))
    report = pd.DataFrame(data, index = None, columns = cols)
    return(report.style.background_gradient(cmap= plt.cm.Blues))
  
  
  def plot_confusion_matrix(self, X, y, title='Confusion matrix', cmap=plt.cm.Blues):
    self.cm = confusion_matrix(y, self.predict(X))
    plt.imshow(self.cm, interpolation='nearest', cmap=plt.cm.Blues)
    plt.title(title)
    plt.colorbar()
    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

  def label(self, X):
    return(self.decode(self.predict(X)))

  def decode(self, labels):
    decoded = []
    for label in labels:
      decoded.append(DECODER[label])
    return decoded
  
  def encode(self, labels):
    encoded = []
    for label in labels:
      encoded.append(ENCODER[label])
    return encoded


#### LinearSVC

In [None]:
param_grid = {
    'C': [0.001, 0.01, 0.1, 1, 10, 100, 1000]
    # 'C': [0.1*2**(-3), 0.1*2**(-2), 0.1*2**(-1), 0.1*2**0, 0.1*2**1, 0.1*2**2, 0.1*2**3]
    # 'C': [0.001, 0.01, 0.1, 1, 10],
    # 'C': [0.01, 0.1, 1]
    # 'penalty': ['l1', 'l2'],
    # 'loss': ['hinge', 'squared_hinge'],
    # 'multi_class': ['ovr', 'crammer_singer']
    # 'tol': [1e-5, 1e-4, 1e-3, 1e-2, 1e-1, 1]
}
best_param_grid = {
    'C': [0.1]
}
# grid_linearsvc = MyGridSearchCV('LinearSVC',param_grid)
grid_linearsvc = MyGridSearchCV('LinearSVC',best_param_grid)
grid_linearsvc.tune_hyperparameters(X, y)

In [None]:
grid_linearsvc.gridsearch_pipeline

In [None]:
grid_linearsvc.validate(X_validate,y_validate)

In [None]:
grid_linearsvc.plot_confusion_matrix(X_validate, y_validate)

In [None]:
grid_linearsvc.label(X_validate)

#### SVC

In [None]:
param_grid = {
    # 'C': [0.001, 0.01, 0.1, 1, 10, 100, 1000],
    'C': [0.001, 0.1, 10],
    'gamma': [1, 10, 100, 1000],
    # 'kernel': ['rbf','linear','poly']
    'kernel': ['linear','poly']
}
grid_svc = MyGridSearchCV('SVC',param_grid)
grid_svc.tune_hyperparameters(X, y)

In [None]:
grid_svc.validate(X_validate,y_validate)

#### KNeighbors

In [None]:
param_grid ={
  'n_neighbors'  :  [1,10, 1],
  'leaf_size'    :  [20,40,1],
  'p'            :  [1,2],
  'weights'      :  ['uniform', 'distance'],
  'metric'       :  ['minkowski', 'chebyshev']
}
grid_kneighbors = MyGridSearchCV('KNeighborsClassifier',param_grid)
grid_kneighbors.tune_hyperparameters(X, y)

In [None]:
grid_kneighbors.validate(X_validate,y_validate)

#### RandomForestClassifier

In [None]:
param_grid = { 
    # 'n_estimators': [25, 50, 100, 150], 
    'n_estimators': [50], 
    'max_depth': [3, 6, 9], 
    'min_samples_split': [2, 5, 10], 
    # 'min_samples_leaf': [1, 5], 
    'min_samples_leaf': [1], 
    # 'max_features': ['sqrt', 'log2', None], 
    'max_features': ['log2'], 
    'max_leaf_nodes': [3, 6, 9], 
    # 'max_leaf_nodes': [None], 
} 

grid_rfc = MyGridSearchCV('RandomForestClassifier',param_grid)
grid_rfc.tune_hyperparameters(X, y)

In [None]:
grid_rfc.validate(X_validate,y_validate)