# Variational Autoencoder for anomaly detection


### Import

In [1]:
import tensorflow as tf 
from tensorflow.keras import layers
import tensorflow_datasets as tfds 

import matplotlib.pyplot as plt 
import numpy as np 
 
import os 
import random 
from IPython import display 

### Setting parameters 


In [None]:
# parameters for building the model and training
BATCH_SIZE = 
LATENT_DIM = 
IMAGE_SIZE = 

### Load the dataset 

In [None]:
def get_dataset_slices_paths(image_dir):
    ''' returns a list of paths to the image files'''
    image_file_list = os.listdir(image_dir)
    image_paths = [os.path.join(image_dir, fname) for fname in image_file_list]

    return image_paths 

def map_image(image_filename): 
    ''' preprocess the images'''
    img_raw = tf.io.read_file(image_filename)
    image = tf.image.decode_jpeg(img_raw) # depends on the type of images

    image = tf.cast(image, dtype=tf.float32)
    image = tf.image.resize(image, (IMAGE_SIZE, IMAGE_SIZE))
    image = image/255.0
    image = tf.reshape(image, shape=(IMAGE_SIZE, IMAGE_SIZE,3))

    return image 

### Data preprocessing

In [None]:
# get the list containing the image paths
paths = get_dataset_slices_paths(image_dir = )

# shuffle the paths 
random.shuffle(paths)

# split the paths list into training and validation sets 
paths_len = len(paths)
train_paths_len = int(paths_len*0.8)

train_paths = paths[:train_paths_len]
val_paths = paths[train_paths_len:]

# load the training image paths into tensors, creates batches and shuffle
training_dataset = tf.data.Dataset.from_tensor_slices((train_paths))
training_dataset = training_dataset.map(map_image)
training_dataset = training_dataset.shuffle().batch(BATCH_SIZE)

# load the validation image paths into tensors, creates batches 
validation_dataset = tf.data.Dataset.from_tensor_slices((val_paths))
validation_dataset = validation_dataset.map(map_image)
validation_dataset = validation_dataset.batch(BATCH_SIZE)

print(f'number of batches in the training set: {len(training_dataset)}')
print(f'number of batches in the validation set: {len(validation_dataset)}')

### Display the images

In [None]:
def display_fabric(dataset, size):
    ''' Takes a sample from a dataset batch and plots it in a grid. '''
    dataset = dataset.unbatch().take(size)
    n_cols = 2
    n_rows = size//n_cols+1
    plt.figure(figsize=(5, 5))
    i=0
    for image in dataset:
        i+=1 
        disp_image = np.reshape(image, (64,64,3))
        plt.subplot(n_rows, n_cols, i)
        plt.xticks([])
        plt.yticks([])
        plt.imshow(disp_image)

def display_one_row(dis_images, offset, shape=(28,28)):
    ''' Display a row of images. '''
    for i, img in enumerate(dis_images):
        plt.subplot(2,2,offset+i+1)
        plt.xticks([])
        plt.yticks([])
        image = np.reshape(img, shape)
        plt.imshow(img)

def display_results(dis_input_images, dis_predicted):
    ''' Display input and predicted images. '''
    plt.figure(figsize=(15,5))
    display_one_row(dis_input_images, 0, shape=(IMAGE_SIZE, IMAGE_SIZE, 3))
    display_one_row(dis_predicted, 20, shape=(IMAGE_SIZE, IMAGE_SIZE, 3))

In [None]:
display_fabric(validation_dataset, size=4)

### Build the Model 

#### Sampling Class

This layer provide the Gaussian noise input along with the mean ($\mu $) and standard deviation ($\sigma$) of the encoder's output, according to the following equation:
**$$z = \mu + e^{0.5\sigma} * \epsilon $$**
($\epsilon$ = random sample)

In [None]:
class Sampling(tf.keras.layers.Layer):
    def call(self, inputs):
        ''' Generates a random sample and combines with the encoder output
        Args:
        inputs - output tensor from the encoder 
        Returns: 
        'inputs' tensors combined with a random sample
        '''
        mu, sigma = inputs
        batch = tf.shape(mu)[0]
        dim = tf.shape(mu)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        z = mu + np.exp^(0.5*sigma)*epsilon
        return z 
    

#### Encoder
 

In [None]:
class Encoder_block(tf.keras.Model):   
    def __init__(self, inputs, final_conv, repetition):
        super(Encoder_block, self).__init__()
        self.conv1 = tf.keras.layers.Conv2D(32, 3, 2, 'same')
        self.conv2 = tf.keras.layers.Conv2D(64, 3, 2, 'same')
        self.conv3 = tf.keras.layers.Conv2D(128, 3, 2, 'same')
        self.lrelu = tf.keras.layers.LeakyReLU(0.1)
        self.bn = tf.keras.layers.BatchNormalization()
        self.inputs = inputs
        self.repetition = repetition
        self.final_conv= final_conv
    def call(self,final_conv=False):
        x = self.conv1(self.inputs)
        x = self.lrelu(x)
        x = self.bn(x)
        x = self.conv2(x)
        x = self.lrelu(x)
        x = self.bn(x)
        feature_shape = self.conv3(x)
        

        if final_conv: 
            return x
        else:
            return x, x.shape()


In [None]:
class Encoder(tf.keras.Model):
    def __init__(self, latent_dim, input_shape):
        super(Encoder, self).__init__()
        self.input_shape = input_shape
        self.latent_dim = latent_dim

    def call(self, inputs):
        inputs = tf.keras.layers.Input(shape=self.input_shape)
        x = Encoder_block(inputs, filters=32, final_conv=False)
        x = Encoder_block(x, filters=64, final_conv=False)
        x = Encoder_block(x, filters=128, final_conv=True)
