In [None]:
import matplotlib.pyplot as plt
import numpy as np
import keras
from keras.utils import plot_model

import pydot as pyd
from keras.utils.vis_utils import plot_model, model_to_dot
keras.utils.vis_utils.pydot = pyd
import tensorflow as tf

import os, glob
import pandas as pd
import platform

CURRENT_OS = platform.system()
print(CURRENT_OS)

In [None]:
from keras.losses import mse, cosine_similarity
def make_mse_cosine_loss(alpha):
    def mse_cosine_loss(y_true, y_pred):
        # y_pred = tf.clip_by_value(y_pred, clip_value_min=0, clip_value_max=127)
        return alpha * (1 * cosine_similarity(y_true, y_pred)) + (1 - alpha) * mse(y_true, y_pred)
    return mse_cosine_loss

ALPHA = 0.15
mse_cosine_loss = make_mse_cosine_loss(ALPHA)
SAMPLE_LENGTH = 4
# model = keras.models.load_model('saved_models/mvi-v2-2023-07-19_00-33_57-h4-e5-mse_cosine_loss-alpha0.15-LSTM-luong_attention-MAESTRO.h5', custom_objects={'mse_cosine_loss': mse_cosine_loss})
model = keras.models.load_model('saved_models/mvi-v2-2023-07-20_13-00_56-h4-e5-mse_cosine_loss-alpha0.15-m0.60-LSTM-luong_attention-MAESTRO.h5', custom_objects={'mse_cosine_loss': mse_cosine_loss})
model.summary()

In [None]:
# Preprocessing starts here

from pathlib import Path
from subprocess import Popen, PIPE

DEMO_DATASET_PATH = '../demo_dataset'
UTIL_PATH = '../util'

# Generate csv from midi files
org_filenames = glob.glob(f'{DEMO_DATASET_PATH}/*.midi')
org_filenames.extend( glob.glob(f'{DEMO_DATASET_PATH}/*.mid'))
for org_filename in org_filenames:
    if org_filename.endswith('predicted.midi'):
        print('Skipping predicted midi')
        continue
    if (CURRENT_OS == 'Windows'):
        midi2csv_filename = f'{UTIL_PATH}/midi2csv.exe'
    else:
        midi2csv_filename = f'{UTIL_PATH}/midi2csv'
    process = Popen([midi2csv_filename, org_filename], stdout=PIPE, universal_newlines=True)
    (output, err) = process.communicate()
    exit_code = process.wait()

    print(output)
    print(f'midi2csv exit code: {exit_code}')
    if exit_code == 0:
        a = 1
    else:
        print(f'midi2csv is not executed correctly (exit code: {exit_code}')
        break

In [None]:
model_dataset_metadata_filepath = f'dataset32-MAESTRO-len{SAMPLE_LENGTH}.json'
# model_dataset_metadata_filepath = f'dataset32-chien2021-len{SAMPLE_LENGTH}.json'
import json

# read dataset metadata from json file
with open(model_dataset_metadata_filepath, 'r') as f:
    dataset_metadata = json.load(f)
    # time_min = dataset_metadata['time_min']
    # time_max = dataset_metadata['time_max']
    train_time_diff_min = dataset_metadata['train_time_diff_min']
    train_time_diff_max = dataset_metadata['train_time_diff_max']
    note_num_min = dataset_metadata['note_num_min']
    note_num_max = dataset_metadata['note_num_max']
    note_num_diff_min = dataset_metadata['note_num_diff_min']
    note_num_diff_max = dataset_metadata['note_num_diff_max']
    length_min = dataset_metadata['length_min']
    length_max = dataset_metadata['length_max']
    velocity_min = dataset_metadata['velocity_min']
    velocity_max = dataset_metadata['velocity_max']

In [None]:
columns_train = ['time_diff', 'note_num', 'length', 'note_num_diff', 'low_octave'] # , 'time']
columns_label = ['velocity']

def divide_list(l, n, overlapping_window=0):
    for i in range(0, len(l) - n + 1, n - overlapping_window):
        yield l[i:i + n]
    if len(l) % n != 0 and len(l) % n < n:
        yield l[-(len(l) % n):]


FEATURE_NUM = 5
FLOAT_DTYPE = np.float32
def pad_data(data, feature_num):
    if (len(data[-1]) != SAMPLE_LENGTH):
        # print(f'Length of last array: {len(data[-1])}')
        last_array = data.pop()
        # print(f'before padding: {last_array}')
        zero_array = np.zeros((SAMPLE_LENGTH - len(last_array), feature_num), dtype=np.float32)
        last_array = np.concatenate((last_array, zero_array))
        # print(f'after padding: {last_array}')
        data.append(last_array)
        # print(f'Length of last array (after padding): {len(data[-1])}')
    return data

def make_dataset(csv_data, columns_train, columns_label):
    dataset_entire_input = np.empty((0, SAMPLE_LENGTH, FEATURE_NUM), dtype=FLOAT_DTYPE)
    dataset_entire_label = np.empty((0, SAMPLE_LENGTH, 1), dtype=FLOAT_DTYPE)

    data_input_raw = np.array(csv_data[columns_train], dtype=FLOAT_DTYPE)
    data_label_raw = np.array(csv_data[columns_label], dtype=FLOAT_DTYPE)

    # normalize the time difference
    data_input_raw[:, 0] = (data_input_raw[:, 0] - train_time_diff_min) / (train_time_diff_max - train_time_diff_min)
    # normalize the note number
    data_input_raw[:, 1] = (data_input_raw[:, 1] - note_num_min) / (note_num_max - note_num_min)
    # normalize the length
    data_input_raw[:, 2] = (data_input_raw[:, 2] - length_min) / (length_max - length_min)
    data_input_raw[:, 3] = (data_input_raw[:, 3] - note_num_diff_min) / (note_num_diff_max - note_num_diff_min)
    # normalize the time
    # data_input_raw[:, 5] = (data_input_raw[:, 5] - time_min) / (time_max - time_min)
    # normalize only the velocity
    data_label_raw[:, 0] = (data_label_raw[:, 0] - velocity_min) / (velocity_max - velocity_min)

    dataset_input = list(divide_list(data_input_raw, SAMPLE_LENGTH))
    dataset_input = pad_data(dataset_input, FEATURE_NUM)
    dataset_input = np.array(dataset_input, dtype=FLOAT_DTYPE)
    dataset_entire_input = np.vstack((dataset_entire_input, dataset_input))

    dataset_label = list(divide_list(data_label_raw, SAMPLE_LENGTH))
    dataset_label = pad_data(dataset_label, 1)
    dataset_label = np.array(dataset_label, dtype=FLOAT_DTYPE)
    dataset_entire_label = np.vstack((dataset_entire_label, dataset_label))    
    
    return dataset_entire_input, dataset_entire_label

In [None]:
# Preparing data for csv
def generate_csv(csv_file, filename, columns_input, result):
    data_demo_input = np.array(csv_file[columns_input], dtype=int)
    # Get the length of data_demo_input and result_augmented
    length_data_demo = len(data_demo_input[:, 0])
    length_result_demo = len(result.reshape(-1))
    # print(length_data_demo, length_result_demo)
    if (length_data_demo < length_result_demo):
        result = result[:length_data_demo]

    data_demo_velocity = np.array(np.round(result), dtype=int)

    dataframe = pd.DataFrame({'time': data_demo_input[:, 0], 
                            'time_diff': data_demo_input[:, 1], 
                            'note_num': data_demo_input[:, 2], 
                            'length': data_demo_input[:, 3], 
                            'note_num_diff': data_demo_input[:, 4],
                            'low_octave': data_demo_input[:, 5],
                            'velocity': data_demo_velocity})
    dataframe.to_csv(filename, index=False)
columns_full_input = ['time', 'time_diff', 'note_num', 'length', 'note_num_diff', 'low_octave']

In [None]:
## util/csv2midi ??_predicted.csv ??.midi
def csv2midi(csv_filename, midi_original_filename):
    if (CURRENT_OS == 'Windows'):
        csv2midi_filename = f'{UTIL_PATH}/csv2midi.exe'
    else:
        csv2midi_filename = f'{UTIL_PATH}/csv2midi'

    process = Popen([csv2midi_filename, csv_filename, midi_original_filename], stdout=PIPE, universal_newlines=True)
    (output, err) = process.communicate()
    exit_code = process.wait()

    print(output)
    print(f'csv2midi exit code: {exit_code}')

In [None]:
current_dir = os.getcwd()

extension = 'csv'
os.chdir(DEMO_DATASET_PATH)
test_csv_filenames = glob.glob('*.{}'.format(extension))
csv_files_demo = []
for filename in test_csv_filenames:
    df = pd.read_csv(filename, index_col=None, header=0)
    df.attrs['filename'] = filename
    csv_files_demo.append(df)

os.chdir(current_dir)

In [None]:
# a = 0.1391 # mae_predict
# b = 0.1446 # sd_predict
# c = 0.6993 # f1
# x = (b * (2 - 6 * a - c)) / (c * (1 - 3 * a))
# print(f'x: {x}') # 0.16540127369027316

# rv_sd = 0.1446
# true_sd = 0.1654
# mae_norm = 0.1391

# f1 = 2 * (((rv_sd / true_sd) * (1 - (3 * mae_norm))) / ((rv_sd / true_sd) + (1 - (3 * mae_norm))))
# print(f'f1: {f1}')

In [None]:
f1_list = []
mae_list = []
mse_list = []
sd_list = []

for demo_input_csv in csv_files_demo:
    demo_dataset_input, demo_dataset_label = make_dataset(demo_input_csv, columns_train, columns_label)
    demo_dataset_result = model.predict(demo_dataset_input)
    
    # print(demo_dataset_result.shape, demo_dataset_label.shape)
    result_velocity = demo_dataset_result.reshape(-1) * velocity_max
    true = demo_dataset_label.reshape(-1) * velocity_max
    result_velocity = result_velocity.round()
    result_velocity = result_velocity.clip(0, velocity_max)
    
    # mae = np.mean(np.abs(result_velocity - true))
    # print(f'MAE: {mae}, MAE(normalized): {mae / 127}')
    rv_norm = result_velocity / 127
    true_norm = true / 127
    mae_norm = np.mean(np.abs(rv_norm - true_norm))
    mse_norm = np.mean((rv_norm - true_norm) ** 2)
    mae_list.append(mae_norm)
    mse_list.append(mse_norm)
    
    rv_sd = np.std(rv_norm)
    true_sd = np.std(true_norm)
    f1 = 2 * (((rv_sd / true_sd) * (1 - (3 * mae_norm))) / ((rv_sd / true_sd) + (1 - (3 * mae_norm))))
    f1_list.append(f1)
    sd_list.append(rv_sd)
        
    original_filename = demo_input_csv.attrs['filename']
    original_shortname = f'{Path(original_filename).stem}'
    original_extension = Path(original_filename).suffix
    
    print(f'original_filename: {original_shortname}')
    print(f'MAE(normalized): {mae_norm}, SD(predict): {rv_sd}, SD(true): {true_sd}, f1: {f1}')

    # break
    # continue

    demo_shortname = f'{original_shortname}_predicted'
    demo_output_csv_filename = f'{DEMO_DATASET_PATH}/{demo_shortname}.csv'
    generate_csv(demo_input_csv, demo_output_csv_filename, columns_full_input, result_velocity)

    # print(f'original_filename: {original_shortname}')
    
    # util/csv2midi ??_predicted.csv ??.midi
    midi_original_filename = f'{DEMO_DATASET_PATH}/{original_shortname}{original_extension}'
    csv2midi(demo_output_csv_filename, midi_original_filename)

In [None]:
# print mean f1, mae, mse score
print(f'f1: {np.mean(f1_list)}, mae: {np.mean(mae_list)}, mse: {np.mean(mse_list)}, sd: {np.mean(sd_list)}')
print(f'f1: {np.max(f1_list)}, mae: {np.max(mae_list)}, mse: {np.max(mse_list)}, sd: {np.max(sd_list)}')