In [None]:
import os
import numpy as np
import librosa
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split
from sklearn.metrics import (recall_score, f1_score,confusion_matrix, roc_auc_score, average_precision_score)

In [None]:
def load_audio_files(directory, sr=22050, duration=5):
    #Loads audio files from a directory
    audio_data = []
    for folder in os.listdir(directory):
        folder_path = os.path.join(directory, folder)
        if not os.path.isdir(folder_path):
            continue        
        for file in os.listdir(folder_path):
            file_path = os.path.join(folder_path, file)
            if not file_path.endswith(('.wav', '.mp3')):
                continue            
            try:
                signal, _ = librosa.load(file_path, sr=sr, duration=duration)
                audio_data.append(signal)
            except Exception as e:
                print(f"Error loading {file_path}: {e}")
    return np.array(audio_data, dtype=object)

In [None]:
def augment_audio(audio, sr=22050):
    #Apply simple pitch shifting and noise injection to augment audio
    n_steps = np.random.uniform(-2, 2)
    audio_shifted = librosa.effects.pitch_shift(y=audio, sr=sr, n_steps=n_steps)
    noise = 0.005 * np.random.randn(len(audio_shifted))
    return audio_shifted + noise

In [None]:
def extract_spectrograms(audio_data, sr=22050, n_fft=2048, hop_length=512, n_mels=128, max_time_frames=128):
    #Computes mel-spectrograms (in decibels) and normalizes them to [0,1].
    specs = []
    for sample in audio_data:
        if len(sample) == 0:
            continue
        mel_spec = librosa.feature.melspectrogram(y=sample, sr=sr, n_fft=n_fft,
                                                  hop_length=hop_length, n_mels=n_mels)
        mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)
        # Add epsilon to denominator to avoid division by zero.
        mel_norm = (mel_spec_db - mel_spec_db.min()) / (mel_spec_db.max() - mel_spec_db.min() + 1e-9)
        if mel_norm.shape[1] < max_time_frames:
            pad_width = max_time_frames - mel_norm.shape[1]
            mel_norm = np.pad(mel_norm, ((0,0),(0,pad_width)), mode='constant')
        else:
            mel_norm = mel_norm[:, :max_time_frames]
        specs.append(mel_norm)
    return np.array(specs)


In [None]:
def prepare_spectrogram_images(specs):
    #Expands spectrograms to have a channel dimension.
    return np.expand_dims(specs, axis=-1)


In [6]:
class Sampling(layers.Layer):
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

def build_encoder(latent_dim, input_shape=(128,128,1)):
    inputs = keras.Input(shape=input_shape)
    x = layers.Conv2D(32, 3, strides=2, padding='same', activation='relu')(inputs)  # 64x64x32
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(64, 3, strides=2, padding='same', activation='relu')(x)       # 32x32x64
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(128, 3, strides=2, padding='same', activation='relu')(x)      # 16x16x128
    x = layers.BatchNormalization()(x)
    x = layers.Flatten()(x)
    x = layers.Dense(256, activation='relu')(x)
    z_mean = layers.Dense(latent_dim, name='z_mean')(x)
    z_log_var = layers.Dense(latent_dim, name='z_log_var')(x)
    z = Sampling()([z_mean, z_log_var])
    return keras.Model(inputs, [z_mean, z_log_var, z], name='encoder')

def build_decoder(latent_dim, output_shape=(128,128,1)):
    latent_inputs = keras.Input(shape=(latent_dim,))
    x = layers.Dense(16*16*128, activation='relu')(latent_inputs)
    x = layers.Reshape((16,16,128))(x)
    x = layers.Conv2DTranspose(128, 3, strides=2, padding='same', activation='relu')(x)  # 32x32x128
    x = layers.BatchNormalization()(x)
    x = layers.Conv2DTranspose(64, 3, strides=2, padding='same', activation='relu')(x)   # 64x64x64
    x = layers.BatchNormalization()(x)
    x = layers.Conv2DTranspose(32, 3, strides=2, padding='same', activation='relu')(x)   # 128x128x32
    x = layers.BatchNormalization()(x)
    outputs = layers.Conv2DTranspose(1, 3, padding='same', activation='sigmoid')(x)       # 128x128x1
    return keras.Model(latent_inputs, outputs, name='decoder')

def build_discriminator(input_shape=(128,128,1)):
    inputs = keras.Input(shape=input_shape)
    x = layers.Conv2D(32, 3, strides=2, padding='same', activation='relu')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(64, 3, strides=2, padding='same', activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Flatten()(x)
    x = layers.Dense(64, activation='relu')(x)
    outputs = layers.Dense(1, activation='sigmoid')(x)
    return keras.Model(inputs, outputs, name='discriminator')

class VAE_GAN(keras.Model):
    def __init__(self, encoder, decoder, discriminator, lambda_adv=0.5, **kwargs):
        super(VAE_GAN, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.discriminator = discriminator
        self.lambda_adv = lambda_adv

    def call(self, inputs, training=False):
        z_mean, z_log_var, z = self.encoder(inputs, training=training)
        reconstructed = self.decoder(z, training=training)
        return reconstructed

    def train_step(self, data):
        if isinstance(data, tuple):
            data = data[0]
        with tf.GradientTape() as tape_gen, tf.GradientTape() as tape_disc:
            z_mean, z_log_var, z = self.encoder(data, training=True)
            # Clip z_log_var to avoid numerical issues in exp
            z_log_var = tf.clip_by_value(z_log_var, -10.0, 10.0)
            reconstructed = self.decoder(z, training=True)
            reconstruction_loss = tf.reduce_mean(tf.keras.losses.mse(data, reconstructed))
            kl_loss = -0.5 * tf.reduce_mean(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
           
            disc_pred_fake = self.discriminator(reconstructed, training=True)
            # Clip discriminator predictions to avoid log(0) issues in binary crossentropy
            disc_pred_fake = tf.clip_by_value(disc_pred_fake, 1e-7, 1.0 - 1e-7)
            valid_labels = tf.ones_like(disc_pred_fake)
            adv_loss = tf.keras.losses.binary_crossentropy(valid_labels, disc_pred_fake)
            adv_loss = tf.reduce_mean(adv_loss)
            gen_loss = reconstruction_loss + kl_loss + self.lambda_adv * adv_loss

            disc_pred_real = self.discriminator(data, training=True)
            disc_pred_real = tf.clip_by_value(disc_pred_real, 1e-7, 1.0 - 1e-7)
            real_loss = tf.keras.losses.binary_crossentropy(tf.ones_like(disc_pred_real), disc_pred_real)
            fake_loss = tf.keras.losses.binary_crossentropy(tf.zeros_like(disc_pred_fake), disc_pred_fake)
            disc_loss = tf.reduce_mean(real_loss) + tf.reduce_mean(fake_loss)
        # Compute gradients
        grads_gen = tape_gen.gradient(gen_loss, self.encoder.trainable_variables + self.decoder.trainable_variables)
        grads_disc = tape_disc.gradient(disc_loss, self.discriminator.trainable_variables)
        # Apply global gradient clipping to prevent explosion
        grads_gen, _ = tf.clip_by_global_norm(grads_gen, 1.0)
        grads_disc, _ = tf.clip_by_global_norm(grads_disc, 1.0)
        # Apply gradients
        self.gen_optimizer.apply_gradients(zip(grads_gen, self.encoder.trainable_variables + self.decoder.trainable_variables))
        self.disc_optimizer.apply_gradients(zip(grads_disc, self.discriminator.trainable_variables))
        return {"gen_loss": gen_loss,
                "reconstruction_loss": reconstruction_loss,
                "kl_loss": kl_loss,
                "adv_loss": adv_loss,
                "disc_loss": disc_loss}

    def test_step(self, data):
        if isinstance(data, tuple):
            data = data[0]
        z_mean, z_log_var, z = self.encoder(data, training=False)
        z_log_var = tf.clip_by_value(z_log_var, -10.0, 10.0)
        reconstructed = self.decoder(z, training=False)
        reconstruction_loss = tf.reduce_mean(tf.keras.losses.mse(data, reconstructed))
        kl_loss = -0.5 * tf.reduce_mean(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
        disc_pred_fake = self.discriminator(reconstructed, training=False)
        disc_pred_fake = tf.clip_by_value(disc_pred_fake, 1e-7, 1.0 - 1e-7)
        valid_labels = tf.ones_like(disc_pred_fake)
        adv_loss = tf.keras.losses.binary_crossentropy(valid_labels, disc_pred_fake)
        adv_loss = tf.reduce_mean(adv_loss)
        gen_loss = reconstruction_loss + kl_loss + self.lambda_adv * adv_loss
        return {"gen_loss": gen_loss,
                "reconstruction_loss": reconstruction_loss,
                "kl_loss": kl_loss,
                "adv_loss": adv_loss}


In [None]:
def generate_synthetic_anomalies(images, noise_factor=2.0):
    #Create synthetic anomalies by adding heavy Gaussian noise.
    anomalies = images + noise_factor * np.random.normal(loc=0.0, scale=1.0, size=images.shape)
    return np.clip(anomalies, 0, 1)

In [8]:
def compute_threshold(losses, k=1.5):
    return np.mean(losses) + k * np.std(losses)

In [9]:
def compute_reconstruction_loss(original, reconstructed):
    return np.mean(np.square(original - reconstructed), axis=(1,2,3))

In [None]:
# Paths 
dataset_path = "/Users/aiswaryamariamjacob/Major Project/audio"
test_path = "/Users/aiswaryamariamjacob/Major Project/Raw Audio"

In [11]:
# 1. Load audio files
audio_samples = load_audio_files(dataset_path)
test_samples = load_audio_files(test_path)

# 2. Augment audio for training
augmented_audio = [augment_audio(a) for a in audio_samples]
audio_train_all = list(audio_samples) + augmented_audio

# 3. Extract mel-spectrograms (in decibels) from audio
specs_train = extract_spectrograms(audio_train_all, sr=22050, n_fft=2048, hop_length=512, n_mels=128, max_time_frames=128)
specs_test = extract_spectrograms(test_samples, sr=22050, n_fft=2048, hop_length=512, n_mels=128, max_time_frames=128)

# 4. Expand dims to have channel dimension
X_all = prepare_spectrogram_images(specs_train)
X_test = prepare_spectrogram_images(specs_test)

# 5. Train-Val split (unsupervised; all assumed normal)
X_train, X_val = train_test_split(X_all, test_size=0.2, random_state=42)

print("Final X_train shape:", X_train.shape)
print("Final X_val shape:", X_val.shape)
print("Final X_test shape:", X_test.shape)

# 6. Build VAE + GAN components (input shape: (128,128,1), latent_dim=32)
latent_dim = 32
encoder = build_encoder(latent_dim, input_shape=(128,128,1))
decoder = build_decoder(latent_dim, output_shape=(128,128,1))
discriminator = build_discriminator(input_shape=(128,128,1))

# 7. Initialize VAE + GAN model
vae_gan = VAE_GAN(encoder, decoder, discriminator, lambda_adv=0.5)

# 8. Set up optimizers with a lower learning rate
vae_gan.gen_optimizer = keras.optimizers.Adam(learning_rate=1e-4)
vae_gan.disc_optimizer = keras.optimizers.Adam(learning_rate=1e-4)

# 8.5 Compile the model (dummy compile; custom train_step is used)
vae_gan.compile(optimizer=vae_gan.gen_optimizer)

# 9. Train the VAE + GAN
vae_gan.fit(X_train, epochs=50, batch_size=32, validation_data=(X_val,))

Final X_train shape: (28008, 128, 128, 1)
Final X_val shape: (7002, 128, 128, 1)
Final X_test shape: (1677, 128, 128, 1)
Epoch 1/50
[1m876/876[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1898s[0m 2s/step - adv_loss: 13.4682 - disc_loss: 0.1806 - gen_loss: 7.1337 - kl_loss: 0.2923 - reconstruction_loss: 0.1073 - val_adv_loss: 14.3531 - val_gen_loss: 8.5190 - val_kl_loss: 1.2636 - val_reconstruction_loss: 0.0788
Epoch 2/50
[1m876/876[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1874s[0m 2s/step - adv_loss: 7.3353 - disc_loss: 1.0845 - gen_loss: 3.9497 - kl_loss: 0.2428 - reconstruction_loss: 0.0393 - val_adv_loss: 7.7084 - val_gen_loss: 3.8854 - val_kl_loss: 0.0010 - val_reconstruction_loss: 0.0301
Epoch 3/50
[1m876/876[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1858s[0m 2s/step - adv_loss: 7.8641 - disc_loss: 0.5654 - gen_loss: 3.9686 - kl_loss: 0.0017 - reconstruction_loss: 0.0349 - val_adv_loss: 13.6546 - val_gen_loss: 6.8586 - val_kl_loss: 0.0016 - val_reconstruction_l

<keras.src.callbacks.history.History at 0x14a47ed10>

In [15]:
# Validation evaluation:
val_anomalies = generate_synthetic_anomalies(X_val, noise_factor=2.0)
val_reconstructed = vae_gan.predict(X_val, batch_size=32)
val_loss_normal = compute_reconstruction_loss(X_val, val_reconstructed)
val_reconstructed_anom = vae_gan.predict(val_anomalies, batch_size=32)
val_loss_anom = compute_reconstruction_loss(val_anomalies, val_reconstructed_anom)
val_threshold = compute_threshold(val_loss_normal, k=1.5)
print(f"Validation adaptive threshold: {val_threshold:.4f}")
X_val_bal = np.concatenate([X_val, val_anomalies], axis=0)
y_val_true = np.concatenate([np.zeros(len(X_val)), np.ones(len(val_anomalies))])
val_reconstructed_bal = vae_gan.predict(X_val_bal, batch_size=32)
val_loss_bal = compute_reconstruction_loss(X_val_bal, val_reconstructed_bal)
y_val_pred = (val_loss_bal > val_threshold).astype(int)

val_recall = recall_score(y_val_true, y_val_pred, zero_division=1)
val_f1 = f1_score(y_val_true, y_val_pred, zero_division=1)
val_cm = confusion_matrix(y_val_true, y_val_pred)
val_roc = roc_auc_score(y_val_true, val_loss_bal)
val_pr  = average_precision_score(y_val_true, val_loss_bal)

print("Validation Metrics:")
print(f"Recall: {val_recall:.4f}, F1: {val_f1:.4f}")
print(f"ROC-AUC: {val_roc:.4f}, PR-AUC: {val_pr:.4f}")
print("Confusion Matrix:")
print(val_cm)

[1m219/219[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m58s[0m 265ms/step
[1m219/219[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m58s[0m 266ms/step
Validation adaptive threshold: 0.0596
[1m438/438[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m117s[0m 266ms/step
Validation Metrics:
Recall: 1.0000, F1: 0.9619
ROC-AUC: 1.0000, PR-AUC: 1.0000
Confusion Matrix:
[[6448  554]
 [   0 7002]]


In [16]:
# Test evaluation:
test_anomalies = generate_synthetic_anomalies(X_test, noise_factor=2.0)
test_reconstructed = vae_gan.predict(X_test, batch_size=32)
test_loss_normal = compute_reconstruction_loss(X_test, test_reconstructed)
test_reconstructed_anom = vae_gan.predict(test_anomalies, batch_size=32)
test_loss_anom = compute_reconstruction_loss(test_anomalies, test_reconstructed_anom)
test_threshold = val_threshold  # using the same threshold from validation
print(f"Test threshold (from validation): {test_threshold:.4f}")
X_test_bal = np.concatenate([X_test, test_anomalies], axis=0)
y_test_true = np.concatenate([np.zeros(len(X_test)), np.ones(len(test_anomalies))])
test_reconstructed_bal = vae_gan.predict(X_test_bal, batch_size=32)
test_loss_bal = compute_reconstruction_loss(X_test_bal, test_reconstructed_bal)
y_test_pred = (test_loss_bal > test_threshold).astype(int)

test_recall = recall_score(y_test_true, y_test_pred, zero_division=1)
test_f1 = f1_score(y_test_true, y_test_pred, zero_division=1)
test_cm = confusion_matrix(y_test_true, y_test_pred)
test_roc = roc_auc_score(y_test_true, test_loss_bal)
test_pr  = average_precision_score(y_test_true, test_loss_bal)

print("Test Metrics:")
print(f"Recall: {test_recall:.4f}, F1: {test_f1:.4f}")
print(f"ROC-AUC: {test_roc:.4f}, PR-AUC: {test_pr:.4f}")
print("Confusion Matrix:")
print(test_cm)

[1m53/53[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m14s[0m 267ms/step
[1m53/53[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m14s[0m 263ms/step
Test threshold (from validation): 0.0596
[1m105/105[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m28s[0m 266ms/step
Test Metrics:
Recall: 1.0000, F1: 0.6802
ROC-AUC: 0.9981, PR-AUC: 0.9939
Confusion Matrix:
[[ 100 1577]
 [   0 1677]]


In [17]:
tf.saved_model.save(vae_gan,"saved_vae_gan")

INFO:tensorflow:Assets written to: saved_vae_gan/assets


INFO:tensorflow:Assets written to: saved_vae_gan/assets
