In [None]:
# !pip install livelossplot

In [None]:
import tensorflow as tf
from tqdm import tqdm
import os
import math
import numpy as np
import matplotlib.pyplot as plt
import librosa
import librosa.display
from livelossplot import PlotLossesKeras
# import torchaudio
import random
import pandas as pd
from IPython.display import clear_output

In [None]:
def plot_spectrogram(spec, title=None, ylabel='freq_bin', aspect='auto', xmax=None):
    fig, axs = plt.subplots(1, 1)
    axs.set_title(title or 'Spectrogram (db)')
    axs.set_ylabel(ylabel)
    axs.set_xlabel('frame')
    im = axs.imshow(librosa.power_to_db(spec), origin='lower', aspect=aspect)
    if xmax:
        axs.set_xlim((0, xmax))
        fig.colorbar(im, ax=axs)
        plt.show(block=False)

In [None]:
encoder_input = tf.keras.Input(shape = (256,4096,1),name = "mel")
x = tf.keras.layers.Conv2D(
    filters= 512,
    activation = "relu",
    kernel_size = 3,
    strides = 2,
    padding = "same",
    name=f"Encoder_Conv2d_{1}"
    )(encoder_input)
x = tf.keras.layers.BatchNormalization(name=f"Encoder_BatchNorm_{1}")(x)

x = tf.keras.layers.Conv2D(
    filters= 256,
    activation = "relu",
    kernel_size = 3,
    strides = 2,
    padding = "same",
    name=f"Encoder_Conv2d_{2}"
    )(x)
x = tf.keras.layers.BatchNormalization(name=f"Encoder_BatchNorm_{2}")(x)

x = tf.keras.layers.Conv2D(
    filters= 128,
    activation = "relu",
    kernel_size = 3,
    strides = 2,
    padding = "same",
    name=f"Encoder_Conv2d_{3}"
    )(x)
x = tf.keras.layers.BatchNormalization(name=f"Encoder_BatchNorm_{3}")(x)

x = tf.keras.layers.Conv2D(
    filters= 64,
    activation = "relu",
    kernel_size = 3,
    strides = 2,
    padding = "same",
    name=f"Encoder_Conv2d_{4}"
    )(x)
x = tf.keras.layers.BatchNormalization(name=f"Encoder_BatchNorm_{4}")(x)

x = tf.keras.layers.Conv2D(
    filters= 32,
    activation = "relu",
    kernel_size = 3,
    strides = 2,
    padding = "same",
    name=f"Encoder_Conv2d_{5}"
    )(x)
x = tf.keras.layers.BatchNormalization(name=f"Encoder_BatchNorm_{5}")(x)

x = tf.keras.layers.Flatten()(x)
encoder_output = tf.keras.layers.Dense(64,activation="relu")(x)
decoder_input = tf.keras.layers.Dense(32768,activation="relu")(encoder_output)
x = tf.keras.layers.Reshape((8,128,32))(decoder_input)

# Decode block
x = tf.keras.layers.Conv2DTranspose(
filters= 64,
activation = "relu",
kernel_size = 3,
strides = 2,
padding = "same",
name=f"Decoder_Conv2d_{1}"
)(x)
x = tf.keras.layers.BatchNormalization(name=f"Decoder_BatchNorm_{1}")(x)

x = tf.keras.layers.Conv2DTranspose(
filters= 128,
activation = "relu",
kernel_size = 3,
strides = 2,
padding = "same",
name=f"Decoder_Conv2d_{2}"
)(x)
x = tf.keras.layers.BatchNormalization(name=f"Decoder_BatchNorm_{2}")(x)

x = tf.keras.layers.Conv2DTranspose(
filters= 256,
activation = "relu",
kernel_size = 3,
strides = 2,
padding = "same",
name=f"Decoder_Conv2d_{3}"
)(x)
x = tf.keras.layers.BatchNormalization(name=f"Decoder_BatchNorm_{3}")(x)

x = tf.keras.layers.Conv2DTranspose(
filters= 512,
activation = "relu",
kernel_size = 3,
strides = 2,
padding = "same",
name=f"Decoder_Conv2d_{4}"
)(x)
x = tf.keras.layers.BatchNormalization(name=f"Decoder_BatchNorm_{4}")(x)

final_output = tf.keras.layers.Conv2DTranspose(
filters= 1,
activation = "linear",
kernel_size = 3,
strides = 2,
padding = "same",
name=f"Decoder_Conv2d_{5}"
)(x)

AutoEncoder = tf.keras.Model(encoder_input,final_output,name = "AutoEncoder")

In [None]:
# AutoEncoder.load_weights("final_checkpoints/cp-0009.ckpt")

In [None]:
class TrainGenerator(tf.keras.utils.Sequence):
    def __init__(self, batch_size: int = 32, validate_: bool = False, shuffle: bool = True):
        self.shuffle = shuffle
        self.batch_size = batch_size
        self.relative_path = "Single_NPYs/"
        self.song_npys = os.listdir(self.relative_path)
        self.validate_ = validate_

        self.song_indices = np.asarray(list(range(len(self.song_npys))))
        self.len_dataset = len(self.song_npys)
        # if self.validate_== False:
        #     self.song_indices = self.song_indices[:int(0.8*self.len_dataset)]
        # else:
        #     self.song_indices = self.song_indices[int(0.8*self.len_dataset):]
        self.len_dataset = self.song_indices.shape[0]
        if self.shuffle:
            self.__shuffle()

    def __shuffle(self):
        shuffler = np.random.permutation(self.len_dataset)
        self.song_indices = self.song_indices[shuffler]

    def __len__(self):
        return math.ceil(self.len_dataset / self.batch_size)

    def __getitem__(self, index):
        temp_indices = self.song_indices[index*self.batch_size:(index+1)*self.batch_size]
        data = np.zeros((temp_indices.shape[0],256,4096,1))
        for ind, value in enumerate(temp_indices):
            path = self.relative_path+self.song_npys[value]
            data[ind] = np.asarray(np.load(path, allow_pickle=True))
        return data, data

    def on_epoch_end(self):
        if self.shuffle:
            self.__shuffle()

In [None]:
training_generator = TrainGenerator(batch_size=4)
# validation_generator = TrainGenerator(batch_size=4, validate_=True, shuffle=False)

In [None]:
checkpoint_path = "after_9_checkpoints/cp-{epoch:04d}.ckpt"
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_path,
    save_weights_only=True,
    save_freq=len(training_generator)*1)

In [None]:
phase1_hist=[]
AutoEncoder.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.005),
    loss="mse",
    metrics=["mae"])

In [None]:
history = AutoEncoder.fit(training_generator, 
            callbacks=[model_checkpoint_callback, PlotLossesKeras()],
            # validation_data=validation_generator, 
            epochs=3)
phase1_hist.append(history)

In [None]:
from tqdm import tqdm
lis_fail = []
folder = "Single_NPYs/"
cor = 0
fa = 0
for i in tqdm(os.listdir(folder), ncols = 100):
    try:
        ttt = np.load(folder + i, allow_pickle=True)
        cor += 1
    except:
        fa += 1
        lis_fail.append(i)
print("Correct =", cor)
print("False =", lis_fail)

In [None]:
def plot_spectrogram(spec, title=None, ylabel='freq_bin', aspect='auto', xmax=None):
    fig, axs = plt.subplots(1, 1)
    axs.set_title(title or 'Spectrogram (db)')
    axs.set_ylabel(ylabel)
    axs.set_xlabel('frame')
    im = axs.imshow(librosa.power_to_db(spec), origin='lower', aspect=aspect)
    if xmax:
        axs.set_xlim((0, xmax))
        fig.colorbar(im, ax=axs)
        plt.show(block=False)

In [None]:
hist_mae=[]
hist_mae_val=[]
hist_loss=[]
hist_loss_val=[]
for i in phase1_hist:
    hist_mae.extend(i.history['mae'])
    hist_mae_val.extend(i.history['val_mae'])
    hist_loss.extend(i.history['loss'])
    hist_loss_val.extend(i.history['val_loss'])

# summarize history for mae
plt.plot(hist_mae)
plt.plot(hist_mae_val)
plt.title('mae'); plt.ylabel('mae'); plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper right')
plt.show()

# summarize history for loss
plt.plot(hist_loss)
plt.plot(hist_loss_val)
plt.title('loss'); plt.ylabel('loss'); plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper right')
plt.show()

In [None]:
cols = [str(i) for i in range(0,64)]

# track_features = pd.DataFrame(columns = ['song_id', *cols])
track_features = pd.read_csv('track_features.csv')
done_songs = set(track_features["song_id"])

def add_to_df():
    global track_features
    for i in encodings:
        if i not in done_songs:
            song = {}
            song['song_id'] = i
            for j in cols:
                song[str(j)] = encodings[i][int(j)]
            track_features = track_features.append(song, ignore_index = True)
            done_songs.add(i)
    track_features.to_csv('track_features.csv', index = False)

In [None]:
len(done_songs)

In [None]:
track_features.shape

In [None]:
numpy_files = os.listdir('Single_NPYs')
encodings = {}
intermediate_layer_model = tf.keras.Model(inputs=AutoEncoder.input,
                                       outputs=AutoEncoder.layers[12].output)

batch_size = 1
count = 0
count_total = 0
inp_array = np.zeros((batch_size, 256,4096,1))
inp_names = ['name' for i in range(batch_size)]
for song in tqdm(sorted(numpy_files), ncols = 100):
    if song.split(".")[0] in done_songs:
        continue
    print(song)
    inp_array[count] = np.expand_dims(np.load("Single_NPYs/"+song, allow_pickle = True), axis=0)
    inp_names[count] = song
    count = (count + 1) % batch_size

    if count % batch_size == 0:
        out_array = intermediate_layer_model(inp_array).numpy()

        for i in range(batch_size):
            encodings[inp_names[i].split(".")[0]] = out_array[i]
        inp_array = np.zeros((batch_size, 256,4096,1))
        inp_names = ['name' for i in range(batch_size)]

    if count_total % (batch_size * 50) == 0:
        add_to_df()
        print("added "+str(count_total)+" songs to csv")
        clear_output(wait=True)
    count_total += 1
add_to_df()