In [None]:
# Pipeline: Transfer Learning + Concept Drift Detection + Explainable AI (SHAP)

import pandas as pd
import numpy as np
import os, glob

from pathlib import Path

from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix

from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier, ExtraTreesClassifier, VotingClassifier
from sklearn.tree import DecisionTreeClassifier

from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from catboost import CatBoostClassifier

import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
# Step 1: Load and preprocess dataset from a structured path
def load_dataset_from_structure(root_path):
    data = []
    for file in root_path.glob('*/*/*.csv'):
        try:
            df = pd.read_csv(file)
            category = file.parents[1].name
            attack = file.parent.name
            label_class = 'Benign' if category.upper() == 'BENIGN' else 'Attack'
            df['category'] = category
            df['attack'] = attack
            df['class'] = label_class
            data.append(df)
        except Exception as e:
            print(f"[ERROR] Failed to read file {file}: {e}")
    return pd.concat(data, ignore_index=True)

# Load datasets
train_root = Path('../../../Data/CICIoMT2024/train')
train_df = load_dataset_from_structure(train_root)

test_root = Path('../../../Data/CICIoMT2024/test')
test_df = load_dataset_from_structure(test_root)

In [None]:
# Summary
print("Train set:", train_df.shape)
print(train_df[['category', 'attack', 'class']].value_counts())
print("\nTest set:", test_df.shape)
print(test_df[['category', 'attack', 'class']].value_counts())

In [None]:
# Combine train and test
df = pd.concat([train_df, test_df], ignore_index=True)

# Label encoding
y_encoded = LabelEncoder().fit_transform(df['attack'])

# Feature selection + scaling
X = df.drop(['class', 'category', 'attack'], axis=1, errors='ignore').select_dtypes(include=[np.number])
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)


In [None]:
# Split data
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_encoded, test_size=0.2, random_state=42)

# Step 2: Concept Drift Detection (Manual Sliding Window Approach)
window_size = 100
threshold = 0.85  # if accuracy in window < threshold, consider drift
for i in range(window_size, len(X_test)):
    window_X = X_test[i-window_size:i]
    window_y = y_test[i-window_size:i]
    clf = XGBClassifier().fit(X_train, y_train)
    preds = clf.predict(window_X)
    acc = np.mean(preds == window_y)
    if acc < threshold:
        print(f"🔺 Possible Concept Drift Detected at index {i}, Accuracy in window: {acc:.2f}")
