In [None]:
import json
import tensorflow as tf

import IPython
import sys
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf

from tensorflow.keras.layers import Dense, Activation, Dropout, Input, LSTM, Reshape, Lambda, RepeatVector, Conv1D
from tensorflow.keras.layers import GRU, Bidirectional, BatchNormalization, Reshape
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical

from pydub import AudioSegment
import random
import io
import os
import glob
import librosa
from scipy.io.wavfile import write

In [None]:
with open('Osu_song_data_ABO.json') as f: #generated by osu_audio_bpm_extractor
    Data = json.load(f)
    print(len(Data.keys()))

In [None]:
Random_song = list(Data.keys())[np.random.randint(len(Data.keys()))]
audio, Bpm, Offset = Data[Random_song]
Audio = Random_song + '/' + audio

print(Random_song, audio, Bpm, Offset)


In [None]:
IPython.display.Audio(Audio)

In [None]:
y, sr = librosa.load(Audio)

In [None]:
print(len(y), sr, librosa.get_duration(y), len(y)/sr)

In [None]:
def graph_spectrogram(data, rate):

    nfft = 200 # Length of each window segment
    fs = 8000 # Sampling frequencies
    noverlap = 120 # Overlap between windows
    nchannels = data.ndim
    if nchannels == 1:
        pxx, freqs, bins, im = plt.specgram(data, nfft, fs, noverlap = noverlap)
    elif nchannels == 2:
        pxx, freqs, bins, im = plt.specgram(data[:,0], nfft, fs, noverlap = noverlap)
    return pxx

In [None]:
#librosa.reassigned_spectrogram(y, sr)

In [None]:
#plt.specgram()

In [None]:
graph_spectrogram(y, sr)

In [None]:
def audio_section(start_ms, duration_s, data, sr):
    """start in ms, duration in sec, data (y), sr=rate"""
    start_s = start_ms/1000.0
    start = int(start_s * sr)
    end_s = start_s + duration_s
    end = int(end_s * sr)
    if end > len(data):
        return(data[start:])
    else:
        return(data[start:end])

In [None]:
ys = audio_section(Offset, 10.0, y, sr)

In [None]:
x = graph_spectrogram(ys, sr)
n_freq, Tx = x.shape
print(n_freq, Tx)
print(x)

In [None]:
write('temp_audio_section.wav', sr, ys)

In [None]:
IPython.display.Audio('temp_audio_section.wav')

In [None]:
def model1(input_shape):
    X_input = Input(shape = input_shape)
    
    #1st layerS
    X = Conv1D(filters=196, kernel_size=10, strides=2)(X_input)
    X = BatchNormalization()(X)
    X = Activation("relu")(X)
    X = Dropout(rate=0.8)(X)
    
    #2nd layerS
    X = GRU(units=128, return_sequences = True)(X)
    X = Dropout(rate=0.8)(X)
    X = BatchNormalization()(X)
    
    X = GRU(units=128, return_sequences = True)(X)
    X = Dropout(rate=0.8)(X)
    X = BatchNormalization()(X)
    
    #out
    X = GRU(units=128, return_sequences = False)(X)
    X = Dense(1, activation = "relu")(X)

    model = Model(inputs = X_input, outputs = X)
    return(model)

In [None]:
def model2(input_shape):
    X_input = Input(shape = input_shape)
    
    #1st layerS
    X = Conv1D(filters=196, kernel_size=6, strides=2)(X_input)
    X = BatchNormalization()(X)
    X = Activation('linear')(X)
    X = Dropout(rate=0.6)(X)
    
    #2nd layerS
    X = LSTM(128, return_sequences = True)(X)
    X = Dropout(rate=0.8)(X)
    X = BatchNormalization()(X)
    
    X = LSTM(128, return_sequences = False)(X)
    X = Dropout(rate=0.8)(X)
    X = BatchNormalization()(X)
    X = Dropout(rate=0.8)(X)
    
    X = Dense(1)(X)
    #X = Activation('linear')(X)

    model = Model(inputs = X_input, outputs = X)
    return(model)

In [None]:
def modelf(input_shape):
    return(
        model2(input_shape)
    )

In [None]:
model = modelf((Tx, n_freq))
model.summary()

In [None]:
def create_training_and_validation_samples(data, clip_duration_s, checkpoint = False):
    X = []
    Y = []
    Songs = []
    
    _save_dir = "D:/temp"+"/"
    
    for i, song in enumerate(data.keys()):
        audio, Bpm, Offset = data[song]
        Audio = song + '/' + audio
        y, sr = librosa.load(Audio)
        ys = audio_section(Offset, clip_duration_s, y, sr)
        x = graph_spectrogram(ys, sr)
        if len(X)<=0:
            X.append(x.swapaxes(0,1))
            Y.append(Bpm)
            Songs.append(song)
            continue
        
        if x.swapaxes(0,1).shape==X[-1].shape:
            X.append(x.swapaxes(0,1))
            Y.append(Bpm)
            Songs.append(song)
        else:
            print(i,Audio, x.shape)
    
        print("finished processing {}/{} songs".format(i, len(data.keys())))
        
        if checkpoint and i%checkpoint == 0:
            np.save(_save_dir + "X_dataset_OsuBPM.npy", np.asarray(X))
            np.save(_save_dir + "Y_dataset_OsuBPM.npy", np.asarray(Y))
            np.save(_save_dir + "Songs_dataset_OsuBPM.npy", np.asarray(Songs))
        #    print("finished processing {} songs".format(i))
    return(np.array(X), np.array(Y), np.array(Songs))


In [None]:
with open('Osu_song_data_ABO.json') as f:
    Data = json.load(f)
X_dataset, Y_dataset, Songs_dataset = create_training_and_validation_samples(Data, 10.0, checkpoint=50)

np.save("D:/temp"+"/" + "X_dataset_OsuBPM_Final.npy", X_dataset)
np.save("D:/temp"+"/" + "Y_dataset_OsuBPM_Final.npy", Y_dataset)
np.save("D:/temp"+"/" + "S_dataset_OsuBPM_Final.npy", Songs_dataset)

In [None]:
X = np.load("D:/temp"+"/" + "X_dataset_OsuBPM.npy")
Y = np.load("D:/temp"+"/" + "Y_dataset_OsuBPM.npy")
S = np.load("D:/temp"+"/" + "Songs_dataset_OsuBPM.npy")
print(X.shape, Y.shape, S.shape)

In [None]:
print(X[0], Y[0], S[0])

In [None]:
#np.random.seed(0)
#np.random.shuffle(X)

shuffler = np.random.permutation(len(Y))
X_shuff = X[shuffler,:,:]
Y_shuff = Y[shuffler]
S_shuff = S[shuffler]

In [None]:
print(X_shuff[0], Y_shuff[0], S_shuff[0])

In [None]:
nb_data = len(Y)
nb_train = int(nb_data*0.7)
print(nb_data, nb_train)

X_train = X_shuff[:nb_train, :,  :]
Y_train = Y_shuff[:nb_train]

X_test = X_shuff[nb_train:, : , :]
Y_test = Y_shuff[nb_train:]

In [None]:
model.load_weights('./0')

In [None]:
opt = Adam(learning_rate=1e-2, beta_1=0.9, beta_2=0.999)
model.compile(optimizer=opt, loss='mean_squared_error', metrics=['mean_squared_error'])

In [None]:
history = model.fit(X_train, Y_train, epochs=3, validation_split=0.0, shuffle=True, batch_size=16)

In [None]:
model.save_weights('./weight_m4_0')

In [None]:
loss, acc, = model.evaluate(X_test, Y_test)
print("loss = ", loss)

In [None]:
loss = history.history['loss']

plt.subplot(2, 1, 2)
plt.plot(loss, label='Training Loss')
plt.legend(loc='upper right')
plt.ylabel('MSE')
#plt.ylim([0,1.0])
plt.title('Training and Validation Loss')
plt.xlabel('epoch')
plt.show()

In [None]:
####Test

file = r'path to a test file'
y, sr = librosa.load(file)
ys = audio_section(6592, 10.0, y, sr)
x = graph_spectrogram(ys, sr)
x = np.expand_dims(x.swapaxes(0,1), axis=0)

#print(x.shape)
#model.predict(x)

In [None]:
#or alternatively
path = r'path to a test file'
a, b, o = Data[path]
print(b, o)

In [None]:
write('temp_test.wav', sr, ys)
IPython.display.Audio('temp_test.wav')

In [None]:
###Test
predictions = model.predict(x)
print(predictions)

In [None]:
#Test on training samples
n=0
x = X_train[n]
y = Y_train[n]

print(x)
print(np.amax(x), np.amin(x))

pred = model.predict(np.array([x]))
print(int(y), pred[0][0])
print("error = {} bpm".format(int(y) - int(pred[0][0])))

In [None]:
#print weights
l = 3

print(model.layers[l])
print(model.layers[l].get_weights()[0])