In [None]:
from google.colab import drive
drive.mount('/content/drive')


In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.model_selection import train_test_split, cross_val_score, StratifiedKFold
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import (
    classification_report, confusion_matrix,
    accuracy_score, f1_score, precision_score,
    recall_score, roc_auc_score, roc_curve, auc,
    precision_recall_curve, average_precision_score
)
from sklearn.inspection import permutation_importance

plt.style.use("seaborn-v0_8-whitegrid")
sns.set_palette("deep")


In [None]:
file_path = "/content/drive/MyDrive/Colab Notebooks/cybersecurityAI/datasets/Cybersecurity_Network_Traffic_Dataset.csv"
df = pd.read_csv(file_path)

In [None]:
print(f"Shape: {df.shape[0]} rows x {df.shape[1]} columns")
print(f"\nColumn types:\n{df.dtypes}")
print(f"\nMissing values:\n{df.isnull().sum()}")
print(f"\nBasic statistics:\n{df.describe()}")
print(f"\nLabel distribution:\n{df['label'].value_counts()}")
print(f"\nAttack type distribution:\n{df['attack_type'].value_counts()}")
print(f"\nProtocol distribution:\n{df['protocol'].value_counts()}")
print(f"\nIs internal traffic distribution:\n{df['is_internal_traffic'].value_counts()}")


In [None]:
df["timestamp"] = pd.to_datetime(df["timestamp"])
df["hour"] = df["timestamp"].dt.hour
df["day_of_week"] = df["timestamp"].dt.dayofweek
df["minute"] = df["timestamp"].dt.minute
df["is_weekend"] = (df["day_of_week"] >= 5).astype(int)
df["is_night"] = ((df["hour"] >= 22) | (df["hour"] <= 5)).astype(int)

df["total_bytes"] = df["bytes_sent"] + df["bytes_received"]
df["bytes_ratio"] = df["bytes_sent"] / (df["bytes_received"] + 1)

df["log_bytes_sent"] = np.log1p(df["bytes_sent"])
df["log_bytes_received"] = np.log1p(df["bytes_received"])
df["log_total_bytes"] = np.log1p(df["total_bytes"])

df["port_diff"] = abs(df["src_port"] - df["dst_port"])
df["src_port_is_well_known"] = (df["src_port"] < 1024).astype(int)
df["dst_port_is_well_known"] = (df["dst_port"] < 1024).astype(int)
df["src_port_is_ephemeral"] = (df["src_port"] >= 49152).astype(int)
df["dst_port_is_ephemeral"] = (df["dst_port"] >= 49152).astype(int)

common_ports = [20, 21, 22, 23, 25, 53, 80, 110, 143, 443, 445, 993, 995, 1433, 3306, 3389, 5432, 8080, 8443]
df["dst_port_common"] = df["dst_port"].isin(common_ports).astype(int)
df["src_port_common"] = df["src_port"].isin(common_ports).astype(int)

df["has_url"] = df["url"].notna().astype(int)
df["url_length"] = df["url"].fillna("").apply(len)
df["url_has_login"] = df["url"].fillna("").str.contains("login|logon|auth|signin", case=False).astype(int)
df["url_has_admin"] = df["url"].fillna("").str.contains("admin|config|phpmyadmin|setup", case=False).astype(int)
df["url_has_id_param"] = df["url"].fillna("").str.contains(r"\?id=", case=False).astype(int)
df["url_num_params"] = df["url"].fillna("").str.count("&") + df["url"].fillna("").str.contains(r"\?").astype(int)
df["url_depth"] = df["url"].fillna("").str.count("/")

ua_col = df["user_agent"].fillna("")
df["ua_is_chrome"] = ua_col.str.contains("Chrome", case=False).astype(int)
df["ua_is_firefox"] = ua_col.str.contains("Firefox", case=False).astype(int)
df["ua_is_windows"] = ua_col.str.contains("Windows", case=False).astype(int)
df["ua_length"] = ua_col.apply(len)

df["is_internal_traffic"] = df["is_internal_traffic"].astype(int)

le_protocol = LabelEncoder()
df["protocol_encoded"] = le_protocol.fit_transform(df["protocol"])

src_ip_parts = df["src_ip"].str.split(".", expand=True).astype(int)
df["src_ip_first_octet"] = src_ip_parts[0]
df["src_ip_second_octet"] = src_ip_parts[1]

dst_ip_parts = df["dst_ip"].str.split(".", expand=True).astype(int)
df["dst_ip_first_octet"] = dst_ip_parts[0]
df["dst_ip_second_octet"] = dst_ip_parts[1]

df["src_ip_is_private"] = (
    (src_ip_parts[0] == 10) |
    ((src_ip_parts[0] == 172) & (src_ip_parts[1] >= 16) & (src_ip_parts[1] <= 31)) |
    ((src_ip_parts[0] == 192) & (src_ip_parts[1] == 168))
).astype(int)

df["dst_ip_is_private"] = (
    (dst_ip_parts[0] == 10) |
    ((dst_ip_parts[0] == 172) & (dst_ip_parts[1] >= 16) & (dst_ip_parts[1] <= 31)) |
    ((dst_ip_parts[0] == 192) & (dst_ip_parts[1] == 168))
).astype(int)

print(f"Features after engineering: {df.shape[1]} columns")

In [None]:
feature_cols = [
    "src_port", "dst_port", "bytes_sent", "bytes_received", "protocol_encoded",
    "is_internal_traffic", "total_bytes", "bytes_ratio",
    "log_bytes_sent", "log_bytes_received", "log_total_bytes",
    "port_diff", "src_port_is_well_known", "dst_port_is_well_known",
    "src_port_is_ephemeral", "dst_port_is_ephemeral",
    "dst_port_common", "src_port_common",
    "hour", "day_of_week", "minute", "is_weekend", "is_night",
    "has_url", "url_length", "url_has_login", "url_has_admin",
    "url_has_id_param", "url_num_params", "url_depth",
    "ua_is_chrome", "ua_is_firefox", "ua_is_windows", "ua_length",
    "src_ip_first_octet", "src_ip_second_octet",
    "dst_ip_first_octet", "dst_ip_second_octet",
    "src_ip_is_private", "dst_ip_is_private"
]

feature_cols = [c for c in feature_cols if c in df.columns]

X = df[feature_cols].copy()
y_binary = df["label"].copy()

le_attack = LabelEncoder()
y_multi = le_attack.fit_transform(df["attack_type"])
attack_classes = le_attack.classes_

print(f"Number of features: {len(feature_cols)}")

In [None]:
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
X_scaled = pd.DataFrame(X_scaled, columns=feature_cols)

In [None]:
X_train_b, X_test_b, y_train_b, y_test_b = train_test_split(
    X_scaled, y_binary, test_size=0.2, random_state=42, stratify=y_binary
)

X_train_m, X_test_m, y_train_m, y_test_m = train_test_split(
    X_scaled, y_multi, test_size=0.2, random_state=42, stratify=y_multi
)

print(f"Binary - Train: {X_train_b.shape}, Test: {X_test_b.shape}")
print(f"Multi  - Train: {X_train_m.shape}, Test: {X_test_m.shape}")

In [None]:
binary_models = {
    "Random Forest": RandomForestClassifier(
        n_estimators=200, max_depth=20, min_samples_split=5,
        min_samples_leaf=2, random_state=42, n_jobs=-1, class_weight="balanced"
    ),
    "Gradient Boosting": GradientBoostingClassifier(
        n_estimators=200, max_depth=6, learning_rate=0.1,
        subsample=0.8, random_state=42
    ),
    "Logistic Regression": LogisticRegression(
        max_iter=1000, random_state=42, class_weight="balanced", C=1.0
    ),
    "SVM (RBF)": SVC(
        kernel="rbf", C=10, gamma="scale", random_state=42,
        probability=True, class_weight="balanced"
    ),
    "Neural Network": MLPClassifier(
        hidden_layer_sizes=(128, 64, 32), activation="relu",
        max_iter=500, random_state=42, early_stopping=True,
        validation_fraction=0.1, batch_size=256, learning_rate="adaptive"
    ),
}

binary_results = {}

for name, model in binary_models.items():
    print(f"\nTraining {name}...")
    model.fit(X_train_b, y_train_b)

    y_pred = model.predict(X_test_b)
    y_proba = model.predict_proba(X_test_b)[:, 1]

    acc = accuracy_score(y_test_b, y_pred)
    f1 = f1_score(y_test_b, y_pred, average="weighted")
    prec = precision_score(y_test_b, y_pred, average="weighted")
    rec = recall_score(y_test_b, y_pred, average="weighted")
    roc = roc_auc_score(y_test_b, y_proba)

    binary_results[name] = {
        "accuracy": acc, "f1": f1,
        "precision": prec, "recall": rec,
        "roc_auc": roc, "model": model
    }

    print(f"Accuracy:  {acc:.4f}")
    print(f"F1-Score:  {f1:.4f}")
    print(f"Precision: {prec:.4f}")
    print(f"Recall:    {rec:.4f}")
    print(f"ROC-AUC:   {roc:.4f}")

In [None]:
# =========================================
# Fig. 2 - Confusion Matrix (Gradient Boosting)
# =========================================

from sklearn.metrics import confusion_matrix

plt.figure(dpi=300)

gb_model = binary_results["Gradient Boosting"]["model"]
y_pred_gb = gb_model.predict(X_test_b)

cm = confusion_matrix(y_test_b, y_pred_gb)

plt.imshow(cm)
plt.title("Confusion Matrix - Gradient Boosting")
plt.xlabel("Predicted Label")
plt.ylabel("True Label")

for i in range(cm.shape[0]):
    for j in range(cm.shape[1]):
        plt.text(j, i, cm[i, j], ha="center", va="center")

plt.tight_layout()
plt.show()

In [None]:
# =========================================
# Fig. 3 - Precision-Recall Curves
# =========================================

from sklearn.metrics import precision_recall_curve, auc

plt.figure(dpi=300)

for name, result in binary_results.items():
    model = result["model"]
    y_probs = model.predict_proba(X_test_b)[:, 1]
    precision, recall, _ = precision_recall_curve(y_test_b, y_probs)
    pr_auc = auc(recall, precision)
    plt.plot(recall, precision, label=f"{name} (AUC={pr_auc:.3f})")

plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision-Recall Curve")
plt.legend()
plt.tight_layout()
plt.show()

In [None]:
# =========================================
# Fig. 3 - Precision-Recall Curves
# =========================================

from sklearn.metrics import precision_recall_curve, auc

plt.figure(dpi=300)

for name, result in binary_results.items():
    model = result["model"]
    y_probs = model.predict_proba(X_test_b)[:, 1]
    precision, recall, _ = precision_recall_curve(y_test_b, y_probs)
    pr_auc = auc(recall, precision)
    plt.plot(recall, precision, label=f"{name} (AUC={pr_auc:.3f})")

plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision-Recall Curve")
plt.legend()
plt.tight_layout()
plt.show()

In [None]:
!pip install shap
import shap

In [None]:
# =========================================
# SHAP Explainability - Binary Classification
# =========================================

# Get best binary model (already computed in your code)
best_binary_name = max(binary_results, key=lambda k: binary_results[k]["f1"])
best_binary_model = binary_results[best_binary_name]["model"]

print(f"Using SHAP for model: {best_binary_name}")

# Create SHAP explainer (TreeExplainer for tree-based models)
explainer = shap.TreeExplainer(best_binary_model)

# Compute SHAP values on test set
shap_values = explainer.shap_values(X_test_b)

# For binary classification, shap_values[1] corresponds to attack class
if isinstance(shap_values, list):
    shap_values_to_use = shap_values[1]
else:
    shap_values_to_use = shap_values

# Convert to DataFrame for safety
X_test_df = pd.DataFrame(X_test_b, columns=feature_cols)

print("SHAP values shape:", np.array(shap_values_to_use).shape)

In [None]:
# =========================================
# Fig. 5 - SHAP Global Summary Plot
# =========================================

plt.figure(dpi=300)

shap.summary_plot(
    shap_values_to_use,
    X_test_df,
    feature_names=feature_cols,
    show=False
)

plt.title("SHAP Summary Plot - Binary Classification")
plt.tight_layout()
plt.show()

In [None]:
# Bar plot of feature importance
plt.figure()
shap.summary_plot(
    shap_values_to_use,
    X_test_df,
    plot_type="bar",
    feature_names=feature_cols,
    show=False
)
plt.title("Mean |SHAP| Feature Importance - Binary Classification")
plt.tight_layout()
plt.show()

In [None]:
# Select one correctly predicted attack sample
attack_indices = np.where(y_test_b.values == 1)[0]

if len(attack_indices) > 0:
    sample_index = attack_indices[0]

    print(f"Explaining sample index: {sample_index}")

    shap.force_plot(
        explainer.expected_value,
        shap_values_to_use[sample_index],
        X_test_df.iloc[sample_index],
        matplotlib=True
    )

In [None]:
# Find top important feature
shap_importance = np.abs(shap_values_to_use).mean(axis=0)
top_feature_index = np.argmax(shap_importance)
top_feature_name = feature_cols[top_feature_index]

print("Top SHAP feature:", top_feature_name)

# =========================================
# Fig. 6 - SHAP Dependence Plot
# =========================================

plt.figure(dpi=300)

shap.dependence_plot(
    top_feature_name,
    shap_values_to_use,
    X_test_df,
    show=False
)

plt.title(f"SHAP Dependence Plot - {top_feature_name}")
plt.tight_layout()
plt.show()