# Music tagging Transformer
This file traines a transformer from the melspectogram features


In [None]:
import json
import yaml
from pathlib import Path
import save_utils

import numpy as np

from sklearn.model_selection import train_test_split

from keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping
from keras.layers import (
    Input,
    GlobalAvgPool1D,
    Dense,
    Dropout,
)
from keras.models import Model
from keras.optimizers import Adam

from transformer import Encoder

from tensorflow.python.ops import math_ops
from tensorflow.python.framework import ops
from tensorflow.python.keras import backend as K
from tensorflow.python.ops import clip_ops

## Define crossentropy and accuracy metric for the training routine
For the metric we use a binary accuracy, for the loss a binary cross-entropy loss

In [None]:
def custom_binary_accuracy(y_true, y_pred, threshold=0.5):
    threshold = math_ops.cast(threshold, y_pred.dtype)
    y_pred = math_ops.cast(y_pred > threshold, y_pred.dtype)
    y_true = math_ops.cast(y_true > threshold, y_true.dtype)

    return K.mean(math_ops.equal(y_true, y_pred), axis=-1)


def custom_binary_crossentropy(y_true, y_pred):
    y_pred = ops.convert_to_tensor(y_pred)
    y_true = math_ops.cast(y_true, y_pred.dtype)
    epsilon_ = K._constant_to_tensor(K.epsilon(), y_pred.dtype.base_dtype)
    output = clip_ops.clip_by_value(y_pred, epsilon_, 1.0 - epsilon_)

    # Compute cross entropy from probabilities.
    bce = 4 * y_true * math_ops.log(output + K.epsilon())
    bce += (1 - y_true) * math_ops.log(1 - output + K.epsilon())
    return K.sum(-bce, axis=-1)

## Define model
Define the transformer model structure with the Encoder from the transformer util file.

In [None]:
def transformer_model(model_config, n_classes):
    num_layers = model_config['n_layers']
    d_model = model_config['d_model']
    num_heads = model_config['n_heads']
    dff = model_config['dff']
    maximum_position_encoding = model_config['max_pos_encoding']
    init_lr = model_config['init_learning_rate']
    dropout_rate = model_config['dropout_rate']
    activations = model_config['activations']

    input_layer = Input((None, d_model))

    encoder = Encoder(
        num_layers=num_layers,
        d_model=d_model,
        num_heads=num_heads,
        dff=dff,
        maximum_position_encoding=maximum_position_encoding,
        rate=model_config['encoder_rate']
    )

    x = encoder(input_layer)
    x = Dropout(dropout_rate)(x)
    x = GlobalAvgPool1D()(x)
    x = Dense(4 * n_classes, activation=activations[0])(x)

    out = Dense(n_classes, activation=activations[1])(x)

    model = Model(inputs=input_layer, outputs=out)
    model.compile(optimizer=Adam(init_lr), loss=custom_binary_crossentropy, metrics=[custom_binary_accuracy])
    model.summary()
    return model


# Main routine
## Load the configuration for this model

In [None]:
# read in the config
with open('music_tag_transformer/transformer_config.yaml', 'r') as f:
    config = yaml.safe_load(f)

# Extract the values
transformer_name = config['transformer_name']
transformer_pretrained_name = config['pretrained_transformer']
batch_size = config['batch_size']
epochs = config['epochs']
data_dir = Path(config['data_dir'])

## Load preprocessed data
Load the numpy arrays and the label-class mapping. Split the data accordingly into train, validation and test set

In [None]:
melspec_data = save_utils.load_sliced_numpy_array('melspec_features', data_dir=data_dir)
labels = np.load(data_dir/'labels.npy')

with open(data_dir/'class_label_index_mapping.json', 'r') as f:
    labels_to_id = json.load(f)

mel_train, mel_test_val, lab_train, lab_test_val = train_test_split(melspec_data, labels, train_size=config['train_set_size'], random_state=config['random_state'])
mel_val, mel_test, lab_val, lab_test             = train_test_split(mel_test_val, lab_test_val, test_size=(config['val_set_size']/(1-config['train_set_size'])), shuffle=False)

# Check the shapes of the splitted sets
assert mel_train.shape[0] == lab_train.shape[0] and mel_test.shape[0] == lab_test.shape[0] and mel_val.shape[0] == lab_val.shape[0]
assert mel_train.shape[1] == mel_test.shape[1] == mel_val.shape[1] and lab_train.shape[1] == lab_test.shape[1] == lab_val.shape[1]

## Set up the model callbacks
For the initializing parameters, refer to the config file.  
For callbacks, we use a frequently backup of the model as checkpoints, and early stopping mechanism to prevent overfitting on the training data and a learningrate reducer, that smallers the update steps when the validation metric does not improve any more

In [None]:
train_config = config['training']

checkpoint = ModelCheckpoint(
    transformer_name,
    monitor=train_config['monitor'],
    verbose=1,
    save_best_only=train_config['save_best_weights'],
    mode=train_config['monitor_mode'],
    save_weights_only=False
)

early_stopping = EarlyStopping(
    monitor = 'val_loss',
    patience = 10,
    verbose = 1,
    mode = 'min',
    restore_best_weights = True,
    start_from_epoch = 5
)

# Reduce learning rate when val_loss stopps improving
lr_reduce_config = train_config['lr_reducing']
lr_reducing_on_platteau = ReduceLROnPlateau(
    monitor=lr_reduce_config['monitor'], patience=lr_reduce_config['patience'], min_lr=lr_reduce_config['min_lr'], mode=lr_reduce_config['mode']
)

## Train routine

In [None]:
# read in the config
with open('preprocess_config.yaml', 'r') as f:
    pp_config = yaml.safe_load(f)

n_mels = pp_config['melspectogram']['n_mels']
mel_train = mel_train.reshape(mel_train.shape[0], -1, )
mel_val   = mel_val.reshape(mel_val.shape[0], -1, )

model = transformer_model(config['model_structure'], n_classes=len(labels_to_id))

history = model.fit(
        x=mel_train,
        y=lab_train,
        validation_data=(mel_val, lab_val),
        batch_size=batch_size,
        epochs=epochs,
        callbacks=[checkpoint, lr_reducing_on_platteau, early_stopping],
        use_multiprocessing=True,
        verbose=2
    )

model.save('models/complete'+transformer_name)