In [1]:
import numpy as np

latent_dim = 16
intermediate_dim = 128
epochs = 10
batch_size = 32

normal_class_label = 0
original_data_dim = 784

x_train_normal_train = np.random.rand(4800, original_data_dim).astype(np.float32)
print(f"Created dummy x_train_normal_train with shape: {x_train_normal_train.shape}")

x_train_normal_val = np.random.rand(1200, original_data_dim).astype(np.float32)
print(f"Created dummy x_train_normal_val with shape: {x_train_normal_val.shape}")

x_test_processed = np.random.rand(10000, original_data_dim).astype(np.float32)
print(f"Created dummy x_test_processed with shape: {x_test_processed.shape}")

y_test = np.full(10000, normal_class_label, dtype=int)
num_anomalies = int(0.1 * 10000)
anomaly_indices = np.random.choice(10000, num_anomalies, replace=False)
y_test[anomaly_indices] = 1
print(f"Created dummy y_test with shape: {y_test.shape} and unique values: {np.unique(y_test)}")

print(f"latent_dim: {latent_dim}")
print(f"intermediate_dim: {intermediate_dim}")
print(f"epochs: {epochs}")
print(f"batch_size: {batch_size}")
print(f"normal_class_label: {normal_class_label}")

print(f"Shape of x_train_normal_train: {x_train_normal_train.shape}, dtype: {x_train_normal_train.dtype}")
print(f"Shape of x_train_normal_val: {x_train_normal_val.shape}, dtype: {x_train_normal_val.dtype}")

import numpy as np

print(f"Min value of x_train_normal_train: {np.min(x_train_normal_train)}")
print(f"Max value of x_train_normal_train: {np.max(x_train_normal_train)}")
print(f"Mean value of x_train_normal_train: {np.mean(x_train_normal_train)}")

print(f"Shape of x_test_processed: {x_test_processed.shape}, dtype: {x_test_processed.dtype}")
print(f"Shape of y_test: {y_test.shape}, dtype: {y_test.dtype}")

import tensorflow as tf
from tensorflow.keras.layers import Layer, Dense, Input
from tensorflow.keras.models import Model

print("TensorFlow, Keras layers, and Model imported successfully.")

original_dim = x_train_normal_train.shape[1]


encoder_inputs = Input(shape=(original_dim,), name='encoder_input')
x = Dense(intermediate_dim, activation='relu')(encoder_inputs)
z_mean = Dense(latent_dim, name='z_mean')(x)
z_log_var = Dense(latent_dim, name='z_log_var')(x)
encoder = Model(encoder_inputs, [z_mean, z_log_var], name='encoder')
encoder.summary()

class Reparameterization(Layer):
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

print("Reparameterization trick layer defined.")

decoder_inputs = Input(shape=(latent_dim,), name='decoder_input')
x = Dense(intermediate_dim, activation='relu')(decoder_inputs)
reconstruction = Dense(original_dim, activation='sigmoid')(x)
decoder = Model(decoder_inputs, reconstruction, name='decoder')

decoder.summary()

vae_inputs = Input(shape=(original_dim,), name='vae_input')
z_mean, z_log_var = encoder(vae_inputs)
z = Reparameterization()([z_mean, z_log_var])
reconstruction = decoder(z)

vae = Model(vae_inputs, [reconstruction, z_mean, z_log_var], name='vae')

vae.summary()

import tensorflow as tf
from tensorflow.keras.layers import Layer, Dense, Input
from tensorflow.keras.models import Model

class KLDivergenceLossLayer(Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, inputs):
        z_mean, z_log_var = inputs
        kl_loss_per_sample = 0.5 * tf.reduce_sum(
            tf.exp(z_log_var) + tf.square(z_mean) - 1.0 - z_log_var,
            axis=1
        )
        self.add_loss(tf.reduce_mean(kl_loss_per_sample))
        return z_mean

vae_inputs = Input(shape=(original_dim,), name='vae_input')
z_mean, z_log_var = encoder(vae_inputs)

_ = KLDivergenceLossLayer(name='kl_divergence_calculator')([z_mean, z_log_var])

z = Reparameterization()([z_mean, z_log_var])
reconstruction = decoder(z)

vae = Model(vae_inputs, reconstruction, name='vae')

reconstruction_loss_fn = tf.keras.losses.BinaryCrossentropy(
    from_logits=False, reduction='sum'
)

vae.compile(optimizer='adam', loss=reconstruction_loss_fn)

print("VAE model reassembled and compiled.")

history = vae.fit(
    x=x_train_normal_train,
    y=x_train_normal_train,
    epochs=epochs,
    batch_size=batch_size,
    validation_data=(x_train_normal_val, x_train_normal_val)
)

print("VAE model training complete.")

import tensorflow as tf

def calculate_reconstruction_error(original_input, reconstruction):
    mse_loss = tf.keras.losses.MeanSquaredError(reduction='none')
    reconstruction_error = tf.reduce_sum(mse_loss(original_input, reconstruction), axis=1)
    return reconstruction_error

print("Reconstruction error function defined (using MSE).")

import tensorflow as tf

def calculate_kl_divergence(input_data, encoder):
    z_mean, z_log_var = encoder(input_data)

    kl_loss_per_sample = 0.5 * tf.reduce_sum(
        tf.exp(z_log_var) + tf.square(z_mean) - 1.0 - z_log_var,
        axis=1
    )
    return kl_loss_per_sample

print("KL divergence calculation function defined.")

import tensorflow as tf

def calculate_anomaly_score(input_data, vae_model, encoder):
    reconstruction, z_mean, z_log_var = vae_model(input_data)

    reconstruction_error = calculate_reconstruction_error(input_data, reconstruction)

    kl_divergence = calculate_kl_divergence(input_data, encoder)

    total_anomaly_score = reconstruction_error + kl_divergence
    return total_anomaly_score

print("Anomaly score calculation function defined.")

import tensorflow as tf

def calculate_anomaly_score(input_data, vae_model, encoder):
    reconstruction = vae_model(input_data)
    reconstruction_error = calculate_reconstruction_error(input_data, reconstruction)
    kl_divergence = calculate_kl_divergence(input_data, encoder)

    total_anomaly_score = reconstruction_error + kl_divergence
    return total_anomaly_score

print("Anomaly score calculation function defined (fixed for VAE output).")

import tensorflow as tf

def calculate_reconstruction_error(original_input, reconstruction):
    squared_difference = tf.square(original_input - reconstruction)
    reconstruction_error = tf.reduce_sum(squared_difference, axis=1)
    return reconstruction_error

print("Reconstruction error function defined (using explicit squared difference sum).")

import numpy as np

x_test_processed_tf = tf.constant(x_test_processed, dtype=tf.float32)
anomaly_scores = calculate_anomaly_score(x_test_processed_tf, vae, encoder)

normal_mask = (y_test == normal_class_label)
anomalous_mask = (y_test != normal_class_label)

anomaly_scores_normal = anomaly_scores[normal_mask]
anomaly_scores_anomalous = anomaly_scores[anomalous_mask]

print(f"Calculated {len(anomaly_scores)} anomaly scores for the test set.")
print(f"Number of normal samples: {len(anomaly_scores_normal)}")
print(f"Number of anomalous samples: {len(anomaly_scores_anomalous)}")

import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(10, 6))
sns.histplot(anomaly_scores_normal.numpy(), color='blue', label='Normal Samples', kde=True, stat='density', alpha=0.5, bins=50)
sns.histplot(anomaly_scores_anomalous.numpy(), color='red', label='Anomalous Samples', kde=True, stat='density', alpha=0.5, bins=50)

plt.title('Distribution of Anomaly Scores for Normal vs. Anomalous Samples')
plt.xlabel('Anomaly Score')
plt.ylabel('Density')
plt.legend()
plt.grid(True)
plt.show()

print("Anomaly score distributions visualized.")

import numpy as np

anomaly_threshold = np.percentile(anomaly_scores_normal, 95)

print(f"Calculated anomaly detection threshold (95th percentile of normal scores): {anomaly_threshold:.2f}")

import numpy as np

y_pred = (anomaly_scores > anomaly_threshold).numpy().astype(int)
anomalous_count = np.sum(y_pred)

print(f"Anomaly detection threshold: {anomaly_threshold:.2f}")
print(f"Number of samples classified as anomalous by this threshold: {anomalous_count}")

import numpy as np

y_true_binary = (y_test != normal_class_label).astype(int)

print(f"Original y_test unique values: {np.unique(y_test)}")
print(f"Binary y_true_binary unique values: {np.unique(y_true_binary)}")
print(f"Number of true normal samples: {np.sum(y_true_binary == 0)}")
print(f"Number of true anomalous samples: {np.sum(y_true_binary == 1)}")

from sklearn.metrics import precision_score, recall_score, f1_score, average_precision_score, precision_recall_curve

print("Scikit-learn metrics imported successfully.")

precision = precision_score(y_true_binary, y_pred)
recall = recall_score(y_true_binary, y_pred)
f1 = f1_score(y_true_binary, y_pred)

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-score: {f1:.4f}")

auc_pr = average_precision_score(y_true_binary, anomaly_scores)

print(f"Area Under the Precision-Recall Curve (AUC-PR): {auc_pr:.4f}")

precision, recall, thresholds = precision_recall_curve(y_true_binary, anomaly_scores)

print("Precision-Recall curve points computed.")

import matplotlib.pyplot as plt

plt.figure(figsize=(8, 6))
plt.plot(recall, precision, label=f'AUC-PR: {auc_pr:.2f}')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.legend()
plt.grid(True)
plt.show()

print("Precision-Recall curve plotted.")

Created dummy x_train_normal_train with shape: (4800, 784)
Created dummy x_train_normal_val with shape: (1200, 784)
Created dummy x_test_processed with shape: (10000, 784)
Created dummy y_test with shape: (10000,) and unique values: [0 1]
latent_dim: 16
intermediate_dim: 128
epochs: 10
batch_size: 32
normal_class_label: 0
Shape of x_train_normal_train: (4800, 784), dtype: float32
Shape of x_train_normal_val: (1200, 784), dtype: float32
Min value of x_train_normal_train: 2.6456815405140333e-08
Max value of x_train_normal_train: 0.9999999403953552
Mean value of x_train_normal_train: 0.49982786178588867
Shape of x_test_processed: (10000, 784), dtype: float32
Shape of y_test: (10000,), dtype: int64


ModuleNotFoundError: No module named 'tensorflow'