# Anomaly / Outlier Detection Template ‚Äì Unsupervised & Semi-Supervised

This notebook is a reusable template for **anomaly detection** problems, where the goal is to find *unusual* points:

- Fraudulent transactions  
- Abnormal system logs  
- Strange player performances  
- Sensor failures / spikes  

It focuses on **tabular data** and gives you a decision process for choosing:

- **Unsupervised methods** (no labels): IsolationForest, LocalOutlierFactor, One-Class SVM  
- **Semi-supervised setups** (small labeled anomalies)  
- How to turn anomaly scores into practical flags.

---

## üîÅ High-Level Workflow (Anomaly Detection)

1. Imports & config  
2. Load data & basic EDA (without assuming labels)  
3. Decide **problem framing**: unsupervised vs semi-supervised  
4. Feature selection & scaling  
5. Train one or more anomaly detectors  
6. Compare anomaly scores / visualize  
7. Set thresholds & generate flags  
8. Export anomaly labels for downstream analysis or supervised models

> Many production anomaly setups end up being **two-stage**:  
> 1) unsupervised scoring ‚Üí 2) supervised model using those scores as features.


In [None]:
# ========== 1. Imports & Config (Anomaly Detection) ==========

from pathlib import Path
from typing import Optional, List

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM

pd.set_option("display.max_rows", 100)
pd.set_option("display.max_columns", 100)
sns.set(style="whitegrid")
plt.rcParams["figure.figsize"] = (10, 6)
plt.rcParams["figure.dpi"] = 100

# ---- Config ----
DATA_DIR = Path("../input")
DATA_FILE = "data.csv"    # change to your file

ID_COL = "id"             # optional
LABEL_COL = None          # set to e.g. "is_anomaly" if you have labels (0/1)

RANDOM_STATE = 42

# If None, we'll auto-select all numeric features (except ID / label)
ANOMALY_FEATURES: Optional[List[str]] = None


In [None]:
# ========== 2. Load Data & Helpers ==========

def load_data(data_dir: Path = DATA_DIR, data_file: str = DATA_FILE) -> pd.DataFrame:
    path = data_dir / data_file
    if not path.exists():
        raise FileNotFoundError(f"Data file not found: {path}")
    df = pd.read_csv(path)
    print("Data shape:", df.shape)
    display(df.head())
    return df


def get_numeric_features(df: pd.DataFrame, exclude: Optional[List[str]] = None) -> List[str]:
    num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
    if exclude:
        num_cols = [c for c in num_cols if c not in exclude]
    return num_cols


def summarize_dataframe(df: pd.DataFrame, name: str = "df"):
    print(f"===== {name} summary =====")
    print("Shape:", df.shape)
    display(df.head())
    print("\nDtypes:")
    display(df.dtypes)
    print("\nMissing (%):")
    display((df.isna().mean() * 100).sort_values(ascending=False))


df = load_data()
summarize_dataframe(df, "df")


### 3Ô∏è‚É£ Decide: Unsupervised vs Semi-Supervised

**Question ‚Äì Do you have labels for anomalies?**

- **No labels** ‚Üí pure **unsupervised** anomaly detection  
  You will rely on scores & domain inspection.

- **Some labels** (e.g., known fraud cases) ‚Üí **semi-supervised**  
  You can:
  - Use anomaly detectors to generate scores  
  - Train a supervised classifier on those scores + original features

This template assumes **unsupervised first**, then optionally plugs in labels if present.


In [None]:
# ========== 4. Feature Selection & Scaling ==========

exclude_cols = []
if ID_COL is not None and ID_COL in df.columns:
    exclude_cols.append(ID_COL)
if LABEL_COL is not None and LABEL_COL in df.columns:
    exclude_cols.append(LABEL_COL)

if ANOMALY_FEATURES is None:
    feature_cols = get_numeric_features(df, exclude=exclude_cols)
    print("Auto-selected numeric features:", feature_cols)
else:
    feature_cols = [c for c in ANOMALY_FEATURES if c in df.columns]
    print("Using configured features:", feature_cols)

X_raw = df[feature_cols].copy()

# Basic scaling ‚Äì many anomaly methods are distance-based
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_raw)
print("Scaled feature matrix shape:", X_scaled.shape)


### 5Ô∏è‚É£ Choose Anomaly Detection Method

Common options:

| Method             | Type            | Pros                                  | Cons                            |
|--------------------|-----------------|---------------------------------------|----------------------------------|
| IsolationForest    | Unsupervised    | Fast, handles high-dim tabular well   | Randomness, some tuning needed   |
| LocalOutlierFactor | Unsupervised    | Local density, good for manifolds     | Slower, no separate `.predict`   |
| One-Class SVM      | Unsupervised    | Kernel-based, flexible                | Very slow on large datasets      |

**Decision Guide**

- Start with **IsolationForest** for large tabular problems.  
- Try **LOF** if you suspect local clusters or manifold structure.  
- Use **One-Class SVM** only on small/medium datasets (and when you can afford tuning).


In [None]:
# ========== 6. Train Multiple Anomaly Detectors ==========

# IsolationForest
iso = IsolationForest(
    n_estimators=300,
    contamination="auto",   # or specify expected outlier fraction, e.g., 0.01
    random_state=RANDOM_STATE,
)
iso_labels = iso.fit_predict(X_scaled)     # +1 normal, -1 outlier
iso_scores = -iso_labels                   # simple flipped label score
iso_decision = iso.decision_function(X_scaled)  # lower = more abnormal

# LocalOutlierFactor (fit_predict only)
lof = LocalOutlierFactor(
    n_neighbors=20,
    contamination="auto",
    novelty=False,
)
lof_labels = lof.fit_predict(X_scaled)     # +1 normal, -1 outlier
lof_scores = -lof.negative_outlier_factor_  # higher = more abnormal

# One-Class SVM (optional, can be expensive)
RUN_OCSVM = False
ocsvm_scores = None
if RUN_OCSVM:
    oc = OneClassSVM(kernel="rbf", gamma="scale", nu=0.05)
    oc.fit(X_scaled)
    oc_labels = oc.predict(X_scaled)       # +1 normal, -1 outlier
    oc_decision = -oc.decision_function(X_scaled)  # higher = more abnormal
    ocsvm_scores = oc_decision

print("Finished training anomaly detectors.")


### 7Ô∏è‚É£ Turn Scores into Flags ‚Äì Thresholding

Anomaly detectors give you **scores**, not hard labels. You need to:

1. Decide on a **budget** for anomalies (e.g., 1% of points).  
2. Choose a method (IsolationForest, LOF, etc.).  
3. Pick a threshold such that top X% (largest scores) are flagged.

If you have labels (`LABEL_COL`), you can grid-search thresholds and maximize F1 or recall at fixed precision.


In [None]:
def flag_top_fraction(scores, fraction=0.01):
    # Flag top 'fraction' of points as anomalies (1), rest as 0.
    n = len(scores)
    k = max(1, int(n * fraction))
    thresh = np.partition(scores, -k)[-k]
    return (scores >= thresh).astype(int), thresh


# Example: use LOF scores and flag top 1% as anomalies
frac = 0.01
lof_flags, lof_thresh = flag_top_fraction(lof_scores, fraction=frac)
print(f"LOF threshold={lof_thresh:.4f}, fraction={frac}")

df_anom = df.copy()
df_anom["anomaly_lof"] = lof_flags
df_anom["lof_score"] = lof_scores

print("Anomaly counts (LOF):")
print(df_anom["anomaly_lof"].value_counts())

display(df_anom.head())


### 8Ô∏è‚É£ If You Have Labels: Evaluate & Calibrate Thresholds

If you set `LABEL_COL` to actual anomaly labels (0 = normal, 1 = anomaly):

- You can compute:
  - Precision, recall, F1  
  - Confusion matrix  
- You can tune the fraction / threshold to match recall/precision targets.

With heavy imbalance, prioritize **recall@fixed-precision** or **PR curves**, not just accuracy.


In [None]:
from sklearn.metrics import classification_report, confusion_matrix

if LABEL_COL is not None and LABEL_COL in df_anom.columns:
    y_true = df_anom[LABEL_COL].values
    y_pred = df_anom["anomaly_lof"].values

    print("Classification report (LOF-based anomalies):")
    print(classification_report(y_true, y_pred, digits=4))

    print("Confusion matrix:")
    print(confusion_matrix(y_true, y_pred))


### 9Ô∏è‚É£ What to Do Next?

- **Manual inspection**: sort by anomaly score, inspect top N rows.  
- **Cluster anomalies**: run clustering *only on flagged anomalies* to see subtypes.  
- **Supervised follow-up**:
  - Use anomaly flags / scores as features in a supervised model.  
  - Train e.g., XGBoost to predict `LABEL_COL` using original features + `lof_score`, `iso_decision`, etc.  
- **Iterate**: adjust fraction, experiment with IsolationForest vs LOF, add domain features.

You can save the dataset with anomaly scores for downstream use:


In [None]:
OUTPUT_DIR = Path("./anomaly_outputs")
OUTPUT_DIR.mkdir(exist_ok=True, parents=True)

df_anom.to_csv(OUTPUT_DIR / "data_with_anomaly_scores.csv", index=False)
print("Saved data_with_anomaly_scores.csv")