# Variational Autoencoder for Audio
In this notebook, I will attempt to implement a VAE for generating similar sounds as those in a Spoken Digit audio Database. 

In [None]:
%matplotlib inline
import IPython.display

import gzip
import cPickle as pickle
import pandas as pd
import random
import seaborn
import librosa

from librosa.display import waveplot, specshow
seaborn.set(style='ticks')

In [None]:
dbfile ='../SpokenDigitDB.pkl.gz'
with gzip.open(dbfile, 'rb') as ifile:
    df = pickle.load(ifile)
    print('File loaded as '+ dbfile)    

In [None]:
df.info()

In [None]:
mag = df.Magnitude
mgs = [np.shape(x)[1] for x in mag]
maxlen = np.max(mgs)
print('Maximum length is: {} '.format(maxlen))
plt.hist(mgs,50)

In [None]:
# Padding & Truncating
maxlen = 84
pad    = lambda a, n: a[:,0: n] if a.shape[1] > n else np.hstack((a, np.zeros([a.shape[0],n - a.shape[1]])))

df.Magnitude = df.Magnitude.apply(pad,args=(maxlen,))  # MaxLen Truncation Voodoo :D
df.Phase     = df.Phase.apply(pad,args=(maxlen,))

print(np.unique([np.shape(x)[1] for x in df.Magnitude]))
print(np.unique([np.shape(x)[1] for x in df.Phase]))

## Plot Samples from Database

In [None]:
seaborn.set(style='white')

# Plot K Random Examples
k  = 5
sr = 8000

sidx = random.sample(range(len(df)),k)
sidx = np.append(sidx,[sidx,sidx])    

for i,j in enumerate(sidx):
    if i<k:
        subplot(3,k,i+1)
        waveplot(df.Wave[j],sr=sr)
        title('Digit:{1}'.format(j,df.Class[j]))
        gca().set_xticklabels([])
        gca().set_yticklabels([])
        gca().get_xaxis().set_visible(False)

    elif (i>=k and i<2*k):
        subplot(3,k,i+1)
        specshow(df.Magnitude[j],sr=sr)
        gca().set_xticklabels([])
        gca().set_yticklabels([])
        
    else:
        subplot(3,k,i+1)
        specshow(df.Phase[j],sr=sr)
        gca().set_xticklabels([])
        gca().set_yticklabels([])        

In [None]:
# np.max(df.Magnitude[j])
# np.max(df.Phase[j])

## Playback & Reconstruction

In [None]:
# Play back an example!
j = sidx[1]
IPython.display.Audio(data=df.Wave[j], rate=sr)

In [None]:
# Reconstruct Audio from Spectrogram
def audio_recon(mag,phi):
    nfft = 128
    hop  = nfft/2
    mag = librosa.db_to_amplitude(mag)           # 1: convert magnitude from db to amplitude
    phi = np.cos(phi) + 1j*np.sin(phi)           # 2: convert phase from angle to phasor/complex
    wav = librosa.istft(mag*phi, hop_length=hop) # 3: compute stft as -  Magnitude * Phase    
    return wav

yo = df.Wave[j]    
yr = audio_recon(df.Magnitude[j],df.Phase[j])
IPython.display.Audio(data=yr, rate=sr)

In [None]:
# Compare reconstructed with original
plt.figure(figsize=(4,4))
plt.subplot(211)
librosa.display.waveplot(yo,sr=sr,alpha=0.7)
plt.title('Original')

plt.subplot(212)
librosa.display.waveplot(yr[:len(yo)],sr=sr,color='r',alpha=0.3)
plt.title('Reconstructed')
plt.tight_layout()

## VAE for Audio

In [None]:
# Imports
from keras.layers import Input, Dense, Lambda, Flatten, Reshape, Layer
from keras.layers import Conv2D, Conv2DTranspose
from keras.models import Model
from keras import backend as K
from keras.callbacks import TensorBoard

from keras import metrics
from sklearn.utils import shuffle
from scipy.stats import norm
import keras

In [None]:
from keras.layers.merge import Concatenate
import keras

In [None]:
chns, rows, cols = 1, 64, 84
filters = 8

batch_size = 10
latent_dim = 10
middle_dim = 200

epochs = 25
epsilon_std = 1.0
img_size = (rows,cols,chns)

In [None]:
x_mag = Input(shape=img_size,name='magnitude')
x_phi = Input(shape=img_size,name='phase')

# Mag Encoder
mconv1 = Conv2D(filters,(3,5),padding='same',activation='relu',strides=(2,3))(x_mag)
mconv2 = Conv2D(filters,(3,3),padding='same',activation='relu',strides=(2,2))(mconv1)
mconv  = Flatten()(mconv2)

# Phi Encoder
pconv1 = Conv2D(filters,(3,5),padding='same',activation='relu',strides=(2,3))(x_phi)
pconv2 = Conv2D(filters,(3,3),padding='same',activation='relu',strides=(2,2))(pconv1)
pconv  = Flatten()(pconv2)


# Intermediate Shared Layer - Encoder
shared_layer = Dense(middle_dim, activation='relu')
encoded_m = shared_layer(mconv)
encoded_p = shared_layer(pconv)
# encoded   = K.concatenate([encoded_m, encoded_p], axis=-1)
encoded   = keras.layers.concatenate([encoded_m, encoded_p])

# Latent Distribution
z_mean = Dense(latent_dim)(encoded)
z_lvar = Dense(latent_dim)(encoded)

In [None]:
# Gaussian Sampler
def sampling(args):
    z_mean, z_lvar = args
    bsize = K.shape(z_mean)[0]
    epsilon = K.random_normal(shape=(bsize, latent_dim),
                              mean=0.0, stddev=epsilon_std)
    return z_mean + K.exp(z_lvar / 2) * epsilon

z = Lambda(sampling, output_shape=(latent_dim,))([z_mean, z_lvar])

In [None]:
# Intermediate Shared Layer - Decoder
decode_h = Dense(middle_dim, activation='relu')

# Mag Decoder
decode_m   = decode_h(z)
upsample_m = Dense(filters*16*14, activation='relu')(decode_m)
ureshape_m = Reshape((16,14,filters))(upsample_m)
trconv1_m  = Conv2DTranspose(filters,3,padding='same',
                          activation='relu',strides=2)(ureshape_m)
trconv2_m  = Conv2DTranspose(filters,(3,5),padding='same',
                          activation='relu',strides=(2,3))(trconv1_m)
decoded_m  = Conv2D(chns,2,padding='same',activation='relu',name='decoded_mag')(trconv2_m)


# Phi Decoder
decode_p   = decode_h(z)
upsample_p = Dense(filters*16*14, activation='relu')(decode_p)
ureshape_p = Reshape((16,14,filters))(upsample_p)
trconv1_p  = Conv2DTranspose(filters,3,padding='same',
                          activation='relu',strides=2)(ureshape_p)
trconv2_p  = Conv2DTranspose(filters,(3,5),padding='same',
                          activation='relu',strides=(2,3))(trconv1_p)
decoded_p  = Conv2D(chns,2,padding='same',activation='relu',name='decoded_phi')(trconv2_p)

In [None]:
def vae_loss(x, decoded):
    x = K.flatten(x)
    decoded = K.flatten(decoded)
    gen_loss = rows * cols * metrics.binary_crossentropy(x, decoded)
    kl_loss = - 0.5 * K.mean(1 + z_lvar - K.square(z_mean) - K.exp(z_lvar), axis=-1)
    return K.mean(gen_loss + kl_loss)

# Model
vae = Model(inputs=[x_mag, x_phi], outputs=[decoded_m, decoded_p])
vae.compile(optimizer='rmsprop', loss=vae_loss)
vae.summary()

In [None]:
# Model(inputs = [x_mag,x_phi],outputs=z_mean)

In [None]:
import tensorflow as tf

with tf.Session() as sess:
    writer = tf.summary.FileWriter('logs', sess.graph)
#     print sess.run(golden_ratio)
    writer.close()

In [None]:
# # Custom loss layer
# class VAELossLayer(Layer):
#     def __init__(self, **kwargs):
#         self.is_placeholder = True
#         super(VAELossLayer, self).__init__(**kwargs)

#     def vae_loss(self, x, decoded):
#         x = K.flatten(x)
#         decoded = K.flatten(decoded)
#         gen_loss = rows * cols * metrics.binary_crossentropy(x, decoded)
#         kl_loss = - 0.5 * K.mean(1 + z_lvar - K.square(z_mean) - K.exp(z_lvar), axis=-1)
#         return K.mean(gen_loss + kl_loss)

#     def call(self, inputs):
#         x = inputs[0]
#         decoded = inputs[1]
#         loss = self.vae_loss(x, decoded)
#         self.add_loss(loss, inputs=inputs)
#         # We don't use this output.
#         return x

# y = VAELossLayer()([x,decoded])
# vae = Model(x,y)
# vae.compile(optimizer='rmsprop', loss=None)
# vae.summary()

In [None]:
# Encoder
encoder = Model(x,z_mean)

# Generator
# dd_in = Input(shape=(latent_dim,))
# dd = decode_h(dd_in)
# dd = ureshape(dd)
# dd = trconv1(dd)
# dd = trconv2(dd)
# dd = decoded(dd)
# generator = Model(dd_in,dd) 
generator = Model(x,decoded)

In [None]:
# Get Training Data
x_data = df.Magnitude.values
x_data = np.dstack(x_data)
x_data = x_data.transpose(2,0,1)
x_data = x_data[...,None]         # add singleton class
x_data = shuffle(x_data)

In [None]:
vae.fit(x_data,x_data,
        shuffle=True,
        epochs=15,
        batch_size=50,
        callbacks=[TensorBoard(log_dir='/tmp/vautoeconder')])

In [None]:
xx = generator.predict(x_data[:10])

In [None]:
k = 0 
subplot(211)
ss=x_data[k].squeeze()
specshow(ss,sr=sr)

subplot(212)
ss=xx[k].squeeze()
specshow(ss,sr=sr)

In [None]:
ss=x_data[k].squeeze()
np.max(ss[:])