# 1. Setup Environment

In [None]:
#%%capture --no-display
import os
import sys
os.environ["CUDA_VISIBLE_DEVICES"]="2"
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
currentdir = os.path.abspath(".")
parentdir  = os.path.abspath(os.path.join(".", os.pardir))
sys.path.insert(0, parentdir) 

RAND_STATE_GLOB = 1291
REDO_FEATURE_EXTRACTION, EXTEND = False, False
LOAD_PKL_DATA = True

import time
import joblib
import scipy
import numpy as np
import pandas as pd
from numpy import savetxt
import librosa as librosa
import librosa.display as display
import IPython.display as ipd
from IPython.display import clear_output

import matplotlib.pyplot as plt
%matplotlib inline 

# Deep Learning (Keras Setups)
import tensorflow as tf
from tensorflow.keras.utils  import Sequence
from tensorflow.keras.layers import Input, Dense, Lambda, Flatten, Reshape, BatchNormalization
from tensorflow.keras.layers import Conv2D, AveragePooling2D, MaxPooling2D, Dropout, SpatialDropout2D
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.models import Model, load_model, model_from_json
from tensorflow.keras.losses import mse
from tensorflow.keras import backend as K
from tensorflow.keras import optimizers
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow_addons.layers import WeightNormalization

### ii. Load  features from database files (*.pkl)

In [None]:
db_dir = currentdir + '/tmpdata/'
audio_df = joblib.load(db_dir + "audio_df.pkl")
video_df = joblib.load(db_dir + "video_df.pkl")
inputs  = np.load(db_dir + 'inputs.npy')
targets = np.load(db_dir + 'targets.npy')
#print(inputs, targets)

#### iii. Setup global variables for spectrogram generation

In [None]:
# sampling rate
sr = 22050

# min/max freq 
fmin, fmax = 20, sr / 2 

# number of samples for each fft window. 
# for music it is recommended 2048, but with 4096 we are getting better results
n_fft = 4096

#(columns) - so we can get 128 frames 
hop_length = 690

#(rows) - With this, we get nice 128 x 128 spectrograms 
n_mels = 128

# 2. Model Preparation

#### 2.1 Setup a class that delivers a ML Model (CVAE)

In [None]:
TF_PI = tf.constant(np.pi, dtype=K.floatx())

def gaussian_log_prob(z, mu, logvar):
    return -0.5*(tf.math.log(2.0*TF_PI) + logvar + tf.math.pow((z-mu), 2.0)/tf.math.exp(logvar))

def log_mean_exp(x, axis):
    m  = tf.math.reduce_max(x, axis=axis)
    m2 = tf.math.reduce_max(x, axis=axis, keepdims=True)
    return m + K.log(K.mean(K.exp(x-m2), axis))

class ConvolutionalVariationalAutoEncoder(Model):
    
    def __init__(self, input_dims, latent_dim, lr, b=1.0, *args, **kwargs):
        super(ConvolutionalVariationalAutoEncoder, self).__init__(*args, **kwargs)
        self.input_dims = input_dims
        self.input_dim  = self.input_dims[0] * self.input_dims[1]
        self.latent_dim = latent_dim
        self.lr         = lr        
        self.kernel     = (3,3)  
        #self.act_layer  = LeakyReLU()
        self.custom_optimizer = optimizers.Adam(lr=self.lr)
        #self.loss       = 'mse'
        #self.act_last_layer = 'sigmoid'
        self.r = tf.constant(b, dtype=K.floatx())
        
        self._make_encoder()
        self._make_decoder()
    
    def get_encoder(self):
        return self.encoder
    
    def get_decoder(self):
        return self.decoder
    
    def _make_encoder(self):
        '''Creates a field in the class representing the encoder model'''
        
        # Setup the input layer
        input_dim_sca = self.input_dims[0] * self.input_dims[1]
        self.input_encoder = Input(shape=(input_dim_sca,), name='input_encoder')
        
        # Setup hidden layers (Convolutional Layers)
        encoder     = Reshape((self.input_dims[0], self.input_dims[1], 1))(self.input_encoder)
        
        encoder     = Conv2D(4, self.kernel, padding='same')(encoder)
        encoder     = LeakyReLU()(encoder)
        encoder     = MaxPooling2D((2, 2), padding='same')(encoder)
        
        encoder     = Conv2D(8, self.kernel, padding='same')(encoder)
        encoder     = LeakyReLU()(encoder)
        encoder     = MaxPooling2D((2, 2), padding='same')(encoder)
        
        encoder     = Conv2D(16, self.kernel, padding='same')(encoder)
        encoder     = LeakyReLU()(encoder)
        encoder     = MaxPooling2D((4, 4), padding='same')(encoder)
        
        encoder     = Flatten()(encoder)
        
        encoder     = Dense(2*latent_dim, name="encoder",
                            kernel_initializer="zeros", 
                            bias_initializer='zeros')(encoder)
        
        self.encoder = Model(inputs=self.input_encoder, outputs=encoder)
    
    def _make_decoder(self):
        # Setup the "input" layer for the decoder
        input_dim_sca = self.input_dims[0] * self.input_dims[1]
        input_latent = Input(shape=(self.latent_dim,), name='z_sampling')
        
        # Setup hidden layers (fully connected dense layers)        
        decoder = Dense(input_dim_sca//4)(input_latent)
        decoder = LeakyReLU()(decoder)
        
        decoder = Dense(2*input_dim_sca, activation="linear")(decoder)
        decoder = LeakyReLU()(decoder)
        
        # Full decoder model
        self.decoder = Model(input_latent, decoder, name='decoder_vae')
    
    def custom_compile(self):
        # Define some metric data displays
        self.loss_ph  = tf.keras.metrics.Mean(name="ELBO")
        self.reg_loss = tf.keras.metrics.Mean(name="reg")
        self.rec_loss = tf.keras.metrics.Mean(name="rec")
        super(ConvolutionalVariationalAutoEncoder, self).compile()
    
    @property
    def metrics(self):
        _metrics = [
            self.loss_ph,
            self.reg_loss,
            self.rec_loss
        ]
        return _metrics
    
    @tf.autograph.experimental.do_not_convert
    def encode(self, x, training=True):
        z_params = self.encoder(x, training=training)
        z_mu, z_logvar = tf.split(z_params, 2, 1)
        return z_mu, z_logvar
    
    @tf.autograph.experimental.do_not_convert
    def decode(self, z, training=True):
        x_params = self.decoder(z, training=training)
        x_mu, x_logvar = tf.split(x_params, 2, 1)
        return x_mu, x_logvar
    
    def reparametrize(self, mu, logvar):
        std = K.exp(0.5 * logvar)
        return mu + tf.random.normal(tf.shape(std), dtype=K.floatx()) * std
    
    def call(self, x, training=True):
        z_mu, z_logvar = self.encode(x, training=training)
        z = self.reparametrize(z_mu, z_logvar)
        x_mu, x_logvar = self.decode(z, training=training)
        return x_mu, x_logvar, z_mu, z_logvar
    
    def compute_kld(self, z_mu, z_logvar):
        return 0.5*(K.pow(z_mu, 2.0) + K.exp(z_logvar) - 1.0 - z_logvar)
    
    def compute_recon_loss(self, x, x_mu, x_logvar, log_prob_func):
        return -K.sum(log_prob_func(x, x_mu, x_logvar), 1)
    
    def compute_negative_elbo(self, x, y, freebits=0.0, training=True):
        x_mu_0, x_logvar_0, z_mu, z_logvar = self(x, training=training)
        l_rec = self.compute_recon_loss(y, x_mu_0, x_logvar_0, gaussian_log_prob)
        log2 = tf.cast(K.log(2.0), dtype=K.floatx())
        freebits = tf.cast(freebits, dtype=K.floatx())
        l_reg = K.sum(K.relu(self.compute_kld(z_mu, z_logvar) - freebits*log2) + freebits*log2, 1)
        return l_rec + l_reg, l_rec, l_reg
    
    def importance_sampling(self, x, y, importance_samples=1, training=True):
        z_mu, z_logvar = self.encode(x, training=training)

        z_mu = tf.tile(tf.expand_dims(z_mu, 1), [1, importance_samples, 1])
        z_mu = tf.reshape(z_mu, (-1, self.latent_dim))
        z_logvar = tf.tile(tf.expand_dims(z_logvar, 1), [1, importance_samples, 1])
        z_logvar = tf.reshape(z_logvar, (-1, self.latent_dim))        
        x_0 = tf.tile(tf.expand_dims(y, 1), [1, importance_samples, 1])
        x_0 = tf.reshape(x_0, (-1, self.input_dim))        

        z = self.reparametrize(z_mu, z_logvar)

        x_mu_0, x_logvar_0 = self.decode(z, training=training)

        x_mu_0 = tf.reshape(x_mu_0, (-1, importance_samples, self.input_dim))
        x_logvar_0 = tf.reshape(x_logvar_0, (-1, importance_samples, self.input_dim))
   
        x_0 = tf.reshape(x_0, (-1, importance_samples, self.input_dim))

        z = tf.reshape(z, (-1, importance_samples, self.latent_dim))
        z_mu = tf.reshape(z_mu, (-1, importance_samples, self.latent_dim))
        z_logvar = tf.reshape(z_logvar, (-1, importance_samples, self.latent_dim))

        logpxz_0 = K.sum(gaussian_log_prob(x_0, x_mu_0, x_logvar_0), -1)
        
        zeros_ = tf.zeros_like(z)
        ones_  = tf.ones_like(z)
        logpz  = K.sum(gaussian_log_prob(z, zeros_, zeros_), -1)
        logqzx = K.sum(gaussian_log_prob(z, z_mu, z_logvar), -1)

        logprob = logpxz_0+logpz - logqzx
        logprob = log_mean_exp(logprob, 1)

        return -logprob
        
    
    def train_step(self, inputs):
        x, y = inputs
        with tf.GradientTape() as tape:
            log2 = tf.cast(K.log(2.0), dtype=K.floatx())        
            neg_elbo, l_rec, l_reg = self.compute_negative_elbo(
                x, y, freebits=0.05, training=True)           
            loss = K.mean(self.r*l_reg + l_rec) / log2
            self.loss_ph.update_state(loss)
            
        gradients = tape.gradient(loss, self.trainable_variables)
        self.custom_optimizer.apply_gradients(zip(gradients, self.trainable_variables))
        self.reg_loss.update_state(K.mean(self.r*l_reg)/log2)
        self.rec_loss.update_state(K.mean(l_rec)/log2)
        return {m.name: m.result() for m in self.metrics}
        
    def test_step(self, data):
        # Unpack the data
        x, y = data
        
        # Compute predictions and update metrics
        log2 = tf.cast(K.log(2.0), dtype=K.floatx())
        nll = self.importance_sampling(x, y, 10, training=False)
        loss = K.mean(nll) / log2
        self.loss_ph.update_state(loss)

        neg_elbo, l_rec, l_reg = self.compute_negative_elbo(
            x, y, freebits=0.05, training=False)
        self.reg_loss.update_state(K.mean(self.r*l_reg)/log2)
        self.rec_loss.update_state(K.mean(l_rec)/log2)
        return {m.name: m.result() for m in self.metrics}

# 3. Training

#### 3.1 Setup global variables for NN (hyperparameters)

In [None]:
input_dims    = (128, 128)
latent_dim    = 256
batch_size    = 256
epochs        = 300
beta          = 150
learning_rate = 4e-4

In [None]:
mg_model = ConvolutionalVariationalAutoEncoder(
    input_dims, latent_dim, learning_rate)

mg_model.custom_compile()

#### 3.2 Setup KFolds 

In [None]:
from sklearn.model_selection import StratifiedKFold

# Create an instance of "StratifiedKFold" class
# This class will perform the splits for us
skf = StratifiedKFold(
    n_splits=4, shuffle=True, random_state=RAND_STATE_GLOB
)

# Create a list of integer labels on which the SKF can
# perform its splits
labels = list(video_df.loc["MusicEncoding", :]) # <--- legacy, but keeping for now

# Create a dummy array to represent features.
# We are only interested in the indexes provided
dummy_features = np.zeros((len(video_df.columns), 1))

### 3.3 Begin  Training Using KFolds 

##### 3.3.3 Setup some variables  

In [None]:
# Variable to the directory where we store the weights
savedir = currentdir + '/saved_models_03/'

# A small lambda to set a model save name
get_model_name = lambda k: 'new_model_'+str(k)+'.h5'

# A constant to determine if we should load our model before training
LOAD_BEFORE_TRAINING = False
IGNORE_TRAINED = True

In [None]:
# An indexer to label the current fold
fold_var = 1

# A list of allready "trained models"
models_list = os.listdir(savedir)

# Perform the K splits 
for train, test in skf.split(dummy_features, labels):
        
    model_name = get_model_name(fold_var)
    if (model_name in models_list) and not IGNORE_TRAINED:
        fold_var += 1
        print("skipping training for " + model_name)
        continue
    
    NO_LOAD = True
    
    # Print the samples in the test-train split
    print(train.shape, test.shape)
    
    # Create our partitions
    trainX, trainY = inputs[train, :], targets[train, :]
    testX , testY  = inputs[test,  :], targets[test,  :]
    
    # Get an instance of the CVAE model
    model  = ConvolutionalVariationalAutoEncoder(
        input_dims, latent_dim, learning_rate, b=beta)
    model.custom_compile()
    #model = cvae.get_full_model()
    
    if LOAD_BEFORE_TRAINING and not NO_LOAD:
        model.load_weights(savedir + model_name)

    # Setup saving weights incrementally
    sf = int((inputs.shape[0] // batch_size + 1) * 15)
    checkpoint = ModelCheckpoint(savedir + model_name, monitor='ELBO', verbose=1, 
          save_best_only=True, mode='auto', save_freq=sf, save_weights_only=True)
    callbacks  = [checkpoint]
    
    # Train the model
    start_time = time.time()
    history = model.fit(
        trainX, trainY,
        epochs          = 5,
        batch_size      = batch_size,
        validation_data = (testX, testY),
        shuffle         = True,
        verbose         = 1
    )

    history = model.fit(
        trainX, trainY,
        epochs          = epochs - 5,
        batch_size      = batch_size,
        validation_data = (testX, testY),
        shuffle         = True,
        verbose         = 1,
        callbacks       = callbacks
    )
    
    print('Done in {0:.2f} seconds'.format((time.time() - start_time)))
    
    # Save history with joblib
    joblib.dump(
        history.history,
        savedir + 'new_model_{0}_history.pkl'.format(fold_var), 
        compress = 3
    )
            
    K.clear_session()
    fold_var += 1

#### 3.4 Train the original model with the full data set 

In [None]:
model_save_name = "model_mg.h5"

# Get an instance of the CVAE model
model  = ConvolutionalVariationalAutoEncoder(
    input_dims, latent_dim, learning_rate, b=beta)
model.custom_compile()
#model = cvae.get_full_model()

# Setup model saving
sf = int((inputs.shape[0] // batch_size + 1) * 15)
checkpoint = ModelCheckpoint(savedir + model_save_name, monitor='ELBO', verbose=1, 
      save_best_only=True, mode='auto', save_freq=sf, save_weights_only=True)
callbacks  = [checkpoint]

#inputs_tf = tf.convert_to_tensor(inputs)
#targets_tf = tf.convert_to_tensor(targets)

history_mg = model.fit(
    inputs, targets,
    epochs          = 300,
    batch_size      = batch_size,
    callbacks       = callbacks,
    shuffle         = True,
    verbose         = 1
)

# (4). Pre-Evaluation By Metrics Inspections

#### (4).1 Evaluate the K-Folds  

In [None]:
# Lists to store results of the different KFolds
TRAIN_R2 = []
TRAIN_LOSS = []
VALIDATION_R2 = []
VALIDATION_LOSS = []

fold_var = 1
for train, test in skf.split(dummy_features, labels):
    model_name = get_model_name(fold_var)
    
    # Create our partitions
    trainX, trainY = inputs[train, :], targets[train, :]
    testX , testY  = inputs[test,  :], targets[test,  :]
    
    # Get an instance of the CVAE model
    model  = ConvolutionalVariationalAutoEncoder(
        input_dims, latent_dim, learning_rate)
    model.custom_compile()
    model(trainX[:1])
    model.load_weights(savedir + model_name)
    
    #Fetch the evaluation metrics
    results_train = model.evaluate(trainX, trainY)
    results_train = dict(zip(model.metrics_names, results_train))
    results_test  = model.evaluate(testX, testY)
    results_test  = dict(zip(model.metrics_names, results_test))
    
    # Save metrics in a list
    TRAIN_R2.append(results_train["rec"])
    TRAIN_LOSS.append(results_train["ELBO"])
    VALIDATION_R2.append(results_test["rec"])
    VALIDATION_LOSS.append(results_test["ELBO"])
    
    K.clear_session()
    fold_var += 1

print(TRAIN_R2)
print(VALIDATION_R2)

In [None]:
# Compute mean and std of the kfolds metrics
mu_train = np.mean(TRAIN_R2)
sig_train = np.std(TRAIN_R2)
print(mu_train, sig_train)

mu_test = np.mean(VALIDATION_R2)
sig_test = np.std(VALIDATION_R2)
print(mu_test, sig_test)

In [None]:
import seaborn as sns
sns.set_theme()

data = {
    "ELBO": [],
    "val_ELBO": [],
    "rec": [],
    "val_rec": []
}

for i in range(4):
    fold_var = i + 1
    history_path = savedir + "model_" + str(fold_var) + "_history.pkl"
    history = joblib.load(history_path)
    data["ELBO"].append(history["ELBO"])
    data["val_ELBO"].append(history["val_ELBO"])
    data["rec"].append(history["rec"])
    data["val_rec"].append(history["val_rec"])

elbo_np = np.array(data["ELBO"])
elbo_np_avg = np.mean(elbo_np, axis=0)
val_elbo_np = np.array(data["val_ELBO"])
val_elbo_avg = np.mean(val_elbo_np, axis=0)

rec_np = np.array(data["rec"])
rec_avg = np.mean(rec_np, axis=0)
val_rec_np = np.array(data["val_rec"])
val_rec_avg = np.mean(val_rec_np, axis=0)

# Plot loss history across epochs
fig, axs = plt.subplots(2, 1, figsize=(5, 7), dpi=120)
axs[0].plot([], [], label="Train Loss", color="green")
axs[0].plot([], [], label="Validation Loss", color="orange")
axs[1].plot([], [], label="Train Loss", color="green")
axs[1].plot([], [], label="Validation Loss", color="orange")

for i in range(4):    
    axs[0].plot(elbo_np[i, :], color="green", alpha=0.2, linewidth=0.5)
    axs[0].plot(val_elbo_np[i, :], color="orange", alpha=0.2, linewidth=0.5)
    axs[1].plot(rec_np[i, :], color="green", alpha=0.2, linewidth=0.5)
    axs[1].plot(val_rec_np[i, :], color="orange", alpha=0.2, linewidth=0.5)
    
axs[0].plot(elbo_np_avg, color="darkgreen", linewidth=0.9)
axs[0].plot(val_elbo_avg, color="orange", linewidth=0.9)
axs[1].plot(rec_avg, color="darkgreen", linewidth=0.9)
axs[1].plot(val_rec_avg, color="orange", linewidth=0.9)
    
#axs[0].set_xlabel('Epochs')
axs[0].set_xticklabels([])
axs[0].set_ylabel('Loss (Negative ELBO)')
axs[1].set_xlabel('Epochs')
axs[1].set_ylabel('Reconstruction Loss')
axs[0].legend()
axs[1].legend()
plt.tight_layout()
plt.show()

# 4. Evaluation 

#### 4.1 Visualise Input-Output  

In [None]:
# Create and load an instance of the trained model
model_name = "new_model_1.h5"

mg_model = ConvolutionalVariationalAutoEncoder(
    input_dims, latent_dim, learning_rate)

mg_model.custom_compile()
mg_model(inputs[:1])
mg_model.load_weights("./saved_models_03/" + model_name)

##### (4.1) Fetch new unseen data

In [None]:
import tensorflow_probability as tfp

def sample_mvn(x_mu, x_logvar):
    x_std = np.exp(x_logvar)
    batch = x_std.shape[0]
    
    mvn = tfp.distributions.MultivariateNormalDiag(
        loc=x_mu, scale_diag=x_std
    )
    return mvn.sample(shape=[batch]).numpy()

In [None]:
mgs_test = joblib.load(currentdir + "/tmpdata/test_mgs.pkl")
print(len(mgs_test))

test_idx = 2
mg_to_test = mgs_test[test_idx][1]
mg_to_test = cv2.cvtColor(mg_to_test, cv2.COLOR_RGB2GRAY)
mg_to_test = cv2.resize(mg_to_test, (128, 128))
mg_to_test = mg_to_test.astype(np.float32)
mg_to_test = np.clip(mg_to_test / 255.0, 0, 1)
mg_to_test = mg_to_test.flatten()
mg_to_test = np.expand_dims(mg_to_test, axis=0)


# Make a prediction
eval_pred_target = mg_model.predict(mg_to_test)

# Transform back to an "image" representation
mel_pred = np.reshape(eval_pred_target, input_dims)
mel_pred = (1 - mel_pred) * -80.
mg       = np.reshape(mg_to_test, input_dims)


# Plot the two mel-spectrograms
fig, axs = plt.subplots(1, 2, figsize=(10, 5))

axs[0].set_xticks([])
axs[0].set_yticks([])
axs[0].set_xlabel("(a) Input Motiongram")
axs[0].imshow(mg, aspect="auto", cmap="binary", interpolation="bicubic")

librosa.display.specshow(mel_pred, x_axis='time', y_axis='mel', 
                         cmap='binary', fmin=fmin, fmax=fmax, ax=axs[1]);
axs[1].set_xticks([])
axs[1].set_yticks([])
axs[1].set_ylabel('')
axs[1].set_xlabel("(b) Predicted Spectrogram")

plt.show()

mel_pred_pwr = librosa.db_to_power(mel_pred)
y_pred = librosa.feature.inverse.mel_to_audio(
    mel_pred_pwr, sr=sr, n_fft=n_fft, hop_length=hop_length, 
    window=scipy.signal.hamming, fmin=fmin, fmax=fmax
)

ipd.display(ipd.Audio(y_pred, rate=sr))

##### 4.1.1 SANITY CHECK: We try to pass through known and unknown inputs to the network

In [None]:
# Sample number
example_number = 788

# Extract two examples:
# (1) a sample to feed through the network
# (2) the ground truth to compare againts
eval_input       = np.expand_dims( inputs[example_number, :], axis=0)
eval_real_target = np.expand_dims(targets[example_number, :], axis=0)
print(eval_input.shape, eval_real_target.shape)

# Make a prediction
x_mu, x_logvar, _, _ = mg_model.predict(eval_input)

eval_pred_target = sample_mvn(x_mu, x_logvar)
print(eval_pred_target.shape)

# Transform back to an "image" representation
mel_real = np.reshape(eval_real_target, input_dims)
mel_real = (1 - mel_real) * -80.
mel_pred = np.reshape(eval_pred_target, input_dims)
mel_pred = (1 - mel_pred) * -80.
mg       = np.reshape(eval_input, input_dims)

# Plot the two mel-spectrograms
fig, axs = plt.subplots(1, 3, figsize=(15, 5), dpi=120)

axs[0].set_xticks([])
axs[0].set_yticks([])
axs[0].set_xlabel("(a) Input Motiongram")
axs[0].imshow(mg, aspect="auto", cmap="binary", interpolation="bicubic")

librosa.display.specshow(mel_real, x_axis='time', y_axis='mel', 
                         cmap='binary', fmin=fmin, fmax=fmax, ax=axs[1]);
axs[1].set_xticks([])
axs[1].set_yticks([])
axs[1].set_ylabel('')
axs[1].set_xlabel("(b) Associated Spectrogram")

librosa.display.specshow(mel_pred, x_axis='time', y_axis='mel', 
                         cmap='binary', fmin=fmin, fmax=fmax, ax=axs[2]);
axs[2].set_xticks([])
axs[2].set_yticks([])
axs[2].set_ylabel('')
axs[2].set_xlabel("(c) Predicted Spectrogram")

plt.show()

# Plot audio examples:
mel_real_pwr = librosa.db_to_power(mel_real)
y_real = librosa.feature.inverse.mel_to_audio(
    mel_real_pwr, sr=sr, n_fft=n_fft, hop_length=hop_length, 
    window=scipy.signal.hamming, fmin=fmin, fmax=fmax
)

mel_pred_pwr = librosa.db_to_power(mel_pred)
y_pred = librosa.feature.inverse.mel_to_audio(
    mel_pred_pwr, sr=sr, n_fft=n_fft, hop_length=hop_length, 
    window=scipy.signal.hamming, fmin=fmin, fmax=fmax
)

ipd.display(ipd.Audio(y_real, rate=sr))
ipd.display(ipd.Audio(y_pred, rate=sr))

##### 4.2.1 Visualize the latent space conditioned on the audio-sample labels

#### 4.2 Experiment with the latent space 

In [None]:
# Fetch the encoder and decoder separately
mg_encoder = mg_model.get_encoder()
mg_decoder = mg_model.get_decoder()

def encode(features):
    z_mu, z_logvar = np.split(mg_encoder.predict(features), 2, axis=-1)
    return mg_model.reparametrize(z_mu, z_logvar).numpy()

def decode(z):
    x_mu, x_logvar = np.split(mg_decoder.predict(z), 2, axis=-1)
    return sample_mvn(x_mu, x_logvar)

In [None]:
import seaborn as sns

# Set the "features" (which was previously called inputs)
features = inputs.copy()

# Project features to latent space
z_mu, z_logvar = np.split(mg_encoder.predict(features), 2, axis=-1)
z = mg_model.reparametrize(z_mu, z_logvar).numpy()
print(z.shape)

corr = np.corrcoef(z[:, :16].T)
print(corr.shape)

sns.heatmap(corr, annot=False, xticklabels=True, yticklabels=True, cmap='Spectral_r')
plt.show()

In [None]:
# Set the "features" (which was previously called inputs)
features = inputs.copy()

# Grab the label (genre encoding) for each feature
labels_str  = list(video_df.loc["DanceGenre", :])
dancegenres = set(labels_str)
genre_dict  = {k: idx for idx, k in enumerate(dancegenres)}
label_genre = {v: k for k, v in genre_dict.items()}
labels      = [genre_dict[k] for k in list(video_df.loc["DanceGenre", :])]

feature_labels = np.array(labels)
print(features.shape, feature_labels.shape)

# Perform LDA
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis

components = 3
lda = LinearDiscriminantAnalysis(n_components=components)

# Project features to latent space
z_mu, z_logvar = np.split(mg_encoder.predict(features), 2, axis=-1)
latent_features = mg_model.reparametrize(z_mu, z_logvar).numpy()
print(latent_features.shape)

# Split to train-test
X_train, X_test, y_train, y_test = train_test_split(
        latent_features, feature_labels, test_size=0.20, random_state=19)

# Fit the latent features
lda.fit(X_train, y_train)

# Perform a transformation and plot
X = lda.transform(X_train)
print(X.shape)

# Visualize on a scatter plot
fig = plt.figure(figsize=(5, 8), dpi=120)
ax = fig.add_subplot(projection='3d')
#ax1 = fig.add_subplot(projection='3d')

from matplotlib.colors import ListedColormap
colors = [plt.cm.Spectral_r(i/float(len(genre_dict)-1)) for i in range(len(genre_dict))]
colors = ListedColormap(colors)

scatter = ax.scatter(X[:, 0], X[:, 1], X[:, 2], c=y_train, cmap=colors)
ax.legend(
    handles=scatter.legend_elements()[0], 
    labels=genre_dict.keys(),
    loc='upper center', 
    bbox_to_anchor=(0.5, 1.2),
    ncol=3, fancybox=True, shadow=True
)
ax.set_xlabel("LDA 1")
ax.set_ylabel("LDA 2")
ax.set_zlabel("LDA 3")
#plt.figure(figsize=(7, 6))
#plt.scatter(X[:, 0], X[:, 1], c=y_train, cmap='Spectral_r')
#ax.colorbar()
plt.show()

# Predict train accuracy
y_pred_train = lda.predict(X_train)
acc_train = accuracy_score(y_train, y_pred_train)

# Predict test accuracy
y_pred_test = lda.predict(X_test)
acc_test = accuracy_score(y_test, y_pred_test)

# Print results
print(acc_train, acc_test)

In [None]:
many_zs = np.random.normal(size=(3120, latent_dim), loc=0, scale=1)

X = lda.transform(many_zs)
y = lda.predict(many_zs)

# Visualize on a scatter plot
plt.figure(figsize=(6, 6))
plt.scatter(X[:, 0], X[:, 1], c=y, cmap='Spectral_r')
plt.colorbar()
plt.show()

##### 4.2.2 Combine motiongrams in the latent space 

In [None]:
# Fetch some random examples
num_examples = 3
rand_idx = np.random.randint(inputs.shape[0], size=num_examples)
examples = inputs[rand_idx, :]
lat_sum  = np.zeros((1, latent_dim))
lat_fac  = 1. / num_examples

for ii in range(num_examples):
    ex = np.expand_dims(examples[ii, :], axis=0)
    _, _, zs = mg_encoder.predict(ex)
    lat_sum = (lat_sum + zs) * lat_fac

print(lat_sum.shape)    
decoded_mel = mg_decoder.predict(lat_sum)
decoded_mel = np.reshape(decoded_mel, input_dims)
decoded_mel = (1 - decoded_mel) * -80.

fig, axs = plt.subplots(1, 1, figsize=(5, 5))
librosa.display.specshow(decoded_mel, x_axis='time', y_axis='mel', 
                         cmap='binary', fmin=fmin, fmax=fmax, ax=axs);
plt.show()

decoded_mel = librosa.db_to_power(decoded_mel)
y_decoded = librosa.feature.inverse.mel_to_audio(
    decoded_mel, sr=sr, n_fft=n_fft, hop_length=hop_length, 
    window=scipy.signal.hamming, fmin=fmin, fmax=fmax
)

ipd.display(ipd.Audio(y_decoded, rate=sr))

##### 4.2.3 Sample randomly and generate an audio-hook 

In [None]:
# Sample a "latent" space vector using numpy
zs = np.random.normal(size=(1, latent_dim), loc=0.25, scale=3.95)

# Use our lda to predict which (dance) genre this is
y = lda.predict(zs)
print("Genre: " + label_genre[int(y)])

# Pass sample through decoder
decoded_mel = mg_decoder.predict(zs)
decoded_mel = np.reshape(decoded_mel, input_dims)
decoded_mel = (1 - decoded_mel) * -80.

fig, axs = plt.subplots(1, 1, figsize=(5, 5))
librosa.display.specshow(decoded_mel, x_axis='time', y_axis='mel', 
                         cmap='binary', fmin=fmin, fmax=fmax, ax=axs);
plt.show()

decoded_mel = librosa.db_to_power(decoded_mel)
y_decoded = librosa.feature.inverse.mel_to_audio(
    decoded_mel, sr=sr, n_fft=n_fft, hop_length=hop_length, 
    window=scipy.signal.hamming, fmin=fmin, fmax=fmax
)

ipd.display(ipd.Audio(y_decoded, rate=sr))

##### 4.2.4 Interpolate between two motiongrams (in latent space) 

In [None]:
# Generate a random number
rand_idxs = np.random.randint(inputs.shape[0], size=2)
print(rand_idxs)

# Fetch two examples to interpolate between
inputs_subset = inputs[rand_idxs, :]
example_lo    = np.expand_dims(inputs_subset[0, :], axis=0)
example_hi    = np.expand_dims(inputs_subset[1, :], axis=0)

# Project them both to latent space
_, _, zs_lo = mg_encoder.predict(example_lo)
_, _, zs_hi = mg_encoder.predict(example_hi)

steps   = 5
interps = []

fig, axs = plt.subplots(1, steps, figsize=(15, 5))

for ii in range(steps):
    # Linearly interpolate the latent vector
    tau = ii / (steps - 1)
    zs  = zs_lo*(1 - tau) + zs_hi*tau
    
    # Try to predict which genre comes in between
    y = lda.predict(zs)
    print("Genre: " + label_genre[int(y)])
    
    # Decode it back to a "spectrogram"
    decoded_mel = mg_decoder.predict(zs)
    decoded_mel = np.reshape(decoded_mel, input_dims)
    decoded_mel = (1 - decoded_mel) * -80.
    
    # Display it
    axs[ii].set_xticks([])
    axs[ii].set_yticks([])
    axs[ii].set_title("Step {}".format(ii+1), size=12)
    axs[ii].imshow(decoded_mel, interpolation = "bicubic", cmap = "binary")
    
    # Convert to audio
    decoded_mel_pwr = librosa.db_to_power(decoded_mel)
    y_decoded = librosa.feature.inverse.mel_to_audio(
        decoded_mel_pwr, sr=sr, n_fft=n_fft, hop_length=hop_length, 
        window=scipy.signal.hamming, fmin=fmin, fmax=fmax
    )
    
    # Store in list
    interps.append((decoded_mel, y_decoded, zs))

In [None]:
# Play some of the audio samples
#step_idx = 2
#y_       = interps[step_idx][1] 
for _, y_, _ in interps:
    ipd.display(ipd.Audio(y_, rate=sr))

#### 4.3 Visualize activations in first conv layer

In [None]:
layer_outputs = [
    layer.output for layer in mg_encoder.layers[:3]
] 

activation_model = Model(
    inputs=mg_cvae.input_encoder, 
    outputs=layer_outputs)

In [None]:
example_no = 310
activations = activation_model.predict(
    np.reshape(inputs[example_no], (1, -1))
) 

fig = plt.figure(figsize=[14,14])
for f in range(0,32):
    plt.subplot(6, 6, f+1)
    fig = plt.imshow(activations[2][0,:,:,f], cmap='binary', origin='lower')
    fig.axes.get_xaxis().set_visible(False)
    fig.axes.get_yaxis().set_visible(False)
plt.show()