# 第七章：性能調優

Apache Spark 的性能調優是確保應用程式高效運行的關鍵。本章將介紹各種調優技術和最佳實踐。

## 學習目標
- 理解 Spark 的執行模型和性能瓶頸
- 學習記憶體管理和垃圾回收調優
- 掌握分區策略和數據傾斜處理
- 了解快取和持久化最佳實踐
- 學習 SQL 查詢優化技術
- 掌握監控和診斷工具的使用

In [None]:
# 導入必要的庫
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.storagelevel import StorageLevel
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import time
import random
from datetime import datetime, timedelta

# 建立 SparkSession（使用調優後的配置）
spark = SparkSession.builder \
    .appName("Spark Performance Tuning") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .config("spark.sql.adaptive.localShuffleReader.enabled", "true") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.sql.execution.arrow.maxRecordsPerBatch", "10000") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# 設定日誌級別
spark.sparkContext.setLogLevel("WARN")

print(f"Spark 版本: {spark.version}")
print(f"可用核心數: {spark.sparkContext.defaultParallelism}")
print(f"默認分區數: {spark.conf.get('spark.sql.shuffle.partitions')}")

## 1. 理解 Spark 執行模型

了解 Spark 的基本執行模型對於性能調優至關重要。

In [None]:
# 檢查 Spark 配置
def show_spark_config():
    """
    顯示重要的 Spark 配置
    """
    important_configs = [
        'spark.sql.adaptive.enabled',
        'spark.sql.adaptive.coalescePartitions.enabled',
        'spark.sql.adaptive.skewJoin.enabled',
        'spark.sql.shuffle.partitions',
        'spark.sql.execution.arrow.pyspark.enabled',
        'spark.serializer',
        'spark.sql.adaptive.advisoryPartitionSizeInBytes',
        'spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold'
    ]
    
    print("重要的 Spark 配置:")
    for config in important_configs:
        try:
            value = spark.conf.get(config)
            print(f"{config}: {value}")
        except:
            print(f"{config}: 未設定")

show_spark_config()

In [None]:
# 建立測試資料集
def create_test_data():
    """
    建立用於性能測試的資料集
    """
    print("建立測試資料集...")
    
    # 建立大型資料集（模擬真實場景）
    num_records = 1000000
    
    # 建立銷售資料
    sales_data = []
    categories = ['Electronics', 'Clothing', 'Books', 'Sports', 'Home']
    regions = ['North', 'South', 'East', 'West', 'Central']
    
    for i in range(num_records):
        sales_data.append((
            i,
            f"customer_{i % 50000}",
            random.choice(categories),
            random.choice(regions),
            random.uniform(10, 1000),
            random.randint(1, 10),
            datetime.now() - timedelta(days=random.randint(0, 365))
        ))
    
    schema = StructType([
        StructField("transaction_id", IntegerType(), True),
        StructField("customer_id", StringType(), True),
        StructField("category", StringType(), True),
        StructField("region", StringType(), True),
        StructField("amount", DoubleType(), True),
        StructField("quantity", IntegerType(), True),
        StructField("date", TimestampType(), True)
    ])
    
    sales_df = spark.createDataFrame(sales_data, schema)
    
    print(f"建立了 {sales_df.count()} 筆銷售記錄")
    print(f"資料分區數: {sales_df.rdd.getNumPartitions()}")
    
    return sales_df

# 建立測試資料
sales_df = create_test_data()
sales_df.show(10)
sales_df.printSchema()

## 2. 分區策略優化

正確的分區策略是 Spark 性能調優的基礎。

In [None]:
# 分區分析
def analyze_partitions(df, name):
    """
    分析 DataFrame 的分區情況
    """
    num_partitions = df.rdd.getNumPartitions()
    partition_sizes = df.rdd.mapPartitions(lambda x: [sum(1 for _ in x)]).collect()
    
    print(f"\n{name} 分區分析:")
    print(f"分區數: {num_partitions}")
    print(f"每個分區大小: {partition_sizes}")
    print(f"最大分區大小: {max(partition_sizes)}")
    print(f"最小分區大小: {min(partition_sizes)}")
    print(f"平均分區大小: {sum(partition_sizes) / len(partition_sizes):.2f}")
    
    # 視覺化分區分佈
    plt.figure(figsize=(12, 4))
    plt.bar(range(len(partition_sizes)), partition_sizes)
    plt.xlabel('分區編號')
    plt.ylabel('記錄數')
    plt.title(f'{name} 分區大小分佈')
    plt.show()

analyze_partitions(sales_df, "原始資料")

In [None]:
# 重新分區策略
def test_repartitioning_strategies():
    """
    測試不同的重新分區策略
    """
    print("測試不同的重新分區策略...")
    
    # 1. 按數量重新分區
    start_time = time.time()
    repartitioned_df = sales_df.repartition(8)
    count1 = repartitioned_df.count()
    time1 = time.time() - start_time
    
    analyze_partitions(repartitioned_df, "按數量重新分區 (8)")
    print(f"執行時間: {time1:.2f} 秒")
    
    # 2. 按欄位重新分區
    start_time = time.time()
    partitioned_by_region = sales_df.repartition("region")
    count2 = partitioned_by_region.count()
    time2 = time.time() - start_time
    
    analyze_partitions(partitioned_by_region, "按地區重新分區")
    print(f"執行時間: {time2:.2f} 秒")
    
    # 3. 合併分區
    start_time = time.time()
    coalesced_df = sales_df.coalesce(4)
    count3 = coalesced_df.count()
    time3 = time.time() - start_time
    
    analyze_partitions(coalesced_df, "合併分區 (4)")
    print(f"執行時間: {time3:.2f} 秒")
    
    return repartitioned_df, partitioned_by_region, coalesced_df

repartitioned_df, partitioned_by_region, coalesced_df = test_repartitioning_strategies()

## 3. 快取和持久化優化

正確使用快取可以顯著提升重複計算的性能。

In [None]:
# 快取策略比較
def test_caching_strategies():
    """
    測試不同的快取策略
    """
    print("測試不同的快取策略...")
    
    # 建立一個需要複雜計算的 DataFrame
    complex_df = sales_df.filter(col("amount") > 100) \
                        .withColumn("total_value", col("amount") * col("quantity")) \
                        .withColumn("month", month(col("date"))) \
                        .withColumn("year", year(col("date")))
    
    # 測試不同的存儲級別
    storage_levels = [
        (StorageLevel.MEMORY_ONLY, "MEMORY_ONLY"),
        (StorageLevel.MEMORY_AND_DISK, "MEMORY_AND_DISK"),
        (StorageLevel.MEMORY_AND_DISK_SER, "MEMORY_AND_DISK_SER"),
        (StorageLevel.DISK_ONLY, "DISK_ONLY")
    ]
    
    results = []
    
    for storage_level, name in storage_levels:
        print(f"\n測試 {name} 存儲級別:")
        
        # 清除之前的快取
        spark.catalog.clearCache()
        
        # 設定快取
        cached_df = complex_df.persist(storage_level)
        
        # 第一次執行（觸發快取）
        start_time = time.time()
        count1 = cached_df.count()
        first_run_time = time.time() - start_time
        
        # 第二次執行（使用快取）
        start_time = time.time()
        count2 = cached_df.count()
        second_run_time = time.time() - start_time
        
        # 第三次執行（確認快取效果）
        start_time = time.time()
        avg_amount = cached_df.agg(avg("amount")).collect()[0][0]
        third_run_time = time.time() - start_time
        
        results.append({
            'storage_level': name,
            'first_run': first_run_time,
            'second_run': second_run_time,
            'third_run': third_run_time,
            'speedup': first_run_time / second_run_time
        })
        
        print(f"第一次執行: {first_run_time:.2f} 秒")
        print(f"第二次執行: {second_run_time:.2f} 秒")
        print(f"第三次執行: {third_run_time:.2f} 秒")
        print(f"加速比: {first_run_time / second_run_time:.2f}x")
    
    # 視覺化結果
    results_df = pd.DataFrame(results)
    
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
    
    # 執行時間比較
    x = np.arange(len(results_df))
    width = 0.25
    
    ax1.bar(x - width, results_df['first_run'], width, label='第一次執行', alpha=0.8)
    ax1.bar(x, results_df['second_run'], width, label='第二次執行', alpha=0.8)
    ax1.bar(x + width, results_df['third_run'], width, label='第三次執行', alpha=0.8)
    
    ax1.set_xlabel('存儲級別')
    ax1.set_ylabel('執行時間 (秒)')
    ax1.set_title('不同存儲級別的執行時間比較')
    ax1.set_xticks(x)
    ax1.set_xticklabels(results_df['storage_level'], rotation=45)
    ax1.legend()
    
    # 加速比比較
    ax2.bar(results_df['storage_level'], results_df['speedup'], color='green', alpha=0.7)
    ax2.set_xlabel('存儲級別')
    ax2.set_ylabel('加速比')
    ax2.set_title('快取加速比')
    ax2.set_xticklabels(results_df['storage_level'], rotation=45)
    
    plt.tight_layout()
    plt.show()
    
    return results_df

cache_results = test_caching_strategies()

## 4. Join 優化

Join 操作是 Spark 中最消耗資源的操作之一，需要特別注意優化。

In [None]:
# 建立用於 Join 測試的資料
def create_join_test_data():
    """
    建立用於 Join 測試的資料集
    """
    # 客戶資料（小表）
    customer_data = []
    for i in range(50000):
        customer_data.append((
            f"customer_{i}",
            f"Customer {i}",
            random.choice(['Premium', 'Standard', 'Basic']),
            random.choice(['North', 'South', 'East', 'West', 'Central'])
        ))
    
    customer_schema = StructType([
        StructField("customer_id", StringType(), True),
        StructField("customer_name", StringType(), True),
        StructField("tier", StringType(), True),
        StructField("region", StringType(), True)
    ])
    
    customers_df = spark.createDataFrame(customer_data, customer_schema)
    
    # 產品資料（中等大小表）
    product_data = []
    categories = ['Electronics', 'Clothing', 'Books', 'Sports', 'Home']
    for category in categories:
        for i in range(1000):
            product_data.append((
                f"product_{category}_{i}",
                f"Product {i} in {category}",
                category,
                random.uniform(10, 500)
            ))
    
    product_schema = StructType([
        StructField("product_id", StringType(), True),
        StructField("product_name", StringType(), True),
        StructField("category", StringType(), True),
        StructField("price", DoubleType(), True)
    ])
    
    products_df = spark.createDataFrame(product_data, product_schema)
    
    return customers_df, products_df

customers_df, products_df = create_join_test_data()

print(f"客戶資料: {customers_df.count()} 筆")
print(f"產品資料: {products_df.count()} 筆")

customers_df.show(5)
products_df.show(5)

In [None]:
# 測試不同的 Join 策略
def test_join_strategies():
    """
    測試不同的 Join 策略和優化技術
    """
    print("測試不同的 Join 策略...")
    
    # 1. 普通 Join（未優化）
    print("\n1. 普通 Join:")
    start_time = time.time()
    normal_join = sales_df.join(customers_df, "customer_id", "inner")
    count1 = normal_join.count()
    time1 = time.time() - start_time
    print(f"結果數量: {count1}")
    print(f"執行時間: {time1:.2f} 秒")
    
    # 2. 廣播 Join（小表優化）
    print("\n2. 廣播 Join:")
    start_time = time.time()
    broadcast_join = sales_df.join(broadcast(customers_df), "customer_id", "inner")
    count2 = broadcast_join.count()
    time2 = time.time() - start_time
    print(f"結果數量: {count2}")
    print(f"執行時間: {time2:.2f} 秒")
    print(f"加速比: {time1 / time2:.2f}x")
    
    # 3. 預分區 Join
    print("\n3. 預分區 Join:")
    start_time = time.time()
    
    # 按相同的 key 進行分區
    partitioned_sales = sales_df.repartition("customer_id")
    partitioned_customers = customers_df.repartition("customer_id")
    
    partitioned_join = partitioned_sales.join(partitioned_customers, "customer_id", "inner")
    count3 = partitioned_join.count()
    time3 = time.time() - start_time
    print(f"結果數量: {count3}")
    print(f"執行時間: {time3:.2f} 秒")
    print(f"相對普通 Join 加速比: {time1 / time3:.2f}x")
    
    # 4. 使用 Hint 的 Join
    print("\n4. 使用 Hint 的 Join:")
    start_time = time.time()
    
    # 使用 SQL hint
    sales_df.createOrReplaceTempView("sales")
    customers_df.createOrReplaceTempView("customers")
    
    hint_join = spark.sql("""
        SELECT /*+ BROADCAST(c) */ s.*, c.customer_name, c.tier
        FROM sales s
        JOIN customers c ON s.customer_id = c.customer_id
    """)
    
    count4 = hint_join.count()
    time4 = time.time() - start_time
    print(f"結果數量: {count4}")
    print(f"執行時間: {time4:.2f} 秒")
    print(f"相對普通 Join 加速比: {time1 / time4:.2f}x")
    
    # 結果比較
    join_results = pd.DataFrame({
        'Join Type': ['Normal', 'Broadcast', 'Pre-partitioned', 'With Hint'],
        'Time (seconds)': [time1, time2, time3, time4],
        'Count': [count1, count2, count3, count4]
    })
    
    plt.figure(figsize=(12, 6))
    plt.bar(join_results['Join Type'], join_results['Time (seconds)'], color=['red', 'green', 'blue', 'orange'])
    plt.xlabel('Join 類型')
    plt.ylabel('執行時間 (秒)')
    plt.title('不同 Join 策略的性能比較')
    plt.xticks(rotation=45)
    
    # 在柱狀圖上顯示數值
    for i, v in enumerate(join_results['Time (seconds)']):
        plt.text(i, v + 0.01, f'{v:.2f}s', ha='center', va='bottom')
    
    plt.tight_layout()
    plt.show()
    
    return join_results

join_results = test_join_strategies()

## 5. 數據傾斜處理

數據傾斜是 Spark 性能問題的常見原因。

In [None]:
# 模擬數據傾斜情況
def create_skewed_data():
    """
    建立有數據傾斜的資料集
    """
    print("建立有數據傾斜的資料集...")
    
    skewed_data = []
    
    # 創建嚴重傾斜的資料：80% 的資料都屬於一個 key
    num_records = 100000
    
    # 80% 的資料使用 "popular_key"
    for i in range(int(num_records * 0.8)):
        skewed_data.append((
            "popular_key",
            f"value_{i}",
            random.randint(1, 100)
        ))
    
    # 20% 的資料平均分佈在其他 key 上
    other_keys = [f"key_{i}" for i in range(10)]
    for i in range(int(num_records * 0.2)):
        skewed_data.append((
            random.choice(other_keys),
            f"value_{i}",
            random.randint(1, 100)
        ))
    
    schema = StructType([
        StructField("key", StringType(), True),
        StructField("value", StringType(), True),
        StructField("amount", IntegerType(), True)
    ])
    
    skewed_df = spark.createDataFrame(skewed_data, schema)
    
    # 分析數據傾斜程度
    key_distribution = skewed_df.groupBy("key").count().orderBy(col("count").desc())
    print("\nKey 分佈情況:")
    key_distribution.show()
    
    # 視覺化數據傾斜
    key_dist_pandas = key_distribution.toPandas()
    plt.figure(figsize=(12, 6))
    plt.bar(key_dist_pandas['key'], key_dist_pandas['count'])
    plt.xlabel('Key')
    plt.ylabel('記錄數')
    plt.title('數據傾斜情況')
    plt.xticks(rotation=45)
    plt.show()
    
    return skewed_df

skewed_df = create_skewed_data()

In [None]:
# 數據傾斜處理技術
def handle_data_skew():
    """
    示範處理數據傾斜的技術
    """
    print("測試數據傾斜處理技術...")
    
    # 1. 直接聚合（有傾斜問題）
    print("\n1. 直接聚合:")
    start_time = time.time()
    
    direct_agg = skewed_df.groupBy("key").agg(
        sum("amount").alias("total_amount"),
        count("*").alias("count")
    )
    
    count1 = direct_agg.count()
    time1 = time.time() - start_time
    print(f"結果數量: {count1}")
    print(f"執行時間: {time1:.2f} 秒")
    
    # 2. 加鹽技術（Salt）
    print("\n2. 加鹽技術:")
    start_time = time.time()
    
    # 第一步：加鹽並進行初步聚合
    salt_factor = 10
    salted_df = skewed_df.withColumn("salt", (rand() * salt_factor).cast(IntegerType())) \
                         .withColumn("salted_key", concat(col("key"), lit("_"), col("salt")))
    
    # 第一次聚合
    first_agg = salted_df.groupBy("salted_key", "key").agg(
        sum("amount").alias("partial_sum"),
        count("*").alias("partial_count")
    )
    
    # 第二次聚合
    final_agg = first_agg.groupBy("key").agg(
        sum("partial_sum").alias("total_amount"),
        sum("partial_count").alias("count")
    )
    
    count2 = final_agg.count()
    time2 = time.time() - start_time
    print(f"結果數量: {count2}")
    print(f"執行時間: {time2:.2f} 秒")
    print(f"加速比: {time1 / time2:.2f}x")
    
    # 3. 兩階段聚合
    print("\n3. 兩階段聚合:")
    start_time = time.time()
    
    # 使用 AQE（Adaptive Query Execution）
    spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
    
    # 先進行局部聚合
    local_agg = skewed_df.groupBy("key").agg(
        sum("amount").alias("total_amount"),
        count("*").alias("count")
    )
    
    count3 = local_agg.count()
    time3 = time.time() - start_time
    print(f"結果數量: {count3}")
    print(f"執行時間: {time3:.2f} 秒")
    print(f"相對直接聚合加速比: {time1 / time3:.2f}x")
    
    # 驗證結果一致性
    print("\n驗證結果一致性:")
    result1 = direct_agg.orderBy("key").collect()
    result2 = final_agg.orderBy("key").collect()
    
    print(f"直接聚合結果: {result1[:3]}")
    print(f"加鹽聚合結果: {result2[:3]}")
    
    # 結果比較
    skew_results = pd.DataFrame({
        'Method': ['Direct Aggregation', 'Salt Technique', 'Two-Stage Aggregation'],
        'Time (seconds)': [time1, time2, time3]
    })
    
    plt.figure(figsize=(10, 6))
    plt.bar(skew_results['Method'], skew_results['Time (seconds)'], color=['red', 'green', 'blue'])
    plt.xlabel('處理方法')
    plt.ylabel('執行時間 (秒)')
    plt.title('數據傾斜處理方法性能比較')
    plt.xticks(rotation=45)
    
    for i, v in enumerate(skew_results['Time (seconds)']):
        plt.text(i, v + 0.01, f'{v:.2f}s', ha='center', va='bottom')
    
    plt.tight_layout()
    plt.show()
    
    return skew_results

skew_results = handle_data_skew()

## 6. SQL 查詢優化

使用 Catalyst 優化器和 SQL 技巧來提升查詢性能。

In [None]:
# SQL 查詢優化技術
def sql_optimization_techniques():
    """
    示範 SQL 查詢優化技術
    """
    print("SQL 查詢優化技術...")
    
    # 建立臨時視圖
    sales_df.createOrReplaceTempView("sales")
    customers_df.createOrReplaceTempView("customers")
    
    # 1. 未優化的查詢
    print("\n1. 未優化的查詢:")
    start_time = time.time()
    
    unoptimized_query = spark.sql("""
        SELECT 
            c.tier,
            s.category,
            SUM(s.amount) as total_sales,
            COUNT(*) as transaction_count
        FROM sales s
        JOIN customers c ON s.customer_id = c.customer_id
        WHERE s.amount > 50
        GROUP BY c.tier, s.category
        ORDER BY total_sales DESC
    """)
    
    result1 = unoptimized_query.collect()
    time1 = time.time() - start_time
    print(f"結果數量: {len(result1)}")
    print(f"執行時間: {time1:.2f} 秒")
    
    # 2. 使用 BROADCAST hint 優化
    print("\n2. 使用 BROADCAST hint:")
    start_time = time.time()
    
    optimized_query = spark.sql("""
        SELECT /*+ BROADCAST(c) */
            c.tier,
            s.category,
            SUM(s.amount) as total_sales,
            COUNT(*) as transaction_count
        FROM sales s
        JOIN customers c ON s.customer_id = c.customer_id
        WHERE s.amount > 50
        GROUP BY c.tier, s.category
        ORDER BY total_sales DESC
    """)
    
    result2 = optimized_query.collect()
    time2 = time.time() - start_time
    print(f"結果數量: {len(result2)}")
    print(f"執行時間: {time2:.2f} 秒")
    print(f"加速比: {time1 / time2:.2f}x")
    
    # 3. 使用 CTE（Common Table Expression）優化
    print("\n3. 使用 CTE 優化:")
    start_time = time.time()
    
    cte_query = spark.sql("""
        WITH filtered_sales AS (
            SELECT customer_id, category, amount
            FROM sales
            WHERE amount > 50
        ),
        sales_summary AS (
            SELECT /*+ BROADCAST(c) */
                c.tier,
                fs.category,
                SUM(fs.amount) as total_sales,
                COUNT(*) as transaction_count
            FROM filtered_sales fs
            JOIN customers c ON fs.customer_id = c.customer_id
            GROUP BY c.tier, fs.category
        )
        SELECT * FROM sales_summary
        ORDER BY total_sales DESC
    """)
    
    result3 = cte_query.collect()
    time3 = time.time() - start_time
    print(f"結果數量: {len(result3)}")
    print(f"執行時間: {time3:.2f} 秒")
    print(f"相對未優化查詢加速比: {time1 / time3:.2f}x")
    
    # 4. 使用窗口函數優化
    print("\n4. 使用窗口函數:")
    start_time = time.time()
    
    window_query = spark.sql("""
        SELECT 
            tier,
            category,
            total_sales,
            transaction_count,
            ROW_NUMBER() OVER (PARTITION BY tier ORDER BY total_sales DESC) as rank_in_tier
        FROM (
            SELECT /*+ BROADCAST(c) */
                c.tier,
                s.category,
                SUM(s.amount) as total_sales,
                COUNT(*) as transaction_count
            FROM sales s
            JOIN customers c ON s.customer_id = c.customer_id
            WHERE s.amount > 50
            GROUP BY c.tier, s.category
        ) t
        WHERE rank_in_tier <= 3
    """)
    
    result4 = window_query.collect()
    time4 = time.time() - start_time
    print(f"結果數量: {len(result4)}")
    print(f"執行時間: {time4:.2f} 秒")
    
    # 查看查詢計劃
    print("\n查看優化後的查詢計劃:")
    optimized_query.explain(mode="cost")
    
    # 結果比較
    sql_results = pd.DataFrame({
        'Query Type': ['Unoptimized', 'With BROADCAST', 'With CTE', 'With Window Function'],
        'Time (seconds)': [time1, time2, time3, time4],
        'Result Count': [len(result1), len(result2), len(result3), len(result4)]
    })
    
    plt.figure(figsize=(12, 6))
    plt.bar(sql_results['Query Type'], sql_results['Time (seconds)'], color=['red', 'green', 'blue', 'orange'])
    plt.xlabel('查詢類型')
    plt.ylabel('執行時間 (秒)')
    plt.title('SQL 查詢優化技術性能比較')
    plt.xticks(rotation=45)
    
    for i, v in enumerate(sql_results['Time (seconds)']):
        plt.text(i, v + 0.01, f'{v:.2f}s', ha='center', va='bottom')
    
    plt.tight_layout()
    plt.show()
    
    return sql_results

sql_results = sql_optimization_techniques()

## 7. 監控和診斷

使用 Spark UI 和其他工具來監控和診斷性能問題。

In [None]:
# 監控和診斷工具
def monitoring_and_diagnostics():
    """
    示範監控和診斷技術
    """
    print("監控和診斷技術...")
    
    # 1. 查看 Spark 配置
    print("\n1. 當前 Spark 配置:")
    conf = spark.sparkContext.getConf()
    all_configs = conf.getAll()
    
    important_configs = [
        'spark.app.name',
        'spark.sql.shuffle.partitions',
        'spark.sql.adaptive.enabled',
        'spark.sql.adaptive.coalescePartitions.enabled',
        'spark.sql.adaptive.skewJoin.enabled',
        'spark.serializer'
    ]
    
    for config in important_configs:
        for key, value in all_configs:
            if key == config:
                print(f"{key}: {value}")
                break
    
    # 2. 查看應用程式資訊
    print("\n2. 應用程式資訊:")
    print(f"應用程式 ID: {spark.sparkContext.applicationId}")
    print(f"應用程式名稱: {spark.sparkContext.appName}")
    print(f"Spark UI URL: {spark.sparkContext.uiWebUrl}")
    
    # 3. 查看執行器資訊
    print("\n3. 執行器資訊:")
    status = spark.sparkContext.statusTracker()
    executor_infos = status.getExecutorInfos()
    
    for executor in executor_infos:
        print(f"執行器 {executor.executorId}:")
        print(f"  主機: {executor.host}")
        print(f"  核心數: {executor.totalCores}")
        print(f"  最大記憶體: {executor.maxMemory / 1024 / 1024:.2f} MB")
        print(f"  已用記憶體: {executor.memoryUsed / 1024 / 1024:.2f} MB")
        print(f"  活躍任務數: {executor.activeTasks}")
        print(f"  已完成任務數: {executor.completedTasks}")
        print(f"  失敗任務數: {executor.failedTasks}")
        print()
    
    # 4. 記憶體使用監控
    print("\n4. 記憶體使用監控:")
    
    # 執行一個需要記憶體的操作
    large_df = sales_df.cache()
    large_df.count()  # 觸發快取
    
    # 再次檢查記憶體使用
    updated_executor_infos = status.getExecutorInfos()
    for executor in updated_executor_infos:
        if executor.executorId != 'driver':
            memory_used_mb = executor.memoryUsed / 1024 / 1024
            max_memory_mb = executor.maxMemory / 1024 / 1024
            usage_percent = (memory_used_mb / max_memory_mb) * 100
            print(f"執行器 {executor.executorId}: {memory_used_mb:.2f} MB / {max_memory_mb:.2f} MB ({usage_percent:.1f}%)")
    
    # 5. 任務執行統計
    print("\n5. 任務執行統計:")
    
    # 執行一個複雜的操作來生成任務
    complex_operation = sales_df.groupBy("region", "category") \
                               .agg(sum("amount").alias("total_sales")) \
                               .orderBy("total_sales", ascending=False)
    
    start_time = time.time()
    result = complex_operation.collect()
    execution_time = time.time() - start_time
    
    print(f"操作執行時間: {execution_time:.2f} 秒")
    print(f"結果數量: {len(result)}")
    
    # 6. 查看快取統計
    print("\n6. 快取統計:")
    cache_status = spark.sparkContext.statusTracker().getExecutorInfos()
    for executor in cache_status:
        if executor.executorId != 'driver':
            print(f"執行器 {executor.executorId}:")
            print(f"  快取記憶體使用: {executor.memoryUsed / 1024 / 1024:.2f} MB")
            print(f"  磁碟使用: {executor.diskUsed / 1024 / 1024:.2f} MB")
    
    return {
        'execution_time': execution_time,
        'result_count': len(result),
        'executor_count': len(executor_infos)
    }

monitoring_stats = monitoring_and_diagnostics()

## 8. 性能調優最佳實踐

總結性能調優的最佳實踐和建議。

In [None]:
# 性能調優最佳實踐總結
def performance_tuning_best_practices():
    """
    總結性能調優的最佳實踐
    """
    print("性能調優最佳實踐總結:")
    
    best_practices = {
        "資料結構優化": [
            "使用 Parquet 格式儲存資料",
            "合理選擇資料類型（避免使用 String 代替數值類型）",
            "使用列式儲存格式",
            "壓縮資料以減少 I/O"
        ],
        "分區策略": [
            "合理設定分區數量（通常每個分區 100-200MB）",
            "使用 coalesce() 而非 repartition() 減少分區數",
            "按查詢模式進行分區（如按日期分區）",
            "避免小檔案問題"
        ],
        "快取策略": [
            "對重複使用的 DataFrame 進行快取",
            "選擇合適的儲存級別",
            "及時清理不需要的快取",
            "監控記憶體使用情況"
        ],
        "Join 優化": [
            "使用 broadcast join 處理小表",
            "預先按 join key 分區",
            "使用 SQL hints 指導優化器",
            "避免不必要的 shuffle 操作"
        ],
        "數據傾斜處理": [
            "使用加鹽技術分散熱點資料",
            "採用兩階段聚合",
            "啟用 AQE 的傾斜處理",
            "監控任務執行時間分佈"
        ],
        "配置調優": [
            "啟用 Adaptive Query Execution (AQE)",
            "使用 Kryo 序列化器",
            "調整 shuffle 分區數",
            "優化記憶體分配"
        ],
        "SQL 優化": [
            "使用列剪裁和謂詞下推",
            "避免使用 UDF，優先使用內建函數",
            "合理使用 CTE 和子查詢",
            "使用窗口函數替代複雜的 join"
        ],
        "監控和診斷": [
            "定期檢查 Spark UI",
            "監控記憶體和 CPU 使用情況",
            "分析任務執行時間",
            "記錄和分析慢查詢"
        ]
    }
    
    for category, practices in best_practices.items():
        print(f"\n{category}:")
        for i, practice in enumerate(practices, 1):
            print(f"  {i}. {practice}")
    
    # 創建性能調優檢查清單
    print("\n性能調優檢查清單:")
    checklist = [
        "✓ 資料格式是否為 Parquet 或其他列式格式？",
        "✓ 分區數是否合理（每個分區 100-200MB）？",
        "✓ 是否對重複使用的 DataFrame 進行快取？",
        "✓ Join 操作是否使用了 broadcast hint？",
        "✓ 是否啟用了 AQE？",
        "✓ 是否使用了 Kryo 序列化器？",
        "✓ 是否存在數據傾斜問題？",
        "✓ SQL 查詢是否使用了列剪裁？",
        "✓ 是否監控了記憶體使用情況？",
        "✓ 是否分析了任務執行時間？"
    ]
    
    for item in checklist:
        print(f"  {item}")
    
    return best_practices

best_practices = performance_tuning_best_practices()

## 9. 實際案例分析

通過一個實際的性能問題案例來展示調優過程。

In [None]:
# 實際案例：慢查詢優化
def case_study_slow_query_optimization():
    """
    案例研究：慢查詢優化
    """
    print("案例研究：慢查詢優化")
    print("="*50)
    
    # 場景：分析客戶的購買行為
    print("\n場景：分析客戶購買行為（原始慢查詢）")
    print("目標：找出每個地區的高價值客戶及其購買偏好")
    
    # 問題查詢（未優化）
    print("\n1. 原始查詢（存在性能問題）：")
    start_time = time.time()
    
    # 模擬複雜的慢查詢
    slow_query = spark.sql("""
        SELECT 
            c.region,
            c.customer_id,
            c.customer_name,
            c.tier,
            SUM(s.amount) as total_spent,
            COUNT(s.transaction_id) as transaction_count,
            COLLECT_LIST(s.category) as categories,
            AVG(s.amount) as avg_transaction_amount
        FROM sales s
        JOIN customers c ON s.customer_id = c.customer_id
        GROUP BY c.region, c.customer_id, c.customer_name, c.tier
        HAVING SUM(s.amount) > 1000
        ORDER BY c.region, total_spent DESC
    """)
    
    slow_result = slow_query.collect()
    slow_time = time.time() - start_time
    
    print(f"執行時間: {slow_time:.2f} 秒")
    print(f"結果數量: {len(slow_result)}")
    print("\n執行計劃分析：")
    slow_query.explain()
    
    # 優化步驟
    print("\n2. 優化步驟：")
    print("   a. 使用 broadcast join")
    print("   b. 預先過濾資料")
    print("   c. 合理使用 CTE")
    print("   d. 最佳化聚合操作")
    
    # 優化後的查詢
    print("\n3. 優化後的查詢：")
    start_time = time.time()
    
    optimized_query = spark.sql("""
        WITH customer_spending AS (
            SELECT 
                customer_id,
                SUM(amount) as total_spent,
                COUNT(transaction_id) as transaction_count,
                AVG(amount) as avg_transaction_amount,
                COLLECT_LIST(category) as categories
            FROM sales
            GROUP BY customer_id
            HAVING SUM(amount) > 1000
        ),
        high_value_customers AS (
            SELECT /*+ BROADCAST(c) */
                c.region,
                c.customer_id,
                c.customer_name,
                c.tier,
                cs.total_spent,
                cs.transaction_count,
                cs.categories,
                cs.avg_transaction_amount
            FROM customer_spending cs
            JOIN customers c ON cs.customer_id = c.customer_id
        )
        SELECT *
        FROM high_value_customers
        ORDER BY region, total_spent DESC
    """)
    
    optimized_result = optimized_query.collect()
    optimized_time = time.time() - start_time
    
    print(f"執行時間: {optimized_time:.2f} 秒")
    print(f"結果數量: {len(optimized_result)}")
    print(f"性能提升: {slow_time / optimized_time:.2f}x")
    
    print("\n優化後的執行計劃：")
    optimized_query.explain()
    
    # 進一步優化：使用快取
    print("\n4. 進一步優化：使用快取")
    
    # 快取中間結果
    customer_spending_cached = spark.sql("""
        SELECT 
            customer_id,
            SUM(amount) as total_spent,
            COUNT(transaction_id) as transaction_count,
            AVG(amount) as avg_transaction_amount,
            COLLECT_LIST(category) as categories
        FROM sales
        GROUP BY customer_id
        HAVING SUM(amount) > 1000
    """).cache()
    
    # 觸發快取
    customer_spending_cached.count()
    
    customer_spending_cached.createOrReplaceTempView("customer_spending_cached")
    
    start_time = time.time()
    
    cached_query = spark.sql("""
        SELECT /*+ BROADCAST(c) */
            c.region,
            c.customer_id,
            c.customer_name,
            c.tier,
            cs.total_spent,
            cs.transaction_count,
            cs.categories,
            cs.avg_transaction_amount
        FROM customer_spending_cached cs
        JOIN customers c ON cs.customer_id = c.customer_id
        ORDER BY region, total_spent DESC
    """)
    
    cached_result = cached_query.collect()
    cached_time = time.time() - start_time
    
    print(f"使用快取後執行時間: {cached_time:.2f} 秒")
    print(f"相對原始查詢加速比: {slow_time / cached_time:.2f}x")
    
    # 結果驗證
    print("\n5. 結果驗證：")
    print(f"原始查詢結果數量: {len(slow_result)}")
    print(f"優化查詢結果數量: {len(optimized_result)}")
    print(f"快取查詢結果數量: {len(cached_result)}")
    
    # 性能總結
    performance_summary = pd.DataFrame({
        'Query Type': ['Original (Slow)', 'Optimized', 'With Cache'],
        'Execution Time': [slow_time, optimized_time, cached_time],
        'Speedup': [1.0, slow_time/optimized_time, slow_time/cached_time]
    })
    
    print("\n6. 性能總結：")
    print(performance_summary.to_string(index=False))
    
    # 視覺化性能比較
    plt.figure(figsize=(12, 6))
    
    # 執行時間比較
    plt.subplot(1, 2, 1)
    plt.bar(performance_summary['Query Type'], performance_summary['Execution Time'], 
            color=['red', 'green', 'blue'])
    plt.xlabel('查詢類型')
    plt.ylabel('執行時間 (秒)')
    plt.title('查詢執行時間比較')
    plt.xticks(rotation=45)
    
    # 加速比比較
    plt.subplot(1, 2, 2)
    plt.bar(performance_summary['Query Type'], performance_summary['Speedup'], 
            color=['red', 'green', 'blue'])
    plt.xlabel('查詢類型')
    plt.ylabel('加速比')
    plt.title('相對原始查詢的加速比')
    plt.xticks(rotation=45)
    
    plt.tight_layout()
    plt.show()
    
    return performance_summary

case_study_results = case_study_slow_query_optimization()

## 10. 總結

本章涵蓋了 Spark 性能調優的各個方面，從基礎概念到實際應用。

In [None]:
# 性能調優總結
def performance_tuning_summary():
    """
    性能調優總結
    """
    print("Spark 性能調優總結")
    print("="*50)
    
    summary_points = {
        "關鍵學習要點": [
            "理解 Spark 的執行模型是調優的基礎",
            "分區策略直接影響並行性能",
            "適當的快取策略可以顯著提升重複計算性能",
            "Join 優化是大資料處理的重點",
            "數據傾斜是常見的性能瓶頸",
            "SQL 查詢優化可以自動提升性能",
            "監控和診斷是持續優化的基礎"
        ],
        "實用技巧": [
            "使用 .explain() 分析查詢計劃",
            "啟用 AQE 獲得自動優化",
            "使用 broadcast() 優化小表 Join",
            "採用加鹽技術處理數據傾斜",
            "合理設定分區數（通常為核心數的 2-3 倍）",
            "使用列式儲存格式（Parquet）",
            "定期清理不需要的快取"
        ],
        "常見錯誤": [
            "過度分區導致任務開銷過大",
            "不適當的快取策略浪費記憶體",
            "忽略數據傾斜問題",
            "使用過多的 UDF 函數",
            "不合理的 Join 順序",
            "忽略資料本地性",
            "不監控資源使用情況"
        ],
        "調優流程": [
            "1. 監控和識別性能瓶頸",
            "2. 分析查詢執行計劃",
            "3. 識別具體問題類型",
            "4. 應用相應的優化策略",
            "5. 測試和驗證優化效果",
            "6. 持續監控和調整"
        ]
    }
    
    for category, points in summary_points.items():
        print(f"\n{category}:")
        for point in points:
            print(f"  • {point}")
    
    # 性能調優工具箱
    print("\n性能調優工具箱:")
    toolbox = {
        "內建工具": [
            "Spark UI (http://localhost:4040)",
            "History Server",
            "SQL Tab",
            "Storage Tab",
            "Executors Tab"
        ],
        "程式化工具": [
            "df.explain()",
            "df.cache()",
            "df.repartition()",
            "broadcast()",
            "spark.sql.functions.*"
        ],
        "配置選項": [
            "spark.sql.adaptive.enabled",
            "spark.sql.adaptive.coalescePartitions.enabled",
            "spark.sql.adaptive.skewJoin.enabled",
            "spark.sql.shuffle.partitions",
            "spark.serializer"
        ],
        "第三方工具": [
            "Dr. Elephant (LinkedIn)",
            "Sparklens (Qubole)",
            "Spark Profiler",
            "Ganglia",
            "Grafana + Prometheus"
        ]
    }
    
    for category, tools in toolbox.items():
        print(f"\n{category}:")
        for tool in tools:
            print(f"  • {tool}")
    
    print("\n下一步建議:")
    next_steps = [
        "在生產環境中實踐這些調優技術",
        "建立性能監控和報警機制",
        "學習更高階的調優技術（如 Tungsten、Catalyst）",
        "了解特定工作負載的最佳實踐",
        "探索雲端 Spark 服務的調優選項",
        "參與 Spark 社群，學習最新的優化技術"
    ]
    
    for i, step in enumerate(next_steps, 1):
        print(f"  {i}. {step}")

performance_tuning_summary()

In [None]:
# 清理資源
print("清理資源...")

# 清除快取
spark.catalog.clearCache()

# 停止 Spark 會話
spark.stop()

print("Spark 會話已結束")
print("\n感謝您完成 Spark 性能調優課程！")
print("記住：性能調優是一個持續的過程，需要根據具體的工作負載和環境進行調整。")