In [None]:
# mqtt_ids_boost_train.py
# Batch + epoch training of a boosted decision tree (XGBoost) for multi-class attack-type detection.
# Processes packet / uniflow / biflow CSVs in streaming mode (chunks), creates a multi-class
# "attack_type" target (normal is one type), trains incrementally across batches and epochs,
# validates per-epoch on an in-memory sampled validation set, and evaluates finally on test set.
#
# IMPORTANT: adjust base_path and folders/filenames to point to your CSV files.
# This script deletes only temporary files it creates and does an additional cleanup of common temp files.

import os
import glob
import time
import tempfile
import shutil
from pathlib import Path
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, f1_score, classification_report
import xgboost as xgb
import random

# ------------------------------
# Config - adjust these paths & hyperparams
# ------------------------------
base_path = "./"   # base folder where "packet_features", "uniflow_features", "biflow_features" reside
folders = {
    "packet": "packet_features",
    "uniflow": "uniflow_features",
    "biflow": "biflow_features"
}

# filenames expected in each folder (same names used by your earlier script)
files = {
    "normal": "normal.csv",
    "sparta": "sparta.csv",
    "scan_A": "scan_A.csv",
    "mqtt_bruteforce": "mqtt_bruteforce.csv",
    "scan_sU": "scan_sU.csv"
}

# If your uniflow/biflow files are prefixed differently, you can change these helper names
def build_filenames(prefix):
    return {
        "normal": f"{prefix}_normal.csv",
        "sparta": f"{prefix}_sparta.csv",
        "scan_A": f"{prefix}_scan_A.csv",
        "mqtt_bruteforce": f"{prefix}_mqtt_bruteforce.csv",
        "scan_sU": f"{prefix}_scan_sU.csv"
    }

feature_files = {
    "packet": files,
    "uniflow": build_filenames("uniflow"),
    "biflow": build_filenames("biflow")
}

# training hyperparams
CHUNKSIZE = 200000           # number of rows per CSV chunk read
TRAIN_FRACTION = 0.80
VAL_FRACTION = 0.10
TEST_FRACTION = 0.10

EPOCHS = 3                   # how many passes over the entire dataset
ROUNDS_PER_BATCH = 1         # xgboost boosting rounds per batch update (small number for fine updates)
XGB_PARAMS = {
    "objective": "multi:softprob",
    "eval_metric": "mlogloss",
    "eta": 0.1,
    "max_depth": 6,
    "verbosity": 0,
    # "tree_method": "hist"  # optionally set depending on your environment
}

SAMPLE_VAL_MAX = 20000       # keep at most this many rows in validation set (sampled)
SAMPLE_TEST_MAX = 20000      # same for test set

RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
random.seed(RANDOM_SEED)

# ------------------------------
# Utilities & bookkeeping
# ------------------------------
TEMP_FILES_CREATED = []   # track any temp files we create so we can delete them later

def safe_remove(path):
    try:
        if os.path.exists(path):
            os.remove(path)
            print(f"[CLEANUP] Removed file: {path}")
    except Exception as e:
        print(f"[CLEANUP] Could not remove {path}: {e}")

# ------------------------------
# Build a mapping from filename key to textual attack type (target)
# ------------------------------
attack_type_names = ["normal", "sparta", "scan_A", "mqtt_bruteforce", "scan_sU"]
# label encoder will convert these to ints later
label_encoder = LabelEncoder()
label_encoder.fit(attack_type_names)

# ------------------------------
# Streaming data generator
# ------------------------------
def stream_chunks_for_all_files(feature_files_map, base_path, chunksize=CHUNKSIZE):
    """
    Generator that yields tuples: (level, file_key, filepath, chunk_df)
    Iterates over feature levels and their files, reading each CSV in chunks.
    """
    for level, file_dict in feature_files_map.items():
        folder_path = os.path.join(base_path, folders[level])
        for key, fname in file_dict.items():
            fpath = os.path.join(folder_path, fname)
            if not os.path.isfile(fpath):
                print(f"[WARN] File not found: {fpath}. Skipping.")
                continue
            try:
                # pandas read_csv with chunksize
                for chunk in pd.read_csv(fpath, chunksize=chunksize, low_memory=False):
                    yield (level, key, fpath, chunk)
            except pd.errors.EmptyDataError:
                print(f"[WARN] Empty CSV: {fpath}. Skipping.")
            except Exception as e:
                print(f"[ERROR] Reading {fpath} failed: {e}")

# ------------------------------
# Preprocess a chunk:
# - add attack_type target column (text), then encoded label (int)
# - select numeric columns only (drop non-numeric / non-finite)
# - fill NaNs with zero (or another strategy)
# - return X (ndarray) and y (ndarray) plus feature_names
# ------------------------------
def preprocess_chunk(df, file_key, expected_feature_names=None):
    """
    df: pandas DataFrame chunk
    file_key: filename key like 'normal', 'sparta', ...
    expected_feature_names: if provided, will attempt to reorder/select features to match
    returns: X, y, feature_names
    """
    # add textual attack type column and numeric encoded label
    df = df.copy()

    # Create attack_type (textual)
    df["attack_type"] = file_key

    # If there's already a 'label' or 'attack_type' column from earlier pipelines, prefer the textual name we set
    # Convert label to categorical integer using our label_encoder mapping
    try:
        y_text = df["attack_type"].astype(str).values
        y_encoded = label_encoder.transform(y_text)
    except Exception as e:
        # If label values are something else, map using dictionary fallback
        mapping = {name: idx for idx, name in enumerate(label_encoder.classes_)}
        y_encoded = df["attack_type"].map(mapping).fillna(-1).astype(int).values

    # Select numeric columns only for features
    numeric_df = df.select_dtypes(include=[np.number]).copy()

    # Drop potential target-like numeric columns if they exist in numeric set (avoid leakage)
    for col in ["label", "attack_type"]:
        if col in numeric_df.columns:
            numeric_df = numeric_df.drop(columns=[col])

    # If expected_feature_names provided, we intersect to maintain consistent features across batches
    if expected_feature_names is not None:
        # keep only columns that are in expected_feature_names; order them accordingly
        intersect = [c for c in expected_feature_names if c in numeric_df.columns]
        numeric_df = numeric_df[intersect]
        feature_names = intersect
    else:
        feature_names = list(numeric_df.columns)

    # Fill missing values
    if len(numeric_df.columns) == 0:
        # No numeric features available -> raise
        raise ValueError("No numeric features found in this chunk. Consider checking CSV format.")
    numeric_df = numeric_df.fillna(0.0)

    X = numeric_df.values.astype(np.float32)
    y = y_encoded.astype(np.int32)

    return X, y, feature_names

# ------------------------------
# Main training loop
# ------------------------------
def train_boosted_tree_batchwise(feature_files_map, base_path, epochs=EPOCHS):
    """
    Trains an XGBoost booster incrementally over batches and epochs.
    Returns final booster and test set performance.
    """
    booster = None
    feature_names_master = None  # features selected from first processed chunk
    val_X = None; val_y = None
    test_X = None; test_y = None

    total_batches = 0

    # We will perform epochs: for each epoch we iterate through all chunks via stream generator
    for epoch in range(1, epochs + 1):
        epoch_start = time.time()
        print(f"\n=== EPOCH {epoch}/{epochs} ===")

        # iterate all chunks (fresh generator)
        gen = stream_chunks_for_all_files(feature_files_map, base_path)
        for level, file_key, filepath, chunk in gen:
            total_batches += 1
            batch_id_str = f"ep{epoch}_b{total_batches}"
            print(f"[BATCH {batch_id_str}] level={level} file_key={file_key} shape={chunk.shape}")

            # Preprocess: determine or reuse master feature names
            try:
                if feature_names_master is None:
                    # first chunk we process: establish feature set
                    X_batch, y_batch, feature_names = preprocess_chunk(chunk, file_key, expected_feature_names=None)
                    feature_names_master = feature_names
                    print(f"[INFO] Master feature count: {len(feature_names_master)}")
                else:
                    # reuse master features for consistency
                    X_batch, y_batch, _ = preprocess_chunk(chunk, file_key, expected_feature_names=feature_names_master)
            except ValueError as e:
                print(f"[SKIP] {e} -- skipping this chunk.")
                continue
            except Exception as e:
                print(f"[ERROR] Preprocessing failed: {e} -- skipping chunk.")
                continue

            # Now split within this batch into train/val/test indices
            n_rows = X_batch.shape[0]
            if n_rows == 0:
                continue
            rnd = np.random.rand(n_rows)
            train_mask = rnd < TRAIN_FRACTION
            val_mask = (rnd >= TRAIN_FRACTION) & (rnd < TRAIN_FRACTION + VAL_FRACTION)
            test_mask = (rnd >= TRAIN_FRACTION + VAL_FRACTION)

            X_train = X_batch[train_mask]
            y_train = y_batch[train_mask]
            X_val_chunk = X_batch[val_mask]
            y_val_chunk = y_batch[val_mask]
            X_test_chunk = X_batch[test_mask]
            y_test_chunk = y_batch[test_mask]

            # Append a **sample** of validation & test chunk to in-memory val/test pools to limit memory
            if X_val_chunk.shape[0] > 0:
                if val_X is None:
                    # sample if too big
                    take = min(X_val_chunk.shape[0], SAMPLE_VAL_MAX)
                    idxs = np.random.choice(X_val_chunk.shape[0], size=take, replace=False)
                    val_X = X_val_chunk[idxs]; val_y = y_val_chunk[idxs]
                else:
                    # append and trim to max
                    combined_X = np.vstack([val_X, X_val_chunk])
                    combined_y = np.concatenate([val_y, y_val_chunk])
                    # if exceed SAMPLE_VAL_MAX, downsample
                    if combined_X.shape[0] > SAMPLE_VAL_MAX:
                        keep_idxs = np.random.choice(combined_X.shape[0], SAMPLE_VAL_MAX, replace=False)
                        val_X = combined_X[keep_idxs]; val_y = combined_y[keep_idxs]
                    else:
                        val_X = combined_X; val_y = combined_y

            if X_test_chunk.shape[0] > 0:
                if test_X is None:
                    take = min(X_test_chunk.shape[0], SAMPLE_TEST_MAX)
                    idxs = np.random.choice(X_test_chunk.shape[0], size=take, replace=False)
                    test_X = X_test_chunk[idxs]; test_y = y_test_chunk[idxs]
                else:
                    combined_X = np.vstack([test_X, X_test_chunk])
                    combined_y = np.concatenate([test_y, y_test_chunk])
                    if combined_X.shape[0] > SAMPLE_TEST_MAX:
                        keep_idxs = np.random.choice(combined_X.shape[0], SAMPLE_TEST_MAX, replace=False)
                        test_X = combined_X[keep_idxs]; test_y = combined_y[keep_idxs]
                    else:
                        test_X = combined_X; test_y = combined_y

            # If no training rows in this batch, skip training update
            if X_train.shape[0] == 0:
                print(f"[BATCH {batch_id_str}] no training rows, skipping update.")
                continue

            # Build DMatrix
            dtrain = xgb.DMatrix(X_train, label=y_train, feature_names=feature_names_master)

            # Train/update booster incrementally
            # We call xgb.train with xgb_model=booster to continue training from prior booster (or None)
            try:
                booster = xgb.train(
                    params={**XGB_PARAMS, "num_class": len(label_encoder.classes_)},
                    dtrain=dtrain,
                    num_boost_round=ROUNDS_PER_BATCH,
                    xgb_model=booster,
                    verbose_eval=False
                )
                print(f"[BATCH {batch_id_str}] booster updated with {X_train.shape[0]} training rows.")
            except Exception as e:
                print(f"[ERROR] xgboost.train failed on batch {batch_id_str}: {e}")

        # End of epoch: evaluate on validation pool if available
        epoch_time = time.time() - epoch_start
        print(f"--- Completed epoch {epoch} in {epoch_time:.1f}s. Running validation...")

        if val_X is not None and val_X.shape[0] > 0 and booster is not None:
            dval = xgb.DMatrix(val_X, label=val_y, feature_names=feature_names_master)
            val_preds_prob = booster.predict(dval)  # shape (n_samples, num_class)
            val_preds = np.argmax(val_preds_prob, axis=1)
            acc = accuracy_score(val_y, val_preds)
            f1 = f1_score(val_y, val_preds, average="macro")
            print(f"[EPOCH {epoch}] Validation Accuracy: {acc:.4f}  Macro-F1: {f1:.4f}")
        else:
            print(f"[EPOCH {epoch}] No validation data or booster unavailable to evaluate.")

    # Done all epochs -> final evaluation on test set
    if test_X is not None and test_X.shape[0] > 0 and booster is not None:
        dtest = xgb.DMatrix(test_X, label=test_y, feature_names=feature_names_master)
        test_probs = booster.predict(dtest)
        test_preds = np.argmax(test_probs, axis=1)
        test_acc = accuracy_score(test_y, test_preds)
        test_f1 = f1_score(test_y, test_preds, average="macro")
        report = classification_report(test_y, test_preds, target_names=label_encoder.classes_, zero_division=0)
        print(f"\n=== FINAL TEST METRICS ===")
        print(f"Test Accuracy: {test_acc:.4f}  Macro-F1: {test_f1:.4f}")
        print("Classification report:\n")
        print(report)
    else:
        print("No test data or booster unavailable; skipping final test evaluation.")

    return booster, feature_names_master

# ------------------------------
# Run training
# ------------------------------
if __name__ == "__main__":
    start_all = time.time()
    print("[START] Batch-epoch boosting training started.")
    try:
        final_booster, final_features = train_boosted_tree_batchwise(feature_files, base_path, epochs=EPOCHS)
    except Exception as e:
        print(f"[FATAL] Training aborted due to error: {e}")
        final_booster = None
        final_features = None

    elapsed_total = time.time() - start_all
    print(f"[DONE] Total elapsed time: {elapsed_total:.1f} seconds")

    # ------------------------------
    # Save final booster to disk temporarily and then remove (if user wants persistent model, comment out removal)
    # ------------------------------
    try:
        if final_booster is not None:
            tmp_model_path = os.path.join(tempfile.gettempdir(), f"xgb_booster_{int(time.time())}.model")
            final_booster.save_model(tmp_model_path)
            print(f"[INFO] Saved booster temporarily to {tmp_model_path}")
            TEMP_FILES_CREATED.append(tmp_model_path)
            # If you want to persist model, move it to a permanent path and remove deletion below.
    except Exception as e:
        print(f"[WARN] Could not save booster: {e}")

    # ------------------------------
    # Cleanup: remove temporary files we created + common tmp files of pattern *.tmp / *.temp in temp dir
    # ------------------------------
    print("\n[START CLEANUP] Removing temporary files created during processing...")
    for f in list(TEMP_FILES_CREATED):
        safe_remove(f)
    # Also try to clean common temp extensions in system temp dir
    tmpdir = tempfile.gettempdir()
    for pattern in ("*.tmp", "*.temp", "tmp*"):
        for f in glob.glob(os.path.join(tmpdir, pattern)):
            try:
                # only remove files, not directories
                if os.path.isfile(f):
                    os.remove(f)
                    print(f"[CLEANUP] Removed temp file: {f}")
            except Exception:
                pass

    print("[CLEANUP] Completed temporary file cleanup.")
    print("[EXIT] Script finished.")