In [1]:
# file related
import os
from os.path import join
import datetime

# machine learning
from keras.models import Sequential
from keras.layers import Dense, Input
import tensorflow as tf
from sklearn.model_selection import train_test_split

# utils
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import random
from IPython.display import Audio as play_audio
import scipy as sp
import taunet_utils

print(f"TF version {tf.__version__}")

TF version 2.12.0


In [2]:
def create_dataset(audio_dir, human_input_csv):
    lib = taunet_utils.af_dsp_init("../src/af/AFInC.dll")
    audio_data = taunet_utils.read_audio_files(audio_dir)
    human_data = pd.read_csv(human_input_csv)

    input_data = dict()
    output_data = dict()
    
    for audio, label, fs in audio_data:
        lib.initAf()
        lib.resetBuffer()
        label = label.split()[0]
        for sample in audio:
            lib.AFInCAppend(sample)
        lib.AFInCProcess()
        
        human_input = human_data[human_data['MEASUREMENT_ID'] == int(label)].iloc[0].to_dict()
        human_input.pop("MEASUREMENT_ID")
        human_input.pop("SONG_ID")
        human_output = dict()
        human_output["ATTACK_T1"] = human_input.pop("ATTACK_T1")
        human_output["SUSTAIN_T1"] = human_input.pop("SUSTAIN_T1")
        output_data[label] = tuple(human_output.values())
        
        input_data[label] = tuple(round(val, 4) for val in (
            lib.afGetTempo(),
            lib.afGetT1A() / fs,
            lib.afGetT2A() / fs,
            int(lib.afGetSpectralCentroid()),
            lib.afGetSpectralFlatness(),
            int(lib.afGetPBandL()),
            int(lib.afGetPBandML()),
            int(lib.afGetPBandMH()),
            int(lib.afGetPBandH()),
            lib.afGetCrestFactor()
        ))
        input_data[label] += tuple(human_input.values())
    return input_data, output_data

input_data, output_data = create_dataset(join("dataset", "audio"), join("dataset", "human_input", "AITD_Dataset_Kristof_beta_1.csv"))
for d in input_data:
    print(f"{d}: {input_data[d]} -> {output_data[d]}")


1: (123.9669, 0.012, 0.2251, 15648, 0.7536, 10, 1, 1, 1, -2097225378, 0.6738, 0.8758, 550.0, 350.0) -> (360.5123, 1144.7427)
10: (159.0106, 0.0131, 0.2123, 2440, 0.8535, 10, 1, 1, 1, 0, 0.3157, 0.8187, 550.0, 350.0) -> (330.558, 1814.8958)
11: (117.4935, 0.0408, 0.31, 7351, 0.7748, 10, 1, 1, 1, 546576167, 0.7365, 0.7567, 550.0, 350.0) -> (302.5441, 430.5534)
12: (172.4138, 0.3676, 0.4617, 15840, 1.569, 10, 1, 1, 1, 102581079, 0.7287, 0.8574, 550.0, 350.0) -> (70.1614, 1192.9986)
13: (156.7944, 0.0098, 0.1759, 3139, 0.859, 10, 1, 1, 1, 78035367, 0.109, 0.9997, 550.0, 350.0) -> (283.9686, 1571.1926)
14: (152.5424, 0.0422, 0.245, 2145, 1.1746, 10, 1, 1, 1, 0, 0.6313, 0.2773, 550.0, 350.0) -> (281.9786, 461.7476)
15: (143.77, 1.4763, 0.1339, 2145, 2.2429, 10, 1, 1, 1, -1248480519, 0.5662, 0.8151, 550.0, 350.0) -> (206.762, 1754.8867)
16: (127.8409, 0.0328, 0.2358, 2134, 0.8128, 10, 1, 1, 1, 1320852491, 0.6062, 0.2058, 550.0, 350.0) -> (499.6292, 320.9579)
17: (140.1869, 0.0419, 0.1801, 927

In [5]:
def create_model(input_shape):
    model = Sequential()
    model.add(Input(shape=input_shape))
    model.add(Dense(64, activation='relu'))
    model.add(Dense(32, activation='relu'))
    model.add(Dense(32, activation='relu'))
    model.add(Dense(16, activation='relu'))
    model.add(Dense(8, activation='relu'))
    model.add(Dense(4, activation='relu'))
    # model.add(layers.LSTM(64, return_sequences=True))   # short term memory, useful if input data is related accross vectors
    model.add(Dense(2, kernel_regularizer=tf.keras.regularizers.l2(0.001))) # no activation (linear): continuous mapping of outputs (this is not a classification task!)
    return model

input_shape = (14,)
model = create_model(input_shape)
model.compile(optimizer='adam', loss='mse', metrics=['accuracy'])
model.summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_7 (Dense)             (None, 64)                960       
                                                                 
 dense_8 (Dense)             (None, 32)                2080      
                                                                 
 dense_9 (Dense)             (None, 32)                1056      
                                                                 
 dense_10 (Dense)            (None, 16)                528       
                                                                 
 dense_11 (Dense)            (None, 8)                 136       
                                                                 
 dense_12 (Dense)            (None, 4)                 36        
                                                                 
 dense_13 (Dense)            (None, 2)                

In [6]:
combined_data = [(input_data[key], output_data[key]) for key in input_data.keys()]

train_data, test_data = train_test_split(combined_data, test_size=0.1, random_state=42)
train_data, val_data = train_test_split(train_data, test_size=0.05, random_state=42)

log_dir = "logs/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)
early_stopping_callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

class LogRMSECallback(tf.keras.callbacks.Callback):
    def __init__(self, log_dir):
        super(LogRMSECallback, self).__init__()
        self.log_dir = log_dir

    def on_epoch_end(self, epoch, logs=None):
        mse = logs.get('val_loss')
        rmse = np.sqrt(mse)
        with tf.summary.create_file_writer(self.log_dir).as_default():
            tf.summary.scalar('val_rmse', rmse, step=epoch)

%load_ext tensorboard
%tensorboard --logdir logs/ --port 6006

history = model.fit(
    x=np.array([item[0] for item in train_data]),
    y=np.array([item[1] for item in train_data]),
    validation_data=(
        np.array([item[0] for item in val_data]),
        np.array([item[1] for item in val_data])
    ),
    epochs=50,
    # callbacks=[tensorboard_callback, early_stopping_callback],
    callbacks=[tensorboard_callback, LogRMSECallback(join(log_dir, "mse"))],
    batch_size=16
)

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


Reusing TensorBoard on port 6006 (pid 11700), started 4:12:38 ago. (Use '!kill 11700' to kill it.)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


In [7]:
train_metrics = model.evaluate(
    np.array([item[0] for item in train_data]),
    np.array([item[1] for item in train_data]),
    verbose=0
)
val_metrics = model.evaluate(
    np.array([item[0] for item in val_data]),
    np.array([item[1] for item in val_data]),
    verbose=0
)
test_metrics = model.evaluate(
    np.array([item[0] for item in test_data]),
    np.array([item[1] for item in test_data]),
    verbose=0
)

for item in test_data:
    input_data = np.array([item[0]])  # Reshape input data into a batch
    prediction = model.predict(input_data)
    print("Input:", item[0])
    print("Expected Output:", item[1])
    print("Predicted Output:", prediction)


print({f"Train {metric_name}": metric_value for metric_name, metric_value in zip(model.metrics_names, train_metrics)})
print({f"Validation {metric_name}": metric_value for metric_name, metric_value in zip(model.metrics_names, val_metrics)})
print({f"Test {metric_name}": metric_value for metric_name, metric_value in zip(model.metrics_names, test_metrics)})

model.save("taunet.h5")

Input: (123.9669, 0.012, 0.2251, 15648, 0.7536, 10, 1, 1, 1, -2097225378, 0.6738, 0.8758, 550.0, 350.0)
Expected Output: (360.5123, 1144.7427)
Predicted Output: [[-0.01997519  0.02225694]]
Input: (152.5424, 0.0422, 0.245, 2145, 1.1746, 10, 1, 1, 1, 0, 0.6313, 0.2773, 550.0, 350.0)
Expected Output: (281.9786, 461.7476)
Predicted Output: [[-0.10234627 -3.1024158 ]]
Input: (114.7959, 0.0193, 0.1472, 10727, 0.5565, 10, 1, 1, 1, 288785892, 0.1961, 0.9731, 550.0, 350.0)
Expected Output: (52.7942, 812.402)
Predicted Output: [[-0.01751418  0.01637454]]
Input: (150.0, 0.015, 0.1608, 12284, 0.8636, 10, 1, 1, 1, 367950548, 0.5384, 0.8216, 550.0, 350.0)
Expected Output: (429.106, 423.9346)
Predicted Output: [[-0.01751418  0.01637454]]
Input: (167.2862, 0.0202, 0.1536, 10259, 0.9246, 10, 1, 1, 1, -1777035699, 0.6273, 0.8602, 550.0, 350.0)
Expected Output: (115.6247, 1020.5743)
Predicted Output: [[-0.01997519  0.02225694]]
Input: (109.2233, 0.037, 0.4977, 3150, 1.2654, 10, 1, 1, 1, -319688086, 0.218