In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import  GradientBoostingClassifier
from sklearn.neighbors import KNeighborsClassifier
import warnings
import time

In [None]:
warnings.filterwarnings("ignore")
print("Starting Anomaly Detection System Setup...")

In [None]:
print("Loading datasets...")
df1 = pd.read_csv('UNSW_NB15_training-set.csv')
df2 = pd.read_csv('UNSW_NB15_testing-set.csv')

In [None]:
df = pd.concat([df1, df2], ignore_index=True)
print("Datasets loaded and concatenated. Total records:", len(df))

In [None]:
print("Dropping irrelevant columns: 'id' and 'attack_cat'")
df.drop(['id', 'attack_cat'], axis=1, inplace=True)

In [None]:
print("Clamping extreme values in numeric columns...")
df_numeric = df.select_dtypes(include=[np.number])
for feature in df_numeric.columns:
    if df[feature].max() > 10 * df[feature].median():
        df[feature] = np.where(
            df[feature] < df[feature].quantile(0.95),
            df[feature],
            df[feature].quantile(0.95)
        )
print("Clamping complete.")

In [None]:
print("Applying log transformation on skewed numeric features...")
for feature in df_numeric.columns:
    if df[feature].nunique() > 50:
        df[feature] = np.log1p(df[feature])
print("Log transformation complete.")

In [None]:
print("Reducing cardinality of categorical features...")
df_cat = df.select_dtypes(exclude=[np.number])
for feature in df_cat.columns:
    top_labels = df[feature].value_counts().nlargest(5).index
    df[feature] = df[feature].apply(lambda x: x if x in top_labels else '-')
print("Categorical cardinality reduction done.")


In [None]:
print("Encoding categorical features using OneHotEncoder...")
X = df.drop('label', axis=1)
y = df['label']
cat_features = X.select_dtypes(include='object').columns
preprocessor = ColumnTransformer(
    transformers=[
        ('cat', OneHotEncoder(handle_unknown='ignore'), cat_features)
    ],
    remainder='passthrough'
)

In [None]:
print("Splitting dataset into train and test sets...")
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)
print("Data split complete:")
print("  Training samples:", X_train.shape[0])
print("  Testing samples:", X_test.shape[0])

In [None]:
scaler = StandardScaler()

In [None]:
models = {
    "Logistic Regression": LogisticRegression(max_iter=1000),
    "Decision Tree": DecisionTreeClassifier(),
    "KNN": KNeighborsClassifier(n_neighbors=5, n_jobs=-1),
    "Gradient Boosting": GradientBoostingClassifier()
}

In [None]:
# Training and evaluating models
results = {}
for name, model in models.items():
    print("\n============================================")
    print("Training and Evaluating Model:", name)
    print("============================================")
    start_time = time.time()
    pipeline = Pipeline([
        ('preprocessor', preprocessor),
        ('scaler', scaler),
        ('classifier', model)
    ])
    pipeline.fit(X_train, y_train)
    train_time = time.time() - start_time
    print(f"Training completed in {train_time:.2f} seconds")

    y_pred = pipeline.predict(X_test)
    y_proba = pipeline.predict_proba(X_test) if hasattr(pipeline.named_steps['classifier'], 'predict_proba') else None

    print("\nClassification Report:")
    print(classification_report(y_test, y_pred))

    print("Confusion Matrix:")
    ConfusionMatrixDisplay.from_estimator(pipeline, X_test, y_test, cmap=plt.cm.Blues)
    plt.title(f"Confusion Matrix - {name}")
    plt.show()

    results[name] = {
        "model": pipeline,
        "report": classification_report(y_test, y_pred, output_dict=True),
        "proba": y_proba,
        "preds": y_pred
    }

print("\nAll models have been trained and evaluated.")

In [None]:
# Anomaly Alert System with Voting
def alert_intrusions():
    print("\nANOMALY ALERTS using Voting System")
    print("Scanning predictions for detected anomalies...")
    
    # Initialize counters for each sample
    sample_votes = np.zeros(len(y_test))
    sample_confidences = np.zeros(len(y_test))
    
    # Collect votes and confidences from each model
    for model_name, model_info in results.items():
        for i, (pred, prob) in enumerate(zip(model_info['preds'], 
                                           model_info['proba'] if model_info['proba'] is not None 
                                           else np.zeros((len(y_test), 2)))):
            if pred == 1:
                sample_votes[i] += 1
                sample_confidences[i] += prob[1] if model_info['proba'] is not None else 1
    
    # Calculate average confidence for each sample
    sample_confidences = sample_confidences / len(results)
    
    # Count confirmed anomalies
    count = 0
    for i, (votes, conf) in enumerate(zip(sample_votes, sample_confidences)):
        # An anomaly is confirmed if majority of models predict it (votes > len(models)/2)
        # and average confidence is above threshold
        if votes > len(models)/2 and conf >= 0.75:
            count += 1
            print(f"\nALERT #{count}")
            print("-------------------------")
            print(f"Sample Index      : {i}")
            print(f"Actual Label      : {y_test.iloc[i]}")
            print(f"Votes for Anomaly : {votes}/{len(models)}")
            print(f"Average Confidence: {conf:.2%}")
            print("-------------------------")

    if count == 0:
        print("No confirmed anomalies detected in the current test set")
    else:
        print(f"\nTotal Confirmed Anomalies: {count}")

# Run the voting-based anomaly detection
alert_intrusions()