# Term Project - Isolation Forest for Anomaly Detection
> Leigh Goetsch </br>
> CSC 5601 - Theory of Machine Learning </br>
> Milwaukee School of Engineering </br>
> Fall 2025


## imports

In [None]:
# imports
from isolation_forest import IsolationForest
import pandas as pd
from scipy import io as sio
import numpy as np
from sklearn.metrics import (
    classification_report,
    confusion_matrix,
    f1_score,
    precision_score,
    recall_score,
)
import seaborn as sns
import matplotlib.pyplot as plt

## Vertebral Dataset

In [None]:
data_path = "../Data/vertebral.mat"
# data_path = "../Data/satellite.mat"
# data_path = "../Data/satimage-2.mat"
mat = sio.loadmat(data_path)

X = mat["X"]
y = mat["y"].flatten()
feature_names = [f"feature_{i}" for i in range(X.shape[1])]
df_vertebral = pd.DataFrame(X, columns=feature_names)
df_vertebral["target"] = y

df_vertebral["target"].value_counts(normalize=True)


In [None]:

iso_forest = IsolationForest(random_state=42, contamination=0.205)
predictions = iso_forest.fit_predict(X)
classification_report(y, predictions, target_names=["Inlier", "Outlier"])


In [None]:

# scatter grid of all feature pairs
pd.plotting.scatter_matrix(
    df_vertebral[feature_names],
    c=y,
    figsize=(10, 10),
    marker="o",
    hist_kwds={"bins": 20},
    s=60,
    alpha=0.8,
)
plt.show()

pd.plotting.scatter_matrix(
    df_vertebral[feature_names],
    c=predictions,
    figsize=(10, 10),
    marker="o",
    hist_kwds={"bins": 20},
    s=60,
    alpha=0.8,
)
plt.show()

## Malware Dataset

In [None]:
# data_path = "../Data/IRIS.csv"
data_path = "../Data/TUANDROMD.csv"
df_data = pd.read_csv(data_path)

# Drop columns with identical values (constant columns)
nunique = df_data.nunique()
df_data = df_data.drop(columns=nunique[nunique == 1].index)

# Drop rows with missing values
df_data = df_data.dropna()

# Encode target: anomalies = 1, inliers = 0
df_data["target"] = np.where(df_data["target"] == 1, 0, 1)

X = df_data.drop(columns=["target"]).values
y = df_data["target"].values

contamination = df_data["target"].value_counts(normalize=True)[1]

X.shape, y.shape, df_data.shape, df_data["target"].value_counts(), contamination

In [None]:
# Train baseline model with default parameters
baseline_iso_forest = IsolationForest(random_state=42, contamination=contamination)
baseline_predictions = baseline_iso_forest.fit_predict(X)

# Store baseline metrics
baseline_metrics = {
    "precision": precision_score(y, baseline_predictions),
    "recall": recall_score(y, baseline_predictions),
    "f1": f1_score(y, baseline_predictions),
}

print(
    classification_report(y, baseline_predictions, target_names=["Inlier", "Outlier"])
)
print(f'\nBaseline F1 Score: {baseline_metrics["f1"]:.4f}')

In [None]:
feature_names = df_data.columns.drop("target")
importances = baseline_iso_forest.feature_importances_()
importance_lookup = dict(zip(feature_names, importances))
importance_df = (
    pd.DataFrame({"Feature": feature_names, "Importance": importances})
    .sort_values(by="Importance", ascending=False)
    .reset_index(drop=True)
)


rows = []

for feature in feature_names:
    # group once per feature
    grp = df_data.groupby([feature, "target"]).size().unstack("target", fill_value=0)
    grp = grp.rename(columns={0: "inliers", 1: "outliers"})

    # total counts for that feature
    total = df_data[feature].value_counts().rename("total_count")

    # merge and tidy
    merged = grp.merge(total, left_index=True, right_index=True)
    merged["feature"] = feature
    merged["importance"] = importance_lookup[feature]
    merged.reset_index(inplace=True)
    merged.rename(columns={feature: "value"}, inplace=True)

    rows.append(merged)

info_df = pd.concat(rows, ignore_index=True)

# weight importance by outlier ratio
info_df["fraction_outliers"] = info_df["outliers"] / info_df["total_count"]
info_df.sort_values(by=["importance"], ascending=False)[:10]

In [None]:
sns.scatterplot(
    data=info_df,
    x="importance",
    y="fraction_outliers",
    size="total_count",
    hue="value",
    alpha=0.5,
)
plt.title("Feature Importance vs Outlier Ratio")
plt.xlabel("Feature Importance")
plt.ylabel("Outlier Ratio")

In [None]:
# Iteratively remove least important features and evaluate performance
selection_results = []
current_features = list(feature_names.values)
least_important = ""


def get_least_important(importances, features, i) -> str:
    # Get feature importances
    importance_dict = dict(zip(current_features, importances))

    # Remove i least important feature
    # # print importance dict sorted by value
    # print("Current feature importances:")
    # for feat, imp in sorted(importance_dict.items(), key=lambda item: item[1]):
    #     print(f"{feat}: {imp:.6f}")
    return sorted(importance_dict, key=importance_dict.get)[i]


def test_model(current_X) -> dict:
    iso_forest = IsolationForest(
        n_trees=500, random_state=42, contamination=contamination
    )
    predictions = iso_forest.fit_predict(current_X)
    # weight the model metrics by contamination

    # Calculate metrics
    return {
        "f1": f1_score(y, predictions, average="weighted"),
        "precision": precision_score(y, predictions, average="weighted"),
        "recall": recall_score(y, predictions, average="weighted"),
        "iforest": iso_forest,
    }

f1 = 0
least_important = ""
importances = np.zeros(len(current_features))
while len(current_features) > 15:
    i = 0
    last_f1 = f1
    f1 = -1
    precision, recall = 0, 0
    feature = least_important
    best_feature = feature
    while last_f1 > f1 and i < len(current_features) // 10:
        tmp_features = current_features.copy()
        if importances.sum() > 0:
            feature = get_least_important(importances, current_features, i)
            tmp_features.remove(feature)
        # Get current feature subset
        current_X = df_data[tmp_features].values
        results = test_model(current_X)
        if results["f1"] > f1:
            best_feature = feature
            precision = results["precision"]
            recall = results["recall"]
            f1 = results["f1"]
        importances = results["iforest"].feature_importances_()
        i += 1
    least_important = best_feature
    if feature != "":
        current_features.remove(least_important)

    # Train model with current features

    selection_results.append(
        {
            "num_features": len(current_features),
            "features": current_features.copy(),
            "f1": f1,
            "precision": precision,
            "recall": recall,
        }
    )

    print(
        f"Features: {len(current_features):3d} | F1: {f1:.4f} | Removed: {least_important}"
    )

In [None]:
# Plot feature selection results
results_df = pd.DataFrame(selection_results)

fig, axes = plt.subplots(1, 3, figsize=(15, 4))

axes[0].plot(results_df["num_features"], results_df["f1"], marker="o")
axes[0].set_xlabel("Number of Features")
axes[0].set_ylabel("F1 Score")
axes[0].set_title("F1 Score vs Number of Features")
axes[0].grid(True)

axes[1].plot(
    results_df["num_features"], results_df["precision"], marker="o", label="Precision"
)
axes[1].plot(
    results_df["num_features"], results_df["recall"], marker="s", label="Recall"
)
axes[1].set_xlabel("Number of Features")
axes[1].set_ylabel("Score")
axes[1].set_title("Precision & Recall vs Number of Features")
axes[1].legend()
axes[1].grid(True)

axes[2].bar(range(len(results_df)), results_df["f1"])
axes[2].set_xlabel("Feature Count")
axes[2].set_ylabel("F1 Score")
axes[2].set_title("F1 Score for Each Feature Count")
axes[2].set_xticks(range(len(results_df)))
axes[2].set_xticklabels(results_df["num_features"])

plt.tight_layout()
plt.show()

# Find best feature subset
best_idx = results_df["f1"].idxmax()
best_features = selection_results[best_idx]["features"]
best_f1 = selection_results[best_idx]["f1"]

print(f"\n=== BEST FEATURE SUBSET ===")
print(f"Number of features: {len(best_features)}")
print(f"Selected features: {best_features}")
print(f"F1 Score: {best_f1:.4f}")

In [None]:
# Prepare data with best features
X_best = df_data[best_features].values

# Define parameter grid for tuning (using custom IsolationForest parameters)
param_grid = {
    "n_trees": [100, 150, 200, 500, 1000],
    "max_samples": [128, 256, 512, 1024],
    "contamination": [0.15, 0.2, 0.25],
}

print("Parameter Grid:")
for param, values in param_grid.items():
    print(f"  {param}: {values}")

# Calculate total combinations
total_combinations = np.prod([len(v) for v in param_grid.values()])
print(f"\nTotal parameter combinations: {total_combinations}")

In [None]:
# Perform grid search with cross-validation
best_params = None
best_score = -1
grid_results = []

n_trees_list = param_grid["n_trees"]
max_samples_list = param_grid["max_samples"]
contamination_list = param_grid["contamination"]

total_evals = 0
total_combinations = len(n_trees_list) * len(max_samples_list) * len(contamination_list)

for n_tr in n_trees_list:
    for max_samp in max_samples_list:
        for contam in contamination_list:
            total_evals += 1

            # Use cross-validation
            cv_scores = []
            skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

            for train_idx, val_idx in skf.split(X_best, y):
                X_train, X_val = X_best[train_idx], X_best[val_idx]
                y_train, y_val = y[train_idx], y[val_idx]

                # Train model with custom IsolationForest parameters
                iso_forest = IsolationForest(
                    n_trees=n_tr,
                    max_samples=max_samp,
                    contamination=contam,
                    random_state=42,
                )
                iso_forest.fit(X_train)
                pred = iso_forest.predict(X_val)

                # Score
                score = f1_score(y_val, pred)
                cv_scores.append(score)

            mean_cv_score = np.mean(cv_scores)

            grid_results.append(
                {
                    "n_trees": n_tr,
                    "max_samples": max_samp,
                    "contamination": contam,
                    "mean_cv_f1": mean_cv_score,
                }
            )

            if mean_cv_score > best_score:
                best_score = mean_cv_score
                best_params = {
                    "n_trees": n_tr,
                    "max_samples": max_samp,
                    "contamination": contam,
                }

            if total_evals % 5 == 0:
                print(
                    f"[{total_evals}/{total_combinations}] Best CV F1: {best_score:.4f}"
                )

print(f"\nCompleted {total_evals} parameter evaluations")
print(f"\n=== BEST PARAMETERS ===")
for param, value in best_params.items():
    print(f"{param}: {value}")
print(f"Best CV F1 Score: {best_score:.4f}")

In [None]:
# Analyze grid search results
grid_results_df = pd.DataFrame(grid_results).sort_values("mean_cv_f1", ascending=False)

print("\n=== TOP 10 PARAMETER COMBINATIONS ===")
print(grid_results_df.head(10))

# Analyze parameter importance
fig, axes = plt.subplots(1, 3, figsize=(15, 4))

# n_trees impact
n_tr_impact = grid_results_df.groupby("n_trees")["mean_cv_f1"].agg(["mean", "std"])
axes[0].errorbar(
    n_tr_impact.index, n_tr_impact["mean"], yerr=n_tr_impact["std"], marker="o"
)
axes[0].set_xlabel("n_trees")
axes[0].set_ylabel("Mean CV F1")
axes[0].set_title("Impact of n_trees")
axes[0].grid(True)

# contamination impact
contam_impact = grid_results_df.groupby("contamination")["mean_cv_f1"].agg(
    ["mean", "std"]
)
axes[1].errorbar(
    contam_impact.index, contam_impact["mean"], yerr=contam_impact["std"], marker="o"
)
axes[1].set_xlabel("contamination")
axes[1].set_ylabel("Mean CV F1")
axes[1].set_title("Impact of contamination")
axes[1].grid(True)

# max_samples impact
max_samp_impact = grid_results_df.groupby("max_samples")["mean_cv_f1"].agg(
    ["mean", "std"]
)
axes[2].errorbar(
    max_samp_impact.index,
    max_samp_impact["mean"],
    yerr=max_samp_impact["std"],
    marker="o",
)
axes[2].set_xlabel("max_samples")
axes[2].set_ylabel("Mean CV F1")
axes[2].set_title("Impact of max_samples")
axes[2].grid(True)

plt.tight_layout()
plt.show()

In [None]:
# Train final model with best features and best parameters
final_iso_forest = IsolationForest(
    n_trees=best_params["n_trees"],
    max_samples=best_params["max_samples"],
    contamination=best_params["contamination"],
    random_state=42,
)

final_predictions = final_iso_forest.fit_predict(X_best)

# Calculate final metrics
final_metrics = {
    "precision": precision_score(y, final_predictions),
    "recall": recall_score(y, final_predictions),
    "f1": f1_score(y, final_predictions),
}

print("\n=== FINAL MODEL METRICS ===")
print(classification_report(y, final_predictions, target_names=["Inlier", "Outlier"]))
print(f'\nFinal F1 Score: {final_metrics["f1"]:.4f}')

In [None]:
# Compare baseline vs final model
comparison_df = pd.DataFrame(
    {"Baseline": baseline_metrics, "Final Model": final_metrics}
)

print("\n=== BASELINE VS FINAL MODEL COMPARISON ===")
print(comparison_df)

# Calculate improvements
improvements = {}
for metric in baseline_metrics.keys():
    improvement = final_metrics[metric] - baseline_metrics[metric]
    pct_change = (
        (improvement / baseline_metrics[metric]) * 100
        if baseline_metrics[metric] != 0
        else 0
    )
    improvements[metric] = {"absolute": improvement, "percent": pct_change}

print("\n=== IMPROVEMENTS ===")
for metric, change in improvements.items():
    print(f'{metric.upper()}: {change["absolute"]:+.4f} ({change["percent"]:+.2f}%)')

In [None]:
# Visualizations
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Confusion matrices
cm_baseline = confusion_matrix(y, baseline_predictions)
cm_final = confusion_matrix(y, final_predictions)

sns.heatmap(cm_baseline, annot=True, fmt="d", cmap="Blues", ax=axes[0], cbar=False)
axes[0].set_title("Baseline Model - Confusion Matrix")
axes[0].set_ylabel("True Label")
axes[0].set_xlabel("Predicted Label")

sns.heatmap(cm_final, annot=True, fmt="d", cmap="Greens", ax=axes[1], cbar=False)
axes[1].set_title("Final Model - Confusion Matrix")
axes[1].set_ylabel("True Label")
axes[1].set_xlabel("Predicted Label")

plt.tight_layout()
plt.show()

# Metrics comparison bar chart
fig, ax = plt.subplots(figsize=(10, 6))
metrics = list(baseline_metrics.keys())
baseline_vals = [baseline_metrics[m] for m in metrics]
final_vals = [final_metrics[m] for m in metrics]

x = np.arange(len(metrics))
width = 0.35

ax.bar(x - width / 2, baseline_vals, width, label="Baseline", alpha=0.8)
ax.bar(x + width / 2, final_vals, width, label="Final Model", alpha=0.8)

ax.set_ylabel("Score")
ax.set_title("Baseline vs Final Model Performance")
ax.set_xticks(x)
ax.set_xticklabels([m.capitalize() for m in metrics])
ax.legend()
ax.set_ylim([0, 1])
ax.grid(axis="y", alpha=0.3)

plt.tight_layout()
plt.show()