<a href="https://colab.research.google.com/github/singr7/MIRAutoencoder/blob/master/VAE.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Variational AutoEncoder


## Setup

In [1]:
import tensorflow.keras.layers
import tensorflow.keras.models
import tensorflow.keras.optimizers
import tensorflow.keras.datasets
import numpy
import matplotlib.pyplot

## Create the encoding layer

In [2]:
num_channels=1
feature_size_1= 96
feature_size_2 = 200
latent_space_dim =2 
# Encoder
x = tensorflow.keras.layers.Input(shape=(feature_size_1, feature_size_2, num_channels), name="encoder_input")

encoder_conv_layer1 = tensorflow.keras.layers.Conv2D(filters=1, kernel_size=(3, 3), padding="same", strides=1, name="encoder_conv_1")(x)
encoder_norm_layer1 = tensorflow.keras.layers.BatchNormalization(name="encoder_norm_1")(encoder_conv_layer1)
encoder_activ_layer1 = tensorflow.keras.layers.LeakyReLU(name="encoder_leakyrelu_1")(encoder_norm_layer1)

encoder_conv_layer2 = tensorflow.keras.layers.Conv2D(filters=32, kernel_size=(3,3), padding="same", strides=1, name="encoder_conv_2")(encoder_activ_layer1)
encoder_norm_layer2 = tensorflow.keras.layers.BatchNormalization(name="encoder_norm_2")(encoder_conv_layer2)
encoder_activ_layer2 = tensorflow.keras.layers.LeakyReLU(name="encoder_activ_layer_2")(encoder_norm_layer2)

encoder_conv_layer3 = tensorflow.keras.layers.Conv2D(filters=64, kernel_size=(3,3), padding="same", strides=2, name="encoder_conv_3")(encoder_activ_layer2)
encoder_norm_layer3 = tensorflow.keras.layers.BatchNormalization(name="encoder_norm_3")(encoder_conv_layer3)
encoder_activ_layer3 = tensorflow.keras.layers.LeakyReLU(name="encoder_activ_layer_3")(encoder_norm_layer3)

encoder_conv_layer4 = tensorflow.keras.layers.Conv2D(filters=64, kernel_size=(3,3), padding="same", strides=2, name="encoder_conv_4")(encoder_activ_layer3)
encoder_norm_layer4 = tensorflow.keras.layers.BatchNormalization(name="encoder_norm_4")(encoder_conv_layer4)
encoder_activ_layer4 = tensorflow.keras.layers.LeakyReLU(name="encoder_activ_layer_4")(encoder_norm_layer4)

encoder_conv_layer5 = tensorflow.keras.layers.Conv2D(filters=64, kernel_size=(3,3), padding="same", strides=1, name="encoder_conv_5")(encoder_activ_layer4)
encoder_norm_layer5 = tensorflow.keras.layers.BatchNormalization(name="encoder_norm_5")(encoder_conv_layer5)
encoder_activ_layer5 = tensorflow.keras.layers.LeakyReLU(name="encoder_activ_layer_5")(encoder_norm_layer5)

shape_before_flatten = tensorflow.keras.backend.int_shape(encoder_activ_layer5)[1:]
encoder_flatten = tensorflow.keras.layers.Flatten()(encoder_activ_layer5)

encoder_mu = tensorflow.keras.layers.Dense(units=latent_space_dim, name="encoder_mu")(encoder_flatten)
encoder_log_variance = tensorflow.keras.layers.Dense(units=latent_space_dim, name="encoder_log_variance")(encoder_flatten)

encoder_mu_log_variance_model = tensorflow.keras.models.Model(x, (encoder_mu, encoder_log_variance), name="encoder_mu_log_variance_model")

def sampling(mu_log_variance):
    mu, log_variance = mu_log_variance
    epsilon = tensorflow.keras.backend.random_normal(shape=tensorflow.keras.backend.shape(mu), mean=0.0, stddev=1.0)
    random_sample = mu + tensorflow.keras.backend.exp(log_variance/2) * epsilon
    return random_sample

encoder_output = tensorflow.keras.layers.Lambda(sampling, name="encoder_output")([encoder_mu, encoder_log_variance])

encoder = tensorflow.keras.models.Model(x, encoder_output, name="encoder_model")

## Build the encoder

In [3]:

encoder.summary()

Model: "encoder_model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 encoder_input (InputLayer)     [(None, 96, 200, 1)  0           []                               
                                ]                                                                 
                                                                                                  
 encoder_conv_1 (Conv2D)        (None, 96, 200, 1)   10          ['encoder_input[0][0]']          
                                                                                                  
 encoder_norm_1 (BatchNormaliza  (None, 96, 200, 1)  4           ['encoder_conv_1[0][0]']         
 tion)                                                                                            
                                                                                      

## Build the decoder

In [4]:
decoder_input = tensorflow.keras.layers.Input(shape=(latent_space_dim), name="decoder_input")
decoder_dense_layer1 = tensorflow.keras.layers.Dense(units=numpy.prod(shape_before_flatten), name="decoder_dense_1")(decoder_input)

decoder_reshape = tensorflow.keras.layers.Reshape(target_shape=shape_before_flatten)(decoder_dense_layer1)

decoder_conv_tran_layer1 = tensorflow.keras.layers.Conv2DTranspose(filters=64, kernel_size=(3, 3), padding="same", strides=1, name="decoder_conv_tran_1")(decoder_reshape)
decoder_norm_layer1 = tensorflow.keras.layers.BatchNormalization(name="decoder_norm_1")(decoder_conv_tran_layer1)
decoder_activ_layer1 = tensorflow.keras.layers.LeakyReLU(name="decoder_leakyrelu_1")(decoder_norm_layer1)

decoder_conv_tran_layer2 = tensorflow.keras.layers.Conv2DTranspose(filters=64, kernel_size=(3, 3), padding="same", strides=2, name="decoder_conv_tran_2")(decoder_activ_layer1)
decoder_norm_layer2 = tensorflow.keras.layers.BatchNormalization(name="decoder_norm_2")(decoder_conv_tran_layer2)
decoder_activ_layer2 = tensorflow.keras.layers.LeakyReLU(name="decoder_leakyrelu_2")(decoder_norm_layer2)

decoder_conv_tran_layer3 = tensorflow.keras.layers.Conv2DTranspose(filters=64, kernel_size=(3, 3), padding="same", strides=2, name="decoder_conv_tran_3")(decoder_activ_layer2)
decoder_norm_layer3 = tensorflow.keras.layers.BatchNormalization(name="decoder_norm_3")(decoder_conv_tran_layer3)
decoder_activ_layer3 = tensorflow.keras.layers.LeakyReLU(name="decoder_leakyrelu_3")(decoder_norm_layer3)

decoder_conv_tran_layer4 = tensorflow.keras.layers.Conv2DTranspose(filters=1, kernel_size=(3, 3), padding="same", strides=1, name="decoder_conv_tran_4")(decoder_activ_layer3)
decoder_output = tensorflow.keras.layers.LeakyReLU(name="decoder_output")(decoder_conv_tran_layer4 )

decoder = tensorflow.keras.models.Model(decoder_input, decoder_output, name="decoder_model")

decoder.summary()

Model: "decoder_model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 decoder_input (InputLayer)  [(None, 2)]               0         
                                                                 
 decoder_dense_1 (Dense)     (None, 76800)             230400    
                                                                 
 reshape (Reshape)           (None, 24, 50, 64)        0         
                                                                 
 decoder_conv_tran_1 (Conv2D  (None, 24, 50, 64)       36928     
 Transpose)                                                      
                                                                 
 decoder_norm_1 (BatchNormal  (None, 24, 50, 64)       256       
 ization)                                                        
                                                                 
 decoder_leakyrelu_1 (LeakyR  (None, 24, 50, 64)     

In [5]:
def loss_func(encoder_mu, encoder_log_variance):
    def vae_reconstruction_loss(y_true, y_predict):
        reconstruction_loss_factor = 1000
        reconstruction_loss = tensorflow.keras.backend.mean(tensorflow.keras.backend.square(y_true-y_predict), axis=[1, 2, 3])
        return reconstruction_loss_factor * reconstruction_loss

    def vae_kl_loss(encoder_mu, encoder_log_variance):
        kl_loss = -0.5 * tensorflow.keras.backend.sum(1.0 + encoder_log_variance - tensorflow.keras.backend.square(encoder_mu) - tensorflow.keras.backend.exp(encoder_log_variance), axis=1)
        return kl_loss

    def vae_kl_loss_metric(y_true, y_predict):
        kl_loss = -0.5 * tensorflow.keras.backend.sum(1.0 + encoder_log_variance - tensorflow.keras.backend.square(encoder_mu) - tensorflow.keras.backend.exp(encoder_log_variance), axis=1)
        return kl_loss

    def vae_loss(y_true, y_predict):
        reconstruction_loss = vae_reconstruction_loss(y_true, y_predict)
        kl_loss = vae_kl_loss(y_true, y_predict)

        loss = reconstruction_loss + kl_loss
        return loss

    return vae_loss

## Define the VAE as a `Model` with a custom `train_step`

In [6]:
vae_input = tensorflow.keras.layers.Input(shape=(96, 200, num_channels), name="VAE_input")
vae_encoder_output = encoder(vae_input)
vae_decoder_output = decoder(vae_encoder_output)
vae = tensorflow.keras.models.Model(vae_input, vae_decoder_output, name="VAE")

vae.compile(optimizer=tensorflow.keras.optimizers.Adam(learning_rate=0.0005), loss=loss_func(encoder_mu, encoder_log_variance), metrics=[tensorflow.keras.metrics.Accuracy()])

vae.summary()

Model: "VAE"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 VAE_input (InputLayer)      [(None, 96, 200, 1)]      0         
                                                                 
 encoder_model (Functional)  (None, 2)                 400786    
                                                                 
 decoder_model (Functional)  (None, 96, 200, 1)        342529    
                                                                 
Total params: 743,315
Trainable params: 742,481
Non-trainable params: 834
_________________________________________________________________


In [7]:
# Get the input data
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline 

import os
from google.colab import drive
drive.mount('/content/drive', force_remount=True)


Mounted at /content/drive


In [None]:
import glob
feature_set =[]
files = glob.glob('/content/drive/My Drive/MusicResearch/vajra/dataset/96_features/96_MelFeatures/96_GTZANMel/**/*', recursive=True)
count=0
for file in files:
  f= np.load(file)
  feature_set.append([f,0])

len(feature_set)

3117

In [None]:
import random

random.shuffle(feature_set)

import pickle 
os.chdir('/content/drive/My Drive/MusicResearch/vajra/dataset/96_features/96_MelFeatures/')
data_pickle = open('Mel_OnlyGTZANData.pickle','wb')
pickle.dump(feature_set,data_pickle)
data_pickle.close()

In [8]:
os.chdir('/content/drive/My Drive/MusicResearch/vajra/dataset/96_features/96_MelFeatures/')
import pickle
data_pickle = open('Mel_OnlyGTZANData.pickle','rb')
data = pickle.load(data_pickle)
data_pickle.close()

features = []
labels = []

for feature, label in data:
  features.append(feature)
  labels.append(label)


In [9]:
len(features)

3117

In [13]:
from sklearn.model_selection import train_test_split
x_train,x_test, y_train, y_test = train_test_split(features, labels, test_size = 0.1)
x_train = np.asarray(x_train, dtype=np.float32)
x_test = np.asarray(x_test, dtype=np.float32)
print(x_train.shape)
x_train = numpy.reshape(x_train, newshape=(x_train.shape[0], x_train.shape[1], x_train.shape[2], 1)) 
x_test = numpy.reshape(x_test, newshape=(x_test.shape[0], x_train.shape[1], x_train.shape[2], 1))
print(x_train.shape)

(2805, 96, 200)
(2805, 96, 200, 1)


In [14]:
def standardize(x):
    mean = np.mean(x, axis=0)
    std = np.std(x, axis=0)+0.000001
    X_train = (x - mean) / std
    return X_train
  

In [17]:
X_train = standardize(x_train)
X_test = standardize(x_test)
X_train.shape

(2805, 96, 200, 1)

In [16]:
print(np.any(np.isnan(X_train)))
print(np.any(np.isnan(X_test)))

False
False


## Train the VAE

In [None]:
history= vae.fit(x_train, x_train, epochs=20, batch_size=32, shuffle=True, validation_data=(x_test, x_test))

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
 2/88 [..............................] - ETA: 6:26 - loss: nan - accuracy: 0.0000e+00

In [None]:
encoder.save("VAE_GTZANencoder.h5") 
decoder.save("VAE_GTZANdecoder.h5") 
vae.save("VAE_GTZAN.h5")



In [10]:
encoder = tensorflow.keras.models.load_model("VAE_GTZANencoder.h5") 
decoder = tensorflow.keras.models.load_model("VAE_GTZANdecoder.h5")
vae_model =tensorflow.keras.models.load_model("VAE_GTZAN.h5",custom_objects={'vae_loss': loss_func(encoder_mu=0.0,encoder_log_variance=1.0)})



In [None]:
encoded_data = encoder.predict(x_test)
decoded_data = decoder.predict(encoded_data)

In [12]:
!pip install keract

Collecting keract
  Downloading keract-4.5.0-py2.py3-none-any.whl (12 kB)
Installing collected packages: keract
Successfully installed keract-4.5.0


In [22]:
import keract

In [20]:
# prepare the fmri dataset
import pickle
data_pickle = open('/content/drive/My Drive/MusicResearch/vajra/dataset/fmri_songs_numpy/fmri_96_mel_numpy/Mel_fMRIStream7minData.pickle','rb')
fmri_data = pickle.load(data_pickle)
data_pickle.close()

x_fmri_stream = []
labels = []

for feature, label in fmri_data:
  x_fmri_stream.append(feature)
  labels.append(label)

x_fmri_stream= np.asarray(x_fmri_stream)
x_fmri_stream = np.reshape(x_fmri_stream, newshape=(x_fmri_stream.shape[0], x_fmri_stream.shape[1], x_fmri_stream.shape[2], 1))
X_fmri_stream= standardize(x_fmri_stream)
X_fmri_stream.shape


(46, 96, 200, 1)

In [26]:
activations = keract.get_activations(encoder, x_fmri_stream, layer_names=['encoder_conv_1','encoder_activ_layer_2','encoder_activ_layer_3', 'encoder_activ_layer_4','encoder_activ_layer_5'] ,auto_compile=True)

# print the activations shapes.
[print(k, '->', v.shape, '- Numpy array') for (k, v) in activations.items()]



encoder_conv_1 -> (46, 96, 200, 1) - Numpy array
encoder_activ_layer_2 -> (46, 96, 200, 32) - Numpy array
encoder_activ_layer_3 -> (46, 48, 100, 64) - Numpy array
encoder_activ_layer_4 -> (46, 24, 50, 64) - Numpy array
encoder_activ_layer_5 -> (46, 24, 50, 64) - Numpy array


[None, None, None, None, None]

In [28]:
filename='/content/drive/My Drive/MusicResearch/vajra/dataset/fmri_songs_numpy/fmri_96_mel_numpy/fmri_activations_stream7_GTZANAE.json'
keract.persist_to_json_file(activations, filename)

In [29]:
loaded_activations= keract.load_activations_from_json_file(filename)

In [27]:
keract.display_activations(activations, cmap=None, save=False, directory='.', data_format='channels_last', fig_size=(24, 24), reshape_1d_layers=False)

encoder_conv_1 (46, 96, 200, 1) -> Skipped. First dimension is not 1.
encoder_activ_layer_2 (46, 96, 200, 32) -> Skipped. First dimension is not 1.
encoder_activ_layer_3 (46, 48, 100, 64) -> Skipped. First dimension is not 1.
encoder_activ_layer_4 (46, 24, 50, 64) -> Skipped. First dimension is not 1.
encoder_activ_layer_5 (46, 24, 50, 64) -> Skipped. First dimension is not 1.


In [None]:
keract.display_heatmaps(activations, input_image, save=False)

In [11]:
acc = history.history['acc']
val_acc = history.history['val_acc']
loss = history.history['loss']
val_loss = history.history['val_loss']
epochs = range(1, len(acc) + 1)
plt.plot(epochs, acc, 'bo', label='Training acc')
plt.plot(epochs, val_acc, 'b', label='Validation acc')
plt.title('Training and validation accuracy')
plt.legend()
plt.figure()
plt.plot(epochs, loss, 'bo', label='Training loss')
plt.plot(epochs, val_loss, 'b', label='Validation loss')
plt.title('Training and validation loss')
plt.legend()
plt.show()

NameError: ignored

In [None]:
vae_model.history.keys()

AttributeError: ignored

## Display a grid of sampled digits

In [None]:
import matplotlib.pyplot as plt


def plot_latent_space(vae, n=30, figsize=15):
    # display a n*n 2D manifold of digits
    digit_size = 28
    scale = 1.0
    figure = np.zeros((digit_size * n, digit_size * n))
    # linearly spaced coordinates corresponding to the 2D plot
    # of digit classes in the latent space
    grid_x = np.linspace(-scale, scale, n)
    grid_y = np.linspace(-scale, scale, n)[::-1]

    for i, yi in enumerate(grid_y):
        for j, xi in enumerate(grid_x):
            z_sample = np.array([[xi, yi]])
            x_decoded = vae.decoder.predict(z_sample)
            digit = x_decoded[0].reshape(digit_size, digit_size)
            figure[
                i * digit_size : (i + 1) * digit_size,
                j * digit_size : (j + 1) * digit_size,
            ] = digit

    plt.figure(figsize=(figsize, figsize))
    start_range = digit_size // 2
    end_range = n * digit_size + start_range
    pixel_range = np.arange(start_range, end_range, digit_size)
    sample_range_x = np.round(grid_x, 1)
    sample_range_y = np.round(grid_y, 1)
    plt.xticks(pixel_range, sample_range_x)
    plt.yticks(pixel_range, sample_range_y)
    plt.xlabel("z[0]")
    plt.ylabel("z[1]")
    plt.imshow(figure, cmap="Greys_r")
    plt.show()


plot_latent_space(vae)

## Display how the latent space clusters different digit classes

In [None]:

def plot_label_clusters(vae, data, labels):
    # display a 2D plot of the digit classes in the latent space
    z_mean, _, _ = vae.encoder.predict(data)
    plt.figure(figsize=(12, 10))
    plt.scatter(z_mean[:, 0], z_mean[:, 1], c=labels)
    plt.colorbar()
    plt.xlabel("z[0]")
    plt.ylabel("z[1]")
    plt.show()


(x_train, y_train), _ = keras.datasets.mnist.load_data()
x_train = np.expand_dims(x_train, -1).astype("float32") / 255

plot_label_clusters(vae, x_train, y_train)