In [None]:
# code modified from:
# https://github.com/sweat0198/audio_classification_CNN_ESC-50/blob/master/esc-50-keras.ipynb

# Audio classification with deep learning


## Preparation for deep learning

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
##
import numpy as np
from tqdm import tqdm
freq = 128
time = 1723

In [None]:
# uncomment one at a time to do partial learning

files_dir = './drive/MyDrive/HDA_project/'
train_files = [
               files_dir + "esc_melsp_train_raw.npz",
              #  files_dir + "esc_melsp_train_ss.npz",
              #  files_dir + "esc_melsp_train_st.npz",
              #  files_dir + "esc_melsp_train_wn.npz",
              #  files_dir + "esc_melsp_train_com.npz"
               ]
test_file = files_dir + "esc_melsp_test.npz"

In [None]:
train_num = 1500 ##
test_num = 500 ##

In [None]:
##

import psutil

def print_memory():

    # Get memory usage
    memory = psutil.virtual_memory()

    # Print memory usage statistics
    # print("Total Memory:", memory.total)
    # print("Available Memory:", memory.available)
    # print("Used Memory:", memory.used)
    # print("Free Memory:", memory.free)
    print("Memory Usage Percentage:", memory.percent)

In [None]:
print_memory()

x_train = np.zeros(freq*time*train_num*len(train_files), dtype='float32').reshape(train_num*len(train_files), freq, time)
y_train = np.zeros(train_num*len(train_files), dtype='float32')

# load dataset
n = len(train_files) ##
for i in tqdm(range(n)):
    print_memory()
    data = np.load(train_files[i])
    x_train[i*train_num:(i+1)*train_num] = data["x"]
    y_train[i*train_num:(i+1)*train_num] = data["y"]

Memory Usage Percentage: 11.0


  0%|          | 0/1 [00:00<?, ?it/s]

Memory Usage Percentage: 11.0


100%|██████████| 1/1 [00:27<00:00, 27.25s/it]


In [None]:
# load test dataset
test_data = np.load(test_file)
x_test = test_data["x"]
y_test = test_data["y"]

In [None]:
##
import os
import keras
from sklearn import model_selection
from keras.models import Model
from keras.layers import Input, Dense, Dropout, Activation
from keras.layers import Conv2D, GlobalAveragePooling2D
from keras.layers import BatchNormalization, Add
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.models import load_model


In [None]:
# redefine target data into one hot vector
classes = 50
y_train = keras.utils.to_categorical(y_train, classes)
y_test = keras.utils.to_categorical(y_test, classes)

In [None]:
# reshape training dataset
x_train = x_train.reshape(train_num*len(train_files), freq, time, 1)
x_test = x_test.reshape(test_num, freq, time, 1)

In [None]:
x_test, x_val, y_test, y_val= model_selection.train_test_split(x_test, y_test, test_size=3/5, random_state=1)

In [None]:

print("x train:{0}\ny train:{1}\nx val:{2}\ny val:{3}\nx test:{4}\ny test:{5}".format(x_train.shape,
                                                                                      y_train.shape,
                                                                                      x_val.shape,
                                                                                      y_val.shape,
                                                                                      x_test.shape,
                                                                                      y_test.shape))

x train:(1500, 128, 1723, 1)
y train:(1500, 50)
x val:(300, 128, 1723, 1)
y val:(300, 50)
x test:(200, 128, 1723, 1)
y test:(200, 50)


## Define convolutional neural network

In [None]:
def cba(inputs, filters, kernel_size, strides):
    x = Conv2D(filters, kernel_size=kernel_size, strides=strides, padding='same')(inputs)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    return x

In [None]:
# define CNN
inputs = Input(shape=(x_train.shape[1:]))

x_1 = cba(inputs, filters=32, kernel_size=(1,8), strides=(1,2))
x_1 = cba(x_1, filters=32, kernel_size=(8,1), strides=(2,1))
x_1 = cba(x_1, filters=64, kernel_size=(1,8), strides=(1,2))
x_1 = cba(x_1, filters=64, kernel_size=(8,1), strides=(2,1))

x_2 = cba(inputs, filters=32, kernel_size=(1,16), strides=(1,2))
x_2 = cba(x_2, filters=32, kernel_size=(16,1), strides=(2,1))
x_2 = cba(x_2, filters=64, kernel_size=(1,16), strides=(1,2))
x_2 = cba(x_2, filters=64, kernel_size=(16,1), strides=(2,1))

x_3 = cba(inputs, filters=32, kernel_size=(1,32), strides=(1,2))
x_3 = cba(x_3, filters=32, kernel_size=(32,1), strides=(2,1))
x_3 = cba(x_3, filters=64, kernel_size=(1,32), strides=(1,2))
x_3 = cba(x_3, filters=64, kernel_size=(32,1), strides=(2,1))

x_4 = cba(inputs, filters=32, kernel_size=(1,64), strides=(1,2))
x_4 = cba(x_4, filters=32, kernel_size=(64,1), strides=(2,1))
x_4 = cba(x_4, filters=64, kernel_size=(1,64), strides=(1,2))
x_4 = cba(x_4, filters=64, kernel_size=(64,1), strides=(2,1))

x = Add()([x_1, x_2, x_3, x_4])

x = cba(x, filters=128, kernel_size=(1,16), strides=(1,2))
x = cba(x, filters=128, kernel_size=(16,1), strides=(2,1))

x = GlobalAveragePooling2D()(x)
x = Dense(classes)(x)
x = Activation("softmax")(x)

model = Model(inputs, x)
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 128, 1723, 1)]       0         []                            
                                                                                                  
 conv2d (Conv2D)             (None, 128, 862, 32)         288       ['input_1[0][0]']             
                                                                                                  
 conv2d_4 (Conv2D)           (None, 128, 862, 32)         544       ['input_1[0][0]']             
                                                                                                  
 conv2d_8 (Conv2D)           (None, 128, 862, 32)         1056      ['input_1[0][0]']             
                                                                                              

## Optimization and callbacks

In [None]:
# initiate Adam optimizer
opt = keras.optimizers.Adam(lr=0.00005, amsgrad=True)

# Let's train the model using Adam with amsgrad
model.compile(loss='categorical_crossentropy',
              optimizer=opt,
              metrics=['acc'])



In [None]:
# directory for model checkpoints
model_dir = "./drive/MyDrive/HDA_project/models4"
if not os.path.exists(model_dir):
    os.mkdir(model_dir)

## Train CNN model with between class dataset

In [None]:
def frequency_masking(spec, F=30, num_masks=1):
    '''F is max width of frequency mask'''
    cloned_spec = spec.copy()
    num_mel_channels = cloned_spec.shape[1]

    for _ in range(num_masks):
        f = int(np.random.uniform(low=0.0, high=F))
        f0 = np.random.randint(0, num_mel_channels - f)
        # Masking
        cloned_spec[:, f0:f0 + f] = 0.0
    return cloned_spec

def time_masking(spec, T=40, num_masks=1):
    cloned_spec = spec.copy()
    num_frames = cloned_spec.shape[0]

    for _ in range(num_masks):
        t = int(np.random.uniform(low=0.0, high=T))
        t0 = np.random.randint(0, num_frames - t)
        # Masking
        cloned_spec[t0:t0 + t, :] = 0.0

    return cloned_spec

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Define your data generator class
class DataGenerator(tf.keras.utils.Sequence):
    def __init__(self, x_data, y_data, batch_size, augment=True):
        self.x_data = x_data
        self.y_data = y_data
        self.batch_size = batch_size
        self.augment = augment
        self.indexes = np.arange(len(self.x_data))
        self.on_epoch_end()

    def __len__(self):
        return len(self.x_data) // self.batch_size

    def __getitem__(self, index):
        indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]

        x_batch = self.x_data[indexes]
        y_batch = self.y_data[indexes]

        if self.augment:
            x_batch = self.apply_augmentation(x_batch)

        return x_batch, y_batch

    def on_epoch_end(self):
        np.random.shuffle(self.indexes)

    def apply_augmentation(self, x_batch):
        augmented_x_batch = np.zeros_like(x_batch)
        for i in range(len(x_batch)):
            augmented_x_batch[i] = frequency_masking(x_batch[i], F=30, num_masks=np.random.randint(0, 2))
            augmented_x_batch[i] = time_masking(augmented_x_batch[i], T=40, num_masks=np.random.randint(0, 2))
        return augmented_x_batch

# Initialize the data generator with your training data

batch_size = 16
epochs = 20
train_generator = DataGenerator(x_train, y_train, batch_size)

model = tf.keras.models.load_model(model_dir + '/model_5.hdf5')
chkpt = os.path.join(model_dir, 'model_5_1_example.hdf5')
cp_cb = ModelCheckpoint(filepath = chkpt, verbose=1, save_best_only = True)

# Train the model using the data generator
history = model.fit(train_generator,
          validation_data = (x_val, y_val),
          epochs = epochs,
          callbacks = [cp_cb],
          shuffle = True)

# Optionally, you can also save the loss values to a text file
with open(model_dir + '/loss_values_5_1.txt', 'w') as f:
    f.write("Train Loss:\n")
    f.write(str(history.history['loss']))
    f.write("\n\nVal Loss:\n")
    f.write(str(history.history['val_loss']))
    f.write("\n\nAccuracy:\n")
    f.write(str(history.history['acc']))
    f.write("\n\nVal accuracy:\n")
    f.write(str(history.history['val_acc']))

Epoch 1/20
Epoch 1: val_loss improved from inf to 1.28362, saving model to ./drive/MyDrive/HDA_project/models4/model_5_1_example.hdf5
Epoch 2/20
Epoch 2: val_loss improved from 1.28362 to 0.44929, saving model to ./drive/MyDrive/HDA_project/models4/model_5_1_example.hdf5
Epoch 3/20
Epoch 3: val_loss did not improve from 0.44929
Epoch 4/20
Epoch 4: val_loss improved from 0.44929 to 0.33868, saving model to ./drive/MyDrive/HDA_project/models4/model_5_1_example.hdf5
Epoch 5/20
Epoch 5: val_loss did not improve from 0.33868
Epoch 6/20
Epoch 6: val_loss improved from 0.33868 to 0.32701, saving model to ./drive/MyDrive/HDA_project/models4/model_5_1_example.hdf5
Epoch 7/20
Epoch 7: val_loss improved from 0.32701 to 0.31431, saving model to ./drive/MyDrive/HDA_project/models4/model_5_1_example.hdf5
Epoch 8/20
Epoch 8: val_loss did not improve from 0.31431
Epoch 9/20
Epoch 9: val_loss improved from 0.31431 to 0.25822, saving model to ./drive/MyDrive/HDA_project/models4/model_5_1_example.hdf5
Ep

## Evaluate model

If you use prepared model and test file uncomment the bottom section.

In [None]:
model = load_model(model_dir + '/model_5_1_example.hdf5')
test_data = np.load('drive/MyDrive/HDA_project/esc_melsp_test.npz')
x_test = test_data["x"]
y_test = test_data["y"]

In [None]:
# x_test = x_test[:,:,:,None]
y_test = keras.utils.to_categorical(y_test, classes)

In [None]:
evaluation = model.evaluate(x_test, y_test)
print(evaluation)

[0.2706111669540405, 0.9160000085830688]
