In [1]:
"""
Data Cleaning and Preprocessing for UNSW-NB15 (concatenated) dataset.

VALIDATION SUMMARY (Task 4):
- The original pipeline is largely correct and complete. It implements all plan steps, with an intentional
  reordering where step 10 (zero-inflation indicators) is applied before step 8 (multicollinearity drops)
  to preserve useful zero-pattern information for features that are subsequently removed.
- One targeted change was made for robustness and to avoid unnecessary row loss:
  CHANGE: In Final Integrity (Step 14), instead of dropping rows with NaN in any column (including raw
          non-modeled string columns), we now only drop rows that have NaNs in numeric columns
          (i.e., model-ready features and targets). This aligns with the step’s intent and avoids removing rows
          due to missing values in non-numeric, uninterpreted identifiers or text columns.
  RATIONALE: The previous "drop if any NaN in df" could remove a large number of rows if any unused string
             column had NaNs. Our change keeps the feature matrix clean where it matters (numeric inputs
             and targets) while preserving rows that are otherwise valid for modeling.

All other logic is retained; additional comments were added for clarity.

This script implements the cleaning plan described in Task 3:
1) Schema lock and dtypes enforcement
2) Identifier removal
3) Exact duplicate handling
4) Label–category consistency audit and repair
5) Domain validity checks and hard constraints
6) Continuous feature skew mitigation (log1p)
7) Extreme outlier capping (winsorization)
10) Zero-inflation indicators for structural zeros (applied BEFORE dropping derived features)
8) Multicollinearity control among derived features
9) Categorical encoding for selected protocol/state flags
11) Scaling of continuous features
12) Rare category handling (applied within step 9)
13) Class distribution profiling and artifact check (non-modeling diagnostics)
14) Final integrity checks and export

Notes:
- This code avoids splitting or modeling; it prepares a single preprocessed DataFrame.
- It is defensive to slight schema variations by checking column existence before operations.
- All steps print concise diagnostics to assist auditability without interrupting execution.
"""

import os
import sys
import math
import warnings
from pathlib import Path

import numpy as np
import pandas as pd
from sklearn.preprocessing import RobustScaler

warnings.filterwarnings("ignore", category=FutureWarning)

# -------------------------------------------------------------------
# Configuration
# -------------------------------------------------------------------

# Input CSV path (must exist)
CSV_PATH = r"E:\Datasets\UNSW-NB15\Training and Testing Sets\UNSW_NB15_concatenated_dropped.csv"

# Output (optional) - will attempt to write in the same directory
OUTPUT_FILENAME = "UNSW_NB15_preprocessed.csv"

# Robust winsorization quantiles (after log1p transformation)
LOW_Q = 0.005
HIGH_Q = 0.995

# Rare category threshold (proportion of dataset); categories under this will be pooled into 'Other'
RARE_CAT_THRESHOLD = 0.001  # 0.1%

# -------------------------------------------------------------------
# Utilities
# -------------------------------------------------------------------

def normalize_attack_cat(val):
    """
    Normalize attack category string to canonical form if recognized.
    Returns (canonical_string, recognized_boolean)

    - Handles common UNSW-NB15 attack categories and "Normal"
    - Uses a lowercase map but returns canonical title-case labels.
    """
    if pd.isna(val):
        return val, False
    s = str(val).strip().lower()
    mapping = {
        "normal": "Normal",
        "fuzzers": "Fuzzers",
        "analysis": "Analysis",
        "backdoor": "Backdoor",
        "backdoors": "Backdoor",  # sometimes plural
        "dos": "DoS",
        "exploits": "Exploits",
        "generic": "Generic",
        "reconnaissance": "Reconnaissance",
        "shellcode": "Shellcode",
        "worms": "Worms",
    }
    if s in mapping:
        return mapping[s], True
    return val, False  # return original if not recognized


def print_step_header(step_text):
    """Helper to print readable step headers in the console for audit trail."""
    print("\n" + "-" * 80)
    print(step_text)
    print("-" * 80)


def safe_intersect(cols, df_columns):
    """Return the intersection of a candidate list with an existing DataFrame columns."""
    return [c for c in cols if c in df_columns]


def is_binary_series(s):
    """Check if a pandas Series contains only {0,1} (ignoring NaNs)."""
    vals = pd.unique(s.dropna())
    return set(vals).issubset({0, 1})


# -------------------------------------------------------------------
# Load data
# -------------------------------------------------------------------

print_step_header("Loading data")
if not os.path.exists(CSV_PATH):
    raise FileNotFoundError(f"Input CSV not found at path: {CSV_PATH}")

# low_memory=False to preserve column consistency and avoid mixed dtypes
df = pd.read_csv(CSV_PATH, low_memory=False)
print(f"Loaded shape: {df.shape}")
print(f"Columns: {list(df.columns)}")

# -------------------------------------------------------------------
# 1) Schema lock and dtypes enforcement
# -------------------------------------------------------------------

print_step_header("1) Schema lock and dtypes enforcement")

# Targets: binary 'label' is required; multiclass 'attack_cat' is optional.
target_label_col = "label"
target_multiclass_col = "attack_cat"

# Ensure 'label' is present; if not, raise error because it is a key target.
if target_label_col not in df.columns:
    raise KeyError("Expected binary target column 'label' not found.")

# Enforce 'label' as pandas nullable integer Int64 (allows NA during coercion) then validate later.
df[target_label_col] = pd.to_numeric(df[target_label_col], errors="coerce").astype("Int64")

# Ensure 'attack_cat' exists; if so, cast to string for normalization.
if target_multiclass_col in df.columns:
    df[target_multiclass_col] = df[target_multiclass_col].astype("string")
else:
    print("Warning: 'attack_cat' column not found; multiclass target-related checks will be skipped.")

# Convert object-typed non-target columns to numeric if the vast majority are numeric-representable.
# This prevents string categorical columns from being damaged while ensuring numeric strings become numerics.
non_target_cols = [c for c in df.columns if c not in [target_label_col, target_multiclass_col]]
for col in non_target_cols:
    if df[col].dtype == object:
        converted = pd.to_numeric(df[col], errors="coerce")
        # Convert only if >90% of values can be parsed as numbers (heuristic to preserve true categoricals).
        if converted.notna().mean() > 0.9:
            df[col] = converted

# -------------------------------------------------------------------
# 2) Identifier removal
# -------------------------------------------------------------------

print_step_header("2) Identifier removal")
# Drop only clear row-unique identifiers; keep potential features like IPs unless explicitly excluded.
drop_id_cols = [c for c in ["id"] if c in df.columns]
if drop_id_cols:
    df.drop(columns=drop_id_cols, inplace=True)
    print(f"Dropped identifier columns: {drop_id_cols}")
else:
    print("No identifier columns found to drop.")

# -------------------------------------------------------------------
# 3) Exact duplicate handling
# -------------------------------------------------------------------

print_step_header("3) Exact duplicate handling")
before = len(df)
df = df.drop_duplicates(ignore_index=True)
after = len(df)
print(f"Removed {before - after} exact duplicate rows. New shape: {df.shape}")

# -------------------------------------------------------------------
# 4) Label–category consistency audit and repair
# -------------------------------------------------------------------

print_step_header("4) Label–category consistency audit and repair")
if target_multiclass_col in df.columns:
    # Normalize attack_cat values to canonical names if recognized by our map
    normalized_vals = df[target_multiclass_col].apply(normalize_attack_cat)
    df[target_multiclass_col] = normalized_vals.apply(lambda x: x[0])
    recognized_mask = normalized_vals.apply(lambda x: x[1])

    # Derive the expected binary label from normalized attack_cat where recognized (Normal -> 0; else -> 1).
    derived_label = pd.Series(index=df.index, dtype="Int64")
    recognized_cats = df.loc[recognized_mask, target_multiclass_col]
    derived_label.loc[recognized_mask] = (recognized_cats != "Normal").astype("Int64")

    # Identify contradictions only on rows where both a label and a recognized cat exist.
    mismatch_mask = recognized_mask & df[target_label_col].notna() & (df[target_label_col] != derived_label)
    mismatches = int(mismatch_mask.sum())

    # Fill missing labels directly from recognized attack_cat-derived labels.
    fill_from_cat_mask = recognized_mask & df[target_label_col].isna()
    fills = int(fill_from_cat_mask.sum())

    # Apply fixes (mend contradictions and fill missing labels).
    df.loc[mismatch_mask, target_label_col] = derived_label[mismatch_mask]
    df.loc[fill_from_cat_mask, target_label_col] = derived_label[fill_from_cat_mask]

    print(f"Normalized recognized attack_cat values: {int(recognized_mask.sum())} rows.")
    print(f"Repaired label to match attack_cat in {mismatches} conflicting rows.")
    print(f"Filled missing labels from attack_cat in {fills} rows.")

# Ensure 'label' is strictly binary {0,1}; if non-binary values remain, coerce and enforce.
if not is_binary_series(df[target_label_col].astype("float").fillna(-1)):
    print("Warning: Non-binary values detected in 'label'. Coercing positive->1, zero/negative->0.")
    df[target_label_col] = (pd.to_numeric(df[target_label_col], errors="coerce").fillna(0) > 0).astype("Int64")

# Drop rows where label is still NaN or not in {0,1} after coercion to guarantee clean target.
valid_label_mask = df[target_label_col].isin([0, 1])
dropped_invalid_label = int((~valid_label_mask).sum())
df = df.loc[valid_label_mask].reset_index(drop=True)
if dropped_invalid_label > 0:
    print(f"Dropped {dropped_invalid_label} rows with invalid 'label'. New shape: {df.shape}")

# -------------------------------------------------------------------
# 5) Domain validity checks and hard constraints
# -------------------------------------------------------------------

print_step_header("5) Domain validity checks and hard constraints")

# Define domain rules for numeric columns with known physical constraints (non-negative, bounded)
non_negative_cols = [
    "dur","sbytes","dbytes","spkts","dpkts",
    "sload","dload","rate",
    "sinpkt","dinpkt","sjit","djit",
    "synack","ackdat","tcprtt",
    "response_body_len",
    "smean","dmean","smeansz","dmeansz",
    "stcpb","dtcpb",
    "trans_depth",
    "sttl","dttl","swin","dwin",
]

# TTL plausible bounds
ttl_cols = ["sttl", "dttl"]
ttl_min, ttl_max = 0, 255

# Known small-domain flag/category columns (will be one-hot encoded later)
small_cat_cols = ["ct_state_ttl", "ct_flw_http_mthd", "is_sm_ips_ports", "is_ftp_login", "ct_ftp_cmd"]

# Enforce numeric dtypes for numeric domain columns when present
present_nonneg_cols = safe_intersect(non_negative_cols, df.columns)
for col in present_nonneg_cols:
    df[col] = pd.to_numeric(df[col], errors="coerce")

# Non-negativity hard filter: drop any row with a negative value across constrained columns
neg_mask_any = pd.Series(False, index=df.index)
for col in present_nonneg_cols:
    neg_mask = df[col] < 0
    neg_mask_any = neg_mask_any | (neg_mask.fillna(False))

neg_count = int(neg_mask_any.sum())
if neg_count > 0:
    print(f"Filtering out {neg_count} rows with negative values in non-negative constrained columns.")
    df = df.loc[~neg_mask_any].reset_index(drop=True)

# TTL hard range check [0, 255]; drop rows outside bounds
present_ttl_cols = safe_intersect(ttl_cols, df.columns)
if present_ttl_cols:
    out_of_range_mask_any = pd.Series(False, index=df.index)
    for col in present_ttl_cols:
        out_of_range_mask = ~df[col].between(ttl_min, ttl_max)
        out_of_range_mask_any = out_of_range_mask_any | (out_of_range_mask.fillna(False))
    out_range_count = int(out_of_range_mask_any.sum())
    if out_range_count > 0:
        print(f"Filtering out {out_range_count} rows with TTL out of [{ttl_min},{ttl_max}].")
        df = df.loc[~out_of_range_mask_any].reset_index(drop=True)

# Binary domain enforcement for is_sm_ips_ports if present (must be {0,1}); drop invalid.
if "is_sm_ips_ports" in df.columns:
    df["is_sm_ips_ports"] = pd.to_numeric(df["is_sm_ips_ports"], errors="coerce").astype("Int64")
    valid_binary = df["is_sm_ips_ports"].isin([0, 1])
    invalid_binary_count = int((~valid_binary).sum())
    if invalid_binary_count > 0:
        print(f"Dropping {invalid_binary_count} rows with invalid 'is_sm_ips_ports' values (expect 0/1).")
        df = df.loc[valid_binary].reset_index(drop=True)

# Ensure we removed any NaNs introduced so far in strict columns (domain-constrained numeric fields)
strict_cols = present_nonneg_cols + present_ttl_cols + (["is_sm_ips_ports"] if "is_sm_ips_ports" in df.columns else [])
if strict_cols:
    nan_mask_any = df[strict_cols].isna().any(axis=1)
    nan_count = int(nan_mask_any.sum())
    if nan_count > 0:
        print(f"Dropping {nan_count} rows with NaN in strict domain columns after enforcement.")
        df = df.loc[~nan_mask_any].reset_index(drop=True)

print(f"Post domain checks shape: {df.shape}")

# -------------------------------------------------------------------
# 6) Continuous feature skew mitigation (log1p)
# -------------------------------------------------------------------

print_step_header("6) Continuous feature skew mitigation (log1p)")

# Columns suited for log1p (only if present and non-negative by design)
# These are often long-tailed in network traffic; log1p compresses scale and preserves zeros.
log1p_candidates = [
    "dur","rate","sload","dload",
    "sbytes","dbytes","spkts","dpkts",
    "sinpkt","dinpkt","sjit","djit",
    "synack","ackdat","tcprtt",
    "response_body_len",
    "smean","dmean","smeansz","dmeansz",
]

log1p_cols = safe_intersect(log1p_candidates, df.columns)

# Apply safe log1p transformation in-place (shifts up if any residual negatives appear unexpectedly)
for col in log1p_cols:
    min_val = df[col].min()
    if pd.notna(min_val) and min_val < 0:
        shift = abs(min_val)
        print(f"Warning: {col} has negative values after domain checks. Shifting by {shift} before log1p.")
        df[col] = df[col] + shift
    df[col] = np.log1p(df[col].astype(float))

print(f"Applied log1p to {len(log1p_cols)} columns: {log1p_cols}")

# -------------------------------------------------------------------
# 7) Extreme outlier capping (winsorization) on log-transformed features
# -------------------------------------------------------------------

print_step_header("7) Extreme outlier capping (winsorization)")
# Winsorization post-log stabilizes extremes while preserving relative ranks.
winsorize_candidates = [
    "rate","sload","dload","sinpkt","dinpkt","sjit","djit",
    "synack","ackdat","tcprtt","dur","sbytes","dbytes"
]
winsor_cols = safe_intersect(winsorize_candidates, df.columns)

for col in winsor_cols:
    lo = df[col].quantile(LOW_Q)
    hi = df[col].quantile(HIGH_Q)
    if pd.isna(lo) or pd.isna(hi):
        continue
    if lo > hi:
        lo, hi = hi, lo
    df[col] = df[col].clip(lower=lo, upper=hi)

print(f"Winsorized {len(winsor_cols)} columns at [{LOW_Q*100:.1f}%, {HIGH_Q*100:.1f}%] quantiles.")

# -------------------------------------------------------------------
# 10) Zero-inflation indicators for structural zeros (do BEFORE feature dropping)
# NOTE (intentional reordering): We place step 10 before step 8 to preserve zero-pattern information for features
# that are subsequently dropped for multicollinearity (e.g., tcprtt, rate, loads).
# -------------------------------------------------------------------

print_step_header("10) Zero-inflation indicators for structural zeros")

zero_indicator_candidates = [
    "dur","sinpkt","dinpkt","sjit","djit","synack","ackdat","tcprtt"
    # We omit rate/sload/dload indicators because they will be dropped for multicollinearity
]
present_zero_cols = safe_intersect(zero_indicator_candidates, df.columns)

zero_ind_cols = []
for col in present_zero_cols:
    ind_col = f"is_zero__{col}"
    # After log1p, a value equal to 0 implies the original value was exactly 0.
    zero_ind = (df[col] == 0).astype("int8")
    df[ind_col] = zero_ind
    zero_ind_cols.append(ind_col)

print(f"Added {len(zero_ind_cols)} zero-indicator columns: {zero_ind_cols}")

# -------------------------------------------------------------------
# 8) Multicollinearity control among derived features
# -------------------------------------------------------------------

print_step_header("8) Multicollinearity control among derived features")

# Drop highly collinear or redundant features (e.g., TCP RTT variants and rates) to reduce leakage risk
# and simplify models. We already captured zero-patterns for some of these above.
drop_for_collinearity = ["tcprtt", "rate", "sload", "dload", "stcpb", "dtcpb"]
drop_existing = safe_intersect(drop_for_collinearity, df.columns)
df.drop(columns=drop_existing, inplace=True, errors="ignore")
print(f"Dropped for multicollinearity (present): {drop_existing}")

# -------------------------------------------------------------------
# 9) Categorical encoding for protocol/state flags (+ step 12 rare cat handling inside)
# -------------------------------------------------------------------

print_step_header("9) Categorical encoding for selected small-domain features (+ rare handling)")

# Selected known small-domain categoricals; will be one-hot encoded with rare category pooling.
ohe_base_cols = ["ct_state_ttl", "ct_flw_http_mthd", "is_sm_ips_ports", "is_ftp_login", "ct_ftp_cmd"]
ohe_cols_present = safe_intersect(ohe_base_cols, df.columns)

created_dummies = []
for col in ohe_cols_present:
    # Treat values as strings for robust one-hot encoding; ensure missing becomes explicit "Unknown"
    col_as_str = df[col].astype("Int64") if pd.api.types.is_numeric_dtype(df[col]) else df[col].astype("string")
    col_as_str = col_as_str.astype("string").fillna("Unknown")

    # Rare category pooling: any category appearing fewer than threshold rows becomes "Other"
    vc = col_as_str.value_counts(dropna=False)
    threshold = max(2, int(math.floor(RARE_CAT_THRESHOLD * len(df))))  # at least 2 rows to keep as unique
    rare_cats = set(vc[vc < threshold].index)
    pooled = col_as_str.where(~col_as_str.isin(rare_cats), other="Other")

    # Create one-hot columns with deterministic naming
    dummies = pd.get_dummies(pooled, prefix=col, prefix_sep="=", dtype=np.uint8)
    df = pd.concat([df.drop(columns=[col]), dummies], axis=1)
    created_dummies.extend(list(dummies.columns))

print(f"One-hot encoded columns: {ohe_cols_present}")
print(f"Created {len(created_dummies)} dummy columns.")

# -------------------------------------------------------------------
# 11) Scaling of continuous features (RobustScaler)
# - Scale only continuous numeric features (exclude targets, OHE binaries, and zero-indicator binaries)
# -------------------------------------------------------------------

print_step_header("11) Scaling of continuous features (RobustScaler)")

# Identify numeric columns post-encoding (includes label, zero indicators, and OHE dummies)
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()

# Build the exclusion set:
exclude_from_scaling = set()
# Targets
exclude_from_scaling.add(target_label_col)
# Zero indicators (binary features we don't scale)
exclude_from_scaling.update(zero_ind_cols)
# OHE dummy columns (uint8 dummies created in step 9)
exclude_from_scaling.update(created_dummies)

# Final list of continuous columns to scale
scale_cols = [c for c in numeric_cols if c not in exclude_from_scaling]

if scale_cols:
    scaler = RobustScaler(with_centering=True, with_scaling=True, quantile_range=(25.0, 75.0))
    df[scale_cols] = scaler.fit_transform(df[scale_cols].astype(float))
    print(f"Scaled {len(scale_cols)} continuous features with RobustScaler.")
else:
    print("No continuous columns identified for scaling.")

# -------------------------------------------------------------------
# 12) Rare category handling (handled in step 9 via pooling before encoding)
# -------------------------------------------------------------------
print_step_header("12) Rare category handling")
print("Rare category pooling performed during step 9 before one-hot encoding.")

# -------------------------------------------------------------------
# 13) Class distribution profiling and artifact check (no modeling)
# -------------------------------------------------------------------

print_step_header("13) Class distribution profiling and artifact check")

# Label distribution
label_counts = df[target_label_col].value_counts(dropna=False)
print("Label distribution:")
print(label_counts.to_string())

# Multiclass distribution if available
if target_multiclass_col in df.columns:
    print("\nattack_cat distribution (top 20):")
    print(df[target_multiclass_col].value_counts(dropna=False).head(20).to_string())

# Quick leakage-like check: high correlation with label for numeric features
print("\nTop 10 absolute correlations with label (numeric features):")
num_for_corr = [c for c in numeric_cols if c != target_label_col and c in df.columns]
if num_for_corr:
    corr = df[num_for_corr].corrwith(df[target_label_col].astype(float)).abs().sort_values(ascending=False)
    print(corr.head(10).to_string())
else:
    print("No numeric features available for correlation analysis.")

# -------------------------------------------------------------------
# 14) Final integrity and export of clean feature matrix
# -------------------------------------------------------------------

print_step_header("14) Final integrity and export of clean feature matrix")

# Replace inf values (should be none) as a safeguard, then check for NaNs
df.replace([np.inf, -np.inf], np.nan, inplace=True)

# VALIDATED CHANGE:
# Previously, code dropped rows if ANY column had NaN (including unused string columns).
# We now enforce NaN-free condition only across numeric columns (model features and targets),
# which aligns with the step's intent and avoids unnecessary row loss.
numeric_cols_final = df.select_dtypes(include=[np.number]).columns.tolist()
nan_mask_numeric = df[numeric_cols_final].isna().any(axis=1)
nan_rows_numeric = int(nan_mask_numeric.sum())
if nan_rows_numeric > 0:
    print(f"Warning: Found {nan_rows_numeric} rows with NaNs in numeric columns after processing; dropping those rows.")
    df = df.loc[~nan_mask_numeric].reset_index(drop=True)

print(f"Final preprocessed shape: {df.shape}")

# Expose the final preprocessed DataFrame for downstream usage (single fully-prepared matrix)
df_preprocessed = df.copy()

# Attempt to save to disk (optional)
try:
    out_dir = str(Path(CSV_PATH).parent)
    out_path = os.path.join(out_dir, OUTPUT_FILENAME)
    df_preprocessed.to_csv(out_path, index=False)
    print(f"Saved preprocessed dataset to: {out_path}")
except Exception as e:
    print(f"Could not save preprocessed CSV due to: {e}")

# For interactive sessions, you can inspect
# print(df_preprocessed.head())


--------------------------------------------------------------------------------
Loading data
--------------------------------------------------------------------------------
Loaded shape: (257673, 42)
Columns: ['id', 'dur', 'spkts', 'dpkts', 'sbytes', 'dbytes', 'rate', 'sttl', 'dttl', 'sload', 'dload', 'sloss', 'dloss', 'sinpkt', 'dinpkt', 'sjit', 'djit', 'swin', 'stcpb', 'dtcpb', 'dwin', 'tcprtt', 'synack', 'ackdat', 'smean', 'dmean', 'trans_depth', 'response_body_len', 'ct_srv_src', 'ct_state_ttl', 'ct_dst_ltm', 'ct_src_dport_ltm', 'ct_dst_sport_ltm', 'ct_dst_src_ltm', 'is_ftp_login', 'ct_ftp_cmd', 'ct_flw_http_mthd', 'ct_src_ltm', 'ct_srv_dst', 'is_sm_ips_ports', 'attack_cat', 'label']

--------------------------------------------------------------------------------
1) Schema lock and dtypes enforcement
--------------------------------------------------------------------------------

--------------------------------------------------------------------------------
2) Identifier rem

  c /= stddev[:, None]
  c /= stddev[None, :]


ct_state_ttl=0     0.675841
sttl               0.518058
ct_state_ttl=1     0.468094
dttl               0.392160
dmean              0.313891
ct_state_ttl=2     0.300556
ackdat             0.295254
dbytes             0.287104
is_zero__dinpkt    0.269314
dinpkt             0.269267

--------------------------------------------------------------------------------
14) Final integrity and export of clean feature matrix
--------------------------------------------------------------------------------
Final preprocessed shape: (160474, 56)
Saved preprocessed dataset to: E:\Datasets\UNSW-NB15\Training and Testing Sets\UNSW_NB15_preprocessed.csv


In [2]:
# Export the cleaned dataframe to a new CSV file
df.to_csv('cleaned_data.csv', index=False)

print("Cleaned DataFrame exported to 'cleaned_data.csv'")

Cleaned DataFrame exported to 'cleaned_data.csv'
