## Computational Analysis of Sound and Music

# ESA 2 - Sound Event Detection - 2

Dr.-Ing. Jakob Abeßer, jakob.abesser@idmt.fraunhofer.de

**Last update:** 25.05.2024

**Outline**

In this notebook, we revise the M1 notebook and use a small dataset of **animal sounds** extracted from the **ESC50 dataset**.
We will study how to 
- apply the data augmentation **before** the training to enhance our training set
- apply the data augmentation **during** the training in a custom **generator**

In [None]:
!pip install wget
!pip install audiomentations

In [None]:
import numpy as np
import sklearn as skl
import os
import matplotlib
import librosa
import matplotlib.pyplot as pl
import platform
import IPython.display as ipd
import wget
import zipfile
import glob

from audiomentations import Compose, AddGaussianNoise, TimeStretch, PitchShift, Shift

from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
import tensorflow as tf

## Dataset download & pre-processing

In [None]:
if not os.path.isfile('animal_sounds.zip'):
    print('Please wait a couple of seconds ...')
    wget.download('https://github.com/machinelistening/machinelistening.github.io/blob/master/animal_sounds.zip?raw=true', 
                      out='animal_sounds.zip', bar=None)
    print('animal_sounds.zip downloaded successfully ...')
else:
    print('Files already exist!')
    
if not os.path.isdir('animal_sounds'):
    print("Let's unzip the file ... ")
    assert os.path.isfile('animal_sounds.zip')
    with zipfile.ZipFile('animal_sounds.zip', 'r') as f:
        # Entpacke alle Inhalte in das angegebene Verzeichnis
        f.extractall('.')
    assert os.path.isdir('animal_sounds')
    print("All done :)")


In [None]:
# sample rate
fs = 44100

In [None]:
# list the subdirectories (which provide us the animal classes)
dir_dataset = 'animal_sounds'
sub_directories = glob.glob(os.path.join(dir_dataset, '*'))

n_sub = len(sub_directories)
# let's collect the files in each subdirectory
# the folder name is the class name
fn_wav_list = []
class_label = []
file_num_in_class = []

for i in range(n_sub):
    current_class_label = os.path.basename(sub_directories[i])
    current_fn_wav_list = sorted(glob.glob(os.path.join(sub_directories[i], '*.wav')))
    for k, fn_wav in enumerate(current_fn_wav_list):
        fn_wav_list.append(fn_wav)
        class_label.append(current_class_label)
        file_num_in_class.append(k)

n_files = len(class_label)
    
# this vector includes a "counter" for each file within its class, we use it later ...
file_num_in_class = np.array(file_num_in_class)

In [None]:
unique_classes = sorted(list(set(class_label)))
class_id = np.array([unique_classes.index(_) for _ in class_label])

## Feature Extraction

In [None]:
def compute_melspec(fn_wav, n_bins=128):
    """ Compute Mel spectrogram with logarithmic magnitude scaling 
    Args:
        fn_wav (str): WAV file name
        n_bins (int): Number of Mel frequency bins
    Returns:
        mel_spec (2d np.ndarray): Mel spectrogram (n_bins x n_frames)
    """
    x, fs = librosa.load(fn_wav, mono=True, sr=44100)
    S = librosa.feature.melspectrogram(y=x, sr=fs, n_mels=n_bins, fmax=fs/2)
    S_dB = librosa.power_to_db(S, ref=np.max)
    return S_dB

In [None]:
feat = []
for fn_wav in fn_wav_list:
    feat.append(compute_melspec(fn_wav))
feat = np.array(feat)
feat = np.expand_dims(feat, axis=-1)

In [None]:
print(f"Feature matrix shape: {feat.shape}")

## Train-Test-Split

In [None]:
is_train = np.where(file_num_in_class <= 2)[0]
is_test = np.where(file_num_in_class >= 3)[0]

In [None]:
X_train = feat[is_train, :, :, :]
X_test = feat[is_test, :, :, :]

y_train = class_id[is_train]
y_test = class_id[is_test]

# one-hot-encoding
y_train = tf.keras.utils.to_categorical(y_train, num_classes=5)
y_test = tf.keras.utils.to_categorical(y_test, num_classes=5)

# Data standardization
X_train -= np.mean(X_train)
X_train /= np.std(X_train)

X_test -= np.mean(X_test)
X_test /= np.std(X_test)

## Neural Network Architecture

We use the same CNN model as in the previous seminar

In [None]:
def creage_vgg_like_model(input_shape, num_output_dim):
    
    inp = tf.keras.layers.Input(shape=input_shape)

    x = None
    for i in range(3):
        if i == 0:
            x = inp
        x = tf.keras.layers.Conv2D(32, kernel_size=(3, 3), padding='same')(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.Activation(activation="relu")(x)

    x = tf.keras.layers.MaxPooling2D((2, 2))(x)

    for i in range(2):
        x = tf.keras.layers.Conv2D(64, kernel_size=(3, 3), padding='same')(x)
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.Activation(activation="relu")(x)

    x = tf.keras.layers.MaxPooling2D((2, 2))(x)
    x = tf.keras.layers.Conv2D(128, kernel_size=(3, 3), padding='same')(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation(activation="relu")(x)

    x = tf.keras.layers.concatenate([tf.keras.layers.GlobalAveragePooling2D()(x),
                                     tf.keras.layers.GlobalMaxPooling2D()(x)])

    x = tf.keras.layers.Dense(256, activation='relu')(x)
    x = tf.keras.layers.Dropout(0.5)(x)
    out = tf.keras.layers.Dense(num_output_dim, activation="softmax")(x)

    model = tf.keras.models.Model(inputs=inp, outputs=out)
    
    model.compile(loss='categorical_crossentropy', 
                  optimizer='adam',
                  metrics=['accuracy'])
 
    return model


In [None]:
# load all training set files and save audio samples in a 2D array
all_samples = []
for fn_wav in fn_wav_list:
    x, fs = librosa.load(fn_wav, mono=True, sr=44100)
    all_samples.append(x)
all_samples = np.vstack(all_samples)
all_samples_train = all_samples[is_train, :]

## Data Augmentation Strategy 1: Enhance training dataset (before the training)

### Data Augmentation of Training Samples

In [None]:
from audiomentations import Compose, AddGaussianNoise, TimeStretch, PitchShift, Shift

augment = Compose([
    AddGaussianNoise(min_amplitude=0.001, max_amplitude=0.03, p=0.5),
    TimeStretch(min_rate=0.8, max_rate=1.2, p=0.5),
    PitchShift(min_semitones=-2, max_semitones=2, p=0.5)
])

In [None]:
n_augmenations_per_file = 10
n_files = all_samples_train.shape[0]

samples_train_aug = []
y_train_s1 = []
for i in range(n_files):
    for k in range(n_augmenations_per_file):
        # create augmented version
        samples_train_aug.append(augment(all_samples_train[i], fs))
        # clone target
        y_train_s1.append(y_train[i, :])

samples_train_aug = np.vstack(samples_train_aug)
y_train_s1 = np.vstack(y_train_s1)

print(samples_train_aug.shape)
print(y_train_s1.shape)

**Observation**: Since we have generated 10 augmented versions of our initial files, we now have a total of **150 audio clips**

In [None]:
# compute Mel spectrograms for 150 files
n_clips = samples_train_aug.shape[0]
X_train_s1 = []
for i in range(n_clips):
    spec = librosa.feature.melspectrogram(y=samples_train_aug[i, :], sr=fs, n_mels=128, fmax=fs/2)
    spec = librosa.power_to_db(spec, ref=np.max)
    X_train_s1.append(spec)
X_train_s1 = np.array(X_train_s1)
X_train_s1 = np.expand_dims(X_train_s1, axis=-1)
print(X_train_s1.shape)

# Data standardization
X_train_s1 -= np.mean(X_train_s1)
X_train_s1 /= np.std(X_train_s1)

### Model training & evaluation

In [None]:
input_shape = X_train_s1.shape[1:] 
model_s1 = creage_vgg_like_model(input_shape, 5)
model_s1.fit(X_train_s1, y_train_s1, batch_size=2, epochs=30, verbose=2)

## Data Augmentation Strategy 2: Implement custom generator (data augmentation during the training)

### Data Generator

Instead of just calling the ```model.fit()``` function as before, we implement a **data generator**, which allows us to modify the training data **in each epoch**.

The **generator class** includes the following **methods**:
- ```__init__()```: Class constructor, we provide the training data and training targets as arguments, which will be stored in class member variables to be accessible from all functions
- ```__len__()```: Method allows Keras to "ask" the generator, how many training data instances exist. This is important so Keras knows how many steps per training epoch it needs to run
- ```__getitem__(k)```: Method called by Keras to get the data and target tuple used in the k-th step of the current epoch. This is the main method we need to implement where the original training data can be modified (data augmentation)
- ```on_epoch_end()```: This method is always called after all steps of one epoch were executed. We'll use it to randomize the order of all data instances in every epoch.

**Note**: For simplicity, we implement here a **batchsize = 1** (so every step uses one spectrogram as features, not multiple ones)

In [None]:
class DataGenerator(tf.keras.utils.Sequence):

    def __init__(self, audio_samples, targets):
        # store audio_samples and target as member variables
        self.audio_samples = audio_samples
        self.targets = targets

        # derive the number of files 
        self.n_files = self.audio_samples.shape[0]
        self.n_samples = self.audio_samples.shape[1]
        
        # array of file indexes that we can shuffle after each training epoch to use files in random order
        self.indexes = np.arange(self.n_files)
        
        # sample rate
        self.fs = 44100
        
        # prepare data augmentation using audiomentations
        self.augment = Compose([AddGaussianNoise(min_amplitude=0.001, max_amplitude=0.005, p=0.5),
                                TimeStretch(min_rate=0.95, max_rate=1.05, p=0.5)])
        
    def __len__(self):
        """ Returns the number of training examples """
        return self.n_files

    def __getitem__(self, index):
        
        # get current audio samples
        curr_samples = self.audio_samples[self.indexes[index], :]
        
        # apply data augmentation
        curr_samples_aug = self.augment(samples=curr_samples, sample_rate=44100)
        
        # compute Mel spectrogram
        spec = librosa.feature.melspectrogram(y=curr_samples, sr=self.fs, n_mels=128, fmax=self.fs/2)
        spec = librosa.power_to_db(spec, ref=np.max)
        
        # standardize
        spec -= np.mean(spec)
        spec /= np.std(spec)

        # convert 2D spectrogram (frequency x time) into 
        # 4D data tensor (batch x frequency x time x channels)
        # remember: batchsize = 1 and number of channels = 1
        feat = np.zeros((1, spec.shape[0], spec.shape[1], 1))
        feat[0, :, :, 0] = spec
        
        # 2D target tensor (batch x number of classes), here: (1 x number of classes)
        target = self.targets[self.indexes[index], :]
        target = np.expand_dims(target, axis=0)
        
        return feat, target

    def on_epoch_end(self):
        # shuffle training file indeces
        np.random.shuffle(self.indexes)

In [None]:
# Initialize generator
generator = DataGenerator(all_samples_train, y_train)

### Model training & evaluation

In [None]:
# create model
model_s2 = creage_vgg_like_model(input_shape, 5)

# train model using generator
model_s2.fit(generator, epochs=30, verbose=2)

## Final Evaluation

In [None]:
# evaluate both models on the test set
acc = np.zeros(2)
for i, model in enumerate((model_s1, model_s2)):

    # evaluate model on test set
    y_test_pred = model.predict(X_test)
    class_id_test = np.argmax(y_test, axis=1)
    class_id_test_pred = np.argmax(y_test_pred, axis=1)
    acc[i] = accuracy_score(class_id_test, class_id_test_pred)
    

In [None]:
pl.figure()
pl.plot(acc, 'o-')
pl.xticks((0, 1), ('S1', 'S2'))
pl.ylabel('Accuracy')
pl.xlabel('Strategy')
pl.show()