## Initial Data Combination Pipeline

In [14]:
# python
import os
import glob
import csv
import sys
import re
import quopri
import argparse
import multiprocessing
from typing import List, Dict, Optional

import pandas as pd
from bs4 import BeautifulSoup
from tqdm.contrib.concurrent import process_map
from bs4 import MarkupResemblesLocatorWarning
import warnings
warnings.filterwarnings("ignore", category=MarkupResemblesLocatorWarning)

# =========================
# Configuration (edit here)
# =========================

COLUMN_CONFIG = {
    "text_columns": [
        "Email Text",
        "Text",
        "body",
    ],
    "label_columns": [
        "Email Type",
        "Class",
        "label",
    ],
}

LABEL_MAP = {
    "Phishing Email": 1,
    "1.0": 1,
    "1": 1,
    "spam": 1,  # <-- ADDED
    "Safe Email": 0,
    "0.0": 0,
    "0": 0,
    "ham": 0,  # <-- ADDED
}


# =========================
# Helpers
# =========================

def set_max_csv_field_size_limit() -> None:
    """Set the csv field size limit as high as possible."""
    print("Setting maximum CSV field size limit...")
    max_int = sys.maxsize
    while True:
        try:
            csv.field_size_limit(max_int)
            break
        except OverflowError:
            max_int = int(max_int / 10)
    print(f" - CSV field size limit set to {max_int}")


def safe_read_csv(path: str) -> Optional[pd.DataFrame]:
    """Read a CSV using robust fallbacks."""
    try:
        df = pd.read_csv(path, engine="c")
        print(f"   - Read OK (C): {path}")
        return df
    except Exception as e:
        print(f"   - C engine failed for {path}: {e}")

    try:
        print("     - Retrying C + latin1 + on_bad_lines='skip'...")
        df = pd.read_csv(path, engine="c", encoding="latin1", on_bad_lines="skip")
        print(f"     - Read OK (C+latin1+skip): {path}")
        return df
    except Exception as e:
        print(f"     - Fallback 1 failed: {e}")

    try:
        print("     - Retrying PYTHON engine...")
        df = pd.read_csv(path, engine="python")
        print(f"     - Read OK (python): {path}")
        return df
    except Exception as e:
        print(f"     - Fallback 2 failed: {e}")

    try:
        print("     - Retrying PYTHON + latin1...")
        df = pd.read_csv(path, engine="python", encoding="latin1")
        print(f"     - Read OK (python+latin1): {path}")
        return df
    except Exception as e:
        print(f"     - All fallbacks failed for {path}: {e}")
        return None


def consolidate_column(df: pd.DataFrame, candidates: List[str]) -> pd.Series:
    """Overlay multiple columns left-to-right to produce one series."""
    existing = [c for c in candidates if c in df.columns]
    if not existing:
        print(f"   - WARNING: None of {candidates} found. Returning empty column.")
        return pd.Series(index=df.index, dtype=object)

    print(f"   - Consolidating columns: {existing}")
    col = df[existing[0]].copy()
    for c in existing[1:]:
        col = col.fillna(df[c])
    return col


# =========================
# Step 1: Combine raw CSVs
# =========================

def combine_csvs_from_directory(root_directory: str, output_filename: str) -> pd.DataFrame:
    if not os.path.isdir(root_directory):
        raise FileNotFoundError(f"Directory not found: {root_directory}")

    print(f"Scanning for CSVs under: {root_directory}")
    pattern = os.path.join(root_directory, "**", "*.csv")
    files = glob.glob(pattern, recursive=True)
    if not files:
        raise FileNotFoundError(f"No CSV files found in: {root_directory}")

    print(f"Found {len(files)} CSV files. Reading...")
    dfs: List[pd.DataFrame] = []

    for f in files:
        df = safe_read_csv(f)
        if df is not None:
            # --- MODIFICATION: Added source_name and record_id ---
            df["source_file"] = os.path.basename(f)
            df["record_id"] = df.index

            # Get relative path of the file's directory from the root
            relative_dir_path = os.path.relpath(os.path.dirname(f), root_directory)

            if relative_dir_path == ".":
                # File is in the root, use root directory's name as source_name
                df["source_name"] = os.path.basename(os.path.normpath(root_directory))
            else:
                # File is in a subdirectory, use the first-level subdirectory name
                df["source_name"] = relative_dir_path.split(os.sep)[0]
            # --- END MODIFICATION ---

            dfs.append(df)

    if not dfs:
        raise RuntimeError("No data could be read from any CSVs.")

    print("Combining dataframes...")
    combined = pd.concat(dfs, ignore_index=True)  # Creates new 0...N index
    combined.to_csv(output_filename, index=False)
    print(f" - Combined rows: {len(combined)}")
    print(f" - Saved: {output_filename}")
    return combined


# ============================================
# Step 2: Consolidate text/labels and standardize
# ============================================

def process_to_master(
    df: pd.DataFrame,
    output_filename: str,
    column_config: Dict[str, List[str]],
    label_map: Dict[str, int],
) -> pd.DataFrame:
    print(f"Processing to master dataset...")
    print("[STEP 1] Consolidating TEXT columns...")
    master_text = consolidate_column(df, column_config["text_columns"])

    print("[STEP 2] Consolidating LABEL columns...")
    master_label_src = consolidate_column(df, column_config["label_columns"])

    print("[STEP 3] Standardizing labels...")
    normalized_map = {str(k).lower().strip(): v for k, v in label_map.items()}
    normalized_labels = master_label_src.astype(str).str.lower().str.strip()
    master_label = normalized_labels.map(normalized_map)
    print("   - Label mapping done.")

    print("[STEP 4] Creating final frame and cleaning...")
    final_df = pd.DataFrame({"text": master_text, "label": master_label})

    # --- MODIFICATION: Propagate all provenance columns ---
    for col in ["source_file", "source_name", "record_id"]:
        if col in df.columns:
            final_df[col] = df[col]
    # --- END MODIFICATION ---

    total = len(final_df)
    na_text = final_df["text"].isna().sum()
    na_label = final_df["label"].isna().sum()
    print("\n--- Processing Report ---")
    print(f"Total rows read: {total}")
    print(f"Rows with missing text: {na_text}")
    print(f"Rows with unmapped labels: {na_label}")

    # Show unmapped original labels (help extend LABEL_MAP)
    if na_label > 0:
        unmapped_vals = master_label_src[final_df["label"].isna()].dropna().unique()
        print("\n  > Unmapped label values (add to LABEL_MAP if needed):")
        for v in unmapped_vals[:20]:
            print(f"    - '{v}' (Type: {type(v)})")
        if len(unmapped_vals) > 20:
            print(f"    ... and {len(unmapped_vals) - 20} more")

    print("\n[STEP 5] Dropping rows with missing text or labels...")
    before = len(final_df)
    final_df = final_df.dropna(subset=["text", "label"])
    after = len(final_df)
    print(f"   - Dropped {before - after} rows.")
    if after == 0:
        raise RuntimeError("Master dataset empty after cleaning.")

    final_df["label"] = final_df["label"].astype(int)

    final_df.to_csv(output_filename, index=False)
    print(f" - Saved master: {output_filename} ({len(final_df)} rows)")
    return final_df


# =========================
# Step 3: Text cleaning
# =========================

def clean_email_text(text: object) -> str:
    if not isinstance(text, str):
        return ""

    # Decode quoted-printable artifacts
    try:
        text_bytes = text.encode("latin-1", errors="ignore")
        decoded_bytes = quopri.decodestring(text_bytes)
        text = decoded_bytes.decode("utf-8", errors="ignore")
    except Exception:
        pass

    # Strip HTML
    soup = BeautifulSoup(text, "html.parser")
    text = soup.get_text(separator=" ")

    # Lowercase
    text = text.lower()

    # Replace URLs
    text = re.sub(r"(https?://\S+|www\.\S+)", "[url]", text)

    # Replace emails
    text = re.sub(r"\b[a-z0.9._%+-]+@[a-z0.9.-]+\.[a-z]{2,}\b", "[email]", text)

    # Keep alnum, space, and []
    text = re.sub(r"[^a-z0-9\s\[\]]", "", text)

    # Normalize whitespace
    text = re.sub(r"\s+", " ", text).strip()

    return text


def clean_dataset(
    df: pd.DataFrame,
    output_filename: str,
    workers: int,
) -> pd.DataFrame:
    print(f"Cleaning text using {workers} workers...")
    df = df.copy()
    df["text"] = df["text"].fillna("")
    results = process_map(
        clean_email_text,
        df["text"],
        max_workers=workers,
        chunksize=500,
        desc="Cleaning Emails",
    )
    df["text"] = results

    before = len(df)
    df = df[df["text"].str.strip().str.len() > 0]
    dropped = before - len(df)
    if dropped > 0:
        print(f" - Dropped {dropped} rows that became empty after cleaning.")

    # --- MODIFICATION: Reorder columns to include new provenance cols ---
    first_cols = ["text", "label"]
    for col in ["source_file", "source_name", "record_id"]:
        if col in df.columns:
            first_cols.append(col)
    # --- END MODIFICATION ---

    other_cols = [c for c in df.columns if c not in first_cols]
    df = df[first_cols + other_cols]

    df.to_csv(output_filename, index=False)
    print(f" - Saved cleaned: {output_filename} ({len(df)} rows)")
    return df


# =========================
# Step 4: Deduplicate
# =========================

def deduplicate_dataset(
    df: pd.DataFrame,
    output_filename: str,
) -> pd.DataFrame:
    print("Deduplicating by 'text' column...")
    df = df.dropna(subset=["text"])
    before = len(df)
    # Keeps the first occurrence and its metadata (source_file, etc.)
    df = df.drop_duplicates(subset=["text"], keep="first")
    after = len(df)

    print("\n==================================")
    print("Deduplication Report")
    print(f"  Rows before: {before}")
    print(f"  Rows after:  {after}")
    print(f"  Removed:     {before - after}")
    print("==================================")

    df.to_csv(output_filename, index=False)
    print(f" - Saved final: {output_filename} ({after} unique rows)")
    return df


# ===================================================================
# --- SCRIPT EXECUTION ---
# This part runs the pipeline. It's no longer in a 'main' function.
# ===================================================================

# Manually define your paths and settings here
class Args:
    # --- YOUR PATH IS HERE ---
    # Use 'r' prefix for Windows/WSL paths to handle backslashes
    root = r"Dataset/raw - DO NOT OVERWRITE"

    # --- Default output files ---
    combined = "combined_emails.csv"
    master = "master_email_dataset.csv"
    cleaned = "master_email_dataset_cleaned.csv"
    final = "master_email_dataset_final.csv"
    workers = multiprocessing.cpu_count()

args = Args()

# This is needed for multiprocessing to work correctly in some environments
multiprocessing.freeze_support()

# --- Run the pipeline ---

set_max_csv_field_size_limit()

# Step 1: Combine
combined_df = combine_csvs_from_directory(args.root, args.combined)

# Step 2: Consolidate + map labels
master_df = process_to_master(
    combined_df, args.master, COLUMN_CONFIG, LABEL_MAP
)

# Step 3: Clean text
cleaned_df = clean_dataset(master_df, args.cleaned, args.workers)

# Step 4: Deduplicate
deduplicate_dataset(cleaned_df, args.final)

print("\nPipeline complete.")

Setting maximum CSV field size limit...
 - CSV field size limit set to 9223372036854775807
Scanning for CSVs under: Dataset/raw - DO NOT OVERWRITE
Found 14 CSV files. Reading...
   - Read OK (C): Dataset/raw - DO NOT OVERWRITE/Miltchev, R. (2025)/Phishing_validation_emails.csv
   - Read OK (C): Dataset/raw - DO NOT OVERWRITE/Chakraborty, S. (2023)/Phishing_Email.csv
   - Read OK (C): Dataset/raw - DO NOT OVERWRITE/Radev, D. (2008)/fraud_email_.csv
   - Read OK (C): Dataset/raw - DO NOT OVERWRITE/Arifa Islam, C. (2023)/Ling.csv
   - Read OK (C): Dataset/raw - DO NOT OVERWRITE/Arifa Islam, C. (2023)/CEAS-08.csv
   - Read OK (C): Dataset/raw - DO NOT OVERWRITE/Arifa Islam, C. (2023)/Enron.csv
   - Read OK (C): Dataset/raw - DO NOT OVERWRITE/Arifa Islam, C. (2023)/Nazario.csv
   - C engine failed for Dataset/raw - DO NOT OVERWRITE/Arifa Islam, C. (2023)/TREC-05.csv: Error tokenizing data. C error: Buffer overflow caught - possible malformed input file.

     - Retrying C + latin1 + on_bad_

Cleaning Emails: 100%|██████████| 249873/249873 [00:05<00:00, 48623.95it/s]


 - Dropped 287 rows that became empty after cleaning.
 - Saved cleaned: master_email_dataset_cleaned.csv (249586 rows)
Deduplicating by 'text' column...

Deduplication Report
  Rows before: 249586
  Rows after:  220495
  Removed:     29091
 - Saved final: master_email_dataset_final.csv (220495 unique rows)

Pipeline complete.


## Traditional ML Model Pipeline

In [16]:
# python
import os
import re
import quopri
import multiprocessing
import warnings
from typing import List, Dict, Optional

import pandas as pd
from bs4 import BeautifulSoup, MarkupResemblesLocatorWarning
from tqdm.contrib.concurrent import process_map

# Suppress the BeautifulSoup URL warning
warnings.filterwarnings("ignore", category=MarkupResemblesLocatorWarning)

# =========================
# Configuration
# =========================

# --- Step A1 Configuration ---
# Use the master file from your *first* pipeline as the input
ML_INPUT_FILE = "master_email_dataset.csv"
ML_CLEANED_OUTPUT_FILE = "ml_dataset_cleaned.csv"

# --- Step A2 Configuration ---
ML_DEDUPED_OUTPUT_FILE = "ml_dataset_deduped.csv"

# --- Step A3 Configuration ---
MIN_TOKEN_LENGTH = 5
MAX_TOKEN_LENGTH = 2000
ML_FINAL_OUTPUT_FILE = "ml_dataset_final.csv"


# ===================================================================
# Step A1: Text Cleaning & Normalization (Aggressive)
# ===================================================================

def clean_email_text_ml(text: object) -> str:
    """
    Aggressive cleaning function for the ML pipeline.
    """
    if not isinstance(text, str):
        return ""

    # 1. Fix Encoding Artifacts (Quoted-Printable)
    try:
        text_bytes = text.encode("latin-1", errors="ignore")
        decoded_bytes = quopri.decodestring(text_bytes)
        text = decoded_bytes.decode("utf-8", errors="ignore")
    except Exception:
        pass

    # 2. Strip HTML Tags
    soup = BeautifulSoup(text, "html.parser")
    text = soup.get_text(separator=" ")

    # 3. Expand common contractions (before lowercasing)
    text = re.sub(r"n't", " not", text)
    text = re.sub(r"'re", " are", text)
    text = re.sub(r"'s", " is", text)
    text = re.sub(r"'d", " would", text)
    text = re.sub(r"'ll", " will", text)
    text = re.sub(r"'ve", " have", text)
    text = re.sub(r"'m", " am", text)

    # 4. Convert to lowercase
    text = text.lower()

    # 5. Replace URLs with [URL]
    text = re.sub(r"(https?://\S+|www\.\S+)", "[URL]", text)

    # 6. Replace Emails with [EMAIL]
    text = re.sub(r"\b[a-z0-9._%+-]+@[a-z0-9.-]+\.[a-z]{2,}\b", "[EMAIL]", text)

    # 7. Replace digits with [NUM]
    text = re.sub(r"\d+", " [NUM] ", text)

    # 8. Remove non-ASCII and punctuation (keep letters, spaces, and our tokens)
    text = re.sub(r"[^a-z\s\[\]]", "", text)

    # 9. Normalize whitespace to a single space
    text = re.sub(r"\s+", " ", text).strip()

    return text


def clean_dataset_ml(
    input_filename: str,
    output_filename: str,
    workers: int,
) -> pd.DataFrame:
    """
    Loads the master dataset and applies the aggressive ML cleaning.
    """
    print(f"--- [Step A1] Running ML Text Cleaning ---")
    print(f"Loading: {input_filename}")
    try:
        df = pd.read_csv(input_filename)
    except Exception as e:
        print(f"Error loading {input_filename}. Make sure it exists. Error: {e}")
        return pd.DataFrame()

    df = df.dropna(subset=["text"])
    df = df.copy()

    print(f"Cleaning text using {workers} workers...")
    df["text"] = df["text"].fillna("")
    results = process_map(
        clean_email_text_ml,
        df["text"],
        max_workers=workers,
        chunksize=500,
        desc="ML Cleaning",
    )
    df["text"] = results

    # Text Integrity Validation
    before = len(df)
    df = df[df["text"].str.strip().str.len() > 0]
    dropped = before - len(df)
    if dropped > 0:
        print(f" - Dropped {dropped} rows that became empty after cleaning.")

    # Reorder columns
    first_cols = ["text", "label"]
    for col in ["source_file", "source_name", "record_id"]:
        if col in df.columns:
            first_cols.append(col)

    other_cols = [c for c in df.columns if c not in first_cols]
    df = df[first_cols + other_cols]

    df.to_csv(output_filename, index=False)
    print(f" - Saved ML cleaned: {output_filename} ({len(df)} rows)")
    print("--- [Step A1] Complete ---")
    return df


# ===================================================================
# Step A2: Deduplication
# ===================================================================

def deduplicate_dataset_ml(
    df: pd.DataFrame,
    output_filename: str,
) -> pd.DataFrame:
    """
    Deduplicates the cleaned ML dataset.
    """
    print(f"\n--- [Step A2] Running Deduplication ---")
    df = df.dropna(subset=["text"])
    before = len(df)

    df = df.drop_duplicates(subset=["text"], keep="first")
    after = len(df)

    print("\n==================================")
    print("Deduplication Report")
    print(f"  Rows before: {before}")
    print(f"  Rows after:  {after}")
    print(f"  Removed:     {before - after}")
    print("==================================")

    df.to_csv(output_filename, index=False)
    print(f" - Saved ML deduplicated: {output_filename} ({after} unique rows)")
    print("--- [Step A2] Complete ---")
    return df


# ===================================================================
# Step A3: Outlier & Length Filtering
# ===================================================================

def filter_by_length_ml(
    df: pd.DataFrame,
    output_filename: str,
    min_len: int,
    max_len: int,
) -> pd.DataFrame:
    """
    Filters the dataset based on token count.
    """
    print(f"\n--- [Step A3] Running Length Filtering ---")
    print(f"Filtering rows with token count < {min_len} or > {max_len}...")

    df["token_count"] = df["text"].str.split().str.len()
    before = len(df)

    df = df[
        (df["token_count"] >= min_len) & (df["token_count"] <= max_len)
    ]

    after = len(df)
    dropped = before - after

    print("\n==================================")
    print("Length Filtering Report")
    print(f"  Rows before: {before}")
    print(f"  Rows after:  {after}")
    print(f"  Removed:     {dropped}")
    print("==================================")

    df = df.drop(columns=["token_count"])

    df.to_csv(output_filename, index=False)
    print(f" - Saved ML final: {output_filename} ({after} rows)")
    print("--- [Step A3] Complete ---")
    return df


# ===================================================================
# --- SCRIPT EXECUTION ---
# This runs the full ML preprocessing pipeline (A1-A3).
# ===================================================================

multiprocessing.freeze_support()

print("Starting Machine Learning (ML) Preprocessing Pipeline (A1-A3)...")

# Step A1: Clean
cleaned_ml_df = clean_dataset_ml(
    input_filename=ML_INPUT_FILE,
    output_filename=ML_CLEANED_OUTPUT_FILE,
    workers=multiprocessing.cpu_count()
)

if not cleaned_ml_df.empty:
    # Step A2: Deduplicate
    deduped_ml_df = deduplicate_dataset_ml(
        cleaned_ml_df,
        output_filename=ML_DEDUPED_OUTPUT_FILE
    )

    # Step A3: Filter by Length
    final_ml_df = filter_by_length_ml(
        deduped_ml_df,
        output_filename=ML_FINAL_OUTPUT_FILE,
        min_len=MIN_TOKEN_LENGTH,
        max_len=MAX_TOKEN_LENGTH
    )

    print("\nML Preprocessing Pipeline (A1-A3) complete.")
else:
    print("\nML Pipeline stopped: Could not load or clean input file.")

Starting Machine Learning (ML) Preprocessing Pipeline (A1-A3)...
--- [Step A1] Running ML Text Cleaning ---
Loading: master_email_dataset.csv
Cleaning text using 16 workers...


ML Cleaning: 100%|██████████| 249873/249873 [00:03<00:00, 62544.03it/s] 


 - Dropped 287 rows that became empty after cleaning.
 - Saved ML cleaned: ml_dataset_cleaned.csv (249586 rows)
--- [Step A1] Complete ---

--- [Step A2] Running Deduplication ---

Deduplication Report
  Rows before: 249586
  Rows after:  211795
  Removed:     37791
 - Saved ML deduplicated: ml_dataset_deduped.csv (211795 unique rows)
--- [Step A2] Complete ---

--- [Step A3] Running Length Filtering ---
Filtering rows with token count < 5 or > 2000...

Length Filtering Report
  Rows before: 211795
  Rows after:  206883
  Removed:     4912
 - Saved ML final: ml_dataset_final.csv (206883 rows)
--- [Step A3] Complete ---

ML Preprocessing Pipeline (A1-A3) complete.


## DL Preprocessing pipeline

In [17]:
# python
import os
import re
import quopri
import multiprocessing
import warnings
from typing import List, Dict, Optional

import pandas as pd
from bs4 import BeautifulSoup, MarkupResemblesLocatorWarning
from tqdm.contrib.concurrent import process_map

# Suppress the BeautifulSoup URL warning
warnings.filterwarnings("ignore", category=MarkupResemblesLocatorWarning)

# =========================
# Configuration
# =========================

# --- Step B1 Configuration ---
# Use the master file from your *first* pipeline as the input
DL_INPUT_FILE = "master_email_dataset.csv"
DL_CLEANED_OUTPUT_FILE = "dl_dataset_cleaned.csv"

# --- Step B2 Configuration ---
DL_DEDUPED_OUTPUT_FILE = "dl_dataset_deduped.csv"

# --- Step B3 Configuration ---
# Filter settings: remove texts with < 5 tokens or > 2000 tokens.
# We use 2000 as a loose upper bound. The tokenizer will handle
# the final truncation to 512 tokens.
MIN_TOKEN_LENGTH = 5
MAX_TOKEN_LENGTH = 2000
DL_FINAL_OUTPUT_FILE = "dl_dataset_final.csv"


# ===================================================================
# Step B1: Text Cleaning (Transformer Path)
# ===================================================================

def clean_email_text_dl(text: object) -> str:
    """
    Gentle cleaning function for the Transformer (DL) pipeline.

    CRITICAL: Does NOT lowercase or remove punctuation.
    """
    if not isinstance(text, str):
        return ""

    # 1. Fix Encoding Artifacts (Quoted-Printable)
    try:
        text_bytes = text.encode("latin-1", errors="ignore")
        decoded_bytes = quopri.decodestring(text_bytes)
        text = decoded_bytes.decode("utf-8", errors="ignore")
    except Exception:
        pass

    # 2. Strip HTML Tags
    soup = BeautifulSoup(text, "html.parser")
    text = soup.get_text(separator=" ")

    # 3. Replace URLs with [URL]
    # We add spaces around tokens to ensure they are tokenized correctly
    text = re.sub(r"(https?://\S+|www\.\S+)", " [URL] ", text)

    # 4. Replace Emails with [EMAIL]
    text = re.sub(r"\b[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\b", " [EMAIL] ", text)

    # 5. Normalize Whitespace (replace \n, \t, etc. with a single space)
    text = re.sub(r"\s+", " ", text).strip()

    # --- NO lowercasing, NO punctuation removal ---

    return text


def clean_dataset_dl(
    input_filename: str,
    output_filename: str,
    workers: int,
) -> pd.DataFrame:
    """
    Loads the master dataset and applies the gentle DL cleaning.
    """
    print(f"--- [Step B1] Running DL (Transformer) Text Cleaning ---")
    print(f"Loading: {input_filename}")
    try:
        df = pd.read_csv(input_filename)
    except Exception as e:
        print(f"Error loading {input_filename}. Make sure it exists. Error: {e}")
        return pd.DataFrame()

    df = df.dropna(subset=["text"])
    df = df.copy()

    print(f"Cleaning text using {workers} workers...")
    df["text"] = df["text"].fillna("")
    results = process_map(
        clean_email_text_dl,
        df["text"],
        max_workers=workers,
        chunksize=500,
        desc="DL Cleaning",
    )
    df["text"] = results

    # Text Integrity Validation
    before = len(df)
    df = df[df["text"].str.strip().str.len() > 0]
    dropped = before - len(df)
    if dropped > 0:
        print(f" - Dropped {dropped} rows that became empty after cleaning.")

    # Reorder columns
    first_cols = ["text", "label"]
    for col in ["source_file", "source_name", "record_id"]:
        if col in df.columns:
            first_cols.append(col)

    other_cols = [c for c in df.columns if c not in first_cols]
    df = df[first_cols + other_cols]

    df.to_csv(output_filename, index=False)
    print(f" - Saved DL cleaned: {output_filename} ({len(df)} rows)")
    print("--- [Step B1] Complete ---")
    return df


# ===================================================================
# Step B2: Deduplication
# ===================================================================

def deduplicate_dataset_dl(
    df: pd.DataFrame,
    output_filename: str,
) -> pd.DataFrame:
    """
    Deduplicates the cleaned DL dataset.
    """
    print(f"\n--- [Step B2] Running Deduplication ---")
    df = df.dropna(subset=["text"])
    before = len(df)

    df = df.drop_duplicates(subset=["text"], keep="first")
    after = len(df)

    print("\n==================================")
    print("Deduplication Report")
    print(f"  Rows before: {before}")
    print(f"  Rows after:  {after}")
    print(f"  Removed:     {before - after}")
    print("==================================")

    df.to_csv(output_filename, index=False)
    print(f" - Saved DL deduplicated: {output_filename} ({after} unique rows)")
    print("--- [Step B2] Complete ---")
    return df


# ===================================================================
# Step B3: Outlier & Length Filtering
# ===================================================================

def filter_by_length_dl(
    df: pd.DataFrame,
    output_filename: str,
    min_len: int,
    max_len: int,
) -> pd.DataFrame:
    """
    Filters the dataset based on token count.
    """
    print(f"\n--- [Step B3] Running Length Filtering ---")
    print(f"Filtering rows with token count < {min_len} or > {max_len}...")

    # Calculate token count (simple split on space)
    df["token_count"] = df["text"].str.split().str.len()
    before = len(df)

    df = df[
        (df["token_count"] >= min_len) & (df["token_count"] <= max_len)
    ]

    after = len(df)
    dropped = before - after

    print("\n==================================")
    print("Length Filtering Report")
    print(f"  Rows before: {before}")
    print(f"  Rows after:  {after}")
    print(f"  Removed:     {dropped}")
    print("==================================")

    df = df.drop(columns=["token_count"])

    df.to_csv(output_filename, index=False)
    print(f" - Saved DL final: {output_filename} ({after} rows)")
    print("--- [Step B3] Complete ---")
    return df


# ===================================================================
# --- SCRIPT EXECUTION ---
# This runs the full DL preprocessing pipeline (B1-B3).
# ===================================================================

multiprocessing.freeze_support()

print("Starting Deep Learning (Transformer) Preprocessing Pipeline (B1-B3)...")

# Step B1: Clean
cleaned_dl_df = clean_dataset_dl(
    input_filename=DL_INPUT_FILE,
    output_filename=DL_CLEANED_OUTPUT_FILE,
    workers=multiprocessing.cpu_count()
)

if not cleaned_dl_df.empty:
    # Step B2: Deduplicate
    deduped_dl_df = deduplicate_dataset_dl(
        cleaned_dl_df,
        output_filename=DL_DEDUPED_OUTPUT_FILE
    )

    # Step B3: Filter by Length
    final_dl_df = filter_by_length_dl(
        deduped_dl_df,
        output_filename=DL_FINAL_OUTPUT_FILE,
        min_len=MIN_TOKEN_LENGTH,
        max_len=MAX_TOKEN_LENGTH
    )

    print("\nDL (Transformer) Preprocessing Pipeline (B1-B3) complete.")
else:
    print("\nDL Pipeline stopped: Could not load or clean input file.")

Starting Deep Learning (Transformer) Preprocessing Pipeline (B1-B3)...
--- [Step B1] Running DL (Transformer) Text Cleaning ---
Loading: master_email_dataset.csv
Cleaning text using 16 workers...


DL Cleaning: 100%|██████████| 249873/249873 [00:04<00:00, 55606.35it/s]


 - Dropped 161 rows that became empty after cleaning.
 - Saved DL cleaned: dl_dataset_cleaned.csv (249712 rows)
--- [Step B1] Complete ---

--- [Step B2] Running Deduplication ---

Deduplication Report
  Rows before: 249712
  Rows after:  221899
  Removed:     27813
 - Saved DL deduplicated: dl_dataset_deduped.csv (221899 unique rows)
--- [Step B2] Complete ---

--- [Step B3] Running Length Filtering ---
Filtering rows with token count < 5 or > 2000...

Length Filtering Report
  Rows before: 221899
  Rows after:  216259
  Removed:     5640
 - Saved DL final: dl_dataset_final.csv (216259 rows)
--- [Step B3] Complete ---

DL (Transformer) Preprocessing Pipeline (B1-B3) complete.
