In [None]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0'

import json
import pandas as pd
import operator
import joblib
import librosa
import soundfile as sf
import shutil
from collections import Counter
import numpy as np
import random
from tqdm.notebook import tqdm

import tensorflow as tf
from tensorflow import keras as k
from tensorflow.keras import layers

from sklearn.preprocessing import LabelBinarizer, MinMaxScaler
from sklearn.model_selection import train_test_split

import seaborn as sns
import matplotlib.pyplot as plt

import IPython.display as ipd

import string
letters = string.ascii_uppercase

print(tf.config.list_physical_devices('GPU'))

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
%cd /content/drive/MyDrive/CODE/CondVAEmelspec/

from generator import DataGenerator
from musicnn_tags import musicnn_tags as musicnn_tags_all

In [None]:
# PATHS
MODEL_PATH = "/content/drive/MyDrive/CODE/CondVAEmelspec/models/simple_vae/"
SPECTROGRAMS_PATH = "/content/drive/MyDrive/CODE/CondVAEmelspec/data/spectrograms/"
TAGS_PATH = "/content/drive/MyDrive/CODE/CondVAEmelspec/data/tags/"
GEN_PATH = '/content/drive/MyDrive/CODE/CondVAEmelspec/data/generations/'

In [None]:
N_data_tot = os.listdir(SPECTROGRAMS_PATH)
print('Dataset size:', len(N_data_tot))

# set number of data to train with
N = 5000
print('training on:', N)

In [None]:
def load_dataset(N):

    melspecs = os.listdir(SPECTROGRAMS_PATH)
    melspecs = [i for i in melspecs if '.npy' in i]

    x_train = []
    y_train = []
    
    for i in tqdm(melspecs[:N]):
        
        try:
        
            spec_file = os.path.join(SPECTROGRAMS_PATH, i)
            arr = np.load(spec_file)

            tags_file = os.path.join(TAGS_PATH, i)
            v = np.load(tags_file)
            
            # take top N tags only
            v = list(v)
            d = dict(zip(musicnn_tags_all, v))
            D = dict(sorted(d.items(),key=operator.itemgetter(1),reverse=True))
            tags = list(D.keys())[:1]
            
            #labels_to_keep = ['techno', 'drums', 'electronic', 'slow']

            if arr.shape == (96, 188):# and tags[0] in labels_to_keep:
                x_train.append(arr)
                y_train.extend(tags)
                
        except Exception as e:
            pass
    
    x_train = np.array(x_train)
    
    # NORMALIZE SPECTROGRAMS
    min_max_scaler = MinMaxScaler()
    x_train = min_max_scaler.fit_transform(x_train.reshape(-1, x_train.shape[-1])).reshape(x_train.shape)
    
    # reshape
    x_train = x_train.reshape(x_train.shape[0], -1).astype('float32') #/ 255
    
    all_labels = list(set(y_train))

    le = LabelBinarizer()
    transfomed_label = le.fit_transform(y_train)
    
    y_train = np.array(transfomed_label)
    
    return x_train, y_train, le, min_max_scaler, all_labels

In [None]:
# fit min_max_scaler and labels encoder
X, y, le, min_max_scaler, all_labels = load_dataset(N=N)
print(X.shape, y.shape)

In [None]:
# save labels encoder and min_max_scaler
le_path = os.path.join(MODEL_PATH, "label_encoder.joblib")
joblib.dump(le, le_path)

min_max_scaler_path = os.path.join(MODEL_PATH, "min_max_scaler.joblib")
joblib.dump(min_max_scaler, min_max_scaler_path)

In [None]:
# inspect classes distribution in train set
x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)

class_distr = le.inverse_transform(y_train)

D = sorted( Counter(class_distr).items(), key=operator.itemgetter(1), reverse=True)
D

In [None]:
del X, y

In [None]:
def create_dataset_df(N):
  
    spectrograms = os.listdir(SPECTROGRAMS_PATH)[:N]
    
    spectrograms = [i for i in spectrograms if '.npy' in i]
    tags = [i for i in spectrograms if '.npy' in i]
    
    spectrograms = [os.path.join(SPECTROGRAMS_PATH, i) for i in spectrograms]
    tags = [os.path.join(TAGS_PATH, i) for i in tags]
    
    df = pd.DataFrame()
    df['spectrograms'] = spectrograms
    df['tags'] = tags
    
    return df

In [None]:
df = create_dataset_df(N=N)
print(df.shape)
df.head()

In [None]:
BATCH_SIZE = 256
LATENT_DIM = 64
LEARNING_RATE = 0.001
EPOCHS = 1000
INPUT_SHAPE = x_train.shape[1]
CONDITION_SIZE = y_train.shape[1]

print('LATENT_DIM:', LATENT_DIM)
print('CONDITION_SIZE:', CONDITION_SIZE)

In [None]:
# save all training params
params = {}
params['DATASET'] = N
params['BATCH_SIZE'] = BATCH_SIZE
params['LATENT_DIM'] = LATENT_DIM
params['LEARNING_RATE'] = LEARNING_RATE
params['EPOCHS'] = EPOCHS
params['INPUT_SHAPE'] = INPUT_SHAPE
params['CONDITION_SIZE'] = CONDITION_SIZE
params['LABELS'] = all_labels

with open(os.path.join(MODEL_PATH, 'training_params.json'), 'w') as outfile:
    json.dump(params, outfile, indent=4)

In [None]:
# sampling
def sampling(args, latent_dim=LATENT_DIM):
    mean, log_var = args
    eps = tf.random.normal(shape=(tf.shape(mean)[0], latent_dim), mean=0., stddev=1.0)
    return mean + tf.exp(log_var/2.) * eps

## encoder
def make_encoder(input_shape=INPUT_SHAPE,
                 latent_dim=LATENT_DIM, 
                 condition_size=CONDITION_SIZE, 
                 batch_size=BATCH_SIZE):
    
    x = layers.Input(shape=(input_shape,))
    c = layers.Input(shape=(condition_size,))
    inputs = layers.concatenate([x,c],axis=1)
    h = layers.Dense(units=512,activation='relu')(inputs)
    h = layers.Dense(units=512,activation='relu')(h)#
    h = layers.Dense(units=512,activation='relu')(h)#
    h = layers.Dense(units=512,activation='relu')(h)#
    h = layers.Dense(units=256,activation='relu')(h)
    mean = layers.Dense(units=latent_dim)(h)
    log_var = layers.Dense(units=latent_dim)(h)
    return tf.keras.Model(inputs=[x,c],outputs=[mean,log_var], name='encoder')


## decoder
def make_decoder(output_shape=INPUT_SHAPE, 
                 batch_size=BATCH_SIZE, 
                 latent_dim=LATENT_DIM, 
                 condition_size=CONDITION_SIZE):
    
    z = layers.Input(shape=(latent_dim,))
    c = layers.Input(shape=(condition_size,))
    con = layers.concatenate([z, c], axis=1)
    h1 = layers.Dense(units=256,activation='relu')(con)
    h2 = layers.Dense(units=512,activation='relu')(h1)
    y = layers.Dense(units=output_shape,activation='sigmoid')(h2)
    return tf.keras.Model(inputs=[z,c],outputs=y, name='decoder')

## loss
def loss(x, y, mean, log_var, alpha=1.0, beta=1.0):
    reconstruction_loss = tf.keras.losses.mean_squared_error(y_true=x, y_pred=y)
    reconstruction_loss = tf.reduce_mean(reconstruction_loss, name='recon_loss')
    kl_loss = - 0.5 * tf.reduce_mean(log_var - tf.square(mean) - tf.exp(log_var) + 1)
    kl_loss = tf.identity(kl_loss, name="kl_loss")
    cvae_loss = alpha*reconstruction_loss + beta*kl_loss
    return cvae_loss

In [None]:
def build_model():
  # making encoder and decoder models
  encoder = make_encoder()
  #print(encoder.summary(),'\n\n********************************\n\n')
  decoder = make_decoder()
  #print(decoder.summary(),'\n\n********************************\n\n')

  # CVAE
  x = layers.Input(shape=(INPUT_SHAPE,))
  c = layers.Input(shape=(CONDITION_SIZE,))
  mean, log_var = encoder([x,c])
  z = layers.Lambda(sampling)([mean, log_var])
  y = decoder([z,c])

  cvae = tf.keras.Model(inputs=[x, c], outputs=y, name='cvae')
  cvae.add_loss(loss(x, y, mean,log_var, alpha=10))
  cvae.compile(optimizer=tf.keras.optimizers.Adam(LEARNING_RATE))
  #print(cvae.summary())

  return encoder, decoder, cvae

## Train

In [None]:
## train
encoder, decoder, cvae = build_model()
cvae.fit((x_train, y_train), 
         epochs=EPOCHS, 
         batch_size=BATCH_SIZE)

In [None]:
#@title #### [commented] fit with generator code
xxx = """
continue_training = False

encoder, decoder, cvae = build_model()

if continue_training:
    print('Continuing training')
    cvae.load_weights(os.path.join(MODEL_PATH, 'vae.h5'))
    
else:
    print('Train from scratch')

model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
                        MODEL_PATH,
                        monitor="val_loss",
                        verbose=0,
                        save_best_only=True,
                        save_weights_only=False,
                        mode="auto",
                        save_freq="epoch",
                        options=None
                    )

# generator
training_generator = DataGenerator(df, le, min_max_scaler, batch_size=BATCH_SIZE)
validation_generator = DataGenerator(df, le, min_max_scaler, batch_size=BATCH_SIZE)

# train
cvae.fit(training_generator, 
        epochs=EPOCHS,
        validation_data=validation_generator,
        #callbacks=[model_checkpoint_callback],
        use_multiprocessing=True,
        workers=24)
"""

In [None]:
# save model
cvae.save(MODEL_PATH)

encoder.save( os.path.join(MODEL_PATH, 'encoder.h5') )
decoder.save( os.path.join(MODEL_PATH, 'decoder.h5') )
cvae.save( os.path.join(MODEL_PATH, 'vae.h5') )

## Generate

In [None]:
# params
SR = 16000 #22050
FFT_HOP = 256
FFT_SIZE = 512
N_MELS = 96
N_ITER = 32

In [None]:
def mel2audio(signal, save=False):

    signal = min_max_scaler.inverse_transform(signal)

    plt.imshow(signal)

    # convert melspec back to audio
    # Invert mel-spectrogram
    S_inv = librosa.feature.inverse.mel_to_stft(signal, sr=SR, n_fft=FFT_HOP*4)
    y = librosa.griffinlim(S_inv, n_iter=N_ITER, hop_length=FFT_HOP)
    
    duration = librosa.get_duration(y=y, sr=SR)
    
    if save:
      filename = ''.join(random.choice(letters) for i in range(10))
      path = os.path.join(GEN_PATH, 'j' + '.wav')
      sf.write(path, y, SR, 'PCM_24')
    
    return y

def mel2audio(mel):

    y = librosa.feature.inverse.mel_to_audio(mel, 
                                         sr=SR, 
                                         n_fft=FFT_SIZE, 
                                         hop_length=FFT_HOP, 
                                         win_length=FFT_SIZE, 
                                         window='hann', 
                                         center=True, 
                                         pad_mode='reflect', 
                                         power=2.0, 
                                         n_iter=32, 
                                         length=None)
    
    return y

In [None]:
#all_labels

In [None]:
# select label
lab = 'electronic'
#lab = all_labels[11]
print(lab)

cond_vec = le.transform([lab])

# if cond vector over all tags
#cond_vec = np.random.uniform(low=0, high=1, size=(CONDITION_SIZE,)).reshape(1,-1)

z_sample = tf.random.normal(shape=(CONDITION_SIZE, LATENT_DIM))
generated = decoder.predict([z_sample, np.repeat(cond_vec, CONDITION_SIZE, axis=0)],steps=1)\
            .reshape(CONDITION_SIZE, 96, 188)

print(generated.shape)

idx = 3

signal = generated[idx,:,:]

y = mel2audio(signal)
ipd.Audio(y, rate=SR)

In [None]:
# PLOT TRAIN SAMPLE
j = x_train[1].reshape(96, 188)
j = x_train[19].reshape(96, 188)

y = mel2audio(j)
ipd.Audio(y, rate=SR)

In [None]:
## plot latent space
n = 60000
mean, logvar = encoder.predict([x_train[:n,:], y_train[:n,:]])

labels = le.inverse_transform(y_train)

cmap = sns.color_palette("hls", len(all_labels))
plt.figure(figsize=(10,10))
sns.scatterplot(x=mean[:,0], y=mean[:,1], hue=labels) #, palette=cmap)
plt.show()

In [None]:
if os.path.exists(GEN_PATH) and os.path.isdir(GEN_PATH):
    shutil.rmtree(GEN_PATH)
    os.mkdir(GEN_PATH)
else:
    os.mkdir(GEN_PATH)

In [None]:
## plot generated images
for cond_tag in all_labels:
    
    cond_vec = le.transform([cond_tag])
    
    #cond_vec = np.random.uniform(low=0, high=1, size=(CONDITION_SIZE,)).reshape(1,-1)
    
    z_sample = tf.random.normal(shape=(CONDITION_SIZE, LATENT_DIM))
    generated = decoder.predict([z_sample, np.repeat(cond_vec, CONDITION_SIZE, axis=0)],steps=1).reshape(CONDITION_SIZE, 188, 96)
    plt.figure(figsize=(20, 10))
    
    for i in range(3):
        
        #signal = generated[i,:,:].reshape(96, 188)
        signal = generated[i].reshape(96, 188)
        
        plt.subplot(1, 5, i+1)
        plt.axis('off')
        plt.title(str(cond_tag))
        plt.imshow(signal) #, cmap='gray')
        
        save_audio = False
        
        if save_audio:
            
            y = mel2audio(signal)
            #ipd.Audio(y, rate=SR)
            filename = ''.join(random.choice(letters) for i in range(3))
            path = os.path.join(GEN_PATH, cond_tag + '_' + filename + '.wav')
            sf.write(path, y, SR, 'PCM_24')

plt.show()