# Outlier Detection on Benchmarks (Statistical vs ML)

This notebook runs **outlier detection** on several `pymoo` benchmark problems using two families of methods:

- **StatisticalOutlierDetection**: Z-Score, IQR, Leverage, Cook's Distance, Mahalanobis Distance
- **MLOutlierDetection**: Elliptic Envelope, Isolation Forest, Local Outlier Factor, One-Class SVM, SGD One-Class SVM

For each benchmark we:
1. Generate a **Latin Hypercube Sampling (LHS)** Design of Experiments (DoE)
2. Evaluate the problem to get the target `y`
3. **Inject output outliers** into `y` (5%)
4. Run statistical methods (mostly on `y` or joint `[X, y]`) and ML methods (on **augmented features** `[X, y]` to capture output anomalies)
5. Report **Precision, Recall, F1-score, ROC-AUC** and draw the **ROC curve** with `RocCurveDisplay`

> Note: Since we inject outliers in **Y**, ML methods are run on the augmented feature set `[X, y_noisy]` so they can learn the joint pattern and flag inconsistencies in the output.


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
from sklearn.covariance import EmpiricalCovariance, EllipticEnvelope
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
from sklearn.linear_model import SGDOneClassSVM, LinearRegression
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score, RocCurveDisplay
from scipy.stats import qmc
from pymoo.problems import get_problem
from sklearn.preprocessing import StandardScaler

np.random.seed(42)

In [None]:
class Benchmark:
    def __init__(self, name: str, dim: int = None):
        self.name = name.lower()
        self.dim = dim
        
        if self.name == "rosenbrock":
            if self.dim is None:
                raise ValueError("Set dim for Rosenbrock")
            self.problem = get_problem("rosenbrock", n_var=self.dim)
        elif self.name == "kursawe":
            self.problem = get_problem("kursawe")
            self.dim = self.problem.n_var
        elif self.name == "sphere":
            if self.dim is None:
                self.dim = 2
            self.problem = get_problem("sphere", n_var=self.dim)
        else:
            self.problem = get_problem(self.name, n_var=self.dim)
            if self.dim is None:
                self.dim = self.problem.n_var
        
        self.bounds = np.vstack([self.problem.xl, self.problem.xu]).T

    def sample_lhs(self, n_points: int, seed: int = 42):
        sampler = qmc.LatinHypercube(d=self.dim, seed=seed)
        sample = sampler.random(n_points)
        return qmc.scale(sample, self.bounds[:,0], self.bounds[:,1])

    def evaluate(self, X: np.ndarray):
        return self.problem.evaluate(X)

    def inject_outliers(self, X, Y, frac: float = 0.05):
        """Inject outliers into Y (output). Returns noisy Y and indices of injected outliers."""
        n_outliers = max(1, int(frac * X.shape[0]))
        idx = np.random.choice(X.shape[0], n_outliers, replace=False)
        Y_out = Y.copy()
        scale = np.std(Y, axis=0) * 10.0
        if Y_out.ndim == 1:
            Y_out[idx] = Y_out[idx] + np.random.normal(0, scale, size=n_outliers)
        else:
            Y_out[idx, 0] = Y_out[idx, 0] + np.random.normal(0, scale, size=n_outliers)
        return Y_out, idx

In [None]:
class StatisticalOutlierDetection:
    def z_score(self, data, threshold: float = 3.0):
        z_scores = np.abs(stats.zscore(data, axis=0))
        scores = np.max(z_scores, axis=1)
        return scores > threshold, scores

    def iqr(self, data, factor: float = 1.5):
        Q1 = np.percentile(data, 25, axis=0)
        Q3 = np.percentile(data, 75, axis=0)
        IQR = Q3 - Q1
        lower = Q1 - factor * IQR
        upper = Q3 + factor * IQR
        mask = np.any((data < lower) | (data > upper), axis=1)
        dist = np.max(np.maximum(0, data - upper) + np.maximum(0, lower - data), axis=1)
        return mask, dist

    def leverage(self, X, threshold: float = None):
        if X.ndim == 1:
            X = X.reshape(-1, 1)
        X_design = np.column_stack([np.ones(X.shape[0]), X])
        H = X_design @ np.linalg.pinv(X_design.T @ X_design) @ X_design.T
        leverages = np.diag(H)
        if threshold is None:
            threshold = 2 * X_design.shape[1] / X_design.shape[0]
        return leverages > threshold, leverages

    def cooks_distance(self, X, y, threshold: float = None):
        if X.ndim == 1:
            X = X.reshape(-1, 1)
        y = y.ravel()
        model = LinearRegression().fit(X, y)
        y_pred = model.predict(X)
        residuals = y - y_pred
        X_design = np.column_stack([np.ones(X.shape[0]), X])
        H = X_design @ np.linalg.pinv(X_design.T @ X_design) @ X_design.T
        leverages = np.diag(H)
        p = X_design.shape[1]
        n = X_design.shape[0]
        mse = np.sum(residuals**2) / max(1, (n - p))
        cooks_d = (residuals**2 / (p * mse)) * (leverages / (1 - leverages)**2)
        if threshold is None:
            threshold = 4 / n
        return cooks_d > threshold, cooks_d

    def mahalanobis(self, X, threshold: float = None):
        cov = EmpiricalCovariance().fit(X)
        m_dist = cov.mahalanobis(X)
        if threshold is None:
            # Use a high percentile as a generic cutoff; ROC will use the full score anyway
            threshold = np.percentile(m_dist, 97.5)
        return m_dist > threshold, m_dist

In [None]:
class MLOutlierDetection:

    def __init__(self):
        self.scaler = StandardScaler()

    def elliptic_envelope(self, X, contamination: float = 0.1):
        model = EllipticEnvelope(contamination=contamination, random_state=42)
        pred = model.fit_predict(X)
        return pred == -1, -model.decision_function(X)

    def isolation_forest(self, X, contamination: float = 0.1):
        model = IsolationForest(contamination=contamination, random_state=42)
        pred = model.fit_predict(X)
        return pred == -1, -model.decision_function(X)

    def lof(self, X, contamination: float = 0.1, n_neighbors: int = 20):
        model = LocalOutlierFactor(contamination=contamination, n_neighbors=n_neighbors)
        pred = model.fit_predict(X)
        return pred == -1, -model.negative_outlier_factor_

    def one_class_svm(self, X, nu: float = 0.1):
        model = OneClassSVM(gamma="scale", nu=nu)
        pred = model.fit_predict(X)
        return pred == -1, -model.decision_function(X)

    def sgd_one_class_svm(self, X, nu: float = 0.1):
        try:
            model = SGDOneClassSVM(nu=nu, random_state=42)
            pred = model.fit_predict(X)
            return pred == -1, -model.decision_function(X)
        except Exception:
            # If not available in the local sklearn version, return blanks
            return np.zeros(X.shape[0], dtype=bool), np.zeros(X.shape[0])

In [None]:
y_true = {}
y_score = {"Z-Score": {}, "IQR": {}, "Leverage": {}, "Cook's Distance": {}, "Mahalanobis": {},
           "Elliptic Envelope": {}, "Isolation Forest": {}, "LOF": {}, "One-Class SVM": {}, "SGD One-Class SVM": {}}
model_names = ["Z-Score", "IQR", "Leverage", "Cook's Distance", "Mahalanobis",
               "Elliptic Envelope", "Isolation Forest", "LOF", "One-Class SVM", "SGD One-Class SVM"]

In [None]:
def evaluate_detection(true_idx, pred_mask, scores, method_name):
    y_true = np.zeros(len(pred_mask), dtype=int)
    y_true[true_idx] = 1
    y_pred = pred_mask.astype(int)

    precision = precision_score(y_true, y_pred, zero_division=0)
    recall = recall_score(y_true, y_pred, zero_division=0)
    f1 = f1_score(y_true, y_pred, zero_division=0)

    # Normalize scores to [0,1] for ROC AUC (higher = more anomalous)
    scores = np.asarray(scores)
    if np.max(scores) > np.min(scores):
        scores_norm = (scores - np.min(scores)) / (np.max(scores) - np.min(scores))
    else:
        scores_norm = scores
    auc = roc_auc_score(y_true, scores_norm)

    print(f"{method_name:25s} | Precision={precision:.2f}  Recall={recall:.2f}  F1={f1:.2f}  AUC={auc:.2f}")
    #RocCurveDisplay.from_predictions(y_true, scores_norm, name=method_name)
    #plt.show()

In [None]:
def evaluate_detection2(true_idx, pred_mask, scores, method_name):
    y_true = np.zeros(len(pred_mask), dtype=int)
    y_true[true_idx] = 1
    y_pred = pred_mask.astype(int)

    precision = precision_score(y_true, y_pred, zero_division=0)
    recall = recall_score(y_true, y_pred, zero_division=0)
    f1 = f1_score(y_true, y_pred, zero_division=0)

    # Normalize scores to [0,1] for ROC AUC (higher = more anomalous)
    scores = np.asarray(scores)
    if np.max(scores) > np.min(scores):
        scores_norm = (scores - np.min(scores)) / (np.max(scores) - np.min(scores))
    else:
        scores_norm = scores
    auc = roc_auc_score(y_true, scores_norm)

    print(f"{method_name:25s} | Precision={precision:.2f}  Recall={recall:.2f}  F1={f1:.2f}  AUC={auc:.2f}")
    
    return y_true, scores_norm
    #RocCurveDisplay.from_predictions(y_true, scores_norm, name=method_name)
    #plt.show()

## Rosenbrock (8D)

We inject 5% output outliers. Statistical methods on `y` and joint `[X, y]`; ML methods on `[X, y]`.

In [None]:
bench = Benchmark("rosenbrock", dim=8)
N = 300
X = bench.sample_lhs(N)
y = bench.evaluate(X)
y = np.asarray(y).reshape(-1)  # ensure 1D
contamination_rate = 0.05
y_noisy, idx = bench.inject_outliers(X, y, frac=contamination_rate)
print(f"Injected outliers: {len(idx)} / {N}")

stat = StatisticalOutlierDetection()
ml = MLOutlierDetection()

# Statistical on y
mask, scores = stat.z_score(y_noisy.reshape(-1,1))
y_score["Z-Score"][bench.name] = scores
y_true[bench.name] = np.zeros(len(mask), dtype=int)
y_true[bench.name][idx] = 1
evaluate_detection(idx, mask, scores, "Z-Score")

mask, scores = stat.iqr(y_noisy.reshape(-1,1), factor=1.5)
y_score["IQR"][bench.name] = scores
y_true[bench.name] = np.zeros(len(mask), dtype=int)
y_true[bench.name][idx] = 1
evaluate_detection(idx, mask, scores, "IQR)")

# Statistical on joint [X, y]
X_aug = np.hstack([X, y_noisy.reshape(-1,1)])
mask, scores = stat.mahalanobis(X_aug)
y_score["Mahalanobis"][bench.name] = scores
y_true[bench.name] = np.zeros(len(mask), dtype=int)
y_true[bench.name][idx] = 1
evaluate_detection(idx, mask, scores, "Mahalanobis")

mask, scores = stat.leverage(X)
y_score["Leverage"][bench.name] = scores
y_true[bench.name] = np.zeros(len(mask), dtype=int)
y_true[bench.name][idx] = 1
evaluate_detection(idx, mask, scores, "Leverage")

mask, scores = stat.cooks_distance(X, y_noisy)
y_score["Cook's Distance"][bench.name] = scores
y_true[bench.name] = np.zeros(len(mask), dtype=int)
y_true[bench.name][idx] = 1
evaluate_detection(idx, mask, scores, "Cook's Distance")

# ML on joint [X, y]
data_scaled = ml.scaler.fit_transform(X_aug)
X_aug = data_scaled
mask, scores = ml.elliptic_envelope(X_aug, contamination=0.05)
y_score["Elliptic Envelope"][bench.name] = scores
y_true[bench.name] = np.zeros(len(mask), dtype=int)
y_true[bench.name][idx] = 1
evaluate_detection(idx, mask, scores, "Elliptic Envelope")

mask, scores = ml.isolation_forest(X_aug, contamination=0.05)
y_score["Isolation Forest"][bench.name] = scores
y_true[bench.name] = np.zeros(len(mask), dtype=int)
y_true[bench.name][idx] = 1
evaluate_detection(idx, mask, scores, "Isolation Forest")

mask, scores = ml.lof(X_aug, contamination=0.05)
y_score["LOF"][bench.name] = scores
y_true[bench.name] = np.zeros(len(mask), dtype=int)
y_true[bench.name][idx] = 1
evaluate_detection(idx, mask, scores, "LOF")

mask, scores = ml.one_class_svm(X_aug, nu=0.05)
y_score["One-Class SVM"][bench.name] = scores
y_true[bench.name] = np.zeros(len(mask), dtype=int)
y_true[bench.name][idx] = 1
evaluate_detection(idx, mask, scores, "One-Class SVM")

mask, scores = ml.sgd_one_class_svm(X_aug, nu=0.05)
y_score["SGD One-Class SVM"][bench.name] = scores
y_true[bench.name] = np.zeros(len(mask), dtype=int)
y_true[bench.name][idx] = 1
evaluate_detection(idx, mask, scores, "SGD One-Class SVM")

In [None]:
import math

cols = 2
pos_label = 0  # mean 0 belongs to positive class
datasets_names = y_true.keys()
rows = math.ceil(len(datasets_names) / cols)

fig, axs = plt.subplots(nrows=rows, ncols=cols, squeeze=False, figsize=(10, rows * 4))

for ax, dataset_name in zip(axs.ravel(), datasets_names):
    for model_idx, model_name in enumerate(model_names):
        display = RocCurveDisplay.from_predictions(
            y_true[dataset_name],
            y_score[model_name][dataset_name],
            pos_label=pos_label,
            name=model_name,
            ax=ax,
            plot_chance_level=(model_idx == len(model_names) - 1),
            chance_level_kw={"linestyle": ":"},
        )
    ax.set_title(dataset_name)
_ = plt.tight_layout(pad=2.0) 

## Rosenbrock (25D)

In [None]:
bench = Benchmark("rosenbrock", dim=25)
N = 500
X = bench.sample_lhs(N)
y = bench.evaluate(X)
y = np.asarray(y).reshape(-1)
y_noisy, idx = bench.inject_outliers(X, y, frac=0.05)
print(f"Injected outliers: {len(idx)} / {N}")

stat = StatisticalOutlierDetection()
ml = MLOutlierDetection()

# Statistical on y
mask, scores = stat.z_score(y_noisy.reshape(-1,1))
evaluate_detection(idx, mask, scores, "Z-Score (on y)")

mask, scores = stat.iqr(y_noisy.reshape(-1,1), factor=1.5)
evaluate_detection(idx, mask, scores, "IQR (on y)")

# Statistical on joint [X, y]
X_aug = np.hstack([X, y_noisy.reshape(-1,1)])
mask, scores = stat.mahalanobis(X_aug)
evaluate_detection(idx, mask, scores, "Mahalanobis (on [X,y])")

mask, scores = stat.leverage(X)
evaluate_detection(idx, mask, scores, "Leverage (on X)")

mask, scores = stat.cooks_distance(X, y_noisy)
evaluate_detection(idx, mask, scores, "Cook's Distance (y~X)")

# ML on joint [X, y]
mask, scores = ml.elliptic_envelope(X_aug, contamination=0.05)
evaluate_detection(idx, mask, scores, "Elliptic Envelope ([X,y])")

mask, scores = ml.isolation_forest(X_aug, contamination=0.05)
evaluate_detection(idx, mask, scores, "Isolation Forest ([X,y])")

mask, scores = ml.lof(X_aug, contamination=0.05)
evaluate_detection(idx, mask, scores, "LOF ([X,y])")

mask, scores = ml.one_class_svm(X_aug, nu=0.05)
evaluate_detection(idx, mask, scores, "One-Class SVM ([X,y])")

mask, scores = ml.sgd_one_class_svm(X_aug, nu=0.05)
evaluate_detection(idx, mask, scores, "SGD One-Class SVM ([X,y])")

## Kursawe (3D, 2 objectives)
We use the **first objective** as `y`.

In [None]:
bench = Benchmark("kursawe")
N = 300
X = bench.sample_lhs(N)
F = bench.evaluate(X)
y = np.asarray(F)[:, 0]  # first objective
y_noisy, idx = bench.inject_outliers(X, y, frac=0.05)
print(f"Injected outliers: {len(idx)} / {N}")

stat = StatisticalOutlierDetection()
ml = MLOutlierDetection()

# Statistical on y
mask, scores = stat.z_score(y_noisy.reshape(-1,1))
evaluate_detection(idx, mask, scores, "Z-Score (on y)")

mask, scores = stat.iqr(y_noisy.reshape(-1,1), factor=1.5)
evaluate_detection(idx, mask, scores, "IQR (on y)")

# Statistical on joint [X, y]
X_aug = np.hstack([X, y_noisy.reshape(-1,1)])
mask, scores = stat.mahalanobis(X_aug)
evaluate_detection(idx, mask, scores, "Mahalanobis (on [X,y])")

mask, scores = stat.leverage(X)
evaluate_detection(idx, mask, scores, "Leverage (on X)")

mask, scores = stat.cooks_distance(X, y_noisy)
evaluate_detection(idx, mask, scores, "Cook's Distance (y~X)")

# ML on joint [X, y]
mask, scores = ml.elliptic_envelope(X_aug, contamination=0.05)
evaluate_detection(idx, mask, scores, "Elliptic Envelope ([X,y])")

mask, scores = ml.isolation_forest(X_aug, contamination=0.05)
evaluate_detection(idx, mask, scores, "Isolation Forest ([X,y])")

mask, scores = ml.lof(X_aug, contamination=0.05)
evaluate_detection(idx, mask, scores, "LOF ([X,y])")

mask, scores = ml.one_class_svm(X_aug, nu=0.05)
evaluate_detection(idx, mask, scores, "One-Class SVM ([X,y])")

mask, scores = ml.sgd_one_class_svm(X_aug, nu=0.05)
evaluate_detection(idx, mask, scores, "SGD One-Class SVM ([X,y])")

## Sphere (8D)

In [None]:
bench = Benchmark("sphere", dim=8)
N = 300
X = bench.sample_lhs(N)
y = bench.evaluate(X)
y = np.asarray(y).reshape(-1)
y_noisy, idx = bench.inject_outliers(X, y, frac=0.05)
print(f"Injected outliers: {len(idx)} / {N}")

stat = StatisticalOutlierDetection()
ml = MLOutlierDetection()

# Statistical on y
mask, scores = stat.z_score(y_noisy.reshape(-1,1))
evaluate_detection(idx, mask, scores, "Z-Score (on y)")

mask, scores = stat.iqr(y_noisy.reshape(-1,1), factor=1.5)
evaluate_detection(idx, mask, scores, "IQR (on y)")

# Statistical on joint [X, y]
X_aug = np.hstack([X, y_noisy.reshape(-1,1)])
mask, scores = stat.mahalanobis(X_aug)
evaluate_detection(idx, mask, scores, "Mahalanobis (on [X,y])")

mask, scores = stat.leverage(X)
evaluate_detection(idx, mask, scores, "Leverage (on X)")

mask, scores = stat.cooks_distance(X, y_noisy)
evaluate_detection(idx, mask, scores, "Cook's Distance (y~X)")

# ML on joint [X, y]
mask, scores = ml.elliptic_envelope(X_aug, contamination=0.05)
evaluate_detection(idx, mask, scores, "Elliptic Envelope ([X,y])")

mask, scores = ml.isolation_forest(X_aug, contamination=0.05)
evaluate_detection(idx, mask, scores, "Isolation Forest ([X,y])")

mask, scores = ml.lof(X_aug, contamination=0.05)
evaluate_detection(idx, mask, scores, "LOF ([X,y])")

mask, scores = ml.one_class_svm(X_aug, nu=0.05)
evaluate_detection(idx, mask, scores, "One-Class SVM ([X,y])")

mask, scores = ml.sgd_one_class_svm(X_aug, nu=0.05)
evaluate_detection(idx, mask, scores, "SGD One-Class SVM ([X,y])")