# Variational Autoencoder - LTSM

## Imports

In [1]:
import tensorflow as tf
from tensorflow import keras

# Imports from keras
from tensorflow.keras import Input
from tensorflow.keras.models import Model
from tensorflow.keras.layers import LSTM, Lambda, Dense, RepeatVector
import tensorflow.keras.backend as K
from tensorflow.keras.utils import plot_model
from tensorflow.keras.losses import mse

import numpy as np
import librosa

from os import walk
import os

import datetime

Import requested from: 'numba.decorators', please update to use 'numba.core.decorators' or pin to Numba version 0.48.0. This alias will not be present in Numba version 0.50.0.[0m
  from numba.decorators import jit as optional_jit
Import of 'jit' requested from: 'numba.decorators', please update to use 'numba.core.decorators' or pin to Numba version 0.48.0. This alias will not be present in Numba version 0.50.0.[0m
  from numba.decorators import jit as optional_jit


## Preprocessing

In [2]:
def has_sound(data, threshold = 0.0005, per = 0.2):
    abs = np.abs(data)
    num_has_sound = np.sum(np.where(abs>threshold, 1, 0))
    ratio = num_has_sound/data.shape[0]
    return ratio >= per

    
def rescale(x):
    return 1 + 2*np.maximum(0.1*np.log10(x + np.sqrt(np.finfo(float).eps)),-1) #add epsilon to avoid divide by zero

    
def spectrum_format(data):
    fft = np.fft.fft(data)
    num_of_samples = int((data.shape[0]/2)) + 1
    fft_abs = np.abs(fft[0:num_of_samples])/data.shape[0]
    return rescale(fft_abs)

class GetSounds(keras.utils.Sequence):
    def __init__(self, path, batch_size = 20000):
        self.file_names = []
        for (dirpath, dirnames, filenames) in walk(path):
            self.file_names = filenames
            break
        self.path = path
        self.batch_size = batch_size
        self.on_epoch_end()
        
    def on_epoch_end(self):
        np.random.shuffle(self.file_names)
        
    def __len__(self):
        return int(np.ceil(len(self.file_names) / float(self.batch_size)))
    
    def __getitem__(self, idx):
        batch_names = self.file_names[self.batch_size*idx:self.batch_size*(idx+1)]
        out = np.empty([self.batch_size,124, 513])
        for i,file_name in enumerate(batch_names):
            out_i = 0
            stream = librosa.stream(self.path + file_name,
                            block_length=1,
                            frame_length=1024,
                            hop_length=512)
            for chunk in stream:
                out[i,out_i] = spectrum_format(chunk)
                out_i = out_i + 1
        return out.astype('float32',casting='same_kind')

### Setup Input Data Pipeline

In [3]:
x_train = GetSounds('../nsynth-train/audio/',batch_size=10000)[0]

In [4]:
x_test = GetSounds('../nsynth-test/audio/',batch_size=4096)[0]

## Model Setup

### Utility functions to build each section

In [5]:
# Reparameterization trick to push the N(0,1) into the back prop inputs
def sample(args):
    z_mean, z_log_var = args
    batch = K.shape(z_mean)[0]
    dim = K.int_shape(z_mean)[1]
    epsilon = K.random_normal(shape=(batch, dim))
    return z_mean + K.exp(0.5 * z_log_var) * epsilon

### Define model parameters and construct the model

In [44]:
# Model Parameters
input_dim = 513
timesteps = 124
intermediate_dim = 64
latent_dim = 20

# Construct the model
x = Input(shape=(timesteps, input_dim,))

# LSTM Encoder
h = LSTM(intermediate_dim)(x)

# Generate Distribution
z_mu = Dense(latent_dim)(h)
z_log_sigma = Dense(latent_dim)(h)

# Sample from Distribution
z = Lambda(sample, output_shape=(latent_dim,))([z_mu, z_log_sigma])

# decoded LSTM layer
decoder_h = LSTM(intermediate_dim, return_sequences=True)
decoder_mean = LSTM(input_dim, return_sequences=True)

h_decoded = RepeatVector(timesteps)(z)
h_decoded = decoder_h(h_decoded)

# decoded layer
x_bar = decoder_mean(h_decoded)

# Full Autoencoder
vae = Model(x, x_bar)

# Encoder, Input -> Latent Space
encoder = Model(x, z_mu)

# generator, from latent space to reconstructed inputs
decoder_input = Input(shape=(latent_dim,))

_h_decoded = RepeatVector(timesteps)(decoder_input)
_h_decoded = decoder_h(_h_decoded)

_x_bar = decoder_mean(_h_decoded)
generator = Model(decoder_input, _x_bar)

In [7]:
# Setup cost functions
def loss(x, x_bar,beta):
    xent_loss = mse(x, x_bar)
    kl_loss = - 0.5 * K.mean(1 + z_log_sigma - K.square(z_mu) - K.exp(z_log_sigma))
    loss = xent_loss + beta*kl_loss
    return loss
    
vae.add_loss(loss(x,x_bar,1.1e-6))

In [8]:
# Finally, compile the model!
vae.compile(optimizer='adam')

## Training Setup

In [9]:
# Epochs and batch size
epochs = 50

checkpoint_path = "training_ltsm/cp.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)

# Model Checkpoint
cp_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_path, 
    verbose=1, 
    save_weights_only=True,
    period=1)

vae.save_weights(checkpoint_path.format(epoch=0))
#vae.load_weights(checkpoint_path)



In [14]:
# TensorBoard Callbacks
log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
    
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

# Train boi
vae.fit(x_train,epochs=epochs,validation_data=[x_test], verbose = 1,batch_size=512,callbacks=[tensorboard_callback,cp_callback])

Epoch 1/50
Epoch 00001: saving model to training_ltsm/cp.ckpt
Epoch 2/50
Epoch 00002: saving model to training_ltsm/cp.ckpt
Epoch 3/50
Epoch 00003: saving model to training_ltsm/cp.ckpt
Epoch 4/50
Epoch 00004: saving model to training_ltsm/cp.ckpt
Epoch 5/50
Epoch 00005: saving model to training_ltsm/cp.ckpt
Epoch 6/50
Epoch 00006: saving model to training_ltsm/cp.ckpt
Epoch 7/50
Epoch 00007: saving model to training_ltsm/cp.ckpt
Epoch 8/50
Epoch 00008: saving model to training_ltsm/cp.ckpt
Epoch 9/50
Epoch 00009: saving model to training_ltsm/cp.ckpt
Epoch 10/50
Epoch 00010: saving model to training_ltsm/cp.ckpt
Epoch 11/50
Epoch 00011: saving model to training_ltsm/cp.ckpt
Epoch 12/50
Epoch 00012: saving model to training_ltsm/cp.ckpt
Epoch 13/50
Epoch 00013: saving model to training_ltsm/cp.ckpt
Epoch 14/50
Epoch 00014: saving model to training_ltsm/cp.ckpt
Epoch 15/50
Epoch 00015: saving model to training_ltsm/cp.ckpt
Epoch 16/50
Epoch 00016: saving model to training_ltsm/cp.ckpt
E

Epoch 00039: saving model to training_ltsm/cp.ckpt
Epoch 40/50
Epoch 00040: saving model to training_ltsm/cp.ckpt
Epoch 41/50
Epoch 00041: saving model to training_ltsm/cp.ckpt
Epoch 42/50
Epoch 00042: saving model to training_ltsm/cp.ckpt
Epoch 43/50
Epoch 00043: saving model to training_ltsm/cp.ckpt
Epoch 44/50
Epoch 00044: saving model to training_ltsm/cp.ckpt
Epoch 45/50
Epoch 00045: saving model to training_ltsm/cp.ckpt
Epoch 46/50
Epoch 00046: saving model to training_ltsm/cp.ckpt
Epoch 47/50
Epoch 00047: saving model to training_ltsm/cp.ckpt
Epoch 48/50
Epoch 00048: saving model to training_ltsm/cp.ckpt
Epoch 49/50
Epoch 00049: saving model to training_ltsm/cp.ckpt
Epoch 50/50
Epoch 00050: saving model to training_ltsm/cp.ckpt


<tensorflow.python.keras.callbacks.History at 0x7f5f3df30310>

## Postprocessing

In [60]:
def vae_predict(input):
    inter = encoder.predict(input)
    return generator.predict(inter)

In [16]:
def inverseRescale(x):
    return np.power(10, (5*(x-1)))

def apply_to_chunk(data):
    fft = np.fft.fft(data)
    num_of_samples = int((data.shape[0]/2)) + 1
    fft_abs = np.abs(fft[0:num_of_samples])/data.shape[0]
    fft_arg = np.angle(fft[0:num_of_samples])
    rescaled = rescale(fft_abs)
    predict_abs = inverseRescale(vae_predict(np.reshape(rescaled, [1, 513])))
    predict_complex = predict_abs * np.exp(1j * fft_arg)
    predict_complex = np.append(predict_complex, np.zeros(data.shape[0] - num_of_samples))
    out_complex = np.fft.ifft(predict_complex)
    return np.real(out_complex)
    
def apply_to_file(input_filename, output_filename):
    frame_length = 1024
    output = np.empty([])
    stream = librosa.stream(input_filename,
                            block_length=1,
                            frame_length=frame_length,
                            hop_length=int(frame_length/2))
    prev_chunk = np.zeros(frame_length)
    up_ramp = 2*np.arange(int(frame_length/2))/frame_length
    down_ramp = 1.0 - up_ramp
    for chunk in stream:
        if chunk.shape[0] == frame_length:
            current_chunk = apply_to_chunk(chunk)
            output_chunk = current_chunk[0:int(frame_length/2)]*up_ramp + prev_chunk[int(frame_length/2):]*down_ramp
            output = np.append(output, output_chunk*frame_length)
            prev_chunk = current_chunk
    sr = librosa.get_samplerate(input_filename)
    librosa.output.write_wav(output_filename, output, sr, norm = False)

In [71]:
def apply(file):
      y, sr = librosa.load(file)
      S = np.expand_dims(np.transpose(librosa.stft(y, n_fft=1024))[0:124,:],axis=0)
      S_pred = vae_predict(np.abs(S))*np.exp(1j*np.angle(S))
      out = librosa.istft(np.transpose(S_pred[0,:,:]))
      librosa.output.write_wav('out.wav', out, sr)

In [73]:
apply("test_organ.wav")

In [17]:
def apply_to_file_interp_no_phase(input1_filename, input2_filename, output_filename):
    frame_length = 1024
    output = np.empty([])
    sr = librosa.get_samplerate(input1_filename)
    dur = librosa.get_duration(filename = input1_filename)
    samples = sr*dur
    sample_count = 0
    stream1 = librosa.stream(input1_filename,
                            block_length=1,
                            frame_length=frame_length,
                            hop_length=int(frame_length/4))
    stream2 = librosa.stream(input2_filename,
                            block_length=1,
                            frame_length=frame_length,
                            hop_length=int(frame_length/4))
    
    S = np.zeros([513, int(np.floor(4*samples/frame_length) - 1)])
    n = 0
    for chunk1, chunk2 in zip(stream1, stream2):
        if chunk1.shape[0] == frame_length:
            state1, fft_arg = state_from_chunk(chunk1)
            state2, fft_arg = state_from_chunk(chunk2)
            a = sample_count/samples
            sample_count += frame_length/4
            spec = spec_from_state(a*state1 + (1-a)*state2)
            S[:, n] = spec
            n = n+1
            print(n)
    output = librosa.griffinlim(S, hop_length = int(frame_length/4))*frame_length
    librosa.output.write_wav(output_filename, output, sr, norm = True)

In [18]:
def state_from_chunk(data):
    fft = np.fft.fft(data)
    num_of_samples = int((data.shape[0]/2)) + 1
    fft_abs = np.abs(fft[0:num_of_samples])/data.shape[0]
    fft_arg = np.angle(fft[0:num_of_samples])
    rescaled = rescale(fft_abs)
    return encoder.predict(np.reshape(rescaled, [1, 513]))[0], fft_arg

def chunk_from_state(data, fft_arg, num_of_samples = 513, data_shape = 1024):
    predict_abs = inverseRescale(decoder.predict(data))
    predict_complex = predict_abs * np.exp(1j * fft_arg)
    predict_complex = np.append(predict_complex, np.zeros(data_shape - num_of_samples))
    out_complex = np.fft.ifft(predict_complex)
    return np.real(out_complex)

def spec_from_state(data):
    return inverseRescale(decoder.predict(data))