In [None]:
import tensorflow as tf
import numpy as np
import time
from scipy.stats import multivariate_normal


class VAE(tf.keras.Model):
    """Class of basic Variational Autoencoder

    This class contains basic components and main functions of VAE model.
    
    Args:
      latent_dim: size of latent variables. 2,4,6 ... 2n
      input_shape: shape of input data. [28,28]...    
    
    """
    def __init__(self, latent_dim,input_shape):
        super(VAE, self).__init__()
        self.latent_dim = latent_dim
        
        # Encoder NN
        self.encoder = tf.keras.Sequential(
          [
            tf.keras.layers.InputLayer(input_shape=input_shape),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(latent_dim + latent_dim),
          ]
        )

        # Decoder NN
        self.decoder = tf.keras.Sequential(
          [
            tf.keras.layers.InputLayer(input_shape=(latent_dim)),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.Dense(784),
            tf.keras.layers.Reshape(target_shape=input_shape),
          ]
        )

    @tf.function
    def sample(self, eps=None):
        """ sample data from latent variables 

        Args:
          eps: it will be used for decode and
            eps will be set by a random data when None
        Return:
          log x'(the reconstruction data of x)
        """
        if eps is None:
            eps = tf.random.normal(shape=(100, self.latent_dim))
        return self.decode(eps, apply_sigmoid=True)

    def encode(self, x):
        """do encode jobs
        for each input data x, will construct multiple Gausian Distribution

        Args:
          x: data that will be encoded.
        Return:
          mean: array like. list of mu of that distribution.
          logvar: array like. list of logvar of that distribution.        
        """
        mean, logvar = tf.split(self.encoder(x), num_or_size_splits=2, axis=1)
        return mean, logvar

    def reparameterize(self, mean, logvar):
        """ reparameterize trick """
        eps = tf.random.normal(shape=mean.shape)
        return eps * tf.exp(logvar * .5) + mean
    
    # decode
    def decode(self, z, apply_sigmoid=False):
        """ do decode job
        Args:
          z: latent variables.
          apply_sigmoid: use sigmoid or not.
        Return:
          log x'(the reconstruction data of x)
        """
        logits = self.decoder(z)
        if apply_sigmoid:
            probs = tf.sigmoid(logits)
            return probs
        return logits



def log_normal_pdf(sample, mean, logvar, raxis=1):
    """Log of the normal probability density function.

    Args:
      sample: sample that need compute pdf
      mean,logvar: parametes of distribution
    Return:
      log pdf 
    """
    log2pi = tf.math.log(2. * np.pi)
    return tf.reduce_sum(
      -.5 * ((sample - mean) ** 2. * tf.exp(-logvar) + logvar + log2pi),
      axis=raxis)

def compute_loss(model, x):
    """calc ELBO
    
    Args: 
      x: input data
    Return:
      loss
    """
    # 1. encode process 
    mean, logvar = model.encode(x)

    # 2. reparameterize 
    z = model.reparameterize(mean, logvar)
    
    # 3. get log x'
    x_logit = model.decode(z)
    
    # 4. Compute sigmoid cross entropy given `logits`. 
    cross_ent = tf.nn.sigmoid_cross_entropy_with_logits(logits=x_logit, labels=x)
    
    # 5. Compute log p(x|z),log p(z),log q(z|x)
    logpx_z = -tf.reduce_sum(cross_ent, axis=[1, 2, 3])
    logpz = log_normal_pdf(z, 0., 0.)
    logqz_x = log_normal_pdf(z, mean, logvar)

    # 5. ELBO 
    return -tf.reduce_mean(logpx_z + logpz - logqz_x)

# optimizers
optimizer = tf.keras.optimizers.Adam(1e-4)

@tf.function
def train_step(model, x, optimizer):
    """Executes one training step and returns the loss.

    This function computes the loss and gradients, and uses the latter to
    update the model's parameters.
    """
    with tf.GradientTape() as tape:
        loss = compute_loss(model, x)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))

def reconstructed_probability(X):
    """Get the reconstruction probability for X

    Args:
      X: test data. X.shape = (32,28,28,1)
    Return:
      log of probability   
    """
    reconstructed_prob = np.zeros(X.shape[0], dtype='float32')

    # 1. encode, 
    mu_hat, log_sigma_hat = model.encode(X)
    sigma_hat = tf.exp(log_sigma_hat)+0.0001
    
    # for each piece of data
    for j in range(X.shape[0]):
        p_l = multivariate_normal.logpdf(X[j], mu_hat[j,:], np.diag(sigma_hat[j,:]))
        reconstructed_prob[j] += tf.reduce_sum(p_l)
    
    return reconstructed_prob

def judege_anomaly(scores, threshold):
    """ judge anomaly
    if scores[i] > threshold:
        label for scores[i]: 1
    Args:
      scores: Array like.
      threshold: float
    Return:
      labels: Array like.
    """
    labels = np.zeros(len(scores),dtype=int)
    for i in range(len(scores)):
        if (scores[i]>threshold):
            labels[i] = 1

    return labels

In [None]:
model = VAE(latent_dim=4,input_shape=(28,28,1))

(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.mnist.load_data()

# preprocess image data
def preprocess_images(images):
    images = images.reshape((images.shape[0], 28, 28, 1)) / 255.
    return np.where(images > .5, 1.0, 0.0).astype('float32')

# exclude error num
def exclude_num(x, y, num):
    keep = (y != num) 
    x = x[keep]
    return x 

# label anomaly
def label_anomaly(anomaly_num,labels):
    true_labels = np.zeros(len(labels),dtype=int)
    for i in range(len(labels)):
        if (labels[i] == anomaly_num):
            true_labels[i] = 1
    return true_labels        

train_images = preprocess_images(train_images)
test_images = preprocess_images(test_images)
print("Before exclusion, Train_images.shape:",train_images.shape)

#choose error num
anomaly_num = 8
train_images = exclude_num(train_images,train_labels,anomaly_num)

# get true labels
true_labels = label_anomaly(anomaly_num, test_labels)

print("Take num {} as error data.".format(anomaly_num))
print("After exclusion, Train_images.shape:",train_images.shape)

Before exclusion, Train_images.shape: (60000, 28, 28, 1)
Take num 8 as error data.
After exclusion, Train_images.shape: (54149, 28, 28, 1)


In [None]:
# get validataion data from train_images
validation_size = int(train_images.shape[0]*0.3)
train_data = train_images[:-validation_size]
validation_data = train_images[-validation_size:]

# test_data 
# test_dataset = test_images

In [None]:
train_size = 60000
batch_size = 32
test_size = 10000

train_dataset = (tf.data.Dataset.from_tensor_slices(train_data)
                 .shuffle(train_size).batch(batch_size))

validation_dataset = (tf.data.Dataset.from_tensor_slices(validation_data)
                 .shuffle(train_size).batch(batch_size))

# for test data, do not shuffle
test_dataset = (tf.data.Dataset.from_tensor_slices(test_images)
                .batch(batch_size))

In [None]:
epochs = 20

# training    
for epoch in range(1, epochs + 1):
    # training
    start_time = time.time()
    for train_x in train_dataset:
        train_step(model, train_x, optimizer)
    end_time = time.time()

    # compute loss
    loss = tf.keras.metrics.Mean()
    for validation_x in validation_dataset:
        loss(compute_loss(model, validation_x))
    elbo = -loss.result()

    print('Epoch: {}, Test set ELBO: {}, time elapse for current epoch: {}'
        .format(epoch, elbo, end_time - start_time))


Epoch: 1, Test set ELBO: -200.57557678222656, time elapse for current epoch: 2.9963009357452393
Epoch: 2, Test set ELBO: -177.3914794921875, time elapse for current epoch: 1.893111228942871
Epoch: 3, Test set ELBO: -162.7436981201172, time elapse for current epoch: 1.8526606559753418
Epoch: 4, Test set ELBO: -155.03526306152344, time elapse for current epoch: 1.885533332824707
Epoch: 5, Test set ELBO: -149.99581909179688, time elapse for current epoch: 1.8758111000061035
Epoch: 6, Test set ELBO: -146.64480590820312, time elapse for current epoch: 1.8865952491760254
Epoch: 7, Test set ELBO: -144.0478057861328, time elapse for current epoch: 1.8395683765411377
Epoch: 8, Test set ELBO: -141.7947540283203, time elapse for current epoch: 1.8848865032196045
Epoch: 9, Test set ELBO: -140.01817321777344, time elapse for current epoch: 1.8557243347167969
Epoch: 10, Test set ELBO: -138.4994659423828, time elapse for current epoch: 1.8587820529937744
Epoch: 11, Test set ELBO: -137.1979522705078, 

In [None]:
# def judege_anomaly(scores,threshold):
#     labels = np.zeros(len(scores),dtype=int)
#     for i in range(len(scores)):
#         if (scores[i]>threshold):
#             labels[i] = 1

#     return labels

In [None]:
# true_labels = np.zeros(len(test_labels),dtype=int)
# for i in range(len(test_labels)):
#     if (test_labels[i] == error_num):
#         true_labels[i] = 1
# true_labels        


In [None]:
from tqdm import tqdm
from time import sleep

predict = []
with tqdm(total=len(test_dataset)) as pbar:
    for test_data in test_dataset:
        results = reconstructed_probability(X=test_data)
        pre_labels = judege_anomaly(results,-31*1000)
        predict.extend(pre_labels)
        pbar.update(1)


100%|██████████| 313/313 [00:18<00:00, 16.78it/s]


In [None]:
from sklearn.metrics import accuracy_score

accuracy_score(predict,true_labels)

0.8986

8986

In [None]:
count/len(true_labels)

0.8986