In [None]:
# Internal
import os
import sys 
import time
import random
import glob
import csv
from ast import literal_eval

# External
import pandas as pd
import numpy as np
import librosa
from pydub import AudioSegment, effects
from sklearn.model_selection import KFold
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.metrics import accuracy_score, hamming_loss, multilabel_confusion_matrix, classification_report
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.callbacks import ModelCheckpoint

## Database generation with pydub (2 samples per file)

In [None]:
source_dir = './UrbanSounds8K/audio/'
dest_dir = './multi/' # new database is stored in multi folder next to original audio folder
folds = np.array(['fold1','fold2','fold3','fold4',
                  'fold5','fold6','fold7','fold8',
                  'fold9','fold10'])

multi_num = 2 # number of overlayed files
files = [0] * multi_num
labels = [0] * multi_num
samples = [0] * multi_num
sounds = [0] * multi_num
names = [0] * multi_num
mult_dict = {}
redo = False

for fold in folds:
    fold_files = os.listdir(source_dir+fold)
    fold_files = [f for f in fold_files if f.endswith('.wav')]
    random.shuffle(fold_files)
    print(f'On fold: {fold}')
    len(fold_files)

    while len(fold_files) > 1:
        rand_gain = [random.randint(-6,0) for _ in range(multi_num)]
        p_ratio = [0] * multi_num

        for j in range(multi_num):
            try:
                samples[j] = fold_files[j]
                sounds[j] = AudioSegment.from_file(f'{source_dir}{fold}/{samples[j]}')
            except:
                fold_files.remove(fold_files[j]) # remove file from list unable to load - prevents unused files when 1st loads and 2nd fails
                redo = True 
                break

        if redo is True:
            print('Unable to read file...')
            redo = False # reset
            continue # back to iter of last successful load
        else:
            del fold_files[0:multi_num] # pop loaded files off top
        
        for j in range(multi_num): # go through again to mix
            print(f'Processing: {samples[j]}')
            labels[j] = samples[j].split('-')[1] # get label
            sounds[j] = effects.normalize(sounds[j]) # normalize
            sounds[j] = sounds[j] + rand_gain[j] # add random gain reduction
            p_ratio[j] = pow(10, rand_gain[j]/10) # convert to power
            names[j] = f'-{labels[j]}({round(p_ratio[j],2)})'

        combined = []
        combined.append(sounds[0].overlay(sounds[1], times=20)) # overlay sound (with repeat if base sound is longer)
        
        # this would be used to further overlay strings but here we only use 2 sounds so no use
        # for j in range(multi_num - 1):
            # combined.append(combined[j].overlay(sounds[j+2], times=20))

        name_info = ''.join(names)
        fn = f'{dest_dir}{fold}/comb{name_info}.wav' # create unique filename with label
        combined[-1].export(fn, format="wav")

        full_label = []
        for j in range(multi_num):
            full_label.append((labels[j], p_ratio[j]))
        mult_dict[fn] = full_label

# Save new database labels as csv
df = pd.DataFrame.from_dict(mult_dict, orient="index")
df.to_csv(f"{dest_dir}multi-labels.csv")

## Define extraction functions

In [None]:
dest_dur = './multi'

# Load label csv as dict
reader = csv.reader(open(f'{dest_dur}/multi-labels.csv'))
multi_dict = {}
for column in reader:
    key = column[0]
    if key in multi_dict:
        pass
    multi_dict[key] = column[1:]

### Extract features for multi-category ###
def extract_features_multi(parent_dir,sub_dir,file_ext="*.wav",
                     bands=60,frames=41):
    def _windows(data, window_size):
        start = 0
        while start < len(data):
            yield int(start), int(start + window_size)
            start += (window_size // 2)
            
    window_size = 512 * (frames - 1) # size of sliding window for taking multiple samples of each sound, could be extended
    features, labels = [], []
    for fn in glob.glob(os.path.join(parent_dir, sub_dir, file_ext)):
        segment_log_specgrams, segment_labels = [], []
        sound_clip, xsr = librosa.load(fn)
        print('Processing: %s' % fn)
        try:
            label = multi_dict[fn] # label
            label = [int(literal_eval(point)[0]) for point in label] # add labels (multi categories) ex. [1,2]
        except KeyError: # if the sample is not in the database, ie. unseen
            label = None
        
        # Extract features per window/segment
        for (start,end) in _windows(sound_clip,window_size):
            if(len(sound_clip[start:end]) == window_size):
                signal = sound_clip[start:end]
                melspec = librosa.feature.melspectrogram(signal,n_mels=bands) # extracting melspec with 60 bands
                # spec = librosa.feature.spectrogram(signal, n_mels=bans) # may be interest in testing performance of spectrogram
                logspec = librosa.amplitude_to_db(melspec)  # converting to log scale
                logspec = logspec.T.flatten()[:, np.newaxis].T
                segment_log_specgrams.append(logspec)
                segment_labels.append(label)
            
        segment_log_specgrams = np.asarray(segment_log_specgrams).reshape(
            len(segment_log_specgrams),bands,frames,1) # reshape to (60, 41, 1)
        segment_features = np.concatenate((segment_log_specgrams, np.zeros(
            np.shape(segment_log_specgrams))), axis=3) # add one layer to axis -> (60, 40, 2)
        for i in range(len(segment_features)): 
            segment_features[i, :, :, 1] = librosa.feature.delta(
                segment_features[i, :, :, 0]) # extract deltas into new layer
        
        if len(segment_features) > 0: # check for empty segments 
            features.append(segment_features)
            labels.append(segment_labels)

    return features, labels

## Extract features into file (per fold and full)

In [None]:
# Pre-process and extract feature from the data (per fold)
parent_dir = './multi'
feat_dir = "./multi-processed"
folds = sub_dirs = np.array(['fold1','fold2','fold3','fold4',
                  'fold5','fold6','fold7','fold8',
                  'fold9','fold10'])

for fold in folds:
    print(f'Now in: {fold}')
    features, labels = extract_features_multi(parent_dir,fold)
    np.savez(f'{feat_dir}/{fold}', features=features, labels=labels)

In [None]:
# Create single file for full dataset through appending - quick)
feat_dir = "./multi-processed"
folds = sub_dirs = np.array(['fold1','fold2','fold3','fold4',
                  'fold5','fold6','fold7','fold8',
                  'fold9','fold10'])
full_features = []
full_labels = []

for fold in folds:
    data = np.load(f'{feat_dir}/{fold}.npz', allow_pickle=True)
    features = data["features"]
    labels = data["labels"]
    full_features.append(features)
    full_labels.append(labels)
    
np.savez(f'{feat_dir}/full_features', features=full_features, labels=full_labels) # combined and saved

## Define architecture of CNN with Keras


In [None]:
def get_network_multi():
    num_filters = [24, 32, 64, 128] 
    pool_size = (2, 2) 
    kernel_size = (3, 3) 
    input_shape = (60, 41, 2) # two layers for melspec and deltas of melspec
    num_classes = 10 # categories of sound
    keras.backend.clear_session()
    
    model = keras.models.Sequential()
    model.add(keras.layers.Conv2D(24, kernel_size,
                padding="same", input_shape=input_shape))
    model.add(keras.layers.BatchNormalization())
    model.add(keras.layers.Activation("relu"))
    model.add(keras.layers.MaxPooling2D(pool_size=pool_size))

    model.add(keras.layers.Conv2D(32, kernel_size,
                                  padding="same"))
    model.add(keras.layers.BatchNormalization())
    model.add(keras.layers.Activation("relu"))
    model.add(keras.layers.MaxPooling2D(pool_size=pool_size))
    
    model.add(keras.layers.Conv2D(64, kernel_size,
                                  padding="same"))
    model.add(keras.layers.BatchNormalization())
    model.add(keras.layers.Activation("relu")) 
    model.add(keras.layers.MaxPooling2D(pool_size=pool_size))
    
    model.add(keras.layers.Conv2D(128, kernel_size,
                                  padding="same"))
    model.add(keras.layers.BatchNormalization())
    model.add(keras.layers.Activation("relu"))  

    model.add(keras.layers.GlobalMaxPooling2D())
    model.add(keras.layers.Dense(128, activation="relu"))
    model.add(keras.layers.Dense(num_classes, activation='sigmoid')) # sigmoid provides unconstrained probability distrib vs softmax

    model.compile(optimizer=keras.optimizers.Adam(1e-4), 
        loss='binary_crossentropy', # one-hot encoding for binary arrays
        metrics=["categorical_accuracy"]) # provides better estimates of accuracy for one-hot labels
    return model

## Split data into training and test sets

In [None]:
cp_path = "./models/checkpoint.hdf5"
checkpoint = ModelCheckpoint(cp_path, monitor='loss', verbose=1, save_best_only=True,
                            mode='auto', save_freq='epoch')
load_dir = "./multi-processed"

data = np.load("{0}/{1}.npz".format(load_dir,'full_features'),
                allow_pickle=True)

features = np.concatenate(data["features"], axis = 0)
labels = np.concatenate(data["labels"], axis = 0)

x_train, x_test, y_train, y_test = train_test_split(features, labels, test_size=.2, callbacks=[checkpoint])

x_train = np.concatenate(x_train, axis = 0).astype(np.float32)
y_train = np.concatenate(y_train, axis = 0).astype(np.float32)

## Encode labels as one-hot binary arrays

In [None]:
mlb = MultiLabelBinarizer()
mlb.fit([[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]) # fit mlb to the category range

y_tr_array = np.array(y_train)
y_tr_hot = mlb.transform(y_tr_array) # this can be done because multi ones-hot arrays are not needed (as they are when comparing test to predict)

y_te_hot = []
for y in y_test: # for each window sample of one sound file in test set
    y_te_array = np.array(y)
    y_hot = mlb.transform(y_te_array) # encoding with transform
    y_te_hot.append(y_hot)
y_te_hot = np.array(y_te_hot)

## Train and model for 30 epochs

In [None]:
# number of epochs and batch size can be adjusted for an individual workstation

model = get_network_multi()
# model.load_weights(cp_path) # if interuption possible - load weights from last point, comment above line

history = model.fit(x_train, y_tr_hot, epochs = 30, batch_size = 48, verbose = 1)

# model.save('./models/models-%s' % time.time())
model.save('./models/models-multi-%s' % time.time()) 

## Load pre-trained model for testing

In [None]:
model = tf.keras.models.load_model('./models/models-multi-1600424459.55954')

## Test model performance with performance metrics

In [None]:
y_true, y_pred = [], []

print('Evaluating on test set...')
for x, y in zip(x_test, y_te_hot):
    # average predictions over segments of a sound clip
    pred = model.predict(x)
    avg_p = np.unique(np.argmax(pred, axis=1)) # avg predictions of each array
    hot_p = mlb.transform([avg_p]) # convert to one-hot encoding - no idea why it wont pass without brackets 

    # print(f'Predict: {hot_p}')
    y_pred.append(hot_p[0]) 
    
    # print(f'True: {y[0]}') 
    y_true.append(y[0]) # pick single label for a sound clip (they're all identical)

hamming = hamming_loss(y_true, y_pred)
accuracy = accuracy_score(y_true, y_pred)
     
print(f'Average Accuracy: {accuracy}')
print(f'Average Hamming: {hamming}\n')
print(f'{multilabel_confusion_matrix(y_true, y_pred)}\n')
print(classification_report(y_true, y_pred))

## Predicting on new data

In [None]:
parent_dir = './'
sub_dir = 'testing' # directory for unseen samples
test_files = [fn for fn in os.listdir(parent_dir+sub_dir) if fn.endswith('.wav')]

look_up = [
    "air_conditioner",  # 0
    "car_horn",         # 1
    "children_playing", # 2
    "dog_bark",         # 3
    "drilling",         # 4
    "engine_idling",    # 5
    "gun_shot",         # 6
    "jackhammer",       # 7
    "siren",            # 8
    "street_music"      # 9
    ]

print('\nPredicting!')
features, labels = extract_features_multi(parent_dir, sub_dir)

predictions = []
for x in features:
    pred = model.predict(x)
    avg_p = np.unique(np.argmax(pred, axis=1))
    predictions.append(avg_p)

print('\nPredictions')
for fn, pred in zip(test_files, predictions):
    print(f'File: {fn} -> Prediction category: {pred}')
    [print(look_up[i]) for i in pred]