# Anomaly Detection Analysis

This notebook implements and evaluates various anomaly detection methods for binary classification (normal vs fault) on the Tennessee Eastman Process dataset.

In [None]:
# Import libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pickle
import os
from typing import Union, Tuple
from numpy.typing import NDArray

# Data preparation imports
import pyreadr
from sklearn.preprocessing import StandardScaler, OneHotEncoder

# Machine Learning
from sklearn.ensemble import RandomForestClassifier
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix
import xgboost as xgb
import lightgbm as lgb
from keras.models import Model
from keras.callbacks import EarlyStopping
from tensorflow import keras
from scipy.stats import chi2
from tabulate import tabulate

In [None]:
# Configuration
VERSION = "1.00"
OUTPUT_PATH = "output"

In [None]:
def save_plot(plot_name: str, suffix: str = "", plot_path: str = "AnomalyDetection") -> None:
    """Save current matplotlib figure."""
    timestamp: str = ""
    base_dir: str = os.path.join(OUTPUT_PATH, "data", plot_path)
    os.makedirs(base_dir, exist_ok=True)

    filename: str = f"{plot_name}_{suffix}_v{VERSION}_{timestamp}.png" if suffix else f"{plot_name}_v{VERSION}_{timestamp}.png"
    filepath: str = os.path.join(base_dir, filename)

    plt.savefig(filepath, bbox_inches="tight", dpi=300)
    print(f"Plot saved: {filepath}")

def save_dataframe(df: pd.DataFrame, name: str, suffix: str = "") -> None:
    """Save a DataFrame to CSV."""
    timestamp: str = ""
    base_dir: str = os.path.join(OUTPUT_PATH, "data")
    os.makedirs(base_dir, exist_ok=True)

    filename: str = f"{name}_{suffix}_v{VERSION}_{timestamp}.csv" if suffix else f"{name}_v{VERSION}_{timestamp}.csv"
    filepath: str = os.path.join(base_dir, filename)

    df.to_csv(filepath, index=True)
    print(f"Data saved: {filepath}")

def save_pickle(obj, name: str, suffix: str = "") -> None:
    """Save object as pickle file."""
    timestamp: str = ""
    base_dir: str = os.path.join(OUTPUT_PATH, "data")
    os.makedirs(base_dir, exist_ok=True)

    filename: str = f"{name}_{suffix}_v{VERSION}_{timestamp}.pkl" if suffix else f"{name}_v{VERSION}_{timestamp}.pkl"
    filepath: str = os.path.join(base_dir, filename)

    with open(filepath, 'wb') as f:
        pickle.dump(obj, f)
    print(f"Results saved: {filepath}")

## Load Prepared Data

In [None]:
# ====================================================================
# STANDALONE ANOMALY DETECTION DATA PREPARATION
# ====================================================================

print("=== STANDALONE ANOMALY DETECTION DATA PREPARATION ===")
print("Preparing data specifically for binary anomaly detection (normal vs fault)...")

# Configuration for anomaly detection
TARGET_VARIABLE_COLUMN_NAME = "faultNumber"
SIMULATION_RUN_COLUMN_NAME = "simulationRun"
COLUMNS_TO_REMOVE = ["simulationRun", "sample"]
SKIPED_FAULTS = []

# Load raw data
print("Loading raw data files...")
fault_free_training_dict = pyreadr.read_r("data/TEP_FaultFree_Training.RData")
fault_free_testing_dict = pyreadr.read_r("data/TEP_FaultFree_Testing.RData")
faulty_training_dict = pyreadr.read_r("data/TEP_Faulty_Training.RData")
faulty_testing_dict = pyreadr.read_r("data/TEP_Faulty_Testing.RData")

# Extract DataFrames
DF_FF_TRAINING_RAW = fault_free_training_dict["fault_free_training"]
DF_FF_TEST_RAW = fault_free_testing_dict["fault_free_testing"]
DF_F_TRAINING_RAW = faulty_training_dict["faulty_training"]
DF_F_TEST_RAW = faulty_testing_dict["faulty_testing"]

print(f"✓ Raw data loaded: Train Fault-free {DF_FF_TRAINING_RAW.shape}, Train Faulty {DF_F_TRAINING_RAW.shape}")

# Skip specified faults (if any)
DF_F_TRAIN_SKIPPED = DF_F_TRAINING_RAW[~DF_F_TRAINING_RAW[TARGET_VARIABLE_COLUMN_NAME].isin(SKIPED_FAULTS)].reset_index(drop=True)
DF_F_TEST_SKIPPED = DF_F_TEST_RAW[~DF_F_TEST_RAW[TARGET_VARIABLE_COLUMN_NAME].isin(SKIPED_FAULTS)].reset_index(drop=True)

# Reduce data for development
DF_FF_TRAINING_REDUCED = DF_FF_TRAINING_RAW[(DF_FF_TRAINING_RAW[SIMULATION_RUN_COLUMN_NAME] > 0) & 
                                           (DF_FF_TRAINING_RAW[SIMULATION_RUN_COLUMN_NAME] < 3)].drop(columns=COLUMNS_TO_REMOVE, axis=1)
DF_F_TRAINING_REDUCED = DF_F_TRAIN_SKIPPED[(DF_F_TRAIN_SKIPPED[SIMULATION_RUN_COLUMN_NAME] > 0) & 
                                          (DF_F_TRAIN_SKIPPED[SIMULATION_RUN_COLUMN_NAME] < 3)].drop(columns=COLUMNS_TO_REMOVE, axis=1)

DF_FF_TEST_REDUCED = DF_FF_TEST_RAW[(DF_FF_TEST_RAW[SIMULATION_RUN_COLUMN_NAME] > 0) & 
                                   (DF_FF_TEST_RAW[SIMULATION_RUN_COLUMN_NAME] < 3)].drop(columns=COLUMNS_TO_REMOVE, axis=1)
DF_F_TEST_REDUCED = DF_F_TEST_SKIPPED[(DF_F_TEST_SKIPPED[SIMULATION_RUN_COLUMN_NAME] > 0) & 
                                     (DF_F_TEST_SKIPPED[SIMULATION_RUN_COLUMN_NAME] < 3)].drop(columns=COLUMNS_TO_REMOVE, axis=1)

print(f"✓ Data reduced for anomaly detection")

# Prepare anomaly detection datasets
# In-control (normal) data - only fault-free
X_INCONTROL_TRAIN_REDUCED = DF_FF_TRAINING_REDUCED.drop(columns=[TARGET_VARIABLE_COLUMN_NAME], axis=1).to_numpy()
X_INCONTROL_TEST_REDUCED = DF_FF_TEST_REDUCED.drop(columns=[TARGET_VARIABLE_COLUMN_NAME], axis=1).to_numpy()

# Out-of-control (faulty) data
X_OUT_OF_CONTROL_TEST_REDUCED = DF_F_TEST_REDUCED.drop(columns=[TARGET_VARIABLE_COLUMN_NAME], axis=1).to_numpy()

# Create binary labels for anomaly detection (0 = normal, 1 = anomaly)
# Training labels - all normal (in-control) for unsupervised methods
Y_TRAIN_ANOMALY_REDUCED_DF = pd.Series([0] * len(X_INCONTROL_TRAIN_REDUCED))

# Test labels - normal + faulty
normal_test_labels = [0] * len(X_INCONTROL_TEST_REDUCED)
fault_test_labels = [1] * len(X_OUT_OF_CONTROL_TEST_REDUCED)
Y_TEST_ANOMALY_REDUCED_DF = pd.Series(normal_test_labels + fault_test_labels)

# Combine test features (normal + faulty)
X_TEST_REDUCED = np.concatenate([X_INCONTROL_TEST_REDUCED, X_OUT_OF_CONTROL_TEST_REDUCED], axis=0)

# For supervised methods - prepare combined training data (normal + faulty training)
DF_TRAINING_REDUCED_CONCATED = pd.concat([DF_FF_TRAINING_REDUCED, DF_F_TRAINING_REDUCED], ignore_index=True)
X_TRAIN_SUPERVISED = DF_TRAINING_REDUCED_CONCATED.drop(columns=[TARGET_VARIABLE_COLUMN_NAME], axis=1).to_numpy()

# Create labels for supervised training: 0 for normal, 1 for any fault
Y_TRAIN_SUPERVISED = pd.Series([0] * len(DF_FF_TRAINING_REDUCED) + [1] * len(DF_F_TRAINING_REDUCED))

# Standardize features for anomaly detection
sc_anomaly = StandardScaler()
X_INCONTROL_TRAIN_REDUCED = sc_anomaly.fit_transform(X_INCONTROL_TRAIN_REDUCED)
X_INCONTROL_TEST_REDUCED = sc_anomaly.transform(X_INCONTROL_TEST_REDUCED)
X_OUT_OF_CONTROL_TEST_REDUCED = sc_anomaly.transform(X_OUT_OF_CONTROL_TEST_REDUCED)
X_TEST_REDUCED = sc_anomaly.transform(X_TEST_REDUCED)

# Scale supervised training data using the same scaler
X_TRAIN_SUPERVISED = sc_anomaly.transform(X_TRAIN_SUPERVISED)

# For neural networks - one-hot encode binary labels
# Need to specify categories explicitly since training only has class 0
encoder_anomaly = OneHotEncoder(sparse_output=False, categories=[[0, 1]])
Y_reshaped_anomaly = Y_TRAIN_SUPERVISED.to_numpy().reshape(-1, 1)
Y_ENC_ANOMALY_TRAIN_REDUCED = encoder_anomaly.fit_transform(Y_reshaped_anomaly)

Y_test_reshaped_anomaly = Y_TEST_ANOMALY_REDUCED_DF.to_numpy().reshape(-1, 1)
Y_ENC_ANOMALY_TEST_REDUCED = encoder_anomaly.transform(Y_test_reshaped_anomaly)

print(f"✓ Anomaly detection data prepared:")
print(f"✓ In-control training data: {X_INCONTROL_TRAIN_REDUCED.shape}")
print(f"✓ In-control test data: {X_INCONTROL_TEST_REDUCED.shape}")
print(f"✓ Out-of-control test data: {X_OUT_OF_CONTROL_TEST_REDUCED.shape}")
print(f"✓ Combined test data: {X_TEST_REDUCED.shape}")
print(f"✓ Supervised training data: {X_TRAIN_SUPERVISED.shape}")
print(f"✓ Supervised training labels: {Y_TRAIN_SUPERVISED.value_counts().to_dict()}")
print(f"✓ Test labels distribution: {Y_TEST_ANOMALY_REDUCED_DF.value_counts().to_dict()}")
print("=== ANOMALY DETECTION DATA PREPARATION COMPLETE ===\n")

## Evaluation Functions

In [None]:
def compute_detection_metrics(predicted, true_labels: NDArray[np.int64]) -> pd.DataFrame:
    """Compute comprehensive detection metrics for binary classification."""
    y_pred = predicted.astype(int)
    y_true = true_labels.astype(int)

    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()

    acc: float = (tp + tn) / (tp + tn + fp + fn)
    prec: float = tp / (tp + fp) if (tp + fp) > 0 else 0.0
    rec: float = tp / (tp + fn) if (tp + fn) > 0 else 0.0
    tnr: float = tn / (tn + fp) if (tn + fp) > 0 else 0.0
    fpr: float = fp / (fp + tn) if (fp + tn) > 0 else 0.0
    npv: float = tn / (tn + fn) if (tn + fn) > 0 else 0.0
    bal_acc: float = 0.5 * (rec + tnr)
    f1: float = 2 * prec * rec / (prec + rec) if (prec + rec) > 0 else 0.0
    
    return pd.DataFrame([{
        "Accuracy": acc,
        "Precision": prec,
        "Recall / TPR": rec,
        "F1-Score": f1,
        "FPR": fpr,
        "NPV (Negative Predictive Value)": npv,
        "Balanced Accuracy": bal_acc,
    }])

## MCUSUM Implementation

In [None]:
def estimate_incontrol_parameters(
    X_incontrol: NDArray[np.float64],
) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
    """Estimate in-control mean and covariance."""
    mu_0 = np.mean(X_incontrol, axis=0)
    sigma = np.cov(X_incontrol, rowvar=False, bias=False)
    return mu_0, sigma

def compute_mcusum_scores(
    X_test: NDArray[np.float64],
    mu_0: NDArray[np.float64],
    sigma: NDArray[np.float64],
    k: float,
) -> NDArray[np.float64]:
    """Compute MCUSUM statistics."""
    X_test = np.asarray(X_test)
    mu_0 = np.asarray(mu_0)
    sigma = np.asarray(sigma)

    n_samples, n_features = X_test.shape

    eigvals, eigvecs = np.linalg.eigh(sigma)
    eigvals_inv_sqrt = np.diag(1.0 / np.sqrt(eigvals))
    sigma_inv_sqrt = eigvecs @ eigvals_inv_sqrt @ eigvecs.T

    Z = (X_test - mu_0) @ sigma_inv_sqrt.T

    S_t = np.zeros(n_features)
    T = np.zeros(n_samples)

    for t in range(n_samples):
        V_t = S_t + Z[t]
        norm_V_t = np.linalg.norm(V_t)

        if norm_V_t <= k:
            S_t = np.zeros(n_features)
        else:
            shrinkage = 1.0 - k / norm_V_t
            S_t = V_t * shrinkage

        T[t] = np.linalg.norm(S_t)

    return T

def estimate_h(
    x_incontrol_np: NDArray[np.float64],
    k: float = 0.5,
    percentile_threshold: int = 98,
) -> float:
    """Estimate control limit h."""
    mu_0, sigma = estimate_incontrol_parameters(x_incontrol_np)

    n_simulations = 500
    max_T_values = []

    for i in range(n_simulations):
        indices = np.random.choice(x_incontrol_np.shape[0], size=300, replace=True)
        sample = x_incontrol_np[indices]
        T = compute_mcusum_scores(sample, mu_0, sigma, k=k)
        max_T_values.append(np.max(T))

    h = np.percentile(max_T_values, percentile_threshold)
    print(f"Estimated control limit h: {h}")
    return h

## PCA-based Anomaly Detection

In [None]:
def train_pca_model(
        X_train: NDArray[np.float64],
        n_components=0.9,
        contamination: float = 0.05) -> Tuple[StandardScaler, PCA, float]:
    """Train PCA model and compute anomaly threshold."""
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X_train)

    pca = PCA(n_components=n_components)
    X_pca = pca.fit_transform(X_scaled)
    X_reconstructed = pca.inverse_transform(X_pca)

    errors = np.sum((X_scaled - X_reconstructed)**2, axis=1)
    threshold_index = int((1 - contamination) * len(errors))
    threshold = np.sort(errors)[threshold_index]

    print(f"PCA components retained: {pca.n_components_}")
    print(f"Explained variance ratio: {pca.explained_variance_ratio_.sum():.3f}")
    print(f"Reconstruction error threshold: {threshold:.3f}")

    return scaler, pca, threshold

def detect_anomalies_pca(
    X_test: NDArray[np.float64],
    scaler: StandardScaler,
    pca: PCA,
    threshold: float,
) -> Tuple[NDArray[np.float64], NDArray[np.int64]]:
    """Apply PCA-based anomaly detection."""
    X_scaled = scaler.transform(X_test)
    X_pca = pca.transform(X_scaled)
    X_reconstructed = pca.inverse_transform(X_pca)

    reconstruction_errors = np.sum((X_scaled - X_reconstructed)**2, axis=1)
    anomaly_flags = (reconstruction_errors > threshold).astype(int)

    return reconstruction_errors, anomaly_flags

## Autoencoder Implementation

In [None]:
from tensorflow import keras
from tensorflow.keras import layers

def build_autoencoder(input_dim: int, latent_dim: int = None) -> keras.Model:
    """Build autoencoder model."""
    if latent_dim is None:
        latent_dim = max(4, min(64, input_dim // 20))

    # Encoder
    input_layer = keras.Input(shape=(input_dim,))
    x = layers.Dense(max(32, input_dim // 2), activation='relu')(input_layer)
    x = layers.Dense(max(16, input_dim // 4), activation='relu')(x)
    encoded = layers.Dense(latent_dim, activation='relu')(x)

    # Decoder
    x = layers.Dense(max(16, input_dim // 4), activation='relu')(encoded)
    x = layers.Dense(max(32, input_dim // 2), activation='relu')(x)
    output_layer = layers.Dense(input_dim, activation='linear')(x)

    model = keras.Model(inputs=input_layer, outputs=output_layer)
    model.compile(optimizer=keras.optimizers.Adam(1e-3), loss='mse')

    return model

def train_autoencoder(X_train: np.ndarray, model: keras.Model, n_epochs: int = 100) -> keras.Model:
    """Train autoencoder."""
    if X_train.dtype != np.float32:
        X_train = X_train.astype(np.float32)

    early_stop = keras.callbacks.EarlyStopping(
        monitor="loss",
        patience=10,
        min_delta=1e-6,
        restore_best_weights=True,
        verbose=1
    )

    model.fit(
        X_train, X_train,
        epochs=n_epochs,
        batch_size=64,
        shuffle=True,
        callbacks=[early_stop],
        verbose=1
    )

    return model

def compute_reconstruction_error(model: keras.Model, X: np.ndarray) -> np.ndarray:
    """Compute reconstruction error."""
    if X.dtype != np.float32:
        X = X.astype(np.float32)

    X_reconstructed = model.predict(X, verbose=0)
    squared_error = np.sum((X - X_reconstructed)**2, axis=1)

    return squared_error

def detect_anomalies_from_error(errors: np.ndarray, contamination: float = 0.05) -> np.ndarray:
    """Flag anomalies based on reconstruction error."""
    threshold_idx = int((1 - contamination) * len(errors))
    threshold = np.sort(errors)[threshold_idx]
    return (errors > threshold).astype(int)

## Model Training and Evaluation

In [None]:
# Initialize results storage
anomaly_results_per_model: dict[str, pd.DataFrame] = {}
trained_models_anomaly: dict[str, object] = {}

### 1. MCUSUM

In [None]:
print("Training MCUSUM...")

# Estimate parameters
mu_0, sigma = estimate_incontrol_parameters(X_INCONTROL_TRAIN_REDUCED)
delta = np.ones(X_INCONTROL_TRAIN_REDUCED.shape[1]) * 0.1
k = 0.5  # Reference value
h = estimate_h(X_INCONTROL_TRAIN_REDUCED, k, 99)

# Compute statistics
mcusum_statistics = compute_mcusum_scores(X_TEST_REDUCED, mu_0, sigma, k)
mcusum_flags = mcusum_statistics > h

# Evaluate
mcusum_metrics = compute_detection_metrics(mcusum_flags.astype(int), Y_TEST_ANOMALY_REDUCED_DF)
anomaly_results_per_model["MCUSUM"] = mcusum_metrics
trained_models_anomaly["MCUSUM"] = {'mu_0': mu_0, 'sigma': sigma, 'k': k, 'h': h}

print(f"MCUSUM - Control limit: {h:.3f}")
print(f"MCUSUM - Out-of-control points: {np.sum(mcusum_flags)} / {len(mcusum_flags)}")
print(f"MCUSUM - Accuracy: {mcusum_metrics['Accuracy'].values[0]:.3f}")

### 2. PCA-based Detection

In [None]:
print("Training PCA...")

# Train PCA model using supervised training data
scaler_pca, pca_model, threshold_pca = train_pca_model(
    X_TRAIN_SUPERVISED, n_components=0.9, contamination=0.05
)

# Detect anomalies
errors_pca, y_pred_pca = detect_anomalies_pca(
    X_TEST_REDUCED, scaler_pca, pca_model, threshold_pca
)

# Evaluate
pca_metrics = compute_detection_metrics(y_pred_pca, Y_TEST_ANOMALY_REDUCED_DF)
anomaly_results_per_model["PCA"] = pca_metrics
trained_models_anomaly["PCA"] = {
    'scaler': scaler_pca, 'pca': pca_model, 'threshold': threshold_pca
}

print(f"PCA - Accuracy: {pca_metrics['Accuracy'].values[0]:.3f}")

### 3. Autoencoder

In [None]:
print("Training Autoencoder...")

# Build and train autoencoder using supervised training data
input_dim = X_TRAIN_SUPERVISED.shape[1]
autoencoder_model = build_autoencoder(input_dim)
autoencoder_model = train_autoencoder(X_TRAIN_SUPERVISED, autoencoder_model)

# Detect anomalies
errors_ae = compute_reconstruction_error(autoencoder_model, X_TEST_REDUCED)
y_pred_ae = detect_anomalies_from_error(errors_ae, contamination=0.05)

# Evaluate
ae_metrics = compute_detection_metrics(y_pred_ae, Y_TEST_ANOMALY_REDUCED_DF)
anomaly_results_per_model["Autoencoder"] = ae_metrics
trained_models_anomaly["Autoencoder"] = autoencoder_model

print(f"Autoencoder - Accuracy: {ae_metrics['Accuracy'].values[0]:.3f}")

### 4. Random Forest (Binary)

In [None]:
print("Training Random Forest for anomaly detection...")

# Train model using supervised training data
rf_anomaly = RandomForestClassifier(n_estimators=100, random_state=42)
rf_anomaly.fit(X_TRAIN_SUPERVISED, Y_TRAIN_SUPERVISED)
y_pred_rf_anomaly = rf_anomaly.predict(X_TEST_REDUCED)

# Evaluate
rf_anomaly_metrics = compute_detection_metrics(y_pred_rf_anomaly, Y_TEST_ANOMALY_REDUCED_DF)
anomaly_results_per_model["Random Forest"] = rf_anomaly_metrics
trained_models_anomaly["Random Forest"] = rf_anomaly

print(f"Random Forest - Accuracy: {rf_anomaly_metrics['Accuracy'].values[0]:.3f}")

### 5. XGBoost (Binary)

In [None]:
print("Training XGBoost for anomaly detection...")

# Train model using supervised training data
xg_anomaly = xgb.XGBClassifier(random_state=42)
xg_anomaly.fit(X_TRAIN_SUPERVISED, Y_TRAIN_SUPERVISED)
y_pred_xg_anomaly = xg_anomaly.predict(X_TEST_REDUCED)

# Evaluate
xg_anomaly_metrics = compute_detection_metrics(y_pred_xg_anomaly, Y_TEST_ANOMALY_REDUCED_DF)
anomaly_results_per_model["XGBoost"] = xg_anomaly_metrics
trained_models_anomaly["XGBoost"] = xg_anomaly

print(f"XGBoost - Accuracy: {xg_anomaly_metrics['Accuracy'].values[0]:.3f}")

### 6. LightGBM

In [None]:
print("Training LightGBM...")

# Train model using supervised training data
train_set = lgb.Dataset(X_TRAIN_SUPERVISED, label=Y_TRAIN_SUPERVISED)
params = {
    "objective": "binary",
    "metric": "binary_logloss",
    "verbosity": -1,
    "boosting_type": "gbdt",
    "learning_rate": 0.1,
    "num_leaves": 31,
    "seed": 42,
}
model_lgb = lgb.train(params, train_set, num_boost_round=100)

# Predict
y_proba_lgb = model_lgb.predict(X_TEST_REDUCED)
y_pred_lgb = (y_proba_lgb >= 0.5).astype(int)

# Evaluate
lgb_metrics = compute_detection_metrics(y_pred_lgb, Y_TEST_ANOMALY_REDUCED_DF)
anomaly_results_per_model["LightGBM"] = lgb_metrics
trained_models_anomaly["LightGBM"] = model_lgb

print(f"LightGBM - Accuracy: {lgb_metrics['Accuracy'].values[0]:.3f}")

### 7. Neural Network (Binary)

In [None]:
print("Training Neural Network for anomaly detection...")

# Import additional required modules
from tensorflow.keras.layers import Input, Dense

# Define model
inputs = Input(shape=(X_TRAIN_SUPERVISED.shape[1],))
x = Dense(100, activation="selu")(inputs)
x = Dense(100, activation="selu")(x)
x = Dense(100, activation="selu")(x)
x = Dense(100, activation="selu")(x)
outputs = Dense(Y_ENC_ANOMALY_TEST_REDUCED.shape[1], activation="softmax")(x)

model_anomaly_nn = Model(inputs=inputs, outputs=outputs)
model_anomaly_nn.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])

# Train
early_stop = EarlyStopping(monitor="val_loss", patience=5, restore_best_weights=True)
history_anomaly = model_anomaly_nn.fit(
    X_TRAIN_SUPERVISED,
    Y_ENC_ANOMALY_TRAIN_REDUCED,
    epochs=100,
    batch_size=256,
    validation_data=(X_TEST_REDUCED, Y_ENC_ANOMALY_TEST_REDUCED),
    callbacks=[early_stop],
    verbose=1
)

# Predict
y_pred_proba_nn = model_anomaly_nn.predict(X_TEST_REDUCED, verbose=0)
y_pred_anomaly_nn = encoder_anomaly.inverse_transform(y_pred_proba_nn).flatten()

# Evaluate
nn_anomaly_metrics = compute_detection_metrics(y_pred_anomaly_nn, Y_TEST_ANOMALY_REDUCED_DF)
anomaly_results_per_model["Neural Network"] = nn_anomaly_metrics
trained_models_anomaly["Neural Network"] = model_anomaly_nn

print(f"Neural Network - Accuracy: {nn_anomaly_metrics['Accuracy'].values[0]:.3f}")

## Results Analysis and Visualization

In [None]:
# Combine results
def convert_result_dict_to_df(results_dict):
    """Convert results dictionary to DataFrame."""
    results_df = pd.concat(
        [df.assign(Model=model) for model, df in results_dict.items()],
        ignore_index=True
    )
    cols = ['Model'] + [col for col in results_df.columns if col != 'Model']
    results_df = results_df[cols]
    return results_df

concatenated_anomaly_results_df = convert_result_dict_to_df(anomaly_results_per_model)
save_dataframe(concatenated_anomaly_results_df, "anomaly_detection_metrics", "anomaly")

print("\n=== Anomaly Detection Results ===")
print(tabulate(concatenated_anomaly_results_df, headers="keys", tablefmt="grid", floatfmt=".3f"))

In [None]:
def plot_anomaly_detection_comparison(metrics_dict) -> None:
    """Plot anomaly detection model comparison."""
    df_combined = pd.concat(
        [df.assign(Model=model) for model, df in metrics_dict.items()],
        ignore_index=True,
    )

    df_melted = df_combined.melt(id_vars="Model", var_name="Metric", value_name="Value")

    plt.figure(figsize=(14, 8))
    
    model_list = df_melted["Model"].unique().tolist()
    metric_list = df_melted["Metric"].unique().tolist()
    n_models = len(model_list)
    bar_width = 0.12
    group_spacing = 0.8

    base_positions = [
        i * (n_models * bar_width + group_spacing)
        for i in range(len(metric_list))
    ]

    for model_idx, model in enumerate(model_list):
        subset = df_melted[df_melted["Model"] == model]
        bar_positions = [pos + bar_width * model_idx for pos in base_positions]
        plt.bar(bar_positions, subset["Value"], width=bar_width, label=model)

    tick_positions = [
        pos + (bar_width * n_models / 2) - (bar_width / 2)
        for pos in base_positions
    ]
    plt.xticks(tick_positions, metric_list, rotation=45, ha="right")

    plt.ylabel("Score")
    plt.title("Anomaly Detection Model Comparison")
    plt.legend(bbox_to_anchor=(1.05, 1), loc="upper left")
    plt.grid(axis="y", linestyle="--", alpha=0.7)
    plt.ylim(0, 1.05)
    plt.tight_layout()
    save_plot("anomaly_detection_comparison")
    plt.show()

plot_anomaly_detection_comparison(anomaly_results_per_model)

In [None]:
# Plot confusion matrices for top performing models
def plot_confusion_matrix_binary(y_true, y_pred, title: str):
    """Plot confusion matrix for binary classification."""
    cm = confusion_matrix(y_true, y_pred, normalize="true")
    
    plt.figure(figsize=(6, 5))
    sns.heatmap(cm, annot=True, cmap="Blues", fmt=".2f", 
                xticklabels=["Normal", "Anomaly"], 
                yticklabels=["Normal", "Anomaly"])
    plt.title(title)
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    safe_title = title.replace(" ", "_").lower()
    save_plot(f"confusion_matrix_{safe_title}")
    plt.show()

# Plot confusion matrices for best performing models
plot_confusion_matrix_binary(Y_TEST_ANOMALY_REDUCED_DF, y_pred_rf_anomaly, "Random Forest")
plot_confusion_matrix_binary(Y_TEST_ANOMALY_REDUCED_DF, y_pred_xg_anomaly, "XGBoost")
plot_confusion_matrix_binary(Y_TEST_ANOMALY_REDUCED_DF, y_pred_ae, "Autoencoder")

## Export Results Summary

In [None]:
# Save models
for model_name, model_obj in trained_models_anomaly.items():
    safe_name = model_name.lower().replace(" ", "_")
    save_pickle(model_obj, f"anomaly_model_{safe_name}")

# Create comprehensive results summary
best_model_idx = concatenated_anomaly_results_df['Accuracy'].idxmax()
best_model = concatenated_anomaly_results_df.loc[best_model_idx, 'Model']
best_accuracy = concatenated_anomaly_results_df['Accuracy'].max()
best_f1 = concatenated_anomaly_results_df['F1-Score'].max()

results_summary = {
    'best_model': best_model,
    'best_accuracy': best_accuracy,
    'best_f1': best_f1,
    'models_trained': list(anomaly_results_per_model.keys()),
    'total_test_samples': len(Y_TEST_ANOMALY_REDUCED_DF),
    'normal_samples': int((Y_TEST_ANOMALY_REDUCED_DF == 0).sum()),
    'anomaly_samples': int((Y_TEST_ANOMALY_REDUCED_DF == 1).sum())
}

# Save results summary
import json
summary_path = os.path.join(OUTPUT_PATH, VERSION, f"anomaly_detection_summary_v{VERSION}_.json")
with open(summary_path, 'w') as f:
    json.dump(results_summary, f, indent=2)

print("\n=== Anomaly Detection Analysis Complete ===")
print(f"Best performing model: {best_model}")
print(f"Best accuracy: {best_accuracy:.3f}")
print(f"Best F1-score: {best_f1:.3f}")
print(f"Normal samples: {results_summary['normal_samples']}")
print(f"Anomaly samples: {results_summary['anomaly_samples']}")
print(f"\nResults saved to: {OUTPUT_PATH}/{VERSION}/")
print("Files generated:")
print("- anomaly_detection_metrics_anomaly_*.csv")
print("- anomaly_detection_summary_*.json")
print("- anomaly_model_*.pkl (trained models)")
print("- Various plots in anomaly/ subfolder")