In [5]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import classification_report, accuracy_score
import xgboost as xgb
import joblib

# --------------
# CONFIGURATION
TRAIN_CSV = '/content/UNSW_NB15_training-set.csv'
TEST_CSV = '/content/UNSW_NB15_testing-set.csv'
MODEL_PATH = '/content/threat_detection_xgb_model.pkl'
SCALER_PATH = '/content/threat_detection_scaler.pkl'
TARGET_ENCODER_PATH = '/content/threat_detection_target_encoder.pkl'
FEATURE_ENCODERS_PATH = '/content/threat_detection_feature_encoders.pkl'

# 1. Load datasets
train_df = pd.read_csv(TRAIN_CSV)
test_df = pd.read_csv(TEST_CSV)

# 2. Preprocessing for training data
def preprocess_train(df):
    df = df.copy()
    # Drop irrelevant columns
    df = df.drop(columns=['id', 'attack_id'], errors='ignore')
    # Extract and encode target
    y = df.pop('attack_cat')
    target_encoder = LabelEncoder()
    y_enc = target_encoder.fit_transform(y)
    # Features
    X = df
    # Encode categorical features
    cat_cols = X.select_dtypes(include=['object']).columns.tolist()
    feature_encoders = {}
    for col in cat_cols:
        le = LabelEncoder()
        X[col] = le.fit_transform(X[col].astype(str))
        feature_encoders[col] = {'classes': le.classes_.tolist(),
                                  'mapping': {cls: idx for idx, cls in enumerate(le.classes_)}}
    # Scale
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    return X_scaled, y_enc, scaler, target_encoder, feature_encoders, X.columns.tolist()

# 3. Preprocessing for test data
def preprocess_test(df, scaler, target_encoder, feature_encoders, feature_cols):
    df = df.copy()
    df = df.drop(columns=['id', 'attack_id'], errors='ignore')
    # Encode target if present
    if 'attack_cat' in df:
        y = df.pop('attack_cat')
        y_enc = target_encoder.transform(y)
    else:
        y_enc = None
    # Features
    X = df.reindex(columns=feature_cols)
    # Handle categorical
    for col, enc in feature_encoders.items():
        X[col] = X[col].astype(str).map(enc['mapping']).fillna(-1).astype(int)
    # Scale
    X_scaled = scaler.transform(X)
    return X_scaled, y_enc

# 4. Train & evaluate
print("Preprocessing training data...")
X_train, y_train, scaler, target_enc, feature_encoders, feature_cols = preprocess_train(train_df)
print("Preprocessing testing data...")
X_test, y_test = preprocess_test(test_df, scaler, target_enc, feature_encoders, feature_cols)

print("Training XGBoost classifier...")
clf = xgb.XGBClassifier(
    objective='multi:softmax',
    num_class=len(target_enc.classes_),
    n_estimators=200,
    learning_rate=0.1,
    max_depth=6,
    subsample=0.8,
    colsample_bytree=0.8,
    use_label_encoder=False,
    eval_metric='mlogloss'
)
clf.fit(X_train, y_train)

print("Evaluating on test data...")
y_pred = clf.predict(X_test)
acc = accuracy_score(y_test, y_pred)
print(f"Test Accuracy: {acc * 100:.2f}%")
print(classification_report(y_test, y_pred, target_names=target_enc.classes_))

# 5. Save artifacts
print("Saving model and preprocessors...")
joblib.dump(clf, MODEL_PATH)
joblib.dump(scaler, SCALER_PATH)
joblib.dump(target_enc, TARGET_ENCODER_PATH)
joblib.dump(feature_encoders, FEATURE_ENCODERS_PATH)
joblib.dump(feature_cols, '/content/feature_columns.pkl')
print("Artifacts saved.")

# 6. Inference function for real-world data without labels
def infer(input_df):
    """
    input_df: DataFrame with same feature columns (may include id, attack_id, but no attack_cat)
    returns: array of predicted attack categories
    """
    X_scaled, _ = preprocess_test(input_df, scaler, target_enc, feature_encoders, feature_cols)
    preds = clf.predict(X_scaled)
    return target_enc.inverse_transform(preds)

#  Example usage for inference:
# new_data = pd.read_csv('new_traffic.csv')  # must have same raw features
# predictions = infer(new_data)
# print(predictions)


Preprocessing training data...
Preprocessing testing data...
Training XGBoost classifier...


Parameters: { "use_label_encoder" } are not used.



Evaluating on test data...
Test Accuracy: 85.41%
                precision    recall  f1-score   support

      Analysis       0.00      0.00      0.00      2000
      Backdoor       0.96      0.07      0.13      1746
           DoS       0.34      0.58      0.43     12264
      Exploits       0.71      0.68      0.70     33393
       Fuzzers       0.96      0.86      0.91     18184
       Generic       0.99      0.98      0.99     40000
        Normal       1.00      1.00      1.00     56000
Reconnaissance       0.92      0.75      0.82     10491
     Shellcode       0.63      0.68      0.65      1133
         Worms       0.69      0.52      0.59       130

      accuracy                           0.85    175341
     macro avg       0.72      0.61      0.62    175341
  weighted avg       0.87      0.85      0.86    175341

Saving model and preprocessors...
Artifacts saved.


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import classification_report, accuracy_score
import xgboost as xgb
from imblearn.over_sampling import SMOTE
import joblib

# --------------
# CONFIGURATION
TRAIN_CSV = '/content/UNSW_NB15_training-set.csv'
TEST_CSV = '/content/UNSW_NB15_testing-set.csv'
MODEL_PATH = '/content/new/threat_detection_xgb_model.pkl'
SCALER_PATH = '/content/new/threat_detection_scaler.pkl'
TARGET_ENCODER_PATH = '/content/new/threat_detection_target_encoder.pkl'
FEATURE_ENCODERS_PATH = '/content/new/threat_detection_feature_encoders.pkl'

# 1. Load datasets
def load_data(path):
    return pd.read_csv(path)

train_df = load_data(TRAIN_CSV)
test_df = load_data(TEST_CSV)

# 2. Preprocessing functions
def preprocess_train(df):
    df = df.copy()
    df.drop(columns=['id', 'attack_id'], errors='ignore', inplace=True)
    y = df.pop('attack_cat')
    target_enc = LabelEncoder()
    y_enc = target_enc.fit_transform(y)
    X = df
    cat_cols = X.select_dtypes(include=['object']).columns.tolist()
    feat_enc = {}
    for col in cat_cols:
        le = LabelEncoder()
        X[col] = le.fit_transform(X[col].astype(str))
        feat_enc[col] = {'classes': le.classes_.tolist(), 'mapping': {cls: idx for idx, cls in enumerate(le.classes_)}}
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    return X_scaled, y_enc, scaler, target_enc, feat_enc, X.columns.tolist()

def preprocess_test(df, scaler, target_enc, feat_enc, feat_cols):
    df = df.copy()
    df.drop(columns=['id', 'attack_id'], errors='ignore', inplace=True)
    if 'attack_cat' in df:
        y = df.pop('attack_cat')
        y_enc = target_enc.transform(y)
    else:
        y_enc = None
    X = df.reindex(columns=feat_cols)
    for col, enc in feat_enc.items():
        X[col] = X[col].astype(str).map(enc['mapping']).fillna(-1).astype(int)
    X_scaled = scaler.transform(X)
    return X_scaled, y_enc

# 3. Prepare training data
X_train_raw, y_train_raw, scaler, target_enc, feat_enc, feat_cols = preprocess_train(train_df)
# Apply SMOTE to balance classes
sm = SMOTE(random_state=42)
X_train, y_train = sm.fit_resample(X_train_raw, y_train_raw)

# 4. Hyperparameter tuning with RandomizedSearchCV
param_dist = {
    'n_estimators': [100, 200, 300],
    'max_depth': [4, 6, 8],
    'learning_rate': [0.01, 0.05, 0.1],
    'subsample': [0.7, 0.8, 1.0],
    'colsample_bytree': [0.7, 0.8, 1.0]
}

base_clf = xgb.XGBClassifier(objective='multi:softmax', num_class=len(target_enc.classes_),
                             use_label_encoder=False, eval_metric='mlogloss', random_state=42)
search = RandomizedSearchCV(base_clf, param_distributions=param_dist, n_iter=10,
                            scoring='accuracy', cv=3, verbose=1, random_state=42, n_jobs=-1)
print("Tuning hyperparameters...")
search.fit(X_train, y_train)
best_clf = search.best_estimator_
print(f"Best params: {search.best_params_}")

# 5. Evaluate on test set
X_test, y_test = preprocess_test(test_df, scaler, target_enc, feat_enc, feat_cols)
y_pred = best_clf.predict(X_test)
acc = accuracy_score(y_test, y_pred)
print(f"Test Accuracy after tuning: {acc * 100:.2f}%")
print(classification_report(y_test, y_pred, target_names=target_enc.classes_))

# 6. Save artifacts
joblib.dump(best_clf, MODEL_PATH)
joblib.dump(scaler, SCALER_PATH)
joblib.dump(target_enc, TARGET_ENCODER_PATH)
joblib.dump(feat_enc, FEATURE_ENCODERS_PATH)
joblib.dump(feat_cols, FEATURE_COLS_PATH)
print("Saved tuned model and preprocessors.")

# 7. Inference function for real-world (no labels)
def infer(input_df):
    X_scaled, _ = preprocess_test(input_df, scaler, target_enc, feat_enc, feat_cols)
    preds = best_clf.predict(X_scaled)
    return target_enc.inverse_transform(preds)

# Dataset recommendations:
# • UNSW-NB15: https://www.kaggle.com/datasets/ahmedbesbes/unsw-nb15
# • CIC-IDS2017 (more realistic, varied attack types): https://www.unb.ca/cic/datasets/ids-2017.html

Tuning hyperparameters...
Fitting 3 folds for each of 10 candidates, totalling 30 fits
