# 02 - Feature Engineering with Spark

本 Notebook 聚焦於：
- 使用 PySpark 進行大規模資料處理
- 時間窗口聚合特徵工程（Source IP × 1 分鐘）
- 計算 Count、Diversity、Bytes Ratio、Port Entropy 等特徵
- 證明 Raw NetFlow 不可直接進模型，需要特徵工程
- 產製特徵表並儲存至 `data/processed/features.parquet`


## 1. 環境設定與 Spark Session 初始化


In [None]:
import os
import sys
from pathlib import Path

# 設定 Python 路徑（避免 Spark 找不到 Python）
python_exe = sys.executable
os.environ['PYSPARK_PYTHON'] = python_exe
os.environ['PYSPARK_DRIVER_PYTHON'] = python_exe

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, count, sum as spark_sum, min as spark_min, max as spark_max, avg, countDistinct,
    window, size, expr,  when, to_timestamp, split, log2, isnan, isnull
)
from pyspark.sql.types import StringType, DoubleType
import math

# 設定專案路徑
PROJECT_ROOT = Path.cwd().resolve()
if PROJECT_ROOT.name == "notebooks":
    PROJECT_ROOT = PROJECT_ROOT.parent

DATA_DIR = PROJECT_ROOT / "data" / "processed"
INPUT_PATH = DATA_DIR / "capture20110817_cleaned_spark.parquet"
OUTPUT_PATH = DATA_DIR / "features.parquet"


In [None]:
# 先關閉現有的 SparkSession
try:
    spark.stop()
    print("已關閉現有的 SparkSession")
except:
    print("沒有現有的 SparkSession 需要關閉")

In [None]:
# 建立 SparkSession
# 設定臨時目錄（使用專案目錄，路徑更短，避免 Windows 路徑長度限制）
import os
import tempfile
from pathlib import Path

# 使用專案目錄下的臨時資料夾（路徑更短）
PROJECT_ROOT = Path.cwd().resolve()
if PROJECT_ROOT.name == "notebooks":
    PROJECT_ROOT = PROJECT_ROOT.parent

# 建立臨時目錄
spark_temp_dir = str(PROJECT_ROOT / "spark_temp")
os.makedirs(spark_temp_dir, exist_ok=True)

# 設定 Hadoop 環境變數（Windows 上避免 HADOOP_HOME 錯誤）
os.environ['HADOOP_HOME'] = spark_temp_dir
os.environ['hadoop.home.dir'] = spark_temp_dir

print(f"設定 Spark 臨時目錄: {spark_temp_dir}")

spark = SparkSession.builder \
    .appName("NetworkAnomalyFeatureEngineering") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.local.dir", spark_temp_dir) \
    .config("spark.sql.warehouse.dir", spark_temp_dir) \
    .config("spark.executor.tempDir", spark_temp_dir) \
    .config("spark.driver.tempDir", spark_temp_dir) \
    .config("spark.hadoop.fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem") \
    .config("spark.hadoop.fs.defaultFS", "file:///") \
    .getOrCreate()

print(f"    SparkSession 建立成功！")
print(f"    Spark 版本: {spark.version}")
print(f"    臨時目錄: {spark_temp_dir}")
print(f"    Spark Context: {spark.sparkContext}")

## 2. 讀取資料與基本檢查


In [None]:
# 讀取 Spark 相容的 Parquet 檔案
if not INPUT_PATH.exists():
    raise FileNotFoundError(
        f"找不到 {INPUT_PATH}. 請先執行 scripts/load_raw_data_pyspark.py 產生 Parquet 檔。"
    )

print(f"正在讀取: {INPUT_PATH}")

# 直接使用 Spark 讀取 Parquet 檔案
df = spark.read.parquet(str(INPUT_PATH))

print(f"✅ Spark 讀取完成！")
print(f"   總筆數: {df.count():,}")
print(f"   欄位數: {len(df.columns)}")
print(f"   欄位名稱: {df.columns}")
print("\n資料結構（Schema）：")
df.printSchema()

print("\n前 5 筆資料：")
df.show(5, truncate=False)

# 檢查並轉換時間欄位（從字串轉為 timestamp）
if "Date_Flow_Start" in df.columns:
    from pyspark.sql.types import TimestampType, StringType
    
    # 檢查當前資料型別
    current_type = df.schema["Date_Flow_Start"].dataType
    print(f"\n時間欄位資訊：")
    print(f"   目前型別: {current_type}")
    
    # 如果是字串，轉換為 timestamp
    if isinstance(current_type, StringType):
        print("   轉換 Date_Flow_Start 為 timestamp 類型...")
        df = df.withColumn("Date_Flow_Start", to_timestamp(col("Date_Flow_Start")))
        print("   ✅ 轉換完成")
    
    # 顯示時間範圍
    print("\n時間範圍統計：")
    time_stats = df.agg(
        spark_min("Date_Flow_Start").alias("最早時間"),
        spark_max("Date_Flow_Start").alias("最晚時間"),
        count("Date_Flow_Start").alias("有效記錄數")
    )
    time_stats.show(truncate=False)
    
    # 計算時間跨度（使用 unix_timestamp 轉換為秒數）
    from pyspark.sql.functions import unix_timestamp
    time_span_result = df.agg(
        (unix_timestamp(spark_max("Date_Flow_Start")) - unix_timestamp(spark_min("Date_Flow_Start"))).alias("時間跨度_秒")
    ).collect()[0]["時間跨度_秒"]
    if time_span_result is not None:
        hours = time_span_result / 3600
        days = hours / 24
        print(f"\n   時間跨度: {days:.2f} 天 ({hours:.2f} 小時, {time_span_result:,} 秒)")

## 3. 資料預處理

### 3.1 拆分 IP:Port 欄位


In [None]:
# 拆分 Src_IP_Port 和 Dst_IP_Port
# 格式: "IP:Port" -> 提取 IP 和 Port
# 同一 IP 的不同 Port 應歸為同一來源，需要按 src_ip 分組，而不是 Src_IP_Port

# 拆分 Source IP:Port
df = df.withColumn("src_ip", split(col("Src_IP_Port"), ":").getItem(0)) \
       .withColumn("src_port", split(col("Src_IP_Port"), ":").getItem(1).cast("int"))

# 拆分 Destination IP:Port
df = df.withColumn("dst_ip", split(col("Dst_IP_Port"), ":").getItem(0)) \
       .withColumn("dst_port", split(col("Dst_IP_Port"), ":").getItem(1).cast("int"))

print("✅ IP:Port 欄位拆分完成")
print("\n檢查拆分結果（前 5 筆）：")
df.select("Src_IP_Port", "src_ip", "src_port", "Dst_IP_Port", "dst_ip", "dst_port").show(5, truncate=False)


### 3.2 標籤二元化


In [None]:
print("=" * 60)
print("開始標籤二元化處理...")
print("=" * 60)

# 步驟 1: 使用 sample 取得 Label 樣本（避免 shuffle）
print("\n[步驟 1] 檢查 Label 欄位的樣本值...")
try:
    # 使用 sample 取得少量樣本來檢查 Label 值
    sample_df = df.select("Label").sample(False, 0.001, seed=42).distinct().limit(20)
    sample_labels = [row["Label"] for row in sample_df.collect()]
    print(f"✅ 從樣本中找到 {len(sample_labels)} 個 Label 值")
    print(f"   Label 樣本: {sample_labels}")
except Exception as e:
    print(f"⚠️ 樣本檢查失敗: {e}")
    sample_labels = []

# 步驟 2: 識別 Botnet 標籤
print("\n[步驟 2] 識別 Botnet 標籤...")
botnet_labels = []
if sample_labels:
    # 方法 1: 尋找包含 "botnet" 或 "bot" 的標籤
    botnet_labels = [label for label in sample_labels if label and ("botnet" in str(label).lower() or "bot" in str(label).lower())]
    print(f"   方法 1 - 找到 Botnet 標籤: {botnet_labels}")
    
    # 方法 2: 如果沒有找到，使用更寬鬆的匹配（非正常標籤）
    if not botnet_labels:
        normal_labels = ["Background", "background", "LEGIT", "legit", "Normal", "normal"]
        botnet_labels = [label for label in sample_labels if label and str(label) not in normal_labels]
        print(f"   方法 2 - 使用更寬鬆匹配，找到異常標籤: {botnet_labels}")

# 步驟 3: 執行標籤二元化（使用簡單的條件判斷，避免需要知道所有標籤）
print("\n[步驟 3] 執行標籤二元化...")
try:
    # 使用簡單的條件判斷：包含 "botnet" 或 "bot" 的標籤 → 1，否則 → 0
    # 這樣不需要知道所有標籤值，避免 shuffle
    df = df.withColumn(
        "label_binary",
        when(
            col("Label").rlike("(?i).*botnet.*") | col("Label").rlike("(?i).*bot.*"),
            1
        ).otherwise(0)
    )
    
    if botnet_labels:
        print(f"✅ 標籤二元化完成（預期 Botnet 標籤: {botnet_labels}）")
    else:
        print("✅ 標籤二元化完成（使用模式匹配：包含 'botnet' 或 'bot' → 1）")
except Exception as e:
    print(f"❌ 錯誤: {e}")

# 步驟 4: 顯示結果（使用 sample 避免 shuffle）
print("\n[步驟 4] 顯示二元化結果（使用樣本）...")
try:
    # 使用 sample 來顯示結果，避免對整個資料集進行 groupBy
    sample_result = df.sample(False, 0.01, seed=42)
    
    print("\n二元化標籤分布摘要（樣本）：")
    sample_result.groupBy("label_binary").count().orderBy("label_binary").show()
    
    print("\n原始標籤與二元化標籤對應（樣本，前 10 個）：")
    sample_result.groupBy("Label", "label_binary").count().orderBy("count", ascending=False).show(10, truncate=False)
    
    # 使用簡單的 count 來顯示整體分布（不需要 shuffle）
    print("\n整體二元化標籤分布（使用近似計數）：")
    total_count = df.count()
    botnet_count = df.filter(col("label_binary") == 1).count()
    normal_count = total_count - botnet_count
    print(f"   總筆數: {total_count:,}")
    print(f"   異常 (label_binary=1): {botnet_count:,} ({botnet_count/total_count*100:.2f}%)")
    print(f"   正常 (label_binary=0): {normal_count:,} ({normal_count/total_count*100:.2f}%)")
    
    print("\n" + "=" * 60)
    print("✅ 標籤二元化處理完成！")
    print("=" * 60)
except Exception as e:
    print(f"❌ 錯誤: {e}")


### 3.3 時間欄位處理


In [None]:
# 確保時間欄位是 timestamp 類型 (抽樣檢查即可)
# capture20110817_cleaned_spark.parquet 已經將 datetime 轉換為字串，讓 Spark 可以正確讀取

print("=" * 60)
print("時間欄位處理")
print("=" * 60)

if "Date_Flow_Start" in df.columns:
    df = df.withColumn("timestamp", to_timestamp(col("Date_Flow_Start")))
    print("✅ 時間欄位轉換完成")
    
    # 顯示樣本（不觸發 shuffle，快速）
    print("\n時間資料樣本（前 10 筆）：")
    time_samples = df.select("timestamp").head(10)
    for i, row in enumerate(time_samples, 1):
        print(f"   {i:2d}. {row['timestamp']}")
    
    # 使用較大樣本進行時間範圍估算（不觸發 shuffle）
    print("\n時間範圍估算（基於前 10000 筆樣本）：")
    try:
        sample_size = 10000
        time_samples = df.select("timestamp").head(sample_size)
        
        if time_samples:
            import pandas as pd
            timestamps = [row['timestamp'] for row in time_samples if row['timestamp'] is not None]
            
            if timestamps:
                timestamps_pd = pd.Series(timestamps)
                print(f"   樣本大小: {len(timestamps)} 筆")
                print(f"   樣本最早時間: {timestamps_pd.min()}")
                print(f"   樣本最晚時間: {timestamps_pd.max()}")
                time_span = timestamps_pd.max() - timestamps_pd.min()
                print(f"   樣本時間跨度: {time_span}")
                print(f"\n   ⚠️ 注意：這是樣本統計，不是完整資料集的統計")
    except Exception as e:
        print(f"   ⚠️ 計算樣本統計時發生錯誤: {e}")
    
    print("\n✅ 時間欄位處理完成！")
else:
    print("⚠️ 找不到 Date_Flow_Start 欄位")

## 4. 時間窗口聚合特徵工程

### 4.1 基礎統計特徵（Source IP × 1 分鐘窗口）
原始資料：
- 每個 NetFlow 記錄 = 1 筆
- 例如：IP A 在 12:01:00-12:02:00 有 100 個連接 → 100 筆原始記錄

特徵表：
- 每個 (src_ip, 1分鐘窗口) = 1 筆特徵
- 例如：IP A 在 12:01:00-12:02:00 → 1 筆特徵（聚合了 100 個連接）

壓縮比 = 原始筆數 / 特徵筆數
       = 8,087,512 / 1,650,021
       ≈ 4.9
這表示平均每個 (src_ip, 1分鐘窗口) 約有 4.9 筆原始記錄

In [None]:
# 以 Source IP + 1 分鐘窗口進行聚合
# 計算：Flow Count, Total Bytes, Total Packets, Average Duration

print("=" * 60)
print("基礎統計特徵計算")
print("=" * 60)

# 步驟 1: 檢查必要的欄位是否存在
print("\n[步驟 1] 檢查必要欄位...")
required_columns = ["src_ip", "timestamp", "Bytes", "Packets", "Duration", "label_binary"]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
    print(f"❌ 缺少必要欄位: {missing_columns}")
    print(f"   當前欄位: {df.columns}")
    raise ValueError(f"缺少必要欄位: {missing_columns}")
else:
    print(f"✅ 所有必要欄位都存在: {required_columns}")

# 步驟 2: 檢查 timestamp 欄位的型別
print("\n[步驟 2] 檢查 timestamp 欄位型別...")
try:
    timestamp_type = df.schema["timestamp"].dataType
    print(f"   timestamp 型別: {timestamp_type}")
    if "Timestamp" not in str(timestamp_type):
        print("   ⚠️ 警告: timestamp 欄位可能不是 TimestampType")
except Exception as e:
    print(f"   ❌ 檢查 timestamp 欄位時發生錯誤: {e}")

# 步驟 3: 測試 window() 函數
print("\n[步驟 3] 測試 window() 函數...")
try:
    from pyspark.sql.functions import window
    test_window = window(col("timestamp"), "1 minute")
    print("✅ window() 函數可用")
except Exception as e:
    print(f"❌ window() 函數不可用: {e}")
    import traceback
    traceback.print_exc()
    raise

# 步驟 4: 執行 groupBy 和聚合
print("\n[步驟 4] 執行 groupBy 和聚合（這可能需要一些時間）...")
try:
    features_base = df.groupBy(
        "src_ip",
        window(col("timestamp"), "1 minute").alias("time_window")
    ).agg(
        count("*").alias("flow_count"),  # 流量數量
        spark_sum("Bytes").alias("total_bytes"),  # 總位元組數
        spark_sum("Packets").alias("total_packets"),  # 總封包數
        avg("Duration").alias("avg_duration"),  # 平均持續時間
        spark_min("Duration").alias("min_duration"),
        spark_max("Duration").alias("max_duration"),
        # ==========================================
        # [新增] 聚合 Label 作為ML模型訓練的標籤 (aka 答案)
        # 如果窗口內有任一 Botnet 流量 (1)，則該窗口視為異常 (1)
        spark_max("label_binary").alias("label") 
        # ==========================================
    )
    print("✅ groupBy 和聚合操作完成")
except Exception as e:
    print(f"❌ groupBy 和聚合操作失敗: {e}")
    print(f"   錯誤類型: {type(e).__name__}")
    import traceback
    traceback.print_exc()
    raise

# 步驟 5: 計算特徵表筆數（這會觸發 action）
print("\n[步驟 5] 計算特徵表筆數（這可能需要一些時間）...")
try:
    feature_count = features_base.count()
    print(f"✅ 基礎統計特徵計算完成")
    print(f"   特徵表筆數: {feature_count:,}")
except Exception as e:
    print(f"❌ 計算特徵表筆數時發生錯誤: {e}")
    import traceback
    traceback.print_exc()
    raise

# 步驟 6: 顯示前 5 筆特徵
print("\n[步驟 6] 顯示前 5 筆特徵...")
try:
    print("\n前 5 筆特徵：")
    features_base.show(5, truncate=False)
    print("\n✅ 所有操作完成！")
except Exception as e:
    print(f"❌ 顯示特徵時發生錯誤: {e}")
    import traceback
    traceback.print_exc()

print("\n" + "=" * 60)
print("基礎統計特徵計算完成！")
print("=" * 60)

### 4.2 Diversity 特徵
#### Diversity 特徵的意義：計算不同值的數量（countDistinct）
- dst_ip_diversity（目標 IP 多樣性）：
    - 高值：短時間內連接多個不同目標 IP，可能表示：掃描行為、DDoS 攻擊
- dst_port_diversity（目標 Port 多樣性）：
  - 高值：短時間內連接多個不同 Port，可能表示：端口掃描行為
- protocol_diversity（協定多樣性）：
  - 高值：使用多種不同協定，可能表示：複雜的網路行為


In [None]:
# 功能：資料預處理
# 拆分 IP:Port 欄位
# 處理只有 IP 沒有 Port 的情況，以及 Port 為非數字的情況

print("=" * 60)
print("拆分 IP:Port 欄位（安全版本 - 使用 try_cast）")
print("=" * 60)

from pyspark.sql.functions import split, size, when, expr

print("\n[步驟 1] 拆分 Source IP:Port...")
try:
    # 使用 try_cast，如果轉換失敗會返回 NULL
    df = df.withColumn(
        "src_ip", 
        split(col("Src_IP_Port"), ":").getItem(0)
    ).withColumn(
        "src_port",
        when(
            size(split(col("Src_IP_Port"), ":")) > 1,
            expr("try_cast(split(Src_IP_Port, ':')[1] as int)")
        ).otherwise(None)
    )
    print("✅ Source IP:Port 拆分完成")
except Exception as e:
    print(f"❌ 拆分 Source IP:Port 時發生錯誤: {e}")
    import traceback
    traceback.print_exc()
    raise

print("\n[步驟 2] 拆分 Destination IP:Port...")
try:
    df = df.withColumn(
        "dst_ip",
        split(col("Dst_IP_Port"), ":").getItem(0)
    ).withColumn(
        "dst_port",
        when(
            size(split(col("Dst_IP_Port"), ":")) > 1,
            expr("try_cast(split(Dst_IP_Port, ':')[1] as int)")
        ).otherwise(None)
    )
    print("✅ Destination IP:Port 拆分完成")
except Exception as e:
    print(f"❌ 拆分 Destination IP:Port 時發生錯誤: {e}")
    import traceback
    traceback.print_exc()
    raise

print("\n[步驟 3] 檢查拆分結果...")
try:
    # 統計 NULL 值
    src_port_null = df.filter(col("src_port").isNull()).count()
    dst_port_null = df.filter(col("dst_port").isNull()).count()
    
    print(f"   src_port NULL 值: {src_port_null:,}")
    print(f"   dst_port NULL 值: {dst_port_null:,}")
    
    # 顯示拆分結果樣本
    print("\n   拆分結果樣本（前 10 筆）：")
    samples = df.select(
        "Src_IP_Port", "src_ip", "src_port",
        "Dst_IP_Port", "dst_ip", "dst_port"
    ).head(10)
    for i, row in enumerate(samples, 1):
        print(f"   {i}. Src: {row['Src_IP_Port']} -> {row['src_ip']}:{row['src_port']}")
        print(f"      Dst: {row['Dst_IP_Port']} -> {row['dst_ip']}:{row['dst_port']}")
    
    # 顯示有 NULL port 的樣本（包括無法轉換的）
    if dst_port_null > 0:
        print(f"\n   有 NULL port 的樣本（前 5 筆）：")
        null_samples = df.filter(col("dst_port").isNull()).select(
            "Dst_IP_Port", "dst_ip", "dst_port"
        ).head(5)
        for i, row in enumerate(null_samples, 1):
            print(f"   {i}. {row['Dst_IP_Port']} -> {row['dst_ip']}:{row['dst_port']}")
    
    print("\n✅ IP:Port 欄位拆分完成！")
except Exception as e:
    print(f"❌ 檢查拆分結果時發生錯誤: {e}")
    import traceback
    traceback.print_exc()

print("\n" + "=" * 60)
print("拆分完成")
print("=" * 60)

In [None]:
# 計算每個 Source IP 在每個時間窗口內的 Diversity
# - Destination IP Diversity: 目標 IP 的多樣性（掃描行為指標）
# - Destination Port Diversity: 目標 Port 的多樣性（端口掃描指標）
# - Protocol Diversity: 協定的多樣性（協議使用多樣性）

print("=" * 60)
print("Diversity 特徵計算")
print("=" * 60)

# 步驟 1: 檢查必要欄位
print("\n[步驟 1] 檢查必要欄位...")
required_columns = ["src_ip", "timestamp", "dst_ip", "dst_port", "Prot"]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
    print(f"❌ 缺少必要欄位: {missing_columns}")
    print(f"   當前欄位: {df.columns}")
    raise ValueError(f"缺少必要欄位: {missing_columns}")
else:
    print(f"✅ 所有必要欄位都存在: {required_columns}")

# 步驟 2: 檢查並清理資料（過濾掉 NULL 值，避免 countDistinct 問題）
print("\n[步驟 2] 檢查並清理資料...")
try:
    # 檢查 NULL 值數量
    null_counts = df.filter(
        (col("dst_ip").isNull()) | 
        (col("dst_port").isNull()) | 
        (col("Prot").isNull())
    ).count()
    
    if null_counts > 0:
        print(f"   ⚠️ 發現 {null_counts:,} 筆資料有 NULL 值")
        print(f"   注意：countDistinct 會自動忽略 NULL 值，不需要過濾")
    else:
        print("   ✅ 沒有發現 NULL 值")
    
    # countDistinct 會自動忽略 NULL 值，所以不需要過濾
    df_clean = df
    print(f"   使用資料筆數: {df_clean.count():,}")
except Exception as e:
    print(f"   ⚠️ 檢查資料時發生錯誤: {e}")
    df_clean = df

# 步驟 3: 計算 Diversity 特徵
print("\n[步驟 3] 計算 Diversity 特徵（這可能需要一些時間）...")
try:
    features_diversity = df_clean.groupBy(
        "src_ip",
        window(col("timestamp"), "1 minute").alias("time_window")
    ).agg(
        countDistinct("dst_ip").alias("dst_ip_diversity"),      # 目標 IP 多樣性
        countDistinct("dst_port").alias("dst_port_diversity"),  # 目標 Port 多樣性
        countDistinct("Prot").alias("protocol_diversity")       # 協定多樣性
    )
    print("✅ groupBy 和聚合操作完成")
except Exception as e:
    print(f"❌ 計算 Diversity 特徵時發生錯誤: {e}")
    import traceback
    traceback.print_exc()
    raise

# 步驟 4: 計算特徵表筆數
print("\n[步驟 4] 計算特徵表筆數（這可能需要一些時間）...")
try:
    diversity_count = features_diversity.count()
    print("✅ Diversity 特徵計算完成")
    print(f"   特徵表筆數: {diversity_count:,}")
    
    # 驗證筆數是否與 features_base 一致
    if 'features_base' in dir():
        base_count = features_base.count()
        if diversity_count == base_count:
            print(f"   ✅ 筆數與 features_base 一致: {base_count:,}")
        else:
            print(f"   ⚠️ 筆數與 features_base 不一致:")
            print(f"      features_base: {base_count:,}")
            print(f"      features_diversity: {diversity_count:,}")
except Exception as e:
    print(f"❌ 計算特徵表筆數時發生錯誤: {e}")
    import traceback
    traceback.print_exc()
    raise

# 步驟 5: 顯示前 5 筆特徵
print("\n[步驟 5] 顯示前 5 筆特徵...")
try:
    print("\n前 5 筆 Diversity 特徵：")
    # 使用 head() 避免可能的顯示問題
    diversity_samples = features_diversity.head(5)
    for i, row in enumerate(diversity_samples, 1):
        print(f"\n   {i}. src_ip: {row['src_ip']}")
        print(f"      time_window: {row['time_window']}")
        print(f"      dst_ip_diversity: {row['dst_ip_diversity']}")
        print(f"      dst_port_diversity: {row['dst_port_diversity']}")
        print(f"      protocol_diversity: {row['protocol_diversity']}")
    
    # 顯示統計摘要（使用樣本）
    print("\nDiversity 特徵統計摘要（基於樣本）...")
    sample_size = 10000
    diversity_sample = features_diversity.head(sample_size)
    
    if diversity_sample:
        import pandas as pd
        diversity_pd = pd.DataFrame([row.asDict() for row in diversity_sample])
        print("\n統計摘要：")
        print(diversity_pd[["dst_ip_diversity", "dst_port_diversity", "protocol_diversity"]].describe())
    
    print("\n✅ 所有操作完成！")
except Exception as e:
    print(f"❌ 顯示特徵時發生錯誤: {e}")
    import traceback
    traceback.print_exc()
    # 即使顯示失敗，特徵計算可能已經成功
    print("\n⚠️ 注意：雖然顯示失敗，但 features_diversity 可能已經成功建立")
    print("   可以嘗試: features_diversity.head(5) 來查看資料")

print("\n" + "=" * 60)
print("Diversity 特徵計算完成！")
print("=" * 60)

### 4.3 Ratio 特徵
原始資料的問題：
- 單一 NetFlow 記錄只是一個時間點的流量
- 無法直接看出「平均行為模式」
- 無法區分「正常的大量流量」和「異常的大量流量」

Ratio 特徵：
- 時間窗口聚合：將多個流量記錄聚合為一個特徵
- 標準化：除以流量數量，消除「流量多 = 異常」的誤判
- 捕捉模式：識別異常的流量模式，而不只是流量大小
#### Ratio 特徵意義
1. bytes_per_flow（平均每個流量的位元組數）
意義：
- 衡量每個網路連接的平均資料傳輸量
- 反映流量的「大小」特徵
異常檢測用途：
- 異常高值：可能表示
  - 大檔案傳輸（正常但值得關注）
  - DDoS 攻擊（大量資料傳輸）
  - 資料外洩（異常大量資料傳輸）
- 異常低值：可能表示
    -掃描行為（很多小連接，每個連接資料量很小）
    - 心跳包（維持連接的小封包）
1. packets_per_flow（平均每個流量的封包數）
意義：
- 衡量每個網路連接的平均封包數量
- 反映流量的「頻率」特徵
異常檢測用途：
- 異常高值：可能表示
  - 頻繁的資料交換（正常但值得關注）
  - 攻擊行為（大量小封包）
- 異常低值：可能表示
  - 掃描行為（每個連接只有少量封包）
  - 連接失敗（只有初始封包，沒有後續資料）

In [None]:
# 計算 Bytes Ratio 和 Packets Ratio
# 這裡我們計算每個 Source IP 的平均 Bytes/Flow 和 Packets/Flow

print("=" * 60)
print("Ratio 特徵計算")
print("=" * 60)

# 步驟 1: 檢查必要欄位
print("\n[步驟 1] 檢查必要欄位...")
required_columns = ["src_ip", "timestamp", "Bytes", "Packets"]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
    print(f"❌ 缺少必要欄位: {missing_columns}")
    print(f"   當前欄位: {df.columns}")
    raise ValueError(f"缺少必要欄位: {missing_columns}")
else:
    print(f"✅ 所有必要欄位都存在: {required_columns}")

# 步驟 2: 檢查資料是否有 NULL 值
print("\n[步驟 2] 檢查資料品質...")
try:
    bytes_null = df.filter(col("Bytes").isNull()).count()
    packets_null = df.filter(col("Packets").isNull()).count()
    print(f"   Bytes NULL 值: {bytes_null:,}")
    print(f"   Packets NULL 值: {packets_null:,}")
    
    if bytes_null > 0 or packets_null > 0:
        print("   ⚠️ 發現 NULL 值，聚合時會自動忽略")
except Exception as e:
    print(f"   ⚠️ 檢查資料品質時發生錯誤: {e}")

# 步驟 3: 計算 Ratio 特徵
print("\n[步驟 3] 計算 Ratio 特徵（這可能需要一些時間）...")
print("   正在執行 groupBy 和聚合操作...")
try:
    features_ratio = df.groupBy(
        "src_ip",
        window(col("timestamp"), "1 minute").alias("time_window")
    ).agg(
        (spark_sum("Bytes") / count("*")).alias("bytes_per_flow"),  # 平均每個流量的位元組數
        (spark_sum("Packets") / count("*")).alias("packets_per_flow")  # 平均每個流量的封包數
    )
    print("✅ groupBy 和聚合操作完成")
except Exception as e:
    print(f"❌ 計算 Ratio 特徵時發生錯誤: {e}")
    import traceback
    traceback.print_exc()
    raise

# 步驟 4: 計算特徵表筆數（這會觸發 action）
print("\n[步驟 4] 計算特徵表筆數（這可能需要一些時間）...")
print("   正在執行 count() 操作...")
try:
    ratio_count = features_ratio.count()
    print("✅ Ratio 特徵計算完成")
    print(f"   特徵表筆數: {ratio_count:,}")
    
    # 驗證筆數是否與 features_base 一致
    if 'features_base' in dir():
        base_count = features_base.count()
        if ratio_count == base_count:
            print(f"   ✅ 筆數與 features_base 一致: {base_count:,}")
        else:
            print(f"   ⚠️ 筆數與 features_base 不一致:")
            print(f"      features_base: {base_count:,}")
            print(f"      features_ratio: {ratio_count:,}")
except Exception as e:
    print(f"❌ 計算特徵表筆數時發生錯誤: {e}")
    import traceback
    traceback.print_exc()
    raise

# 步驟 5: 顯示前 5 筆特徵
print("\n[步驟 5] 顯示前 5 筆特徵...")
try:
    print("\n前 5 筆 Ratio 特徵：")
    # 使用 head() 避免可能的顯示問題
    ratio_samples = features_ratio.head(5)
    for i, row in enumerate(ratio_samples, 1):
        print(f"\n   {i}. src_ip: {row['src_ip']}")
        print(f"      time_window: {row['time_window']}")
        print(f"      bytes_per_flow: {row['bytes_per_flow']:.2f}")
        print(f"      packets_per_flow: {row['packets_per_flow']:.2f}")
    
    print("\n✅ 所有操作完成！")
except Exception as e:
    print(f"❌ 顯示特徵時發生錯誤: {e}")
    import traceback
    traceback.print_exc()
    # 即使顯示失敗，特徵計算可能已經成功
    print("\n⚠️ 注意：雖然顯示失敗，但 features_ratio 可能已經成功建立")
    print("   可以嘗試: features_ratio.head(5) 來查看資料")

print("\n" + "=" * 60)
print("Ratio 特徵計算完成！")
print("=" * 60)


### 4.4 Entropy 特徵
#### Entropy 數學意義
Entropy 使用 Shannon Entropy 公式計算：

$Entropy = -Σ(p(x) * log₂(p(x)))$

其中 p(x) 是某個值出現的機率。Entropy 衡量分布的隨機性或多樣性：
- 高 Entropy：分布較均勻、多樣性高
- 低 Entropy：分布集中、多樣性低

#### 特徵意義
1. Port Entropy (port_entropy)：衡量目標 Port 分布的隨機性。
   - 高 Port Entropy（接近 log₂(不同 port 數量)）：
     - 表示 Port 分布較均勻
     - 可能表示：端口掃描、隨機連接嘗試、異常行為
   - 低 Port Entropy（接近 0）：
     - 表示集中在少數 Port
     - 可能表示：正常服務連接（如只連 80、443）
2. Protocol Entropy (protocol_entropy)：衡量協定（TCP、UDP 等）分布的隨機性。
   - 高 Protocol Entropy：
     - 表示使用多種協定且分布均勻
     - 可能表示：複雜攻擊、多協定掃描
   - 低 Protocol Entropy：
     - 表示主要使用單一協定
     - 可能表示：正常服務（如只使用 TCP）

In [None]:
# 計算 Port Entropy 和 Protocol Entropy
# Entropy = -Σ(p(x) * log2(p(x)))
# Entropy 衡量分布的隨機性/多樣性

# 使用 Spark SQL 內建函數，避免 UDF 和 collect_list 的記憶體問題

print("=" * 60)
print("Entropy 特徵計算（優化版本 - 使用 Spark SQL）")
print("=" * 60)

# 步驟 1: 檢查必要欄位
print("\n[步驟 1] 檢查必要欄位...")
required_columns = ["src_ip", "timestamp", "dst_port", "Prot"]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
    print(f"❌ 缺少必要欄位: {missing_columns}")
    raise ValueError(f"缺少必要欄位: {missing_columns}")
else:
    print(f"✅ 所有必要欄位都存在: {required_columns}")

# 步驟 2: 計算每個值的出現次數和總數
print("\n[步驟 2] 計算每個值的出現次數（這可能需要一些時間）...")
try:
    # 先計算每個 (src_ip, time_window, dst_port) 的出現次數
    port_counts = df.filter(col("dst_port").isNotNull()).groupBy(
        "src_ip",
        window(col("timestamp"), "1 minute").alias("time_window"),
        "dst_port"
    ).agg(
        count("*").alias("port_count")
    )
    
    # 計算每個窗口的總數
    port_totals = port_counts.groupBy("src_ip", "time_window").agg(
        spark_sum("port_count").alias("total_count")
    )
    
    # 計算每個窗口的 Entropy
    # Entropy = -Σ(p * log2(p)) where p = count / total
    port_entropy_df = port_counts.join(
        port_totals, ["src_ip", "time_window"], "inner"
    ).withColumn(
        "p", col("port_count") / col("total_count")
    ).withColumn(
        "entropy_component", 
        -col("p") * log2(col("p"))
    ).groupBy("src_ip", "time_window").agg(
        spark_sum("entropy_component").alias("port_entropy")
    )
    
    print("✅ Port Entropy 計算完成")
except Exception as e:
    print(f"❌ 計算 Port Entropy 時發生錯誤: {e}")
    import traceback
    traceback.print_exc()
    raise

# 步驟 3: 計算 Protocol Entropy
print("\n[步驟 3] 計算 Protocol Entropy（這可能需要一些時間）...")
try:
    protocol_counts = df.filter(col("Prot").isNotNull()).groupBy(
        "src_ip",
        window(col("timestamp"), "1 minute").alias("time_window"),
        "Prot"
    ).agg(
        count("*").alias("protocol_count")
    )
    
    protocol_totals = protocol_counts.groupBy("src_ip", "time_window").agg(
        spark_sum("protocol_count").alias("total_count")
    )
    
    protocol_entropy_df = protocol_counts.join(
        protocol_totals, ["src_ip", "time_window"], "inner"
    ).withColumn(
        "p", col("protocol_count") / col("total_count")
    ).withColumn(
        "entropy_component",
        -col("p") * log2(col("p"))
    ).groupBy("src_ip", "time_window").agg(
        spark_sum("entropy_component").alias("protocol_entropy")
    )
    
    print("✅ Protocol Entropy 計算完成")
except Exception as e:
    print(f"❌ 計算 Protocol Entropy 時發生錯誤: {e}")
    import traceback
    traceback.print_exc()
    raise

# 步驟 4: 合併兩個 Entropy 特徵
print("\n[步驟 4] 合併 Entropy 特徵...")
try:
    features_entropy = port_entropy_df.join(
        protocol_entropy_df, ["src_ip", "time_window"], "outer"
    ).fillna(0.0, subset=["port_entropy", "protocol_entropy"])
    
    print("✅ Entropy 特徵合併完成")
except Exception as e:
    print(f"❌ 合併 Entropy 特徵時發生錯誤: {e}")
    import traceback
    traceback.print_exc()
    raise

# 步驟 5: 驗證結果
print("\n[步驟 5] 驗證結果...")
try:
    entropy_count = features_entropy.count()
    print(f"✅ Entropy 特徵計算完成")
    print(f"   特徵表筆數: {entropy_count:,}")
    
    # 顯示前 5 筆（使用 head 避免可能的顯示問題）
    print("\n前 5 筆 Entropy 特徵：")
    entropy_samples = features_entropy.head(5)
    for i, row in enumerate(entropy_samples, 1):
        print(f"\n   {i}. src_ip: {row['src_ip']}")
        print(f"      time_window: {row['time_window']}")
        print(f"      port_entropy: {row['port_entropy']:.4f}")
        print(f"      protocol_entropy: {row['protocol_entropy']:.4f}")
    
    print("\n✅ 所有操作完成！")
except Exception as e:
    print(f"❌ 驗證結果時發生錯誤: {e}")
    import traceback
    traceback.print_exc()
    raise

print("\n" + "=" * 60)
print("Entropy 特徵計算完成！")
print("=" * 60)

## 5. 合併所有特徵


In [None]:
# 合併所有特徵表
# 使用 src_ip 和 time_window 作為 key

# ===== 過濾異常的 src_ip =====
print("=" * 60)
print("過濾異常的 src_ip")
print("=" * 60)

# 定義有效的 IP 格式（基本驗證：4 個數字用點分隔）
# 過濾掉：NULL、空字串、"0"、不符合 IP 格式的值

# 定義過濾條件（重複使用）
ip_filter = (
    col("src_ip").isNotNull() &
    (col("src_ip") != "") &
    (col("src_ip") != "0") &
    col("src_ip").rlike(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$")
)

print("\n[步驟 1] 過濾 features_base...")
try:
    features_base_clean = features_base.filter(ip_filter)
    base_before = features_base.count()
    base_after = features_base_clean.count()
    print(f"   ✅ features_base: {base_before:,} -> {base_after:,} (過濾 {base_before - base_after:,} 筆)")
except Exception as e:
    print(f"   ❌ features_base 過濾失敗: {e}")
    print("   ⚠️ 使用原始 features_base")
    features_base_clean = features_base
    base_before = base_after = features_base.count()

print("\n[步驟 2] 過濾 features_diversity...")
try:
    features_diversity_clean = features_diversity.filter(ip_filter)
    diversity_before = features_diversity.count()
    diversity_after = features_diversity_clean.count()
    print(f"   ✅ features_diversity: {diversity_before:,} -> {diversity_after:,} (過濾 {diversity_before - diversity_after:,} 筆)")
except Exception as e:
    print(f"   ❌ features_diversity 過濾失敗: {e}")
    print("   ⚠️ 使用原始 features_diversity")
    features_diversity_clean = features_diversity
    diversity_before = diversity_after = features_diversity.count()

print("\n[步驟 3] 過濾 features_ratio...")
try:
    features_ratio_clean = features_ratio.filter(ip_filter)
    ratio_before = features_ratio.count()
    ratio_after = features_ratio_clean.count()
    print(f"   ✅ features_ratio: {ratio_before:,} -> {ratio_after:,} (過濾 {ratio_before - ratio_after:,} 筆)")
except Exception as e:
    print(f"   ❌ features_ratio 過濾失敗: {e}")
    print("   ⚠️ 使用原始 features_ratio")
    features_ratio_clean = features_ratio
    ratio_before = ratio_after = features_ratio.count()

print("\n[步驟 4] 過濾 features_entropy...")
try:
    features_entropy_clean = features_entropy.filter(ip_filter)
    entropy_before = features_entropy.count()
    entropy_after = features_entropy_clean.count()
    print(f"   ✅ features_entropy: {entropy_before:,} -> {entropy_after:,} (過濾 {entropy_before - entropy_after:,} 筆)")
except Exception as e:
    print(f"   ❌ features_entropy 過濾失敗: {e}")
    print("   ⚠️ 使用原始 features_entropy")
    features_entropy_clean = features_entropy
    entropy_before = entropy_after = features_entropy.count()

print("\n✅ 異常值過濾完成")
print("=" * 60)
# ===== 過濾結束 =====

# 使用過濾後的特徵表進行合併
print("\n[步驟 5] 合併所有特徵...")
try:
    features_final = features_base_clean \
        .join(features_diversity_clean, ["src_ip", "time_window"], "outer") \
        .join(features_ratio_clean, ["src_ip", "time_window"], "outer") \
        .join(features_entropy_clean, ["src_ip", "time_window"], "outer")
    
    print("✅ 所有特徵合併完成")
    print(f"   最終特徵表筆數: {features_final.count():,}")
    print(f"   特徵欄位數: {len(features_final.columns)}")
    print(f"   特徵欄位: {features_final.columns}")
except Exception as e:
    print(f"❌ 合併特徵時發生錯誤: {e}")
    import traceback
    traceback.print_exc()
    raise

# 驗證沒有異常值
print("\n[步驟 6] 驗證過濾結果...")
try:
    remaining_abnormal = features_final.filter(
        (col("src_ip").isNull()) | 
        (col("src_ip") == "") | 
        (col("src_ip") == "0")
    ).count()
    
    if remaining_abnormal == 0:
        print("   ✅ 驗證：所有異常值已過濾")
    else:
        print(f"   ⚠️ 警告：仍有 {remaining_abnormal:,} 筆異常值")
except Exception as e:
    print(f"   ⚠️ 驗證時發生錯誤: {e}")

# 使用 head() 代替 show() 避免可能的顯示問題
print("\n前 5 筆完整特徵：")
try:
    samples = features_final.head(5)
    for i, row in enumerate(samples, 1):
        print(f"\n   {i}. src_ip: {row['src_ip']}")
        print(f"      time_window: {row['time_window']}")
        flow_count = row['flow_count'] if 'flow_count' in row else None
        print(f"      flow_count: {flow_count}")
        port_ent = row['port_entropy'] if 'port_entropy' in row else None
        protocol_ent = row['protocol_entropy'] if 'protocol_entropy' in row else None
        print(f"      port_entropy: {port_ent:.4f if port_ent is not None else 'N/A'}")
        print(f"      protocol_entropy: {protocol_ent:.4f if protocol_ent is not None else 'N/A'}")
except Exception as e:
    print(f"❌ 顯示特徵時發生錯誤: {e}")
    import traceback
    traceback.print_exc()

## 6. 特徵統計摘要與視覺化


In [None]:
# 特徵統計摘要
print("特徵統計摘要：")
features_final.describe().show()

## 7. 儲存特徵表


In [None]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import shutil
import os
from pathlib import Path

# 確保 OUTPUT_PATH 是 Path 物件
OUTPUT_PATH = Path(r"C:\MyVS\NetworkAnomalyDetection\data\processed\features.parquet")

print("=" * 60)
print(f"正在使用 Pandas 替代方案儲存至: {OUTPUT_PATH}")
print("=" * 60)

try:
    # -------------------------------------------------------
    # 檢查並刪除舊的衝突路徑
    # -------------------------------------------------------
    if OUTPUT_PATH.exists():
        print(f"⚠️ 發現舊的輸出路徑: {OUTPUT_PATH}")
        try:
            if OUTPUT_PATH.is_dir():
                print("   正在刪除舊的 Spark 資料夾...")
                shutil.rmtree(OUTPUT_PATH)  # 強制刪除資料夾
            else:
                print("   正在刪除舊的檔案...")
                os.remove(OUTPUT_PATH)      # 刪除檔案
            print("   ✅ 舊路徑清理完成")
        except PermissionError:
            print("   ❌ 無法刪除舊檔，可能被佔用。請手動到檔案總管刪除該資料夾，或重啟 Kernel。")
            raise

    # 1. 將 Spark DataFrame 轉為 Pandas DataFrame
    # (如果你已經轉過了，變數 pandas_df 還在記憶體裡，可以註解掉下面這一行直接跑第 2 步)
    if 'pandas_df' not in locals():
        print("1. 正在將資料從 Spark 轉換為 Pandas...")
        pandas_df = features_final.toPandas()
        print(f"   ✅ 轉換完成，資料形狀: {pandas_df.shape}")
    else:
        print("1. Pandas DataFrame 已存在記憶體中，跳過轉換。")

    # 2. 儲存為 Parquet
    print("2. 正在寫入 Parquet 檔案...")
    # 確保父目錄存在
    OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True)
    
    # 使用 pyarrow 引擎寫入
    pandas_df.to_parquet(str(OUTPUT_PATH), engine='pyarrow', index=False)
    
    print(f"✅ 特徵表儲存成功！")
    print(f"   路徑: {OUTPUT_PATH}")

except Exception as e:
    print(f"❌ 儲存失敗: {e}")
    import traceback
    traceback.print_exc()

# 確保 Spark 關閉
# try:
#     spark.stop()
#     print("\n✅ SparkSession 已確認關閉")
# except:
#     pass

## 7.1 檢查特徵表

In [None]:
import pandas as pd
from pathlib import Path

# 讀取 Parquet 檔案
file_path = Path(r"C:\MyVS\NetworkAnomalyDetection\data\processed\features.parquet")
df = pd.read_parquet(file_path)

print("=" * 60)
print("特徵統計摘要：")
print("=" * 60)
print(df.describe())

print("\n" + "=" * 60)
print("資料基本資訊：")
print("=" * 60)
print(f"資料筆數: {len(df):,}")
print(f"欄位數: {len(df.columns)}")
print(f"\n欄位清單:")
for i, col in enumerate(df.columns, 1):
    print(f"  {i:2d}. {col}")

# 檢查 label 欄位
if 'label' in df.columns:
    print(f"\n✅ label 欄位存在")
    print(f"\nlabel 值分布:")
    print(df['label'].value_counts().sort_index())
else:
    print(f"\n❌ label 欄位不存在")

## 8. 結論與下一步

### 特徵工程總結

1. **時間窗口聚合**：以 Source IP × 1 分鐘窗口進行聚合，將原始 NetFlow 記錄轉換為特徵向量
2. **基礎統計特徵**：Flow Count, Total Bytes, Total Packets, Average Duration
3. **Diversity 特徵**：Destination IP/Port/Protocol 的多樣性
4. **Ratio 特徵**：Bytes/Flow, Packets/Flow 等比例特徵
5. **Entropy 特徵**：Port 和 Protocol 分布的熵值

### 為何 Raw NetFlow 不可直接進模型？

- **資料粒度問題**：原始 NetFlow 是單一流量記錄，缺乏時間上下文
- **特徵稀疏性**：單一記錄的特徵不足以捕捉異常行為模式
- **時間序列特性**：網路異常通常表現在時間序列模式中（如短時間內大量連接）
- **聚合特徵的必要性**：透過時間窗口聚合，我們可以捕捉到：
  - 異常的流量模式（高 flow_count）
  - 掃描行為（高 dst_ip_diversity, dst_port_diversity）
  - 異常的流量比例（異常的 bytes_per_flow）

### 下一步

- 在 `03_Model_Training_IsoForest_vs_XGB.ipynb` 中使用這些特徵進行模型訓練
- 將特徵工程邏輯封裝到 `src/features.py` 以便重複使用


In [None]:
# 關閉 SparkSession
spark.stop()
print("✅ SparkSession 已關閉")
