Sascha Spors,
Professorship Signal Theory and Digital Signal Processing,
Institute of Communications Engineering (INT),
Faculty of Computer Science and Electrical Engineering (IEF),
University of Rostock,
Germany

# Data Driven Audio Signal Processing - A Tutorial with Computational Examples

Winter Semester 2023/24 (Master Course #24512)

- lecture: https://github.com/spatialaudio/data-driven-audio-signal-processing-lecture
- tutorial: https://github.com/spatialaudio/data-driven-audio-signal-processing-exercise

Feel free to contact lecturer frank.schultz@uni-rostock.de

# Music Genre Classification with Fully Connected Layers

In this toy example we have a special look at
- feature design (loudness, crest, peak, rms, spectral weight)
- feature inspection / avoiding NaNs
- feature normalization
- balancing data set wrt class occurrence
- one hot encoding
- hyper parameter tuning using train / val data set
- training of best model with re-set weights using train / val data set
- final prediction on unseen test data set compared to predictions on train / val data sets
- confusion matrix and visualization of predictions
for a music genre classification application.

We put some mp3 into `audiofolder = './audio_ex12/'`. The end of the mp3 filename encodes the label `_i.mp3` where
- `i=0` Metal
- `i=1` EDM
- `i=2` Classical,

or we use other nice genres. We make sure that we have about the same playing length of all genres/labels such that we can fairly train the model.

Instead of feature extraction of raw audio data, we might also use `_raw_data_large.npz` or `_raw_data_small.npz` data , then we do not run the `feature extraction` cell.
`_raw_data_large.npz` originates from reading 66 mp3 files with 7h 8 min playtime, with balanced occurrence of the 3 genres.
`_raw_data_small.npz` originates from reading 4 mp3 files with about 22 min playtime, with balanced occurrence of the 3 genres.
Therefore, the amount of data and sampling of music should be considered as toy data, but we can reasonably play around with the machine learning procedure achieving about 80% (small) / 84% (large) accuracy.
To achieve very high accuracies >95% much more data and increased computational power as well as more sophisticated feature design is needed.
We should try this at home and read corresponding scientific literature, how people solved this for professional applications.

We make sure that in section `Data Handling / Inspection / Selection / Visualization` the intended `*.npz` data file is loaded by `with np.load(audiofolder+'/_raw_data_small.npz') as data:`. Default uses the small data set for reasonable computing time. 

## Imports

In [None]:
import keras_tuner as kt
import librosa
import matplotlib.pyplot as plt
import numpy as np
import os
import pyloudnorm
from scipy.signal import resample
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, LabelBinarizer
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.backend as K
import time


print(
    "TF version",
    tf.__version__,  # we used 2.10.0
    "\nKeras version",
    keras.__version__,  # we used 2.10.0
    "\nKeras Tuner version",
    kt.__version__,
)  # we used 1.1.0
verbose = 1  # plot training status

CI_flag = True  # use toy parameters to check if this notebooks runs in CI

## Folder Structure for Log Data

In [None]:
audiofolder = "./audio_ex12/"
ex_str = "mgc_"
time_str = "%Y_%m_%d_%H_%M_"


def get_kt_logdir():
    run_id = time.strftime(time_str + ex_str + "kt")
    return os.path.join(root_logdir, run_id)


def get_tf_kt_logdir():
    run_id = time.strftime(time_str + ex_str + "tf_kt")
    return os.path.join(root_logdir, run_id)


def get_tf_logdir():
    run_id = time.strftime(time_str + ex_str + "tf")
    return os.path.join(root_logdir, run_id)


root_logdir = os.path.join(os.curdir, "tf_keras_logs")
kt_logdir = get_kt_logdir()
tf_kt_logdir = get_tf_kt_logdir()
tf_logdir = get_tf_logdir()
print(root_logdir)
print(kt_logdir)  # folder for keras tuner results
print(tf_kt_logdir)  # folder for TF checkpoints while keras tuning
print(tf_logdir)  # folder for TF checkpoint for best model training

## Feature Extraction

In [None]:
# Labels:
# 0 Metal
# 1 EDM
# 2 Classical

N_F = 12  # number of features, must match dim of np array 'features'
t = 0.5  # s  # 1s long blocks

N_OS = 4  # oversampling factor for true peak
# final dimension is (data entry, features):
Xdata = np.zeros((1, N_F))
# final dimension is (data entry, 1), we encode labels as integers >=0:
Ydata = np.zeros((1, 1), dtype=int)
files = sorted(os.listdir(audiofolder))  # get file names in this folder
for i in files:
    if i[-4:] == ".mp3":  # consider only mp3 files
        # make sure to check defaults, they might be not convenient for us:
        x, fs = librosa.load(
            audiofolder + i, mono=False, sr=None, dtype="double"
        )
        x = x.T  # we use more convenient dim: (samples, channels)
        label = i[-6:]  # get '_i.mp3' with i={0,1,2,...} as genre label
        label = int(label[1])
        print("song", i, "label", label)
        meter = pyloudnorm.Meter(fs)  # init 1770 meter
        N = int(t * fs)  # in samples
        df = fs / N  # fft frequency resolution
        fsplit = 1000  # Hz, split frequency between low / high frequency band
        Nsplit = int(fsplit / df) + 1
        Nb = x.shape[0] // N  # number of blocks in the file
        print(Nb)
        for ch in range(x.shape[1]):  # use all channels
            k = 0  # clr accu, move through samples
            # print(k, k+N)
            for n in range(Nb):  # go through blocks
                tmp = x[k : k + N, ch]  # get signal block
                k += N  # hop
                # print(tmp.shape[0])
                if (
                    np.mean(tmp**2) > (10 ** (-80 / 20)) ** 2
                ):  # use simple silence detection
                    tmp_os = resample(tmp, N_OS * tmp.shape[0])

                    # calc potential (here rather simple) features
                    true_peak_lin = np.max(np.abs(tmp_os))
                    true_peak_lin2 = true_peak_lin**2
                    true_peak_db = 10 * np.log10(true_peak_lin2)

                    rms_lin2 = np.mean(tmp**2)
                    rms_lin = np.sqrt(rms_lin2)
                    rms_db = 10 * np.log10(rms_lin2)

                    lufs_db = meter.integrated_loudness(tmp)
                    lufs_lin2 = 10 ** (lufs_db / 10)
                    lufs_lin = np.sqrt(lufs_lin2)

                    crest_lin = true_peak_lin / rms_lin
                    crest_db = 20 * np.log10(crest_lin)

                    ffttmp = np.fft.fft(tmp)
                    # sum squared entries in low frequency band
                    Xe_low = np.sum(np.abs(ffttmp[1:Nsplit]) ** 2)  # without DC
                    # sum squared entries in high frequency band
                    Xe_high = np.sum(
                        np.abs(ffttmp[Nsplit : N // 2]) ** 2
                    )  # without DC
                    low_high_ratio = Xe_low / Xe_high

                    if lufs_db < -70:  # avoid NaN
                        lufs_db = -70
                    # put all features into np:
                    features = np.array(
                        [
                            true_peak_lin,
                            true_peak_lin2,
                            true_peak_db,
                            rms_lin2,
                            rms_lin,
                            rms_db,
                            lufs_lin,
                            lufs_lin2,
                            lufs_db,
                            crest_lin,
                            crest_db,
                            low_high_ratio,
                        ]
                    )
                    # store features and according label
                    Xdata = np.vstack((Xdata, features))
                    Ydata = np.vstack((Ydata, label))

# del very first entries since these are not valid data
# but rather stems from allocating by np.zeros(())
Xdata = Xdata[1:, :]
Ydata = Ydata[1:, :]

## Data Handling / Inspection / Selection / Visualization

In [None]:
if not os.path.exists(tf_logdir):
    os.makedirs(tf_logdir)
np.savez(audiofolder + "/_raw_data.npz", Xdata=Xdata, Ydata=Ydata)

In [None]:
# we use this option when features were extracted in cell [5] above
# model calc time depends on how much data is preprocessed to features
with np.load(audiofolder + "/_raw_data.npz") as data:
    Xdata = data["Xdata"]
    Ydata = data["Ydata"]

# we use this when we want to use 'large' toy example data set
# this might take time, especially for a large hyper param search
with np.load(audiofolder + "/_raw_data_large.npz") as data:
    Xdata = data["Xdata"]
    Ydata = data["Ydata"]

# we use this when we want to use 'small' toy example data set
# this takes <10 min on a Mac Book Pro M1
with np.load(audiofolder + "/_raw_data_small.npz") as data:
    Xdata = data["Xdata"]
    Ydata = data["Ydata"]

In [None]:
# permute to make data entries random
p = np.random.permutation(Ydata.shape[0])
Xdata = Xdata[p, :]
Ydata = Ydata[p]
# minimal example
# Ydata = np.array([[0,1,2,3,4,5,6,7,8,9]]).T
# Xdata = np.random.rand(10, 4)
# print(Ydata, Ydata.shape)
# print(Xdata)
# p = np.random.permutation(Ydata.shape[0])
# print(p)
# print(Ydata[p])
# print(Xdata[p,:])

In [None]:
print("Xdata.shape, Ydata.shape")
print(Xdata.shape, Ydata.shape)

In [None]:
nlabels = np.max(Ydata) + 1
# we encode as integers
labels = np.arange(nlabels)
print(labels)

In [None]:
# we should very carefully inspect our data !
# at least check for NaN, Inf
print(np.sum(np.isfinite(Xdata)) == Xdata.shape[0] * Xdata.shape[1])
print(np.sum(np.isnan(Xdata)))
print(np.sum(np.isinf(Xdata)))
# we should visualize as much as possible...

In [None]:
# check if label occurence is balanced
# if not then balance it for fair learning
m = Ydata.shape[0]
for n in labels:
    print("label", n, "occurence", np.sum(Ydata == n) / m * 100, "%")

In [None]:
# normalize all features columns to mean=0, std=1
Xdata_norm = Xdata - np.mean(Xdata, axis=0)
Xdata_norm = Xdata_norm / np.std(Xdata_norm, ddof=1, axis=0)
print("\nmean\n", np.mean(Xdata_norm, axis=0))
print("\nstd\n", np.std(Xdata_norm, axis=0, ddof=1))

In [None]:
#  0 true_peak_lin
#  1 true_peak_lin2
#  2 true_peak_db
#  3 rms_lin2
#  4 rms_lin
#  5 rms_db
#  6 lufs_lin
#  7 lufs_lin2
#  8 lufs_db
#  9 crest_lin
# 10 crest_db
# 11 low_high_ratio
# the seven features [2, 3, 5, 7, 8, 10, 11] might be useful:
which_features = [2, 3, 5, 7, 8, 10, 11]
X = np.copy(Xdata_norm[:, which_features])
Y = np.copy(Ydata)
m = X.shape[0]  # number data examples
nx = X.shape[1]  # number of features
print("X.shape, Y.shape")
print(X.shape, Y.shape)

In [None]:
plt.figure(figsize=(12, 4))
for f in range(nx):
    plt.plot(X[:, f], label="feature " + str(f))
plt.xlabel("data entry index")
plt.ylabel("feature extent")
plt.legend(loc="upper left")
plt.grid(True)

In [None]:
plt.figure(figsize=(12, 9))
for sp in range(nlabels):
    plt.subplot(nlabels, 1, sp + 1)
    for f in range(nx):
        plt.plot(X[Y[:, 0] == sp, f])
        plt.ylabel("feature extent for label " + str(sp))
plt.xlabel("data entry index")

## Data Preparation / Splitting 

In [None]:
encoder = OneHotEncoder(sparse=False)
# we encode as one-hot for TF model
Y = encoder.fit_transform(Y.reshape(-1, 1))

In [None]:
train_size = 5 / 10
validate_size = 5 / 10 * 1 / 2
test_size = 1 - train_size - validate_size  # remaining data, must be > 0

# split into train, val, test data:
X_train, X_tmp, Y_train, Y_tmp = train_test_split(
    X, Y, train_size=train_size, random_state=None
)
val_size = (validate_size * m) / ((1 - train_size) * m)
X_val, X_test, Y_val, Y_test = train_test_split(
    X_tmp, Y_tmp, train_size=val_size, random_state=None
)

print(train_size, validate_size, test_size)
print(train_size * m, validate_size * m, test_size * m)
print(X_train.shape, X_val.shape, X_test.shape)
print(Y_train.shape, Y_val.shape, Y_test.shape)

m_train = X_train.shape[0]
m_val = X_val.shape[0]
m_test = X_test.shape[0]
print(m_train, m_val, m_test, m_train + m_val + m_test == m)

# we should get balanced occurence
print("occurence of labels in train")
for n in range(nlabels):
    print(n, np.sum(Y_train[:, n]))
print("occurence of labels in val")
for n in range(nlabels):
    print(n, np.sum(Y_val[:, n]))
print("occurence of labels in test")
for n in range(nlabels):
    print(n, np.sum(Y_test[:, n]))

## Model Preparation / Hyper Parameter Range

In [None]:
earlystopping_cb = keras.callbacks.EarlyStopping(
    monitor="val_loss", patience=2, restore_best_weights=True  # on val data!
)

In [None]:
# as homework we might also consider dropout / regularization in the model
def build_model(hp):  # with hyper parameter ranges
    model = keras.Sequential()
    # input layer
    model.add(keras.Input(shape=nx))
    # hidden layers
    for layer in range(hp.Int("no_layers", 1, 5)):
        model.add(
            keras.layers.Dense(
                units=hp.Int(
                    f"no_perceptrons_{layer}", min_value=2, max_value=16, step=2
                ),
                activation=hp.Choice("activation", ["tanh"]),
            )
        )
    # softmax output layer
    model.add(keras.layers.Dense(nlabels, activation="softmax"))
    # learning_rate = hp.Float('learning_rate', min_value=1e-5, max_value=1e-1,
    #                         sampling='log')
    model.compile(
        optimizer=keras.optimizers.Adam(),  # learning_rate=learning_rate
        loss=keras.losses.CategoricalCrossentropy(
            from_logits=False, label_smoothing=0
        ),
        metrics=["CategoricalCrossentropy", "CategoricalAccuracy"],
    )
    return model

## Hyper Parameter Tuner

In [None]:
if CI_flag:
    max_trials = 5  # very low for CI version
else:
    max_trials = 50
executions_per_trial = 2
model = build_model(kt.HyperParameters())
hptuner = kt.RandomSearch(
    hypermodel=build_model,
    objective="val_categorical_accuracy",  # check performance on val data!
    max_trials=max_trials,
    executions_per_trial=executions_per_trial,
    overwrite=True,
    directory=kt_logdir,
    project_name=None,
)
print(hptuner.search_space_summary())

## Training of Models

In [None]:
if CI_flag:
    epochs = 20  # very low for CI version
else:
    epochs = 250
tensorboard_cb = keras.callbacks.TensorBoard(tf_kt_logdir)
hptuner.search(
    X_train,
    Y_train,
    validation_data=(X_val, Y_val),
    epochs=epochs,
    callbacks=[earlystopping_cb, tensorboard_cb],
    verbose=verbose,
)
print(hptuner.results_summary())

## Best Model Selection / Preparation

In [None]:
# we might check (train) the best XX models in detail
# for didactical purpose we choose only the very best one, located in [0]:
model = hptuner.get_best_models(num_models=1)[0]
model.save(tf_logdir + "/best_model")

In [None]:
# taken from https://github.com/keras-team/keras/issues/341
# 183amir commented on 7 Oct 2019:
# "If you are using tensorflow 2, you can use this:"
def reset_weights(model):
    for layer in model.layers:
        if isinstance(layer, tf.keras.Model):
            reset_weights(layer)
            continue
        for k, initializer in layer.__dict__.items():
            if "initializer" not in k:
                continue
            # find the corresponding variable
            var = getattr(layer, k.replace("_initializer", ""))
            var.assign(initializer(var.shape, var.dtype))


# 183amir: "I am not sure if it works in all cases, I have only tested the Dense and Conv2D layers."

In [None]:
# load best model and reset weights
model = keras.models.load_model(tf_logdir + "/best_model")
reset_weights(model)  # start training from scratch
print(model.summary())

## Training of Best Model

In [None]:
batch_size = 16
if CI_flag:
    epochs = 20  # very low for CI version
else:
    epochs = 250
tensorboard_cb = keras.callbacks.TensorBoard(tf_logdir)
history = model.fit(
    X_train,
    Y_train,
    epochs=epochs,
    batch_size=batch_size,
    validation_data=(X_val, Y_val),
    callbacks=[earlystopping_cb, tensorboard_cb],
    verbose=1,
)
model.save(tf_logdir + "/trained_best_model")
print(model.summary())

## Evaluation of Best Model on Unseen Test Data

In [None]:
def print_results(X, Y):
    # https://stackoverflow.com/questions/48908641/how-to-get-a-single-value-from-softmax-instead-of-probability-get-confusion-ma:
    lb = LabelBinarizer()
    lb.fit(labels)

    m = X.shape[0]
    results = model.evaluate(X, Y, batch_size=m, verbose=verbose)
    Y_pred = model.predict(X)
    cm = tf.math.confusion_matrix(
        labels=lb.inverse_transform(Y),
        predictions=lb.inverse_transform(Y_pred),
        num_classes=nlabels,
    )
    print("data entries", m)
    print(
        "Cost",
        results[0],
        "\nCategoricalCrossentropy",
        results[1],
        "\nCategoricalAccuracy",
        results[2],
    )
    print(
        "nCategoricalAccuracy from Confusion Matrix = ",
        np.sum(np.diag(cm.numpy())) / m,
    )
    print("Confusion Matrix in %\n", cm / m * 100)


print("\n\nmetrics on train data:")
print_results(X_train, Y_train)

print("\n\nmetrics on val data:")
print_results(X_val, Y_val)

print("\n\nmetrics on never seen test data:")
print_results(X_test, Y_test)
# we never used X_test, Y_test in an above training steps!

In [None]:
Y_pred = model.predict(X_test)

k0 = 0
kN = k0 + 100

plt.figure(figsize=(12, 4))
plt.plot(Y_test[k0:kN, 0], "C0o", label="True 0...Metal")
plt.plot(Y_test[k0:kN, 1], "C1o", label="True 1...EDM")
plt.plot(Y_test[k0:kN, 2], "C2o", label="True 2...Classical")

plt.plot(Y_pred[k0:kN, 0], "C0", lw=1)
plt.plot(Y_pred[k0:kN, 1], "C1", lw=1)
plt.plot(Y_pred[k0:kN, 2], "C2", lw=1)

tmp = np.argmax(Y_pred, axis=1)
tmp = tmp[k0:kN]
plt.plot((tmp == 0) + 0.03, "C0v", label="Predicted 0...Metal")
plt.plot((tmp == 1) + 0.03, "C1v", label="Predicted 1...EDM")
plt.plot((tmp == 2) + 0.03, "C2v", label="Predicted 2...Classical")

plt.ylim(0.06, 1.075)
plt.xlabel("index for chosen data entries")
plt.ylabel("predicted via softmax")
plt.legend(loc="center")
plt.grid(True)

## Copyright

- the notebooks are provided as [Open Educational Resources](https://en.wikipedia.org/wiki/Open_educational_resources)
- the text is licensed under [Creative Commons Attribution 4.0](https://creativecommons.org/licenses/by/4.0/)
- the code of the IPython examples is licensed under the [MIT license](https://opensource.org/licenses/MIT)
- feel free to use the notebooks for your own purposes
- please attribute the work as follows: *Frank Schultz, Data Driven Audio Signal Processing - A Tutorial Featuring Computational Examples, University of Rostock* ideally with relevant file(s), github URL https://github.com/spatialaudio/data-driven-audio-signal-processing-exercise, commit number and/or version tag, year.