# 分散式環境數據處理流水線示範

本教學展示如何在分散式環境中建立完整的數據處理流水線，包含：
1. 環境設置
2. 資料擷取
3. 資料清理和預處理
4. 特徵工程
5. 模型訓練
6. 模型評估
7. 預測和結果儲存

In [None]:
# 導入所需套件
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
import pandas as pd
import numpy as np
from datetime import datetime

## 1. 環境設置

In [None]:
# 創建 SparkSession
spark = SparkSession.builder \
    .appName("DataPipelineDemo") \
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.memory.offHeap.size", "10g") \
    .config("spark.sql.shuffle.partitions", "10") \
    .getOrCreate()

# 設定記錄層級
spark.sparkContext.setLogLevel("WARN")

## 2. 資料擷取
### 2.1 從多個來源讀取資料

In [None]:
# 定義資料schema
customer_schema = StructType([
    StructField("customer_id", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("gender", StringType(), True),
    StructField("income", DoubleType(), True),
    StructField("registration_date", DateType(), True)
])

transaction_schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("transaction_date", DateType(), True),
    StructField("amount", DoubleType(), True),
    StructField("product_category", StringType(), True)
])

# 讀取客戶資料
customers_df = spark.read \
    .option("header", "true") \
    .schema(customer_schema) \
    .csv("data/customers.csv")

# 讀取交易資料
transactions_df = spark.read \
    .option("header", "true") \
    .schema(transaction_schema) \
    .csv("data/transactions.csv")

### 2.2 資料初步檢查

In [None]:
# 顯示資料基本信息
print("客戶資料概覽：")
customers_df.printSchema()
customers_df.show(5)

print("\n交易資料概覽：")
transactions_df.printSchema()
transactions_df.show(5)

# 檢查資料筆數
print("\n資料筆數統計：")
print(f"客戶資料筆數: {customers_df.count()}")
print(f"交易資料筆數: {transactions_df.count()}")

## 3. 資料清理和預處理
### 3.1 處理缺失值

In [None]:
# 檢查缺失值
def check_null_values(df, df_name):
    print(f"\n{df_name} 缺失值統計：")
    df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

check_null_values(customers_df, "客戶資料")
check_null_values(transactions_df, "交易資料")

# 處理缺失值
customers_cleaned = customers_df \
    .na.fill({"age": customers_df.select(avg("age")).first()[0]}) \
    .na.fill({"income": customers_df.select(avg("income")).first()[0]}) \
    .na.fill({"gender": "unknown"})

transactions_cleaned = transactions_df \
    .na.drop()  # 交易資料中的缺失值直接刪除

### 3.2 異常值處理

In [None]:
# 計算數值欄位的統計量用於異常值檢測
def detect_outliers(df, numeric_cols):
    for col_name in numeric_cols:
        quantiles = df.approxQuantile(col_name, [0.25, 0.75], 0.05)
        iqr = quantiles[1] - quantiles[0]
        lower_bound = quantiles[0] - 1.5 * iqr
        upper_bound = quantiles[1] + 1.5 * iqr
        
        print(f"\n{col_name} 的異常值界限：")
        print(f"Lower bound: {lower_bound}")
        print(f"Upper bound: {upper_bound}")
        
        # 過濾異常值
        df = df.filter((col(col_name) >= lower_bound) & 
                      (col(col_name) <= upper_bound))
    return df

# 處理客戶資料中的異常值
customers_cleaned = detect_outliers(customers_cleaned, ["age", "income"])
transactions_cleaned = detect_outliers(transactions_cleaned, ["amount"])

## 4. 特徵工程
### 4.1 特徵創建

In [None]:
# 為客戶創建統計特徵
customer_features = transactions_cleaned \
    .groupBy("customer_id") \
    .agg(
        count("transaction_id").alias("transaction_count"),
        avg("amount").alias("avg_transaction_amount"),
        sum("amount").alias("total_spend"),
        stddev("amount").alias("transaction_amount_std"),
        datediff(max("transaction_date"), min("transaction_date")).alias("customer_lifetime_days")
    )

# 計算每個產品類別的消費比例
category_pivot = transactions_cleaned \
    .groupBy("customer_id") \
    .pivot("product_category") \
    .agg(sum("amount")) \
    .fillna(0)

# 合併所有特徵
final_features = customers_cleaned \
    .join(customer_features, "customer_id") \
    .join(category_pivot, "customer_id")

### 4.2 特徵轉換

In [None]:
# 創建特徵處理流水線
categorical_cols = ["gender"]
numeric_cols = [col_name for col_name in final_features.columns 
               if col_name not in ["customer_id", "gender", "registration_date"]]

# 字串索引轉換器
indexers = [StringIndexer(inputCol=col, outputCol=col+"_idx") 
           for col in categorical_cols]

# 特徵向量組合器
assembler = VectorAssembler(
    inputCols=[col+"_idx" for col in categorical_cols] + numeric_cols,
    outputCol="features"
)

# 標準化處理器
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

## 5. 模型訓練
### 5.1 準備訓練資料

In [None]:
# 創建目標變數（示例：高價值客戶分類）
final_features = final_features \
    .withColumn("label", 
                when(col("total_spend") > final_features.select(
                    percentile_approx("total_spend", 0.8)).first()[0], 1).otherwise(0))

# 分割訓練和測試資料
train_data, test_data = final_features.randomSplit([0.8, 0.2], seed=42)

### 5.2 建立和訓練模型

In [None]:
# 創建隨機森林分類器
rf = RandomForestClassifier(labelCol="label", 
                          featuresCol="scaled_features",
                          numTrees=100)

# 建立完整的流水線
pipeline = Pipeline(stages=indexers + [assembler, scaler, rf])

# 訓練模型
model = pipeline.fit(train_data)

## 6. 模型評估

In [None]:
# 在測試集上進行預測
predictions = model.transform(test_data)

# 評估模型效能
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", 
    predictionCol="prediction", 
    metricName="accuracy"
)

accuracy = evaluator.evaluate(predictions)
print(f"模型準確率: {accuracy}")

# 計算其他評估指標
evaluator.setMetricName("weightedPrecision")
precision = evaluator.evaluate(predictions)
print(f"加權精確率: {precision}")

evaluator.setMetricName("weightedRecall")
recall = evaluator.evaluate(predictions)
print(f"加權召回率: {recall}")

### 6.1 檢視詳細預測結果

In [None]:
# 顯示預測結果範例
predictions.select("customer_id", "label", "prediction", "probability").show(10)

# 計算混淆矩陣
confusion_matrix = predictions.groupBy("label", "prediction").count().orderBy("label", "prediction")
print("混淆矩陣:")
confusion_matrix.show()

## 7. 預測和結果儲存

In [None]:
# 創建預測函數
def predict_new_data(new_data, model):
    predictions = model.transform(new_data)
    return predictions.select("customer_id", "prediction", "probability")

# 儲存模型
model_path = f"models/customer_classification_{datetime.now().strftime('%Y%m%d')}"
model.write().overwrite().save(model_path)
print(f"模型已儲存至: {model_path}")

### 7.1 儲存預測結果

In [None]:
# 儲存預測結果
predictions.select("customer_id", "prediction", "probability") \
    .write \
    .mode("overwrite") \
    .parquet("output/predictions")

print("預測結果已儲存至: output/predictions")

## 8. 模型部署準備

In [None]:
# 創建模型部署所需的元數據
model_metadata = {
    "model_version": datetime.now().strftime("%Y%m%d"),
    "features": {
        "categorical": categorical_cols,
        "numerical": numeric_cols
    },
    "performance": {
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall
    }
}

print("模型元數據：")
print(model_metadata)

## 9. 資源清理

In [None]:
# 清理 Spark Session
spark.stop()
print("資源已清理完成")

## 10. 總結與建議

本示範展示了完整的分散式數據處理流水線，包括：
1. 資料擷取和整合
2. 資料清理和預處理
3. 特徵工程
4. 模型訓練和評估
5. 結果儲存和部署準備

建議在實際應用時：
- 根據實際數據量調整 Spark 配置
- 增加更多的數據質量檢查
- 實作更完整的錯誤處理機制
- 添加詳細的日誌記錄
- 建立自動化測試流程