In [6]:
import os
import random
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.metrics import roc_auc_score, precision_score, recall_score, f1_score, accuracy_score
from sklearn.svm import OneClassSVM
from sklearn.ensemble import IsolationForest
import matplotlib.pyplot as plt

# --- Set Random Seeds for Reproducibility ---
# Setting seeds for Python's hash randomization, standard random module, NumPy, and TensorFlow.
os.environ['PYTHONHASHSEED'] = str(0)
random.seed(42)
np.random.seed(42)
tf.random.set_seed(42)
print("--- Random seeds set for reproducibility ---")

# --- Global Model Parameters ---
INPUT_SHAPE = (28, 28, 1)
EMBEDDING_DIM = 128 # The size of our feature vector (output of Deep SVDD's CNN)
NORMAL_CLASS = 1    # The digit '1' is considered the normal class
LAMBDA = 1e-6       # Regularization parameter for weight decay in Deep SVDD
LEARNING_RATE = 1e-2 # Learning rate for SGD optimizer
EPOCHS = 50         # Maximum number of training epochs for Deep SVDD
BATCH_SIZE = 128    # Batch size for training and inference

# --- NEW GLOBAL VARIABLE for Early Stopping ---
EARLY_STOPPING_LOSS_THRESHOLD = 0.002 # Stop training if average epoch loss falls below this value
                                     # This value might need tuning based on observed loss convergence.
                                     # For the given training output, a value like 0.002 or 0.0019 would have stopped it near the end.


# --- Phase 1: Data Loading and Preprocessing ---
print("\n--- Phase 1: Data Loading and Preprocessing ---")

# Load the MNIST dataset
(x_train_raw, y_train_raw), (x_test_raw, y_test_raw) = tf.keras.datasets.mnist.load_data()

# Normalize pixel values to [0, 1] (convert to float32 first)
x_train_normalized = x_train_raw.astype('float32') / 255.0
x_test_normalized = x_test_raw.astype('float32') / 255.0

# Add a channel dimension for grayscale images (for CNN input)
x_train_processed = np.expand_dims(x_train_normalized, axis=-1)
x_test_processed = np.expand_dims(x_test_normalized, axis=-1)

# Define 'NORMAL_CLASS' (digit '1') and restructure datasets
x_train_normal = x_train_processed[y_train_raw == NORMAL_CLASS]
print(f"Shape of x_train_normal (only digit '{NORMAL_CLASS}'): {x_train_normal.shape}")

x_test_normal_samples = x_test_processed[y_test_raw == NORMAL_CLASS]
x_test_anomaly_samples = x_test_processed[y_test_raw != NORMAL_CLASS]

x_test_combined = np.concatenate((x_test_normal_samples, x_test_anomaly_samples), axis=0)
y_test_combined_labels = np.concatenate((
    np.zeros(len(x_test_normal_samples)),
    np.ones(len(x_test_anomaly_samples))
), axis=0)

permutation = np.random.permutation(len(x_test_combined))
x_test_final = x_test_combined[permutation]
y_test_final = y_test_combined_labels[permutation]

print(f"Shape of x_test_final (combined normal and anomaly): {x_test_final.shape}")
print(f"Number of normal samples in test set: {np.sum(y_test_final == 0)}")
print(f"Number of anomaly samples in test set: {np.sum(y_test_final == 1)}")


# --- Phase 2: Deep SVDD Model Development and Training ---
print("\n--- Phase 2: Deep SVDD Model Development and Training ---")

# 1. Define the CNN Architecture (Feature Extractor phi(x;W))
def build_cnn_feature_extractor(input_shape, embedding_dim):
    model = models.Sequential([
        layers.Conv2D(32, (3, 3), activation='relu', input_shape=input_shape),
        layers.MaxPooling2D((2, 2)),
        layers.Conv2D(64, (3, 3), activation='relu'),
        layers.MaxPooling2D((2, 2)),
        layers.Flatten(),
        layers.Dense(embedding_dim)
    ])
    return model

feature_extractor = build_cnn_feature_extractor(INPUT_SHAPE, EMBEDDING_DIM)
print("\n--- Feature Extractor (CNN) Architecture Summary ---")
feature_extractor.summary()

# 2. Define the Deep SVDD Loss Function
@tf.function
def deep_svdd_loss(phi_output, center, lambda_val, model_weights):
    distance_loss = tf.reduce_mean(tf.reduce_sum(tf.square(phi_output - center), axis=1))
    regularization_loss = 0.0
    for weight in model_weights:
        regularization_loss += tf.reduce_sum(tf.square(weight))
    total_loss = distance_loss + 2 * lambda_val * regularization_loss
    return total_loss

# 3. Data-Dependent Fixed Center 'c' Initialization
def initialize_center(model, normal_data, num_batches_for_init=10, batch_size=BATCH_SIZE):
    print(f"\n--- Initializing Center 'c' using {num_batches_for_init} batches of normal data ---")
    features_sum = tf.zeros(model.output_shape[-1], dtype=tf.float32)
    num_samples = 0
    normal_dataset = tf.data.Dataset.from_tensor_slices(normal_data).batch(batch_size)
    for i, batch in enumerate(normal_dataset):
        if i >= num_batches_for_init:
            break
        embeddings = model(batch, training=False)
        features_sum += tf.reduce_sum(embeddings, axis=0)
        num_samples += batch.shape[0]
    center_c = features_sum / num_samples
    print(f"Center 'c' initialized. Shape: {center_c.shape}")
    return center_c

center = initialize_center(feature_extractor, x_train_normal)

# 4. Training Loop Implementation
optimizer = tf.keras.optimizers.SGD(learning_rate=LEARNING_RATE)
train_dataset = tf.data.Dataset.from_tensor_slices(x_train_normal).shuffle(
    buffer_size=len(x_train_normal)).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

print("\n--- Starting Deep SVDD Training ---")

@tf.function
def train_step(images, current_center):
    with tf.GradientTape() as tape:
        phi_output = feature_extractor(images, training=True)
        loss = deep_svdd_loss(phi_output, current_center, LAMBDA, feature_extractor.trainable_weights)
    gradients = tape.gradient(loss, feature_extractor.trainable_weights)
    optimizer.apply_gradients(zip(gradients, feature_extractor.trainable_weights))
    return loss

for epoch in range(EPOCHS):
    total_epoch_loss = 0
    num_batches = 0
    for batch_images in train_dataset:
        loss = train_step(batch_images, center)
        total_epoch_loss += loss.numpy()
        num_batches += 1

    avg_epoch_loss = total_epoch_loss / num_batches
    print(f"Epoch {epoch+1}/{EPOCHS}, Loss: {avg_epoch_loss:.4f}")

    # --- Early Stopping Check ---
    if avg_epoch_loss < EARLY_STOPPING_LOSS_THRESHOLD:
        print(f"Early stopping triggered: Average epoch loss {avg_epoch_loss:.4f} is below threshold {EARLY_STOPPING_LOSS_THRESHOLD:.4f}")
        break # Exit the training loop

print("--- Deep SVDD Training Complete ---")


# --- Phase 3: Deep SVDD Anomaly Scoring and Evaluation ---
print("\n--- Phase 3: Deep SVDD Anomaly Scoring and Evaluation ---")

# 1. Calculate Anomaly Scores for the Test Dataset
test_dataset_inference = tf.data.Dataset.from_tensor_slices(x_test_final).batch(BATCH_SIZE)
all_test_embeddings = []
for batch_images in test_dataset_inference:
    embeddings = feature_extractor(batch_images, training=False)
    all_test_embeddings.append(embeddings)

all_test_embeddings = tf.concat(all_test_embeddings, axis=0)
deep_svdd_anomaly_scores = tf.reduce_sum(tf.square(all_test_embeddings - center), axis=1).numpy()

print(f"Calculated anomaly scores for {len(deep_svdd_anomaly_scores)} test samples.")
print(f"Min Deep SVDD anomaly score (test): {np.min(deep_svdd_anomaly_scores):.4f}")
print(f"Max Deep SVDD anomaly score (test): {np.max(deep_svdd_anomaly_scores):.4f}")
print(f"Mean Deep SVDD anomaly score (test): {np.mean(deep_svdd_anomaly_scores):.4f}")
print(f"Median Deep SVDD anomaly score (test): {np.median(deep_svdd_anomaly_scores):.4f}")

# 2. Evaluate using ROC AUC
roc_auc_deep_svdd = roc_auc_score(y_test_final, deep_svdd_anomaly_scores)
print(f"\nDeep SVDD ROC AUC Score: {roc_auc_deep_svdd:.4f}")


# 3. Calculate Threshold from NORMAL TRAINING DATA ONLY (e.g., 99th percentile)
print("\n--- Calculating Deep SVDD Threshold from Normal Training Data Only ---")

normal_train_dataset_inference = tf.data.Dataset.from_tensor_slices(x_train_normal).batch(BATCH_SIZE)
normal_train_embeddings = []
for batch_images in normal_train_dataset_inference:
    embeddings = feature_extractor(batch_images, training=False)
    normal_train_embeddings.append(embeddings)
normal_train_embeddings = tf.concat(normal_train_embeddings, axis=0)

deep_svdd_normal_train_scores = tf.reduce_sum(tf.square(normal_train_embeddings - center), axis=1).numpy()

threshold_deep_svdd = np.percentile(deep_svdd_normal_train_scores, 99)

print(f"Min Deep SVDD anomaly score (normal train): {np.min(deep_svdd_normal_train_scores):.4f}")
print(f"Max Deep SVDD anomaly score (normal train): {np.max(deep_svdd_normal_train_scores):.4f}")
print(f"Deep SVDD Threshold (99th percentile of NORMAL TRAINING scores): {threshold_deep_svdd:.4f}")


# 4. Classify test samples based on the new threshold
y_pred_deep_svdd_binary = (deep_svdd_anomaly_scores > threshold_deep_svdd).astype(int)

# 5. Report classification metrics using this threshold
acc_deep_svdd = accuracy_score(y_test_final, y_pred_deep_svdd_binary)
prec_deep_svdd = precision_score(y_test_final, y_pred_deep_svdd_binary)
rec_deep_svdd = recall_score(y_test_final, y_pred_deep_svdd_binary)
f1_deep_svdd = f1_score(y_test_final, y_pred_deep_svdd_binary)

print(f"\nDeep SVDD Classification Metrics (using threshold {threshold_deep_svdd:.4f} from normal training data):")
print(f"  Accuracy: {acc_deep_svdd:.4f}")
print(f"  Precision: {prec_deep_svdd:.4f}")
print(f"  Recall: {rec_deep_svdd:.4f}")
print(f"  F1-Score: {f1_deep_svdd:.4f}")


# --- Phase 4: Comparison with Traditional Methods ---
print("\n--- Phase 4: Comparison with Traditional Methods ---")

# Data preparation for traditional methods: Flatten images
x_train_flat = x_train_normal.reshape(x_train_normal.shape[0], -1)
x_test_flat = x_test_final.reshape(x_test_final.shape[0], -1)

print(f"\nShape of flattened training data for traditional methods: {x_train_flat.shape}")
print(f"Shape of flattened test data for traditional methods: {x_test_flat.shape}")


# 1. One-Class SVM (OC-SVM)
print("\n--- Training and Evaluating One-Class SVM ---")
oc_svm = OneClassSVM(nu=0.1, kernel="rbf", gamma='scale')
oc_svm.fit(x_train_flat)
oc_svm_anomaly_scores = -oc_svm.decision_function(x_test_flat)
roc_auc_oc_svm = roc_auc_score(y_test_final, oc_svm_anomaly_scores)
print(f"One-Class SVM ROC AUC Score: {roc_auc_oc_svm:.4f}")


# 2. Isolation Forest (IF)
print("\n--- Training and Evaluating Isolation Forest ---")
isolation_forest = IsolationForest(random_state=42, contamination='auto')
isolation_forest.fit(x_train_flat)
isolation_forest_anomaly_scores = -isolation_forest.decision_function(x_test_flat)
roc_auc_isolation_forest = roc_auc_score(y_test_final, isolation_forest_anomaly_scores)
print(f"Isolation Forest ROC AUC Score: {roc_auc_isolation_forest:.4f}")


# --- Final Performance Comparison Summary (ROC AUC) ---
print("\n--- Anomaly Detection Performance Comparison (ROC AUC) ---")
print(f"Deep SVDD (Custom CNN):         {roc_auc_deep_svdd:.4f}")
print(f"One-Class SVM:                  {roc_auc_oc_svm:.4f}")
print(f"Isolation Forest:               {roc_auc_isolation_forest:.4f}")

--- Random seeds set for reproducibility ---

--- Phase 1: Data Loading and Preprocessing ---
Shape of x_train_normal (only digit '1'): (6742, 28, 28, 1)
Shape of x_test_final (combined normal and anomaly): (10000, 28, 28, 1)
Number of normal samples in test set: 1135
Number of anomaly samples in test set: 8865

--- Phase 2: Deep SVDD Model Development and Training ---

--- Feature Extractor (CNN) Architecture Summary ---


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)



--- Initializing Center 'c' using 10 batches of normal data ---
Center 'c' initialized. Shape: (128,)

--- Starting Deep SVDD Training ---
Epoch 1/50, Loss: 0.0659
Epoch 2/50, Loss: 0.0328
Epoch 3/50, Loss: 0.0226
Epoch 4/50, Loss: 0.0174
Epoch 5/50, Loss: 0.0141
Epoch 6/50, Loss: 0.0118
Epoch 7/50, Loss: 0.0101
Epoch 8/50, Loss: 0.0087
Epoch 9/50, Loss: 0.0076
Epoch 10/50, Loss: 0.0067
Epoch 11/50, Loss: 0.0060
Epoch 12/50, Loss: 0.0054
Epoch 13/50, Loss: 0.0049
Epoch 14/50, Loss: 0.0044
Epoch 15/50, Loss: 0.0041
Epoch 16/50, Loss: 0.0037
Epoch 17/50, Loss: 0.0035
Epoch 18/50, Loss: 0.0032
Epoch 19/50, Loss: 0.0030
Epoch 20/50, Loss: 0.0028
Epoch 21/50, Loss: 0.0027
Epoch 22/50, Loss: 0.0025
Epoch 23/50, Loss: 0.0024
Epoch 24/50, Loss: 0.0023
Epoch 25/50, Loss: 0.0022
Epoch 26/50, Loss: 0.0021
Epoch 27/50, Loss: 0.0020
Epoch 28/50, Loss: 0.0019
Early stopping triggered: Average epoch loss 0.0019 is below threshold 0.0020
--- Deep SVDD Training Complete ---

--- Phase 3: Deep SVDD Ano