In [1]:
# import packages
import numpy as np 
import tensorflow as tf 
from tensorflow import keras 
from tensorflow.keras import layers 
from keras.datasets import mnist 
from keras import backend as K
from scipy import stats
import matplotlib.pyplot as plt

In [2]:
# Initial Setup
original_dim = 28 * 28
intermediate_dim1 = 256
intermediate_dim2 = 64
latent_dim = 2
beta = 0.5           # beta is the weight for penalizer, in paper, it is lambda
#tf.random.set_seed(124)

In [3]:
inputs = keras.Input(shape=(original_dim,))
h = layers.Dense(intermediate_dim1, activation='relu')(inputs)
# h = layers.Dense(intermediate_dim2, activation='relu')(h)
z_mean = layers.Dense(latent_dim)(h)
z_log_sigma = layers.Dense(latent_dim)(h)

In [4]:
# sample new data points from latent space (re-parameterization trick):
def sampling(args):
    z_mean, z_log_sigma = args
    # dimension of epsilon: batch_size by 2 in this example
    epsilon = K.random_normal(shape=(K.shape(z_mean)[0], latent_dim),
                              mean=0., stddev=1.0)                       
    return z_mean + K.exp(0.5*z_log_sigma) * epsilon

z = layers.Lambda(sampling)([z_mean, z_log_sigma])

In [5]:
# Create encoder:
encoder = keras.Model(inputs, [z_mean, z_log_sigma, z], name='encoder')

In [6]:
# Create decoder:
latent_inputs = keras.Input(shape=(latent_dim,), name='z_sampling')
x = layers.Dense(intermediate_dim1, activation='relu')(latent_inputs)
x = layers.Dense(intermediate_dim2, activation='relu')(x)
outputs = layers.Dense(original_dim, activation='sigmoid')(x)
decoder = keras.Model(latent_inputs, outputs, name='decoder')

In [7]:
# instantiate VAE model:
outputs = decoder(encoder(inputs)[2]) # get latent vector z
vae = keras.Model(inputs, outputs, name='vae_mlp')

In [8]:
# Another way to write Bernoulli distribution for p_theta(x|z)
reconstruction_loss = inputs*tf.math.log(1e-10+outputs) + (1-inputs)*tf.math.log(1e-10+1-outputs)
reconstruction_loss = tf.reduce_sum(reconstruction_loss, axis=1)

# # reconstruction loss: assuming Normal distribution for p_theta(x|z)
#mse = -0.5*K.sum(K.square((outputs-mu)/K.exp(logsigma)),axis=1)
#sigma_trace = -K.sum(logsigma, axis=1)
#log2pi = -0.5*n_dims*np.log(2*np.pi)

# A. divergence loss: KL_loss
#kl_loss = 0.5*K.sum(K.square(z_mean) + K.exp(z_log_sigma) - 1 - z_log_sigma, axis = -1)
# # total loss (combine with first term)
#vae_loss = K.mean(-reconstruction_loss + beta*kl_loss)

# A_prime another way (Monte Carlo method) to calculate KL divergence (also works)
log_diff_p_qphi = 0.5*(-K.square(z) + z_log_sigma + K.square(z-z_mean)/K.exp(z_log_sigma))
kl_loss = -K.sum(log_diff_p_qphi, axis = 1)
# # total loss (combine with first term)
vae_loss = K.mean(-reconstruction_loss + beta*kl_loss)

# B divergence loss: HD_loss: doesn't use closed form
#log_diff_p_qphi = 0.5*(-K.square(z) + z_log_sigma + K.square(z-z_mean)/K.exp(z_log_sigma))
#Aff_loss = K.exp(0.5*K.sum(log_diff_p_qphi, axis = 1))
# # B_prime divergence loss: HD loss: closed form
#Aff_loss = K.pow(K.prod(K.exp(z_log_sigma), axis = 1), 0.25)/K.sqrt(K.prod((K.exp(z_log_sigma)+1)/2, axis = 1))*K.exp(-0.25*K.sum(K.square(z_mean)/(K.exp(z_log_sigma)+1) ,axis = 1)) 
#Aff_loss = K.sqrt(K.prod(K.exp(0.5*z_log_sigma)/(K.exp(z_log_sigma)+1), axis = 1))*K.exp(-0.25*K.sum(K.square(z_mean)/(K.exp(z_log_sigma)+1) ,axis = 1)) 
# # HD total loss (combine with first term)
#vae_loss = K.mean(-reconstruction_loss + beta*(1-Aff_loss))

# C divergence loss: VNED loss
#log_diff_p_qphi = 0.5*(-K.square(z) + z_log_sigma + K.square(z-z_mean)/K.exp(z_log_sigma))
#VNED_loss = K.exp(-K.exp(K.sum(log_diff_p_qphi, axis=1))+1) - 1 
#VNED_loss = beta*VNED_loss
# VNED total loss:
#vae_loss = K.mean(-reconstruction_loss + VNED_loss)

In [9]:
# Fit model
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.fashion_mnist.load_data()
#(x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()
#(x_train, y_train), (x_test, y_test) = mnist.load_data()

x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.
# for the following: len(x_train) = 6000, np.prod(x_train.shape[1:]) = 784
x_train = x_train.reshape((len(x_train), np.prod(x_train.shape[1:]))) 
x_test = x_test.reshape((len(x_test), np.prod(x_test.shape[1:])))

In [10]:
#for i  in range(60000):
    #pix =  random.sample(range(784), 78)
    # outliers
    #z = np.abs(stats.zscore(x_train[i]))
    #outliers = np.where(z>2)
    # randomly pick outlier location
    #random_outlier = random.choice(outliers[0])

In [11]:
#import random
#print(z.shape)
#print(x_train[1].shape)
#x_train[1][155] 
#print(random.choice(outlier[0]))
#print(z[random.choice(outlier[0])])
#print(x_train[1][random.choice(outlier[0])])

In [12]:
import random
from statistics import median
# at the locations stored in replace_pixel_location, replace pixel values with outliers (at those locations)
for ar in range(60000):
    # list of 78 random number between 0 to 783 without duplicates
    replace_pixel_location = random.sample(range(784), 78)
    # list of outliers 
    z = np.abs(stats.zscore(x_train[ar]))
    outlier = np.where(z >= 1)
    
    ##############################################################################################
    z_list = []
    # list of all the z_values with outlier location
    for i in range(len(outlier[0])):
        z_list.append(z[outlier[0][i]])
    
    x_new = x_train[ar][outlier[0][z_list.index(max(z_list))]]
    ##############################################################################################
    #print(x_new)
    # replacing 78 locations with an outlier value
    for loc in replace_pixel_location:
        # replace pixels with outliers
        #x_train[ar][loc] = x_new
        #x_train[ar][loc] = x_train[ar][random.choice(outlier[0])]
        #x_train[ar][loc] = np.max(x_train[ar]) # to replace x_train[ar][loc] with maximum value
        x_train[ar][loc] = random.random() # to replace x_train[ar][loc] with random value between 0 and 1

'import random\nfrom statistics import median\n# at the locations stored in replace_pixel_location, replace pixel values with outliers (at those locations)\nfor ar in range(60000):\n    # list of 78 random number between 0 to 783 without duplicates\n    replace_pixel_location = random.sample(range(784), 78)\n    # list of outliers \n    z = np.abs(stats.zscore(x_train[ar]))\n    outlier = np.where(z >= 1)\n    \n    ##############################################################################################\n    z_list = []\n    # list of all the z_values with outlier location\n    for i in range(len(outlier[0])):\n        z_list.append(z[outlier[0][i]])\n    \n    x_new = x_train[ar][outlier[0][z_list.index(max(z_list))]]\n    ##############################################################################################\n    #print(x_new)\n    # replacing 78 locations with an outlier value\n    for loc in replace_pixel_location:\n        # replace pixels with outliers\n        #x_tr

In [13]:
def compile_model(my_optimizer):
    vae.add_loss(vae_loss)
    #my_optimizer = "Adam"
    vae.compile(optimizer=my_optimizer)  # choices: RMSprop, SGD, Adam
    # Another way for optimization (can specify learning rate)
    #opt = keras.optimizers.SGD(learning_rate=1e-2)
    #vae.compile(optimizer=opt)

In [14]:
def plot_loss(f_name):
    plt.plot(vae_fit.history['val_loss'])
    plt.savefig(f_name, format='png', dpi = 120)
    #plt.savefig('Validation Loss Adam MNIST', format='png', dpi = 120)
    #plt.savefig('Validation Loss RMSprop MNIST', format='png', dpi = 120)
    plt.show()

In [15]:
# Because the VAE is a generative model, we can also use it to generate new digits! 
# Here we will scan the latent plane, sampling latent points at regular intervals, 
# and generating the corresponding digit for each of these points. This gives us 
# a visualization of the latent manifold that "generates" the MNIST digits.  

def generate_new_img(f_name):
    # Display a 2D manifold of the digits
    n = 15  # figure with 15x15 digits
    digit_size = 28
    figure = np.zeros((digit_size * n, digit_size * n))
    # We will sample n points within [-15, 15] standard deviations
    grid_x = np.linspace(-15, 15, n)
    grid_y = np.linspace(-15, 15, n)

    for i, yi in enumerate(grid_x):
        for j, xi in enumerate(grid_y):
            # z_sample = np.array([[xi, yi]])
            z_sample = np.random.normal(0., 3, size = [1, latent_dim]) # for random order
            x_decoded = decoder.predict(z_sample)
            digit = x_decoded[0].reshape(digit_size, digit_size)
            figure[i * digit_size: (i + 1) * digit_size,
                   j * digit_size: (j + 1) * digit_size] = digit

    plt.figure(figsize=(10, 10))
    plt.imshow(figure)
    plt.savefig(f_name, format='png', dpi = 120)
    #plt.savefig('Generating New Images Adam MNIST', format='png', dpi = 120)
    #plt.savefig('Generating New Images RMSprop MNIST', format='png', dpi = 120)
    plt.show()

In [16]:
# Another way to create a mnist picture:
def plot_latent_space(decoder, n=15, figsize=15):
    # np.random.seed(1)  
    # display a n*n 2D manifold of digits
    digit_size = 28
    scale = 1.0
    figure = np.zeros((digit_size * n, digit_size * n))
    # linearly spaced coordinates corresponding to the 2D plot
    # of digit classes in the latent space
    grid_x = np.linspace(-scale, scale, n)
    grid_y = np.linspace(-scale, scale, n)[::-1]

    for i, yi in enumerate(grid_y):
        for j, xi in enumerate(grid_x):
            # z_sample = np.array([[xi, yi]])                  # for fixed order
            z_sample = np.random.normal(0., 5, size = [1, latent_dim]) # for random order
            x_decoded = decoder.predict(z_sample)
            digit = x_decoded[0].reshape(digit_size, digit_size)
            figure[
                i * digit_size : (i + 1) * digit_size,
                j * digit_size : (j + 1) * digit_size,
            ] = digit

    plt.figure(figsize=(figsize, figsize))
    start_range = digit_size // 2
    end_range = n * digit_size + start_range
    pixel_range = np.arange(start_range, end_range, digit_size)
    sample_range_x = np.round(grid_x, 1)
    sample_range_y = np.round(grid_y, 1)
    plt.xticks(pixel_range, sample_range_x)
    plt.yticks(pixel_range, sample_range_y)
    plt.xlabel("z[0]")
    plt.ylabel("z[1]")
    plt.imshow(figure, cmap="Greys_r")
    plt.savefig(fig_name, format='png', dpi = 120)
    #plt.savefig('Generating New Images (2) Adam MNIST', format='png', dpi = 120)
    #plt.savefig('Generating New Images (2) RMSprop MNIST', format='png', dpi = 120)
    plt.show()



In [17]:
# Display how the latent space clusters different digit classes
def plot_label_clusters(encoder, data, labels, fname):
    # display a 2D plot of the digit classes in the latent space
    z_mean, _, _ = encoder.predict(data)
    plt.figure(figsize=(12, 10))
    plt.scatter(z_mean[:, 0], z_mean[:, 1], c=labels)
    plt.colorbar()
    plt.xlabel("z[0]")
    plt.ylabel("z[1]")
    plt.savefig(fname, format='png', dpi = 120)
    #plt.savefig('Label Clusters Adam MNIST', format='png', dpi = 120)
    #plt.savefig('Label Clusters RMSprop MNIST', format='png', dpi = 120)
    plt.show()



In [18]:
from sklearn.metrics import roc_curve, auc, confusion_matrix
def plot_roc_curve(x_test, tname):
    ground_truth_labels = x_test.ravel()
    x_score = vae.predict(x_test)
    x_score = np.where(x_score < 0.01, 0, x_score)
    score_value = x_score.round().ravel()
    fpr, tpr, _ = roc_curve(score_value, ground_truth_labels)
    roc_auc = auc(fpr, tpr)
    fig, ax = plt.subplots(1, 1)
    ax.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
    ax.plot([0, 1], [0, 1], 'k--')
    ax.set_xlim([0.0, 1.0])
    ax.set_ylim([0.0, 1.05])
    ax.set_xlabel('False Positive Rate')
    ax.set_ylabel('True Positive Rate')
    ax.set_title(tname)
    ax.legend(loc = 'lower right')
    plt.savefig(tname, format='png', dpi = 120)
    plt.show()

In [19]:
#address = '/home/kiara/Desktop/VAE/dr_li_code/Report/VAE/VAE MNIST/lambda=' + str(beta) + '/' + my_optimizer + '/' + l + '/' + number_of_epochs + ' epochs/'                                                                
my_optimizer = "SGD" #Adam, SGD, RMSprop
n_epochs = 200                                                        #20, 100, 200
compile_model(my_optimizer)
vae_fit = vae.fit(x_train, y_train,
                  epochs=n_epochs,
                  batch_size=100,
                  validation_data=(x_test, y_test))

Epoch 1/200
Epoch 2/200
Epoch 3/200
Epoch 4/200
Epoch 5/200
Epoch 6/200
Epoch 7/200
Epoch 8/200
Epoch 9/200
Epoch 10/200
Epoch 11/200
Epoch 12/200
Epoch 13/200
Epoch 14/200
Epoch 15/200
Epoch 16/200
Epoch 17/200
Epoch 18/200
Epoch 19/200
Epoch 20/200
Epoch 21/200
Epoch 22/200
Epoch 23/200
Epoch 24/200
Epoch 25/200
Epoch 26/200
Epoch 27/200
Epoch 28/200
Epoch 29/200
Epoch 30/200
Epoch 31/200
Epoch 32/200
Epoch 33/200
Epoch 34/200
Epoch 35/200
Epoch 36/200
Epoch 37/200
Epoch 38/200
Epoch 39/200
Epoch 40/200
Epoch 41/200
Epoch 42/200
Epoch 43/200
Epoch 44/200
Epoch 45/200
Epoch 46/200
Epoch 47/200
Epoch 48/200
Epoch 49/200
Epoch 50/200
Epoch 51/200

KeyboardInterrupt: 

In [None]:
#model.evaluate(x_test, y_test)

In [None]:
str1 = 'Validation Loss '+ my_optimizer+' MNIST' 
plot_loss(str1)
str2 = 'Generating New Images ' + my_optimizer + ' MNIST' 
generate_new_img(str2)
fig_name = 'Generating New Images (2) ' + my_optimizer + ' MNIST' 
plot_latent_space(decoder)
str3 = 'Label Clusters ' + my_optimizer + ' MNIST'
plot_label_clusters(encoder, x_train, y_train, fname = str3)
str4 = 'Receiver Operating Characteristic (ROC) Curve ' + my_optimizer + ' MNIST'
plot_roc_curve(x_test, str4)