In [1]:
import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import librosa
import matplotlib.pyplot as plt
import soundfile as sf
from sklearn.model_selection import train_test_split
import tensorflow_addons as tfa



TensorFlow Addons (TFA) has ended development and introduction of new features.
TFA has entered a minimal maintenance and release mode until a planned end of life in May 2024.
Please modify downstream libraries to take dependencies from other repositories in our TensorFlow community (e.g. Keras, Keras-CV, and Keras-NLP). 

For more information see: https://github.com/tensorflow/addons/issues/2807 

 The versions of TensorFlow you are currently using is 2.18.0 and is not supported. 
Some things might work, some things might not.
If you were to encounter a bug, do not file an issue.
If you want to make sure you're using a tested and supported configuration, either change the TensorFlow version or the TensorFlow Addons's version. 
You can find the compatibility matrix in TensorFlow Addon's readme:
https://github.com/tensorflow/addons


ModuleNotFoundError: No module named 'keras.src.engine'

In [5]:
import librosa
import numpy as np
import os
import cv2  # OpenCV for resizing

def load_audio_files(directory):
    data, labels = [], []
    for label, folder in enumerate(os.listdir(directory)):
        folder_path = os.path.join(directory, folder)
        if not os.path.isdir(folder_path):
            continue
        for file in os.listdir(folder_path):
            file_path = os.path.join(folder_path, file)
            if not file_path.endswith(('.wav','.mp3')):
                continue
            signal, sr = librosa.load(file_path, sr=22050)
            
            # Generate Mel spectrogram with 128 Mel bands
            mel_spec = librosa.feature.melspectrogram(y=signal, sr=sr, n_mels=128)
            mel_spec = librosa.power_to_db(mel_spec, ref=np.max)
            
            # Resize to (128, 128)
            mel_spec_resized = cv2.resize(mel_spec, (128, 128))

            # Expand dimensions to add channel (for CNN input compatibility)
            data.append(mel_spec_resized)
            labels.append(label)

    # Convert to numpy arrays and add channel dimension
    data = np.array(data)
    data = np.expand_dims(data, -1)  # Shape will be (num_samples, 128, 128, 1)
    labels = np.array(labels)
    
    return data, labels

# Load the audio data
data, labels = load_audio_files('C:/Users/HP/Downloads/archive/Raw Audio/')

# Verify the shape
print(data.shape)  # Should be (num_samples, 128, 128, 1)


(1415, 128, 128, 1)


In [6]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

latent_dim = 32  # Adjust as needed based on audio data complexity
input_shape = (128, 128, 1)  # Example shape for Mel spectrogram

# Custom Sampling layer for the reparameterization trick
class Sampling(layers.Layer):
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

# Encoder for VAE
def build_encoder(input_shape, latent_dim):
    inputs = keras.Input(shape=input_shape)
    x = layers.Conv2D(32, 3, activation="relu", strides=2, padding="same")(inputs)
    x = layers.Conv2D(64, 3, activation="relu", strides=2, padding="same")(x)
    x = layers.Flatten()(x)
    x = layers.Dense(160, activation="relu")(x)
    z_mean = layers.Dense(latent_dim, name="z_mean")(x)
    z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
    z = Sampling()([z_mean, z_log_var])  # Use the Sampling layer here
    return keras.Model(inputs, [z_mean, z_log_var, z], name="encoder")

# Decoder for VAE
def build_decoder(latent_dim, input_shape):
    latent_inputs = keras.Input(shape=(latent_dim,))
    x = layers.Dense(8 * 8 * 64, activation="relu")(latent_inputs)
    x = layers.Reshape((8, 8, 64))(x)
    x = layers.Conv2DTranspose(64, 3, activation="relu", strides=2, padding="same")(x)
    x = layers.Conv2DTranspose(32, 3, activation="relu", strides=2, padding="same")(x)
    outputs = layers.Conv2DTranspose(1, 3, activation="sigmoid", padding="same")(x)
    return keras.Model(latent_inputs, outputs, name="decoder")

# Discriminator for GAN
def build_discriminator(input_shape):
    inputs = keras.Input(shape=input_shape)
    x = layers.Conv2D(64, 3, activation="relu", strides=2, padding="same")(inputs)
    x = layers.Conv2D(32, 3, activation="relu", strides=2, padding="same")(x)
    x = layers.Flatten()(x)
    x = layers.Dense(64, activation="relu")(x)
    outputs = layers.Dense(1, activation="sigmoid")(x)
    return keras.Model(inputs, outputs, name="discriminator")

# Instantiate the models
encoder = build_encoder(input_shape, latent_dim)
decoder = build_decoder(latent_dim, input_shape)
discriminator = build_discriminator(input_shape)

# Display model summaries to verify
encoder.summary()
decoder.summary()
discriminator.summary()





In [7]:
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow import keras

# Define encoder with (32, 32, 1) input shape
def build_encoder(latent_dim):
    encoder_inputs = keras.Input(shape=(32, 32, 1))
    x = layers.Conv2D(32, (3, 3), activation="relu", strides=2, padding="same")(encoder_inputs)
    x = layers.Conv2D(64, (3, 3), activation="relu", strides=2, padding="same")(x)
    x = layers.Flatten()(x)
    x = layers.Dense(16, activation="relu")(x)
    z_mean = layers.Dense(latent_dim, name="z_mean")(x)
    z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
    
    def sampling(args):
        z_mean, z_log_var = args
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon
    
    z = layers.Lambda(sampling, output_shape=(latent_dim,), name="z")([z_mean, z_log_var])
    return keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")

# Define decoder that takes in latent vector and outputs (32, 32, 1)
def build_decoder(latent_dim):
    latent_inputs = keras.Input(shape=(latent_dim,))
    x = layers.Dense(8 * 8 * 64, activation="relu")(latent_inputs)
    x = layers.Reshape((8, 8, 64))(x)
    x = layers.Conv2DTranspose(64, (3, 3), activation="relu", strides=2, padding="same")(x)
    x = layers.Conv2DTranspose(32, (3, 3), activation="relu", strides=2, padding="same")(x)
    x = layers.Conv2DTranspose(1, (3, 3), activation="sigmoid", padding="same")(x)
    return keras.Model(latent_inputs, x, name="decoder")

# Define discriminator that takes (32, 32, 1) as input
def build_discriminator():
    discriminator_inputs = keras.Input(shape=(32, 32, 1))
    x = layers.Conv2D(64, (3, 3), activation="relu", strides=2, padding="same")(discriminator_inputs)
    x = layers.Conv2D(128, (3, 3), activation="relu", strides=2, padding="same")(x)
    x = layers.Flatten()(x)
    x = layers.Dense(64, activation="relu")(x)
    discriminator_outputs = layers.Dense(1)(x)
    return keras.Model(discriminator_inputs, discriminator_outputs, name="discriminator")

# Instantiate encoder, decoder, and discriminator with desired latent dimension
latent_dim = 16
encoder = build_encoder(latent_dim)
decoder = build_decoder(latent_dim)
discriminator = build_discriminator()


In [12]:
# Ensure resizing function works correctly
def resize_data(data, target_shape=(32, 32)):
    resized_data = []
    for img in data:
        img_resized = cv2.resize(img, target_shape)  # Resize to target shape (32, 32)
        resized_data.append(img_resized)
    resized_data = np.array(resized_data)
    resized_data = np.expand_dims(resized_data, -1)  # Add channel dimension for (num_samples, 32, 32, 1)
    return resized_data

train_data_resized = resize_data(data, target_shape=(32, 32))

In [14]:
class VAE_GAN(tf.keras.Model):
    def __init__(self, encoder, decoder, discriminator, **kwargs):
        super(VAE_GAN, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.discriminator = discriminator

    def compile(self, vae_optimizer, disc_optimizer, gen_optimizer, **kwargs):
        super(VAE_GAN, self).compile(**kwargs)
        self.vae_optimizer = vae_optimizer
        self.disc_optimizer = disc_optimizer
        self.gen_optimizer = gen_optimizer
        self.reconstruction_loss_fn = keras.losses.MeanSquaredError()
        self.kl_loss_fn = keras.losses.KLDivergence()
        self.gan_loss_fn = keras.losses.BinaryCrossentropy(from_logits=True)

    def train_step(self, data):
        batch_size = tf.shape(data)[0]  # Use tf.shape to get the dynamic batch size

        with tf.GradientTape(persistent=True) as tape:
            # Forward pass
            z_mean, z_log_var, z = self.encoder(data)
            reconstruction = self.decoder(z)

            # Compute reconstruction and KL loss
            reconstruction_loss = tf.reduce_mean(self.reconstruction_loss_fn(data, reconstruction))
            kl_loss = -0.5 * tf.reduce_sum(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
            vae_loss = reconstruction_loss + kl_loss

            # Discriminator
            real_labels = tf.ones((batch_size, 1))
            fake_labels = tf.zeros((batch_size, 1))
            disc_loss_real = self.gan_loss_fn(real_labels, self.discriminator(data))
            disc_loss_fake = self.gan_loss_fn(fake_labels, self.discriminator(reconstruction))
            disc_loss = (disc_loss_real + disc_loss_fake) / 2

            # Generator loss
            gen_loss = self.gan_loss_fn(real_labels, self.discriminator(reconstruction))

        # Backpropagation
        vae_gradients = tape.gradient(vae_loss, self.encoder.trainable_weights + self.decoder.trainable_weights)
        disc_gradients = tape.gradient(disc_loss, self.discriminator.trainable_weights)
        gen_gradients = tape.gradient(gen_loss, self.decoder.trainable_weights)

        self.vae_optimizer.apply_gradients(zip(vae_gradients, self.encoder.trainable_weights + self.decoder.trainable_weights))
        self.disc_optimizer.apply_gradients(zip(disc_gradients, self.discriminator.trainable_weights))
        self.gen_optimizer.apply_gradients(zip(gen_gradients, self.decoder.trainable_weights))

        return {
            "vae_loss": vae_loss,
            "disc_loss": disc_loss,
            "gen_loss": gen_loss,
            "reconstruction_loss": reconstruction_loss,
            "kl_loss": kl_loss,
        }

# Instantiate the model and compile it
vae_gan = VAE_GAN(encoder, decoder, discriminator)
vae_gan.compile(
    vae_optimizer=keras.optimizers.Adam(),
    disc_optimizer=keras.optimizers.Adam(),
    gen_optimizer=keras.optimizers.Adam()
)

# Train the model on resized data
history = vae_gan.fit(train_data_resized, epochs=50, batch_size=32)


Epoch 1/50
[1m45/45[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 30ms/step - disc_loss: 0.0423 - gen_loss: 8.3575 - kl_loss: 253.9988 - reconstruction_loss: 2676.8513 - vae_loss: 2930.8499
Epoch 2/50
[1m45/45[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 30ms/step - disc_loss: 6.2072e-09 - gen_loss: 18.4106 - kl_loss: 0.0215 - reconstruction_loss: 2678.7620 - vae_loss: 2678.7834
Epoch 3/50
[1m45/45[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 30ms/step - disc_loss: 3.6094e-09 - gen_loss: 18.8101 - kl_loss: 0.0158 - reconstruction_loss: 2649.3318 - vae_loss: 2649.3474
Epoch 4/50
[1m45/45[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 29ms/step - disc_loss: 3.5851e-09 - gen_loss: 18.8149 - kl_loss: 0.0118 - reconstruction_loss: 2668.7014 - vae_loss: 2668.7134
Epoch 5/50
[1m45/45[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 30ms/step - disc_loss: 3.5767e-09 - gen_loss: 18.8152 - kl_loss: 0.0089 - reconstruction_loss: 2683.7405 - vae_loss: 268

In [15]:
# Define a function to calculate reconstruction errors on test data
def calculate_reconstruction_error(data, model):
    # Encode and decode the data using the trained VAE-GAN model
    z_mean, z_log_var, z = model.encoder(data)
    reconstructed_data = model.decoder(z)

    # Compute reconstruction error (Mean Squared Error)
    reconstruction_error = tf.reduce_mean(tf.square(data - reconstructed_data), axis=[1, 2, 3])
    return reconstruction_error.numpy()


In [30]:
import cv2
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix

# Load and resize temp data
temp_data, temp_labels = load_audio_files('C:/Users/HP/Desktop/Test Data/')  # Load test data
temp_data_resized = resize_data(temp_data, target_shape=(32, 32))  # Resize to (32, 32, 1)

#splitting into validation and test sets
val_data,test_data,val_labels,test_labels=train_test_split(temp_data_resized,temp_labels,test_size=0.5,random_state=42)

# Verify the shape of test_data and val_data
print("Validation data shape:", val_data.shape)
print("Test data shape:", test_data.shape)  # Should print (num_samples, 32, 32, 1)

for dataset_name,dataset_data,dataset_labels in [("validation",val_data,val_labels),("Testing",test_data,test_labels)]:
    reconstruction_errors = calculate_reconstruction_error(dataset_data, vae_gan)
    threshold = np.percentile(reconstruction_errors, 40)  
    anomaly_predictions = reconstruction_errors > threshold
    # Evaluation metrics
    binary_labels = (dataset_labels == 1)  # 1 for anomaly, 0 for normal
    accuracy = accuracy_score(binary_labels, anomaly_predictions)
    precision = precision_score(binary_labels, anomaly_predictions,zero_division=1)
    recall = recall_score(binary_labels, anomaly_predictions,zero_division=1)
    f1 = f1_score(binary_labels, anomaly_predictions,zero_division=1)
    conf_matrix = confusion_matrix(binary_labels, anomaly_predictions)
    # Print evaluation results
    print(f"{dataset_name} Set evalution")
    print("Accuracy:", accuracy)
    print("Precision:", precision)
    print("Recall:", recall)
    print("F1 Score:", f1)
    print("Confusion Matrix:\n", conf_matrix)


Validation data shape: (131, 32, 32, 1)
Test data shape: (132, 32, 32, 1)
validation Set evalution
Accuracy: 0.9389312977099237
Precision: 0.9871794871794872
Recall: 0.9166666666666666
F1 Score: 0.9506172839506173
Confusion Matrix:
 [[46  1]
 [ 7 77]]
Testing Set evalution
Accuracy: 0.9393939393939394
Precision: 1.0
Recall: 0.9080459770114943
F1 Score: 0.9518072289156626
Confusion Matrix:
 [[45  0]
 [ 8 79]]
