In [17]:
import tensorflow as tf
from tensorflow import keras
import tensorflow_addons as tfa
import numpy as np

# DNN model 1 (Traditional CNN)

A reasonably standard CNN structure


In [24]:
from tensorflow.keras import Model
from tensorflow.keras.layers import Input, Dense, Conv2D, BatchNormalization, GlobalAvgPool2D, AvgPool2D, MaxPool2D, LeakyReLU, Concatenate, Dropout, SpatialDropout2D

def simple_cnn_model(input_shape, num_classes):
    filters = [32, 64, 128]
    bn_momentum=0.99
    leaky_alpha = 0.05
    dropout = 0.1
    
    inputs = Input(shape=input_shape)
    
    # first convolutional block
    # small filters
    features = BatchNormalization(momentum=bn_momentum)(inputs)
    features = LeakyReLU(leaky_alpha)(features)
    features = Conv2D(filters[0], (5,5), padding='same')(features)
    features = Concatenate()([AvgPool2D(pool_size=(2,2))(features),MaxPool2D(pool_size=(2,2))(features)])
    features = SpatialDropout2D(dropout)(features)
    
    # second convolutional block
    # moderate filters
    features = BatchNormalization(momentum=bn_momentum)(features)
    features = LeakyReLU(leaky_alpha)(features)
    features = Conv2D(filters[1], (5,5), padding='same')(features)
    features = Concatenate()([AvgPool2D(pool_size=(2,2))(features),MaxPool2D(pool_size=(2,2))(features)])
    features = SpatialDropout2D(dropout)(features)
    
    # third convolutional block
    # moderate filters
    features = BatchNormalization(momentum=bn_momentum)(features)
    features = LeakyReLU(leaky_alpha)(features)
    features = Conv2D(filters[2], (3,3), padding='same')(features)
    features = GlobalAvgPool2D()(features)
    features = Dropout(dropout)(features)

    # Dense classification
    classification = Dense(32)(features)
    classification = LeakyReLU(leaky_alpha)(classification)
    classification = Dense(num_classes, activation='sigmoid')(classification)

    model = Model(inputs, classification, name='cnn_model')
    return model

# DNN model 2 (Densenet)

Below is our own implementation of densenet, following the tutorial in
https://amaarora.github.io/2020/08/02/densenets.html

In [None]:
def dense_net(initial_feature, num_label, input_shape, 
              dense_block_config, drop_out = 0.2, bottle_necksz=4, growth_rate=32):
    
    def dense_block(input_layer, num_sets, bottle_necksz, growth_rate):
        layer_sets = [input_layer]
        for i in range(num_sets):
            if i > 0:
                input_layer = keras.layers.Concatenate()(layer_sets)
                layer_sets = []
                layer_sets.append(input_layer)
            bottleneck_1 = keras.layers.BatchNormalization()(input_layer)
            activation_1 = keras.layers.ReLU()(bottleneck_1)
            convolution_1 = keras.layers.Conv2D(bottle_necksz*growth_rate,
                                                kernel_size=(1,1), strides=1, use_bias=False)(activation_1)
            bottleneck_2 = keras.layers.BatchNormalization()(convolution_1)
            activation_2 =  keras.layers.ReLU()(bottleneck_2)
            convolution_2 = keras.layers.Conv2D(growth_rate, kernel_size=(3,3), 
                                                strides=1, padding='same', use_bias=False)(activation_2)
            layer_sets.append(convolution_2)
        return keras.layers.Concatenate()(layer_sets)

    def transition_layer(input_layer):
        batch_norm = keras.layers.BatchNormalization()(input_layer)
        activation = keras.layers.ReLU()(batch_norm)
        feature_size = keras.backend.int_shape(activation)[3]
        conv = keras.layers.Conv2D(feature_size//2, kernel_size=(1,1),strides=1,use_bias=False)(activation)
        pool = keras.layers.AveragePooling2D()(conv)
        return pool

    def fully_connected_layer(input_layer, num_labels):
        pool = keras.layers.GlobalAveragePooling2D()(input_layer)
        norm_1 = keras.layers.BatchNormalization()(pool)
        dropout = keras.layers.Dropout(.2)(norm_1)
        dense_1 = keras.layers.Dense(1024, activation='relu')(dropout)
        dense_2 = keras.layers.Dense(512, activation='relu')(dense_1)
        norm_2 = keras.layers.BatchNormalization()(dense_2)
        dropout_2 = keras.layers.Dropout(.2)(norm_2)
        return keras.layers.Dense(num_labels, activation='softmax')(dropout_2)

    inputs = keras.Input(shape = input_shape)
    # initial transition layers
    initial_padding_1 = keras.layers.ZeroPadding2D(padding=(3,3))(inputs)
    initial_conv = keras.layers.Conv2D(initial_feature, kernel_size=(7,7), 
                                       strides=2, use_bias=False)(initial_padding_1)
    initial_norm = keras.layers.BatchNormalization()(initial_conv)
    initial_relu = keras.layers.ReLU()(initial_norm)
    initial_padding_2 = keras.layers.ZeroPadding2D(padding=(1,1))(initial_relu)
    initial = keras.layers.MaxPooling2D(pool_size=(3,3), strides=2)(initial_padding_2)
    
    for num in dense_block_config:
        conv = dense_block(initial, num, bottle_necksz, growth_rate)
        initial = transition_layer(conv)

    outputs = fully_connected_layer(initial, num_label)
    return keras.Model(inputs=inputs, outputs=outputs)

## DNN model 2 (Convolutional LSTM)
Below is the Convolutional LSTM used in https://github.com/WWH98932/Audio-Classification-Models

ResNet50 is discussed in the project report. The original paper for resnet is at https://arxiv.org/pdf/1512.03385.pdf

In [None]:
from tensorflow import keras
from keras.models import Model, Sequential
from keras.layers import *
from keras.layers.wrappers import TimeDistributed
from keras import regularizers

def resnet_ldnn(num_label):
    model = Sequential()
    model.add(keras.applications.resnet50.ResNet50(include_top=False, input_shape=(128, 126, 1), 
                                                   weights=None, classes=None, pooling='average'))
    model.add(Permute((2, 1, 3)))
    model.add(TimeDistributed(Flatten()))
    model.add(LSTM(64, dropout=0.25, return_sequences=True))
    model.add(LSTM(64, dropout=0.25))
    model.add(Dense(64))
    model.add(LeakyReLU(alpha=0.01))
    model.add(Dropout(0.5))
    model.add(Dense(num_label, kernel_regularizer=regularizers.l2(0.01), activation='sigmoid'))
    return model

# Data Preperation

In [4]:
import pandas as pd
import pathlib
import ast

# Read data from path
# dataframe is ideally the pickled output from 01_dataset_curation.select_training_data()
data_root = pathlib.Path('/path/to/training/dataframe/')
training_df = pd.read_pickle(data_root/'01_manifest.pkl')
training_df.head()

len(training_df)

In [5]:
from sklearn.preprocessing import MultiLabelBinarizer

considered_categories_large = ["animal_dogs", "animal_insects", "animal_birds", "animal_cockatoo", "animal_poultry",  "background", "human_voice", "indeterminate", "mechanical", "mechanical_construction", "mechanical_impulsive", "mechanical_plant", "nature_wind", "signals_horn", "signals_siren", "transport_car", 'music']
considered_categories_small = ['animal_dogs', 'animal_birds', 'human_voice', 'transport_car', 'mechanical', 'music']
binary_dogs = ['animal_dogs']
# change this line to the categorisation you need
considered_categories = considered_categories_small
class OneVsOtherBinarizer(object):
    # simple dummy class for a fit for purpose one vs others binariser
    # keep a similar api to other binarisers used to avoid modifying code down the track
    def __init__(self, *args, **kwargs):
        pass
    def fit(self, the_one):
        self.the_one = the_one
        self.the_others = 'not_'+the_one
        self.classes_ = np.array([self.the_others, the_one])
        return self
    def transform(self, data):
        _bin = np.array([self.the_one in d for d in data])
        _bin = _bin.astype(np.int32)
        return _bin


def get_category_encoder(categories):
    # return an appropriate encoder for the classification problem
    # again, quite fit for purpose
    if len(categories) == 1:
        return OneVsOtherBinarizer().fit(categories[0])
    else:
        return MultiLabelBinarizer().fit([categories])

array(['animal_birds', 'animal_dogs', 'background', 'human_voice',
       'mechanical', 'transport_car'], dtype=object)

In [6]:
# These spectrogram settings look pretty good from a domain perspective.
# Trying a little bit higher "resolution" than previously
mel_settings = {'fmax': 8000, 'power': 2, 'n_mels' :128, 'n_fft':2048, 'hop_length':512}
fs_nom = 16000 # Nominal sampling rate. Most files should be this rate, but if not, they will be resampled
shape_nom = (128,126) # nominal spectrogram shape

In [7]:
import os
import numpy as np
import soundfile as sf
import librosa
import librosa.display
import sklearn

def force_array_shape(x, force_shape):
    """Forces a numpy array to a specific shape by filling with zeros, or truncating"""
    pad_widths = []
    for ax, ax_length in enumerate(force_shape):
        if x.shape[ax] >= ax_length:
            x = x.take(indices=range(0,ax_length), axis=ax)
        pad_widths.append((0,ax_length-x.shape[ax]))
    x = np.pad(x, pad_widths)
    return x

def get_mels(filepath='', data=[], fs=None, force_shape=None):
    if filepath:
        data, fs = librosa.load(filepath, sr=fs)
        if fs != fs_nom:
            print(filepath)
    else:
        assert (len(data>0) and fs >0), 'Must provide either a filename, or array of data and sample rate'
    
    S = librosa.feature.melspectrogram(y=data, sr = fs, **mel_settings)
    
    if force_shape and S.shape != force_shape:
        
        S = force_array_shape(S, force_shape)
            
    return S, fs
 
def load_mels(filepath, force_create=False, save=True):
    mel_path = filepath.with_suffix('.npy')
    
    if mel_path.is_file() and not force_create:
        #print('Loading {}'.format(mel_path))
        mels = np.load(mel_path)
    else:
        #print('Generating from {}'.format(filepath))
        mels, _ = get_mels(filepath, fs=fs_nom, force_shape = shape_nom)
        if save:
            #print('Saving {}'.format(mel_path))
            np.save(mel_path, mels)
    
    return mels

def feature_preprocessing(mel):
    # convert to db and normalise
    power = librosa.core.power_to_db(mel, ref=np.max)
    power = power - np.mean(power)
    power = power / (np.std(power))
    return power[:, :, None]


In [8]:
# generate the features
# note this will store all features in memory, as well as saving them to disk. 
# Can't guarantee it will work for large datasets.
training_df['features'] = training_df.apply(lambda x: data_root/x['package_hash']/x['filename'], axis=1).apply(lambda x: feature_preprocessing(load_mels(x, force_create=False, save=True)))

# sometimes nan's leak in, from bad source data remove them
training_df = training_df[~training_df['features'].apply(lambda x: np.any(np.isnan(x.flatten())))]

In [9]:
from sklearn.model_selection import train_test_split

X = np.stack(training_df['features'].values)
category_encoder = get_category_encoder(considered_categories)
y = category_encoder.transform(training_df['category_set'].values)

print('Category Support')
for c,n in zip(category_encoder.classes_, y.sum(axis=0)):
    print('{:30s}{} : {}'.format(c, category_encoder.transform([[c]]), n) )

idx_list= list(range(y.shape[0]))
for i in range(y.shape[0]):
    if np.all((y[i] == 0)):
        idx_list.remove(i) 
X = X[idx_list]
y = y[idx_list]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1)

print(X_train.shape)



Category Support
animal_birds                  [[1 0 0 0 0 0]] : 2382
animal_dogs                   [[0 1 0 0 0 0]] : 2229
background                    [[0 0 1 0 0 0]] : 915
human_voice                   [[0 0 0 1 0 0]] : 964
mechanical                    [[0 0 0 0 1 0]] : 2099
transport_car                 [[0 0 0 0 0 1]] : 1251
(4379, 128, 126, 1)


# Model Selection
Run one of these cells to set the model architecture to use

In [None]:
# channel of the first convolutional layer
initial_feature = 64  
# number of labels to be categorized
num_labels = 6
input_shape = (128, 126, 1)
dense_block_config=(6, 12, 24, 16)
model = dense_net(initial_feature, num_labels, input_shape, dense_block_config)

In [None]:
model = resnet_ldnn(len(category_encoder.classes_))
model.summary()

In [25]:
model = simple_cnn_model((128, 126, 1),len(category_encoder.classes_))

## Custom Metrics

In [19]:
def exact_count(y_true, y_pred):
    # metric to compute the exact match ratio
    # this portion counts 0/1 whether prediction exactly matches target.
    # use ec.numpy().mean() to get the ratio
    predictions = tf.cast(tf.greater_equal(y_pred, 0.5), tf.float32)
    pred_match = tf.equal(predictions, tf.round(y_true))
    exact_count = tf.math.reduce_min(tf.cast(pred_match, tf.float32), axis=1)
    return exact_count

def macro_double_soft_f1(y, y_hat):
    """
    Taken Directly from https://towardsdatascience.com/the-unknown-benefits-of-using-a-soft-f1-loss-in-classification-systems-753902c0105d
    All credit to Ashref Maiza
    
    Compute the macro soft F1-score as a cost (average 1 - soft-F1 across all labels).
    Use probability values instead of binary predictions.
    This version uses the computation of soft-F1 for both positive and negative class for each label.
    
    Args:
        y (int32 Tensor): targets array of shape (BATCH_SIZE, N_LABELS)
        y_hat (float32 Tensor): probability matrix from forward propagation of shape (BATCH_SIZE, N_LABELS)
        
    Returns:
        cost (scalar Tensor): value of the cost function for the batch
    """
    y = tf.cast(y, tf.float32)
    y_hat = tf.cast(y_hat, tf.float32)
    tp = tf.reduce_sum(y_hat * y, axis=0)
    fp = tf.reduce_sum(y_hat * (1 - y), axis=0)
    fn = tf.reduce_sum((1 - y_hat) * y, axis=0)
    tn = tf.reduce_sum((1 - y_hat) * (1 - y), axis=0)
    soft_f1_class1 = 2*tp / (2*tp + fn + fp + 1e-16)
    soft_f1_class0 = 2*tn / (2*tn + fn + fp + 1e-16)
    cost_class1 = 1 - soft_f1_class1 # reduce 1 - soft-f1_class1 in order to increase soft-f1 on class 1
    cost_class0 = 1 - soft_f1_class0 # reduce 1 - soft-f1_class0 in order to increase soft-f1 on class 0
    cost = 0.5 * (cost_class1 + cost_class0) # take into account both class 1 and class 0
    macro_cost = tf.reduce_mean(cost) # average on all labels
    return macro_cost

## Compile and Fit model

In [26]:
from keras.callbacks import ModelCheckpoint,EarlyStopping
# model weights will be saved with this name
model_name = 'lstm_small_categories'
# in this directory
model_path = pathlib.Path('/mnt/tag_data/models/')
model_path.mkdir(parent=True)
model_savefile = model_path/model_name.with_suffix('.hdf5')
# callbacks to automatically monitor/stop training if needed. Set them in the fit call
es=EarlyStopping(monitor='val_loss',patience=10)
mc=ModelCheckpoint(str(model_savefile),monitor='val_loss',mode='auto',save_best_only=True)

model.compile(loss='binary_crossentropy', metrics=[exact_count], optimizer=keras.optimizers.Adam(learning_rate=1e-5))


In [27]:
# fit the model
history = model.fit(X_train, y_train, epochs=50, validation_data=(X_test, y_test))

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


## Testing
Simple validation metrics are available in the training history
But it can be useful to play around with the fit model and test or other data.


In [None]:
y_pred=model.predict(X_test)

In [None]:
# Convert probabilities to class labels (one-hot encoding)
y_pred[y_pred>=0.5] = 1
y_pred[y_pred<0.5] = 0

In [None]:
# look at the exact match ratio
exact_count(y_test, y_pred).numpy().mean()

In [None]:
from sklearn.metrics import classification_report

print(classification_report(y_test, y_pred, target_names=final_categories))