In [None]:
test_run = True
import os

train_set_path = "../../../datasets/train_set.csv"
validation_set_path = "../../../datasets/validation_set.csv"
test_set_path = "../../../datasets/test_set.csv"

tuning_metric = "f1"  # f1 or accuracy

if test_run:
    use_sample = True
    train_frac = 0.05
    with_storage = False
    trials = 10
else:
    os.makedirs("optuna_storage", exist_ok=True)
    storage_path = "sqlite:///optuna_storage/dbscan_study.db"
    use_sample = True
    train_frac = 0.5
    with_storage = False
    trials = 100

Test set

In [None]:
import pandas as pd

test_set = pd.read_csv(test_set_path)

if test_run:
    test_set = test_set.sample(frac=train_frac * 2, random_state=42)

print(f"test set count: {test_set.shape[0]:,}")

# Splitting into X and y
X_test = test_set.drop(
    columns=["attack_binary", "attack_categorical", "attack_class"]
).values
y_test = test_set["attack_binary"].values
y_test_class = test_set["attack_class"]

test_set.head(3)

Validation set

In [None]:
validation_set = pd.read_csv(validation_set_path)

if test_run:
    validation_set = validation_set.sample(frac=train_frac * 2, random_state=42)



print(f"Validation set count: {validation_set.shape[0]:,}")


print(validation_set["attack_class"].value_counts())



# Splitting into X and y


X_val = test_set.drop(
    columns=["attack_binary", "attack_categorical", "attack_class"]
).values

y_val = test_set["attack_binary"].values


y_val_class = test_set["attack_class"]



validation_set.head(3)

In [None]:
from imblearn.over_sampling import SMOTE
import numpy as np
import pandas as pd

# First, display the original distribution
print("Before SMOTE:")
print(f"Val set count: {X_val.shape[0]:,}")
before_counts = pd.Series(y_val_class).value_counts()
print(before_counts)

# Apply SMOTE to training data using class labels
if test_run:
    sampling_strategy = {
        "DoS": 800,
        "R2L": 800,
        "Probe": 800,
        "U2R": 800,
    }
else:
    sampling_strategy = {
        "DoS": 4000,
        "R2L": 4000,
        "Probe": 4000,
        "U2R": 4000,
    }

smote = SMOTE(random_state=42, k_neighbors=3, sampling_strategy=sampling_strategy)
X_val_resampled, y_val_resampled = smote.fit_resample(X_val, y_val_class)

# Display the distribution after SMOTE
print("\nAfter SMOTE:")
print(f"Val set count: {X_val_resampled.shape[0]:,}")
after_counts = pd.Series(y_val_resampled).value_counts()
print(after_counts)

# If you need binary labels for further processing, convert back
y_val_resampled = np.where(y_val_resampled == "normal", 1, -1)

Train set

In [None]:
train_dataset = pd.read_csv(train_set_path)

if use_sample:
    train_dataset = train_dataset.sample(frac=train_frac, random_state=1)

print(f"train set count: {train_dataset.shape[0]:,}")
train_dataset.head(3)

# Tuning

In [None]:
import time

start_time = time.time()

In [None]:
from typing import Any


from sklearn.svm import OneClassSVM
from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score
import optuna


def objective(trial):
    nu = trial.suggest_float("nu", 0.2, 0.3)
    gamma = trial.suggest_float("gamma", 0.1, 0.3)

    ocsvm = OneClassSVM(kernel="rbf", nu=nu, gamma=gamma)
    ocsvm.fit(train_dataset.values)

    y_pred_val = ocsvm.predict(X_val_resampled)
    y_pred_test = ocsvm.predict(X_test)

    acc_val = accuracy_score(y_val_resampled, y_pred_val)
    f1_val = f1_score(y_val_resampled, y_pred_val, pos_label=-1)
    precision_val = precision_score(y_val_resampled, y_pred_val, pos_label=-1)
    recall_val = recall_score(y_val_resampled, y_pred_val, pos_label=-1)
    print("Validation Results:")
    print(
        {
            "accuracy": f"{acc_val * 100:.2f}",
            "f1": f"{f1_val * 100:.2f}",
            "precision": f"{precision_val * 100:.2f}",
            "recall": f"{recall_val * 100:.2f}",
        }
    )

    print("\nTest Results:")
    acc_test = accuracy_score(y_test, y_pred_test)
    f1_test = f1_score(y_test, y_pred_test, pos_label=-1)
    precision_test = precision_score(y_test, y_pred_test, pos_label=-1)
    recall_test = recall_score(y_test, y_pred_test, pos_label=-1)
    print(
        {
            "accuracy": f"{acc_test * 100:.2f}",
            "f1": f"{f1_test * 100:.2f}",
            "precision": f"{precision_test * 100:.2f}",
            "recall": f"{recall_test * 100:.2f}",
        }
    )

    if tuning_metric == "f1":
        return f1_val
    elif tuning_metric == "accuracy":
        return acc_val
    else:
        raise ValueError("Invalid tuning metric")


if with_storage:
    study = optuna.create_study(
        direction="maximize",
        storage=storage_path,
        study_name="base_ocsvm_study",
        load_if_exists=True,
    )
    study.optimize(objective, n_trials=trials)
else:
    study = optuna.create_study(direction="maximize")
    study.optimize(objective, n_trials=trials)


print(f"Best score: {study.best_value:.3f}")
print(f"Best parameters: {study.best_params}")

In [None]:
end_time = time.time()
duration = end_time - start_time
print(f"Tuning duration: {duration:.2f} seconds")

In [None]:
import optuna
from plotly.io import show

fig = optuna.visualization.plot_optimization_history(study)
show(fig)

In [None]:
fig = optuna.visualization.plot_edf([study])
show(fig)

# Training

In [None]:
from sklearn.svm import OneClassSVM

gamma = study.best_params["gamma"]
nu = study.best_params["nu"]

ocsvm = OneClassSVM(kernel="rbf", gamma=gamma, nu=nu, verbose=True)

ocsvm.fit(train_dataset.values)

# Testing

Perform prediction

In [None]:
y_pred = ocsvm.predict(X_test)

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

cm = confusion_matrix(y_test, y_pred, labels=[-1, 1])


def plot_confusion_matrix(cm, labels, title):
    plt.figure(figsize=(5, 4))
    sns.heatmap(
        cm, annot=True, fmt="d", cmap="Blues", xticklabels=labels, yticklabels=labels
    )
    plt.xlabel("Predicted Label")
    plt.ylabel("Actual Label")
    plt.title(title)
    plt.show()


plot_confusion_matrix(cm, ["Anomaly", "Normal"], "Confusion Matrix (Anomaly vs Normal)")

In [None]:
from sklearn.metrics import (
    classification_report,
    precision_score,
    recall_score,
    f1_score,
    accuracy_score,
)

print("Classification Report:")
print(classification_report(y_test, y_pred, target_names=["Anomaly", "Normal"]))

precision = precision_score(y_test, y_pred, pos_label=-1)
recall = recall_score(y_test, y_pred, pos_label=-1)
f1 = f1_score(y_test, y_pred, pos_label=-1)
acc = accuracy_score(y_test, y_pred)

print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")
print(f"Accuracy: {acc}")

In [None]:
import numpy as np


def create_multiclass_cm(y_true_class, y_pred_binary):
    """
    Create a confusion matrix showing how each attack class was classified.

    For attack classes (DoS, Probe, R2L, U2R), correct detection is when y_pred = -1 (anomaly)
    For normal class, correct detection is when y_pred = 1 (normal)
    """
    classes = np.unique(y_true_class)
    cm = np.zeros((len(classes), 2))

    for i, cls in enumerate(classes):
        # Get predictions for this class
        cls_indices = y_true_class == cls
        preds = y_pred_binary[cls_indices]

        # Count correct and incorrect predictions
        if cls == "normal":
            cm[i, 0] = np.sum(preds == -1)  # incorrectly detected as anomaly
            cm[i, 1] = np.sum(preds == 1)  # correctly detected as normal
        else:
            cm[i, 0] = np.sum(preds == -1)  # correctly detected as anomaly
            cm[i, 1] = np.sum(preds == 1)  # incorrectly detected as normal

    return cm, classes


# Create and plot the multi-class confusion matrix
cm_multi, classes = create_multiclass_cm(y_test_class, y_pred)

plt.figure(figsize=(10, 8))
sns.heatmap(
    cm_multi,
    annot=True,
    fmt="g",
    cmap="Blues",
    xticklabels=["Detected as Anomaly", "Detected as Normal"],
    yticklabels=classes,
)
plt.ylabel("True Attack Class")
plt.title("Confusion Matrix by Attack Class")
plt.tight_layout()
plt.show()

In [None]:
# Calculate detection rates for each class
print("Detection rates by class:")
class_metrics = {}
for cls in np.unique(y_test_class):
    # Get indices for this class
    class_indices = y_test_class == cls

    # True values and predictions for this class
    y_true_cls = y_test[class_indices]
    y_pred_cls = y_pred[class_indices]

    # Calculate metrics
    if cls == "Normal":
        # For normal class, we want to detect 1 (normal)
        correct = np.sum((y_pred_cls == 1))
        precision = precision_score(
            y_true_cls, y_pred_cls, pos_label=1, zero_division=0
        )
        recall = recall_score(y_true_cls, y_pred_cls, pos_label=1, zero_division=0)
    else:
        # For attack classes, we want to detect -1 (anomaly)
        correct = np.sum((y_pred_cls == -1))
        precision = precision_score(
            y_true_cls, y_pred_cls, pos_label=-1, zero_division=0
        )
        recall = recall_score(y_true_cls, y_pred_cls, pos_label=-1, zero_division=0)

    total = len(y_pred_cls)
    detection_rate = correct / total
    f1 = f1_score(
        y_true_cls, y_pred_cls, pos_label=-1 if cls != "Normal" else 1, zero_division=0
    )

    class_metrics[cls] = {
        "detection_rate": detection_rate,
        "precision": precision,
        "recall": recall,
        "f1_score": f1,
        "count": total,
        "correctly_detected": correct,
    }

    print(f"{cls}: {detection_rate:.4f} ({correct}/{total})")