<a href="https://colab.research.google.com/github/weso500/IOT-Anomaly-Detection/blob/main/Single_Image_VAE_MOD_V_GenLatent.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Obtained intially from https://keras.io/examples/generative/vae/ and https://colab.research.google.com/github/keras-team/keras-io/blob/master/examples/generative/ipynb/vae.ipynb

In [1]:
pip install --upgrade keras

Collecting keras
  Downloading keras-3.5.0-py3-none-any.whl.metadata (5.8 kB)
Downloading keras-3.5.0-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m23.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: keras
  Attempting uninstall: keras
    Found existing installation: keras 3.4.1
    Uninstalling keras-3.4.1:
      Successfully uninstalled keras-3.4.1
Successfully installed keras-3.5.0


In [2]:
import os

os.environ["KERAS_BACKEND"] = "tensorflow"

import numpy as np
import tensorflow as tf
import keras
from keras import ops
from keras import layers

In [3]:
class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.seed_generator = keras.random.SeedGenerator(1337)

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = ops.shape(z_mean)[0]
        dim = ops.shape(z_mean)[1]
        epsilon = keras.random.normal(shape=(batch, dim), seed=self.seed_generator)
        return z_mean + ops.exp(0.5 * z_log_var) * epsilon

Use this notebook as example for obtaining images: https://colab.research.google.com/drive/1EOFDisibXb9a4A3lZ_2z4Jbhlnx0OVy6 The below takes care of obtaining images

In [4]:
latent_dim = 3

encoder_inputs = keras.Input(shape=(256, 256, 3))
x = layers.Conv2D(32, 3, activation="relu", strides=2, padding="same")(encoder_inputs)
x = layers.Conv2D(64, 3, activation="relu", strides=2, padding="same")(x)
x = layers.Flatten()(x)
x = layers.Dense(128, activation="relu")(x)
z_mean = layers.Dense(latent_dim, name="z_mean")(x)
z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
z = Sampling()([z_mean, z_log_var])
encoder = keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")
encoder.summary()

In [5]:
latent_inputs = keras.Input(shape=(latent_dim,))
x = layers.Dense(64 * 64 * 64, activation="relu")(latent_inputs)
x = layers.Reshape((64, 64, 64))(x)
x = layers.Conv2DTranspose(64, 3, activation="relu", strides=2, padding="same")(x)
x = layers.Conv2DTranspose(32, 3, activation="relu", strides=2, padding="same")(x)
decoder_outputs = layers.Conv2DTranspose(3, 3, activation="sigmoid", padding="same")(x)
decoder = keras.Model(latent_inputs, decoder_outputs, name="decoder")
decoder.summary()

In [6]:
from tensorflow.keras.layers import Layer
class KLLossLayer(Layer):
    def call(self, inputs):
        z_mean, z_log_var = inputs
        kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
        kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))

        return kl_loss

class VAE(keras.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super().__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(
            name="reconstruction_loss"
        )
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")
        self.kl_loss_layer = KLLossLayer()

    def call(self, inputs):  # Add the call method
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstructed = self.decoder(z)
        return reconstructed

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    def train_step(self, data):
        with tf.GradientTape() as tape:
            z_mean, z_log_var, z = self.encoder(data)
            reconstruction = self.decoder(z)
            reconstruction_loss = ops.mean(
                ops.sum(
                    keras.losses.binary_crossentropy(data, reconstruction),
                    axis=(1, 2),
                )
            )
            kl_loss = -0.5 * (1 + z_log_var - ops.square(z_mean) - ops.exp(z_log_var))
            kl_loss = ops.mean(ops.sum(kl_loss, axis=1))
            total_loss = reconstruction_loss + kl_loss
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstructed = self.decoder(z)
        kl_loss = self.kl_loss_layer([z_mean, z_log_var])
        return reconstructed, kl_loss, z_mean, z_log_var

    def predict(self, x):
        reconstructed, kl_loss, z_mean, z_log_var = super().predict(x)
        return reconstructed, kl_loss, z_mean, z_log_var


In [7]:
!unzip /content/Gold_Folder_Final.zip

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: Gold Folder Final/Training Data/Normal/Spectrograms_Normal_B+IGBT-I/108Normal.png  
  inflating: Gold Folder Final/Training Data/Normal/Spectrograms_Normal_B+IGBT-I/109Normal.png  
  inflating: Gold Folder Final/Training Data/Normal/Spectrograms_Normal_B+IGBT-I/10Normal.png  
  inflating: Gold Folder Final/Training Data/Normal/Spectrograms_Normal_B+IGBT-I/110Normal.png  
  inflating: Gold Folder Final/Training Data/Normal/Spectrograms_Normal_B+IGBT-I/111Normal.png  
  inflating: Gold Folder Final/Training Data/Normal/Spectrograms_Normal_B+IGBT-I/112Normal.png  
  inflating: Gold Folder Final/Training Data/Normal/Spectrograms_Normal_B+IGBT-I/113Normal.png  
  inflating: Gold Folder Final/Training Data/Normal/Spectrograms_Normal_B+IGBT-I/114Normal.png  
  inflating: Gold Folder Final/Training Data/Normal/Spectrograms_Normal_B+IGBT-I/115Normal.png  
  inflating: Gold Folder Final/Training Data/Normal/Spectrogram

In [8]:
train_ds = tf.keras.preprocessing.image_dataset_from_directory(
  '/content/Gold_Folder_Final/Training_Data/Normal/Spectrograms_Normal_MOD-V',
  image_size=(256, 256),
  batch_size=32,
  label_mode=None)


normalization_layer = layers.Rescaling(scale= 1./255)

normalized_ds = train_ds.map(lambda x: normalization_layer(x))

Found 500 files.


In [9]:
test_ds = tf.keras.preprocessing.image_dataset_from_directory(
  '/content/Gold_Folder_Final/Test_Data/Normal/Spectrograms_Normal_MOD-V',
  image_size=(256, 256),
  shuffle=False,
  batch_size=1,
  label_mode=None)

file_paths = test_ds.file_paths
print(file_paths)

normalization_layer = layers.Rescaling(scale= 1./255)

normalized_test_ds = test_ds.map(lambda x: normalization_layer(x))

Found 190 files.
['/content/Gold_Folder_Final/Test_Data/Normal/Spectrograms_Normal_MOD-V/500Normal.png', '/content/Gold_Folder_Final/Test_Data/Normal/Spectrograms_Normal_MOD-V/501Normal.png', '/content/Gold_Folder_Final/Test_Data/Normal/Spectrograms_Normal_MOD-V/502Normal.png', '/content/Gold_Folder_Final/Test_Data/Normal/Spectrograms_Normal_MOD-V/503Normal.png', '/content/Gold_Folder_Final/Test_Data/Normal/Spectrograms_Normal_MOD-V/504Normal.png', '/content/Gold_Folder_Final/Test_Data/Normal/Spectrograms_Normal_MOD-V/505Normal.png', '/content/Gold_Folder_Final/Test_Data/Normal/Spectrograms_Normal_MOD-V/506Normal.png', '/content/Gold_Folder_Final/Test_Data/Normal/Spectrograms_Normal_MOD-V/507Normal.png', '/content/Gold_Folder_Final/Test_Data/Normal/Spectrograms_Normal_MOD-V/508Normal.png', '/content/Gold_Folder_Final/Test_Data/Normal/Spectrograms_Normal_MOD-V/509Normal.png', '/content/Gold_Folder_Final/Test_Data/Normal/Spectrograms_Normal_MOD-V/510Normal.png', '/content/Gold_Folder_Fin

In [10]:
test_anomaly_ds = tf.keras.preprocessing.image_dataset_from_directory(
  '/content/Gold_Folder_Final/Test_Data/Anomaly/Spectrograms_Anomaly_MOD-V',
  image_size=(256, 256),
  shuffle=False,
  batch_size=1,
  label_mode=None)

file_paths = test_ds.file_paths
print(file_paths)

normalization_layer = layers.Rescaling(scale= 1./255)

normalized_anomaly_test_ds = test_anomaly_ds.map(lambda x: normalization_layer(x))

Found 182 files.
['/content/Gold_Folder_Final/Test_Data/Normal/Spectrograms_Normal_MOD-V/500Normal.png', '/content/Gold_Folder_Final/Test_Data/Normal/Spectrograms_Normal_MOD-V/501Normal.png', '/content/Gold_Folder_Final/Test_Data/Normal/Spectrograms_Normal_MOD-V/502Normal.png', '/content/Gold_Folder_Final/Test_Data/Normal/Spectrograms_Normal_MOD-V/503Normal.png', '/content/Gold_Folder_Final/Test_Data/Normal/Spectrograms_Normal_MOD-V/504Normal.png', '/content/Gold_Folder_Final/Test_Data/Normal/Spectrograms_Normal_MOD-V/505Normal.png', '/content/Gold_Folder_Final/Test_Data/Normal/Spectrograms_Normal_MOD-V/506Normal.png', '/content/Gold_Folder_Final/Test_Data/Normal/Spectrograms_Normal_MOD-V/507Normal.png', '/content/Gold_Folder_Final/Test_Data/Normal/Spectrograms_Normal_MOD-V/508Normal.png', '/content/Gold_Folder_Final/Test_Data/Normal/Spectrograms_Normal_MOD-V/509Normal.png', '/content/Gold_Folder_Final/Test_Data/Normal/Spectrograms_Normal_MOD-V/510Normal.png', '/content/Gold_Folder_Fin

In [11]:
#(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
#mnist_digits = np.concatenate([x_train, x_test], axis=0)
#mnist_digits = np.expand_dims(mnist_digits, -1).astype("float32") / 255

vae = VAE(encoder, decoder)
vae.compile(optimizer=keras.optimizers.Adam())
vae.fit(normalized_ds, epochs=15, batch_size=32)

Epoch 1/15
[1m16/16[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m27s[0m 739ms/step - kl_loss: 95.1000 - loss: 44683.2617 - reconstruction_loss: 44588.1602
Epoch 2/15
[1m16/16[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m19s[0m 88ms/step - kl_loss: 5203.7598 - loss: 18910.5625 - reconstruction_loss: 13706.8018
Epoch 3/15
[1m16/16[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 81ms/step - kl_loss: 280.9164 - loss: 10828.7959 - reconstruction_loss: 10547.8789
Epoch 4/15
[1m16/16[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 88ms/step - kl_loss: 199.9346 - loss: 8383.1309 - reconstruction_loss: 8183.1958
Epoch 5/15
[1m16/16[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 87ms/step - kl_loss: 221.7357 - loss: 6402.4668 - reconstruction_loss: 6180.7310
Epoch 6/15
[1m16/16[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 80ms/step - kl_loss: 129.0914 - loss: 5954.8403 - reconstruction_loss: 5825.7490
Epoch 7/15
[1m16/16[0m [32m━━━━━━━━━━━━━━━━━━━━[0m

<keras.src.callbacks.history.History at 0x7f6941d56110>

In [12]:
train_ds_test_ = tf.keras.preprocessing.image_dataset_from_directory(
  '/content/Gold_Folder_Final/Training_Data/Normal/Spectrograms_Normal_MOD-V',
  image_size=(256, 256),
  batch_size=1,
  label_mode=None)


normalization_layer = layers.Rescaling(scale= 1./255)

normalized_ds_test1 = train_ds_test_.map(lambda x: normalization_layer(x))

Found 500 files.


In [13]:
pip install sewar

Collecting sewar
  Downloading sewar-0.4.6.tar.gz (11 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: sewar
  Building wheel for sewar (setup.py) ... [?25l[?25hdone
  Created wheel for sewar: filename=sewar-0.4.6-py3-none-any.whl size=11419 sha256=7ae721109a24b243de757daa6ea8780d840cb6304a79bc034018a481a11b7014
  Stored in directory: /root/.cache/pip/wheels/3f/af/02/9c6556ba287b62a945d737def09b8b8c35c9e1d82b9dfae84c
Successfully built sewar
Installing collected packages: sewar
Successfully installed sewar-0.4.6


In [None]:
import cv2
from urllib.request import AbstractBasicAuthHandler
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt
from sewar.full_ref import mse, rmse, psnr, uqi, ssim, ergas, scc, rase, sam, msssim, vifp

#mse_values_normal = [] # Create an empty list to store mse values
#mse_values_anomaly = []
mse_values = []
kldiv1 = []
zmean1 = []
zmean2 = []
zmean3 = []
zlogvar1 = []
zlogvar2 = []
zlogvar3 = []

for elem in normalized_ds_test1.take(500):
  for i in range(len(elem)):
    elem_np = elem.numpy()
    exp = vae.predict(elem)
    exp_np = tf.get_static_value(exp)
    #plt.imshow(np.squeeze(elem_np))
    #plt.show()
    #plt.imshow(np.squeeze(exp_np[0]))
    #plt.show()
    #x = cv2.PSNR(elem_np, exp_np[0])
    x = mse(elem_np, exp_np[0])
    print(x)
    print(exp_np[1])
    kldiv1.append(exp_np[1])
    print(exp_np[2])
    zmean1.append(exp_np[2][0][0])
    zmean2.append(exp_np[2][0][1])
    zmean3.append(exp_np[2][0][2])
    print(exp_np[3])
    zlogvar1.append(exp_np[3][0][0])
    zlogvar2.append(exp_np[3][0][1])
    zlogvar3.append(exp_np[3][0][2])
    mse_values.append(x) # Append each mse value to the list



[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 3s/step
0.0005071965806992144
41.301212
[[ 5.586003   4.7356315 -4.753886 ]]
[[-3.1496475 -2.9048767 -3.179609 ]]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 53ms/step
0.0007586564030898175
39.69414
[[ 5.5006356  4.648484  -4.6042604]]
[[-3.1887712 -2.86346   -3.129401 ]]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 43ms/step
0.00046412286840646493
41.189434
[[ 5.574857   4.7226043 -4.7606483]]
[[-3.1197495 -2.9009132 -3.1713169]]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 27ms/step
0.000412029651231247
41.040024
[[ 5.5652657  4.7205544 -4.741953 ]]
[[-3.135039  -2.8929896 -3.1691375]]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 71ms/step
0.0004225114983646982
41.406906
[[ 5.5883307  4.744343  -4.764081 ]]
[[-3.1521437 -2.9045646 -3.1834266]]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 38ms/step
0.0004542668250088152
41.343895
[[ 5.5859

In [None]:
import pandas as pd

dict = {'MSE': mse_values, 'KL Loss': kldiv1, 'Zmean1': zmean1, 'Zmean2': zmean2, 'Zmean3': zmean3, 'Zlogvar1': zlogvar1,'Zlogvar2': zlogvar2, 'Zlogvar3': zlogvar3 }

df = pd.DataFrame(dict)

# saving the dataframe
df.to_csv('MOD-V_train.csv')

In [None]:
import cv2
from urllib.request import AbstractBasicAuthHandler
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt
from sewar.full_ref import mse, rmse, psnr, uqi, ssim, ergas, scc, rase, sam, msssim, vifp

#mse_values_normal = [] # Create an empty list to store mse values
#mse_values_anomaly = []
mse_values = []
kldiv1 = []
zmean1 = []
zmean2 = []
zmean3 = []
zlogvar1 = []
zlogvar2 = []
zlogvar3 = []

for elem in normalized_test_ds.take(190):
  for i in range(len(elem)):
    elem_np = elem.numpy()
    exp = vae.predict(elem)
    exp_np = tf.get_static_value(exp)
    #plt.imshow(np.squeeze(elem_np))
    #plt.show()
    #plt.imshow(np.squeeze(exp_np[0]))
    #plt.show()
    #x = cv2.PSNR(elem_np, exp_np[0])
    x = mse(elem_np, exp_np[0])
    print(x)
    print(exp_np[1])
    kldiv1.append(exp_np[1])
    print(exp_np[2])
    zmean1.append(exp_np[2][0][0])
    zmean2.append(exp_np[2][0][1])
    zmean3.append(exp_np[2][0][2])
    print(exp_np[3])
    zlogvar1.append(exp_np[3][0][0])
    zlogvar2.append(exp_np[3][0][1])
    zlogvar3.append(exp_np[3][0][2])
    mse_values.append(x) # Append each mse value to the list

In [None]:
import pandas as pd

dict = {'MSE': mse_values, 'KL Loss': kldiv1, 'Zmean1': zmean1, 'Zmean2': zmean2, 'Zmean3': zmean3, 'Zlogvar1': zlogvar1,'Zlogvar2': zlogvar2, 'Zlogvar3': zlogvar3 }

df2 = pd.DataFrame(dict)

# saving the dataframe
df2.to_csv('MOD-V_testnorm.csv')

In [None]:
import cv2
from urllib.request import AbstractBasicAuthHandler
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt
from sewar.full_ref import mse, rmse, psnr, uqi, ssim, ergas, scc, rase, sam, msssim, vifp

#mse_values_normal = [] # Create an empty list to store mse values
#mse_values_anomaly = []
mse_values = []
kldiv1 = []
zmean1 = []
zmean2 = []
zmean3 = []
zlogvar1 = []
zlogvar2 = []
zlogvar3 = []

for elem in normalized_anomaly_test_ds.take(190):
  for i in range(len(elem)):
    elem_np = elem.numpy()
    exp = vae.predict(elem)
    exp_np = tf.get_static_value(exp)
    #plt.imshow(np.squeeze(elem_np))
    #plt.show()
    #plt.imshow(np.squeeze(exp_np[0]))
    #plt.show()
    #x = cv2.PSNR(elem_np, exp_np[0])
    x = mse(elem_np, exp_np[0])
    print(x)
    print(exp_np[1])
    kldiv1.append(exp_np[1])
    print(exp_np[2])
    zmean1.append(exp_np[2][0][0])
    zmean2.append(exp_np[2][0][1])
    zmean3.append(exp_np[2][0][2])
    print(exp_np[3])
    zlogvar1.append(exp_np[3][0][0])
    zlogvar2.append(exp_np[3][0][1])
    zlogvar3.append(exp_np[3][0][2])
    mse_values.append(x) # Append each mse value to the list

In [None]:
import pandas as pd

dict = {'MSE': mse_values, 'KL Loss': kldiv1, 'Zmean1': zmean1, 'Zmean2': zmean2, 'Zmean3': zmean3, 'Zlogvar1': zlogvar1,'Zlogvar2': zlogvar2, 'Zlogvar3': zlogvar3 }

df3 = pd.DataFrame(dict)

# saving the dataframe
df3.to_csv('MOD-V_testanom.csv')

In [None]:
from google.colab import files
files.download('/content/B+FLUX_testanom.csv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
files.download('/content/B+FLUX_testnorm.csv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
files.download('/content/B+_FLUX_train.csv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>