# PySpark SQL 進階教學 🚀

這份教學專為已經掌握 SQL 基礎的你設計！
我們將深入探討進階的 SQL 技巧，包括視窗函數、CTE、效能優化等主題。
讓你的 SQL 技能更上一層樓！✨

作者：QChoice AI 教學團隊  
日期：2025-01-15

## 🎯 環境設定

In [None]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time

# 創建 Spark Session
spark = SparkSession.builder \
    .appName("SQL進階教學") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

print("🚀 歡迎來到 SQL 進階課程！")

In [None]:
# 🎯 環境設定 - 準備進階開發環境


In [None]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
# 創建 Spark Session - 配置進階效能參數
spark = SparkSession.builder \
    .appName("SQL進階教學") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()
print("🚀 歡迎來到 SQL 進階課程！")
print("=" * 60)


## 📚 第一章：視窗函數 (Window Functions)

In [None]:
# 📚 第一章：視窗函數 (Window Functions)


🎈 概念解釋：
ROW_NUMBER() 為分區內的每一列分配一個唯一的序號。
常用於排名、去重、分頁等場景。

🎯 AI Prompt 範例：
請幫我寫一個 PySpark 程式，為每個部門的員工按薪資排序並分配序號。


In [None]:
print("\n📌 範例 1: ROW_NUMBER - 排名與編號")
# 建立員工資料
employees_data = [
    (1, "Alice", "工程部", 75000),
    (2, "Bob", "工程部", 82000),
    (3, "Charlie", "工程部", 68000),
    (4, "David", "行銷部", 65000),
    (5, "Eve", "行銷部", 72000),
    (6, "Frank", "行銷部", 70000),
    (7, "Grace", "人資部", 60000),
    (8, "Henry", "人資部", 63000)
]
employees_df = spark.createDataFrame(
    employees_data, 
    ["員工編號", "姓名", "部門", "薪資"]
)
# 定義視窗規格：按部門分組，按薪資降序排列
windowSpec = Window.partitionBy("部門").orderBy(desc("薪資"))
# 使用 ROW_NUMBER
result = employees_df.withColumn("部門排名", row_number().over(windowSpec))
print("\n使用 ROW_NUMBER 進行部門內排名：")
result.orderBy("部門", "部門排名").show()
# SQL 寫法
employees_df.createOrReplaceTempView("employees")


In [None]:
sql_result = spark.sql("""
    SELECT 
        `員工編號`,
        `姓名`,
        `部門`,
        `薪資`,
        ROW_NUMBER() OVER (PARTITION BY `部門` ORDER BY `薪資` DESC) as `部門排名`
    FROM employees
    ORDER BY `部門`, `部門排名`
""")
print("\nSQL 寫法結果：")
sql_result.show()


🎈 概念解釋：
RANK() 和 DENSE_RANK() 都用於排名，但處理相同值的方式不同：
- RANK(): 相同值得到相同排名，下一個排名會跳號
- DENSE_RANK(): 相同值得到相同排名，下一個排名連續

🎯 應用場景：
成績排名、銷售業績排名等需要處理並列情況的場景


In [None]:
print("\n📌 範例 2: RANK vs DENSE_RANK - 並列排名處理")
# 建立考試成績資料
scores_data = [
    (1, "Alice", 95),
    (2, "Bob", 95),
    (3, "Charlie", 90),
    (4, "David", 90),
    (5, "Eve", 85),
    (6, "Frank", 80),
]
scores_df = spark.createDataFrame(scores_data, ["學號", "姓名", "分數"])
# 定義視窗規格
windowSpec = Window.orderBy(desc("分數"))
result = scores_df.withColumn("RANK", rank().over(windowSpec)) \
                  .withColumn("DENSE_RANK", dense_rank().over(windowSpec)) \
                  .withColumn("ROW_NUMBER", row_number().over(windowSpec))
print("\n比較三種排名函數：")
result.show()


💡 說明：
- Alice 和 Bob 都是 95 分，RANK 和 DENSE_RANK 都是 1
- Charlie 和 David 都是 90 分：
  * RANK 是 3（因為前面有 2 個人）
  * DENSE_RANK 是 2（連續編號）
- ROW_NUMBER 為每個人分配唯一編號


🎈 概念解釋：
LEAD() 和 LAG() 允許你訪問當前列之前或之後的資料：
- LAG(): 訪問前面的列
- LEAD(): 訪問後面的列

🎯 應用場景：
計算增長率、同比環比分析、時間序列分析


In [None]:
print("\n📌 範例 3: LEAD & LAG - 時間序列分析")
# 建立月度銷售資料
sales_data = [
    ("2024-01", 100000),
    ("2024-02", 120000),
    ("2024-03", 115000),
    ("2024-04", 130000),
    ("2024-05", 135000),
    ("2024-06", 140000),
]
sales_df = spark.createDataFrame(sales_data, ["月份", "銷售額"])
# 定義視窗規格
windowSpec = Window.orderBy("月份")
result = sales_df.withColumn("上月銷售額", lag("銷售額", 1).over(windowSpec)) \
                 .withColumn("下月銷售額", lead("銷售額", 1).over(windowSpec)) \
                 .withColumn("月增長額", col("銷售額") - lag("銷售額", 1).over(windowSpec)) \
                 .withColumn("月增長率%", 
                            round((col("銷售額") - lag("銷售額", 1).over(windowSpec)) / 
                                  lag("銷售額", 1).over(windowSpec) * 100, 2))
print("\n銷售額月度分析：")
result.show()


🎈 概念解釋：
視窗函數配合聚合函數可以進行累計計算，而不會折疊資料列。

🎯 應用場景：
累計銷售額、移動平均、滾動統計


In [None]:
print("\n📌 範例 4: 累計計算與移動平均")
# 繼續使用銷售資料
windowSpec = Window.orderBy("月份").rowsBetween(Window.unboundedPreceding, Window.currentRow)
movingAvgSpec = Window.orderBy("月份").rowsBetween(-2, 0)  # 3個月移動平均
result = sales_df.withColumn("累計銷售額", sum("銷售額").over(windowSpec)) \
                 .withColumn("3月移動平均", round(avg("銷售額").over(movingAvgSpec), 2))
print("\n累計銷售額與移動平均：")
result.show()


## 💪 練習題 1

**題目：**

請建立一個銷售業績分析系統：
1. 建立銷售員月度銷售資料
2. 使用 ROW_NUMBER 為每個區域的銷售員排名
3. 使用 LAG 計算每月環比增長率
4. 計算每個銷售員的累計業績
5. 計算 3 個月移動平均

**提示：綜合運用 Window Functions：row_number、lag、sum、avg**

<details>
<summary>📝 點擊查看參考答案</summary>

```python
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, lag, col, round, sum, avg

# 1. 建立銷售資料
sales_data = [
    ("張三", "北區", "2024-01", 150000),
    ("張三", "北區", "2024-02", 180000),
    ("張三", "北區", "2024-03", 165000),
    ("李四", "北區", "2024-01", 140000),
    ("李四", "北區", "2024-02", 155000),
    ("李四", "北區", "2024-03", 170000),
    ("王五", "南區", "2024-01", 200000),
    ("王五", "南區", "2024-02", 220000),
    ("王五", "南區", "2024-03", 210000)
]

sales_df = spark.createDataFrame(
    sales_data,
    ["銷售員", "區域", "月份", "銷售額"]
)

print("=== 1. 原始銷售資料 ===")
sales_df.orderBy("區域", "銷售員", "月份").show()

# 2. 區域排名（按總業績）
print("\n=== 2. 各區域銷售員排名 ===")
window_rank = Window.partitionBy("區域").orderBy(col("銷售額").desc())
ranked_df = sales_df.withColumn("區域排名", row_number().over(window_rank))
ranked_df.orderBy("區域", "區域排名", "月份").show()

# 3. 環比增長率
print("\n=== 3. 月度環比增長率 ===")
window_lag = Window.partitionBy("銷售員").orderBy("月份")
growth_df = sales_df \
    .withColumn("上月銷售額", lag("銷售額", 1).over(window_lag)) \
    .withColumn(
        "環比增長率%",
        round((col("銷售額") - col("上月銷售額")) / col("上月銷售額") * 100, 2)
    )
growth_df.orderBy("銷售員", "月份").show()

# 4. 累計業績
print("\n=== 4. 累計業績 ===")
window_cumsum = Window.partitionBy("銷售員").orderBy("月份") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
cumsum_df = sales_df \
    .withColumn("累計銷售額", sum("銷售額").over(window_cumsum))
cumsum_df.orderBy("銷售員", "月份").show()

# 5. 3個月移動平均
print("\n=== 5. 3個月移動平均 ===")
window_ma = Window.partitionBy("銷售員").orderBy("月份") \
    .rowsBetween(-2, Window.currentRow)
ma_df = sales_df \
    .withColumn("3月移動平均", round(avg("銷售額").over(window_ma), 0))
ma_df.orderBy("銷售員", "月份").show()
```

</details>

---


## 📚 第二章：通用表表達式 (Common Table Expressions - CTE)

In [None]:
# 📚 第二章：通用表表達式 (Common Table Expressions - CTE)


🎈 概念解釋：
CTE (WITH 子句) 讓你建立臨時的命名結果集，提高查詢的可讀性和可維護性。
可以把複雜查詢分解為多個步驟。

🎯 優勢：
1. 提高可讀性
2. 可重複使用
3. 便於除錯
4. 支援遞迴查詢


In [None]:
print("\n📌 範例 5: CTE - 簡化複雜查詢")
# 建立訂單資料
orders_data = [
    (1, "Alice", "2024-01-15", 1500),
    (2, "Bob", "2024-01-20", 2300),
    (3, "Alice", "2024-02-10", 1800),
    (4, "Charlie", "2024-02-15", 3200),
    (5, "Bob", "2024-03-05", 2100),
    (6, "Alice", "2024-03-20", 2500),
]
orders_df = spark.createDataFrame(
    orders_data,
    ["訂單編號", "客戶", "訂單日期", "金額"]
)
orders_df.createOrReplaceTempView("orders")
# 使用 CTE 計算客戶總消費和平均消費


In [None]:
sql_with_cte = spark.sql("""
    WITH customer_stats AS (
        SELECT 
            `客戶`,
            COUNT(*) as `訂單數`,
            SUM(`金額`) as `總消費`,
            AVG(`金額`) as `平均消費`
        FROM orders
        GROUP BY `客戶`
    ),
    high_value_customers AS (
        SELECT 
            `客戶`,
            `訂單數`,
            `總消費`,
            ROUND(`平均消費`, 2) as `平均消費`
        FROM customer_stats
        WHERE `總消費` > 5000
    )
    SELECT * FROM high_value_customers
    ORDER BY `總消費` DESC
""")
print("\n高價值客戶分析（使用 CTE）：")
sql_with_cte.show()


🎈 概念解釋：
多層 CTE 可以將複雜的業務邏輯分解為多個清晰的步驟。

🎯 應用場景：
多步驟的資料轉換、複雜的業務指標計算


In [None]:
print("\n📌 範例 6: 多層 CTE - 客戶分級分析")

In [None]:
sql_multi_cte = spark.sql("""
    WITH monthly_sales AS (
        -- 第一層：計算每個客戶的月度銷售
        SELECT 
            `客戶`,
            DATE_FORMAT(TO_DATE(`訂單日期`), 'yyyy-MM') as `月份`,
            SUM(`金額`) as `月銷售額`
        FROM orders
        GROUP BY `客戶`, DATE_FORMAT(TO_DATE(`訂單日期`), 'yyyy-MM')
    ),
    customer_summary AS (
        -- 第二層：匯總客戶統計
        SELECT 
            `客戶`,
            COUNT(DISTINCT `月份`) as `活躍月數`,
            SUM(`月銷售額`) as `總銷售額`,
            AVG(`月銷售額`) as `平均月銷售額`
        FROM monthly_sales
        GROUP BY `客戶`
    ),
    customer_level AS (
        -- 第三層：客戶分級
        SELECT 
            `客戶`,
            `活躍月數`,
            ROUND(`總銷售額`, 2) as `總銷售額`,
            ROUND(`平均月銷售額`, 2) as `平均月銷售額`,
            CASE 
                WHEN `總銷售額` >= 6000 THEN '白金客戶'
                WHEN `總銷售額` >= 4000 THEN '金牌客戶'
                ELSE '一般客戶'
            END as `客戶等級`
        FROM customer_summary
    )
    SELECT * FROM customer_level
    ORDER BY `總銷售額` DESC
""")

print("\n客戶分級結果：")
sql_multi_cte.show()


## 💪 練習題 2

**題目：**

請使用 CTE 建立一個完整的客戶價值分析系統：
1. 計算每個客戶的 RFM 指標（最近購買、購買頻率、購買金額）
2. 根據 RFM 分數對客戶分級
3. 分析各級別客戶的特徵
4. 給出營銷建議

**提示：使用多層 CTE，結合 CASE WHEN 進行分級**

<details>
<summary>📝 點擊查看參考答案</summary>

```python
from datetime import date

# 建立訂單資料
orders_data = [
    (1, "Alice", "2024-01-05", 1500),
    (2, "Alice", "2024-02-10", 2000),
    (3, "Alice", "2024-03-15", 1800),
    (4, "Bob", "2024-01-08", 3000),
    (5, "Bob", "2024-01-20", 2500),
    (6, "Charlie", "2024-02-01", 5000),
    (7, "David", "2023-12-15", 1000),
    (8, "Eve", "2024-03-10", 4000),
    (9, "Eve", "2024-03-20", 3500)
]

orders_df = spark.createDataFrame(
    orders_data,
    ["訂單編號", "客戶", "訂單日期", "金額"]
)
orders_df.createOrReplaceTempView("orders")

# 使用多層 CTE 進行 RFM 分析
result = spark.sql("""
WITH rfm_base AS (
    -- 計算基礎 RFM 指標
    SELECT 
        客戶,
        DATEDIFF('2024-03-31', MAX(訂單日期)) as 最近購買天數,
        COUNT(*) as 購買頻率,
        SUM(金額) as 購買金額,
        ROUND(AVG(金額), 0) as 平均訂單金額
    FROM orders
    GROUP BY 客戶
),
rfm_score AS (
    -- 計算 RFM 分數（1-5分）
    SELECT 
        客戶,
        最近購買天數,
        購買頻率,
        購買金額,
        平均訂單金額,
        CASE 
            WHEN 最近購買天數 <= 15 THEN 5
            WHEN 最近購買天數 <= 30 THEN 4
            WHEN 最近購買天數 <= 60 THEN 3
            WHEN 最近購買天數 <= 90 THEN 2
            ELSE 1
        END as R分數,
        CASE 
            WHEN 購買頻率 >= 3 THEN 5
            WHEN 購買頻率 >= 2 THEN 4
            ELSE 3
        END as F分數,
        CASE 
            WHEN 購買金額 >= 5000 THEN 5
            WHEN 購買金額 >= 4000 THEN 4
            WHEN 購買金額 >= 3000 THEN 3
            ELSE 2
        END as M分數
    FROM rfm_base
),
customer_segment AS (
    -- 客戶分級
    SELECT 
        客戶,
        最近購買天數,
        購買頻率,
        購買金額,
        平均訂單金額,
        R分數,
        F分數,
        M分數,
        (R分數 + F分數 + M分數) as 總分,
        CASE 
            WHEN R分數 >= 4 AND F分數 >= 4 AND M分數 >= 4 THEN '重要價值客戶'
            WHEN R分數 >= 4 AND F分數 < 4 THEN '重要喚回客戶'
            WHEN R分數 < 4 AND F分數 >= 4 THEN '重要保持客戶'
            WHEN M分數 >= 4 THEN '重要發展客戶'
            ELSE '一般客戶'
        END as 客戶等級
    FROM rfm_score
)
SELECT 
    客戶,
    客戶等級,
    最近購買天數 || '天前' as 最後購買,
    購買頻率 || '次' as 購買次數,
    購買金額,
    平均訂單金額,
    總分
FROM customer_segment
ORDER BY 總分 DESC, 購買金額 DESC
""")

print("=== RFM 客戶價值分析 ===")
result.show(truncate=False)

# 統計各等級客戶分布
print("\n=== 客戶等級分布 ===")
segment_stats = spark.sql("""
WITH rfm_base AS (
    SELECT 
        客戶,
        DATEDIFF('2024-03-31', MAX(訂單日期)) as 最近購買天數,
        COUNT(*) as 購買頻率,
        SUM(金額) as 購買金額
    FROM orders
    GROUP BY 客戶
),
rfm_score AS (
    SELECT 
        客戶,
        CASE WHEN 最近購買天數 <= 15 THEN 5 WHEN 最近購買天數 <= 30 THEN 4 WHEN 最近購買天數 <= 60 THEN 3 WHEN 最近購買天數 <= 90 THEN 2 ELSE 1 END as R分數,
        CASE WHEN 購買頻率 >= 3 THEN 5 WHEN 購買頻率 >= 2 THEN 4 ELSE 3 END as F分數,
        CASE WHEN 購買金額 >= 5000 THEN 5 WHEN 購買金額 >= 4000 THEN 4 WHEN 購買金額 >= 3000 THEN 3 ELSE 2 END as M分數
    FROM rfm_base
),
customer_segment AS (
    SELECT 
        CASE 
            WHEN R分數 >= 4 AND F分數 >= 4 AND M分數 >= 4 THEN '重要價值客戶'
            WHEN R分數 >= 4 AND F分數 < 4 THEN '重要喚回客戶'
            WHEN R分數 < 4 AND F分數 >= 4 THEN '重要保持客戶'
            WHEN M分數 >= 4 THEN '重要發展客戶'
            ELSE '一般客戶'
        END as 客戶等級
    FROM rfm_score
)
SELECT 
    客戶等級,
    COUNT(*) as 客戶數量
FROM customer_segment
GROUP BY 客戶等級
ORDER BY 客戶數量 DESC
""")
segment_stats.show(truncate=False)

print("""
💡 營銷建議：
1. 重要價值客戶：提供 VIP 服務，專屬優惠
2. 重要喚回客戶：發送個性化促銷，新品推薦
3. 重要保持客戶：會員積分獎勵，購買提醒
4. 重要發展客戶：交叉銷售，提升購買頻率
5. 一般客戶：大眾促銷活動
""")
```

</details>

---


## 📚 第三章：子查詢優化

In [None]:
# 📚 第三章：子查詢優化


🎈 概念解釋：
- 非相關子查詢：內部查詢獨立執行，只執行一次
- 相關子查詢：內部查詢依賴外部查詢，可能執行多次（效能較差）

🎯 優化建議：
盡量使用 JOIN 或視窗函數替代相關子查詢


In [None]:
print("\n📌 範例 7: 子查詢優化 - 找出高於部門平均薪資的員工")

In [None]:
# 方法 1：使用相關子查詢（較慢）
sql_correlated = spark.sql("""
    SELECT 
        e1.`姓名`,
        e1.`部門`,
        e1.`薪資`,
        (SELECT AVG(e2.`薪資`) 
         FROM employees e2 
         WHERE e2.`部門` = e1.`部門`) as `部門平均薪資`
    FROM employees e1
    WHERE e1.`薪資` > (
        SELECT AVG(e2.`薪資`) 
        FROM employees e2 
        WHERE e2.`部門` = e1.`部門`
    )
    ORDER BY e1.`部門`, e1.`薪資` DESC
""")

print("\n方法 1 - 相關子查詢：")
sql_correlated.show()

In [None]:
# 方法 2：使用 JOIN（較快）
sql_join = spark.sql("""
    WITH dept_avg AS (
        SELECT `部門`, AVG(`薪資`) as `平均薪資`
        FROM employees
        GROUP BY `部門`
    )
    SELECT 
        e.`姓名`,
        e.`部門`,
        e.`薪資`,
        ROUND(d.`平均薪資`, 2) as `部門平均薪資`
    FROM employees e
    JOIN dept_avg d ON e.`部門` = d.`部門`
    WHERE e.`薪資` > d.`平均薪資`
    ORDER BY e.`部門`, e.`薪資` DESC
""")

print("\n方法 2 - 使用 JOIN（推薦）：")
sql_join.show()
# 方法 3：使用視窗函數（最快）
windowSpec = Window.partitionBy("部門")
result_window = employees_df.withColumn("部門平均薪資", round(avg("薪資").over(windowSpec), 2)) \
                            .filter(col("薪資") > col("部門平均薪資")) \
                            .orderBy("部門", desc("薪資"))
print("\n方法 3 - 使用視窗函數（最推薦）：")
result_window.show()


## 💪 練習題 3

**題目：**

請完成一個綜合查詢優化練習：
1. 找出銷售額超過部門平均值的員工（用3種方法實現）
2. 比較子查詢、CTE、JOIN 三種方法的執行計畫
3. 使用 broadcast join 優化小表連接
4. 總結效能優化建議

**提示：使用 explain()、broadcast() 函數**

<details>
<summary>📝 點擊查看參考答案</summary>

```python
from pyspark.sql.functions import broadcast, col, avg

# 建立員工銷售資料
emp_sales_data = [
    (1, "張三", "業務部", 150000),
    (2, "李四", "業務部", 200000),
    (3, "王五", "業務部", 120000),
    (4, "趙六", "工程部", 180000),
    (5, "錢七", "工程部", 160000),
    (6, "孫八", "工程部", 140000)
]

emp_sales_df = spark.createDataFrame(
    emp_sales_data,
    ["員工編號", "姓名", "部門", "銷售額"]
)
emp_sales_df.createOrReplaceTempView("employee_sales")

# ====================================================================
# 方法 1: 使用相關子查詢
# ====================================================================
print("=== 方法 1: 相關子查詢 ===")
result1 = spark.sql("""
    SELECT 
        e.姓名,
        e.部門,
        e.銷售額,
        (SELECT AVG(銷售額) FROM employee_sales WHERE 部門 = e.部門) as 部門平均
    FROM employee_sales e
    WHERE e.銷售額 > (
        SELECT AVG(銷售額) 
        FROM employee_sales 
        WHERE 部門 = e.部門
    )
    ORDER BY e.部門, e.銷售額 DESC
""")
result1.show()
print("\n執行計畫:")
result1.explain(mode="simple")

# ====================================================================
# 方法 2: 使用 CTE
# ====================================================================
print("\n=== 方法 2: CTE ===")
result2 = spark.sql("""
    WITH dept_avg AS (
        SELECT 
            部門,
            AVG(銷售額) as 平均銷售額
        FROM employee_sales
        GROUP BY 部門
    )
    SELECT 
        e.姓名,
        e.部門,
        e.銷售額,
        d.平均銷售額
    FROM employee_sales e
    INNER JOIN dept_avg d ON e.部門 = d.部門
    WHERE e.銷售額 > d.平均銷售額
    ORDER BY e.部門, e.銷售額 DESC
""")
result2.show()
print("\n執行計畫:")
result2.explain(mode="simple")

# ====================================================================
# 方法 3: 使用 Window Function（最優）
# ====================================================================
print("\n=== 方法 3: Window Function（推薦）===")
from pyspark.sql.window import Window

window_spec = Window.partitionBy("部門")

result3 = emp_sales_df \
    .withColumn("部門平均", avg("銷售額").over(window_spec)) \
    .filter(col("銷售額") > col("部門平均")) \
    .orderBy("部門", col("銷售額").desc())

result3.show()
print("\n執行計畫:")
result3.explain(mode="simple")

# ====================================================================
# 方法 4: Broadcast Join 優化（小表場景）
# ====================================================================
print("\n=== 方法 4: Broadcast Join ===")

# 計算部門平均（小表）
dept_avg_df = emp_sales_df.groupBy("部門") \
    .agg(avg("銷售額").alias("平均銷售額"))

# 使用 broadcast 優化
result4 = emp_sales_df.join(
    broadcast(dept_avg_df),
    "部門"
).filter(col("銷售額") > col("平均銷售額")) \
    .select("姓名", "部門", "銷售額", "平均銷售額") \
    .orderBy("部門", col("銷售額").desc())

result4.show()
print("\n執行計畫（注意 BroadcastHashJoin）:")
result4.explain(mode="simple")

# ====================================================================
# 效能比較總結
# ====================================================================
print("""
\n📊 效能優化總結：

1. **相關子查詢**：
   - ❌ 效能最差，每行都執行一次子查詢
   - ❌ 避免在大數據集使用

2. **CTE + JOIN**：
   - ✓ 效能較好，子查詢只執行一次
   - ✓ 程式碼可讀性高
   
3. **Window Function**：
   - ✅ 效能最佳，單次掃描完成
   - ✅ Spark 內部優化良好
   - ✅ **推薦使用**

4. **Broadcast Join**：
   - ✅ 適用於小表（<10MB）
   - ✅ 避免 shuffle，效能提升明顯
   - ⚠️  需確認表大小

💡 最佳實踐：
1. 優先使用 Window Functions
2. 避免相關子查詢
3. 小表使用 broadcast
4. 合理分區（partition）
5. 使用 cache() 暫存中間結果
6. 查看執行計畫優化查詢
""")
```

</details>

---


## 📚 第四章：進階 JOIN 技巧

In [None]:
# 📚 第四章：進階 JOIN 技巧


🎈 概念解釋：
CROSS JOIN 產生兩個表的笛卡爾積，即所有可能的組合。

🎯 應用場景：
生成測試資料、建立時間序列、產生所有可能的組合


In [None]:
print("\n📌 範例 8: CROSS JOIN - 產生所有可能的配對")
# 建立產品和顏色資料
products = spark.createDataFrame([("手機",), ("平板",), ("筆電",)], ["產品"])
colors = spark.createDataFrame([("黑色",), ("白色",), ("銀色",)], ["顏色"])
# CROSS JOIN
cross_result = products.crossJoin(colors)
print("\n產品與顏色的所有組合：")
cross_result.show()


🎈 概念解釋：
SELF JOIN 是表與自己進行連接，常用於查找層級關係或比較同表內的記錄。

🎯 應用場景：
員工-主管關係、組織架構、尋找重複記錄


In [None]:
print("\n📌 範例 9: SELF JOIN - 員工與主管關係")
# 建立包含主管資訊的員工資料
emp_manager_data = [
    (1, "Alice", None),      # CEO，沒有主管
    (2, "Bob", 1),           # Bob 的主管是 Alice
    (3, "Charlie", 1),       # Charlie 的主管是 Alice
    (4, "David", 2),         # David 的主管是 Bob
    (5, "Eve", 2),           # Eve 的主管是 Bob
    (6, "Frank", 3),         # Frank 的主管是 Charlie
]
emp_manager_df = spark.createDataFrame(
    emp_manager_data,
    ["員工編號", "姓名", "主管編號"]
)
emp_manager_df.createOrReplaceTempView("emp_manager")
# 使用 SELF JOIN 查詢員工和其主管


In [None]:
# 使用 SELF JOIN 查詢員工和其主管
sql_self_join = spark.sql("""
    SELECT 
        e.`員工編號`,
        e.`姓名` as `員工姓名`,
        COALESCE(m.`姓名`, '無主管') as `主管姓名`
    FROM emp_manager e
    LEFT JOIN emp_manager m ON e.`主管編號` = m.`員工編號`
    ORDER BY e.`員工編號`
""")

print("\n員工與主管關係：")
sql_self_join.show()


In [None]:
# 🔟 ANTI JOIN & SEMI JOIN - 高效過濾


🎈 概念解釋：
- SEMI JOIN (LEFT SEMI): 返回左表中在右表中有匹配的記錄
- ANTI JOIN (LEFT ANTI): 返回左表中在右表中沒有匹配的記錄

🎯 優勢：
比 IN / NOT IN 子查詢效能更好


In [None]:
print("\n📌 範例 10: ANTI JOIN & SEMI JOIN - 客戶訂單分析")
# 建立客戶資料
customers_data = [
    (1, "Alice"),
    (2, "Bob"),
    (3, "Charlie"),
    (4, "David"),
    (5, "Eve"),
]
customers_df = spark.createDataFrame(customers_data, ["客戶編號", "客戶姓名"])
# 建立訂單資料（只有部分客戶有訂單）
order_customers = orders_df.select("客戶").distinct()
# SEMI JOIN - 有訂單的客戶
customers_with_orders = customers_df.join(
    order_customers,
    customers_df["客戶姓名"] == order_customers["客戶"],
    "leftsemi"
)
print("\n有訂單的客戶（SEMI JOIN）：")
customers_with_orders.show()
# ANTI JOIN - 沒有訂單的客戶
customers_without_orders = customers_df.join(
    order_customers,
    customers_df["客戶姓名"] == order_customers["客戶"],
    "leftanti"
)
print("\n沒有訂單的客戶（ANTI JOIN）：")
customers_without_orders.show()


## 📚 第五章：效能優化技巧

In [None]:
# 📚 第五章：效能優化技巧


🎈 概念解釋：
當一個大表和一個小表進行 JOIN 時，可以將小表廣播到所有節點，
避免 shuffle 操作，大幅提升效能。

🎯 適用場景：
小表 < 10MB，大表與維度表 JOIN


In [None]:
print("\n📌 範例 11: Broadcast Join - 優化大小表連接")
# 建立部門資訊（小表）
departments_data = [
    ("工程部", "技術大樓"),
    ("行銷部", "行政大樓"),
    ("人資部", "行政大樓"),
]
departments_df = spark.createDataFrame(departments_data, ["部門", "辦公地點"])
# 一般 JOIN
normal_join = employees_df.join(departments_df, "部門")
print("\n一般 JOIN：")
normal_join.select("姓名", "部門", "辦公地點").show(5)
# Broadcast JOIN
broadcast_join = employees_df.join(
    broadcast(departments_df),
    "部門"
)
print("\nBroadcast JOIN（效能更好）：")
broadcast_join.select("姓名", "部門", "辦公地點").show(5)
# 查看執行計畫
print("\n執行計畫差異：")
print("一般 JOIN 會有 SortMergeJoin")
print("Broadcast JOIN 會有 BroadcastHashJoin")


🎈 概念解釋：
- Partitioning: 按欄位值將資料分割成多個目錄
- Bucketing: 按 hash 值將資料分割成固定數量的檔案

🎯 優勢：
減少掃描的資料量，提升查詢效能


In [None]:
print("\n📌 範例 12: 分區與過濾優化")
# 重新分區資料
employees_partitioned = employees_df.repartition(2, "部門")
print("\n重新分區後的分區數：", employees_partitioned.rdd.getNumPartitions())
# 使用分區欄位過濾（效能更好）
filtered = employees_partitioned.filter(col("部門") == "工程部")
print("\n過濾後的結果：")
filtered.show()


🎈 概念解釋：
將常用的 DataFrame 快取到記憶體中，避免重複計算。

🎯 適用場景：
多次使用相同的中間結果、迭代計算


In [None]:
print("\n📌 範例 13: 快取優化")
# 建立一個複雜的計算
complex_df = employees_df.groupBy("部門") \
                        .agg(
                            count("*").alias("人數"),
                            avg("薪資").alias("平均薪資"),
                            max("薪資").alias("最高薪資")
                        )
# 快取結果
complex_df.cache()
# 第一次執行（會觸發計算並快取）
print("\n第一次查詢（計算並快取）：")
start = time.time()
complex_df.show()
print(f"執行時間: {time.time() - start:.4f} 秒")
# 第二次執行（從快取讀取）
print("\n第二次查詢（從快取讀取）：")
start = time.time()
complex_df.show()
print(f"執行時間: {time.time() - start:.4f} 秒（應該更快）")
# 釋放快取
complex_df.unpersist()


## 📚 第六章：複雜資料轉換

In [None]:
# 📚 第六章：複雜資料轉換


🎈 概念解釋：
PIVOT 將行資料轉換為列，常用於建立交叉表。

🎯 應用場景：
銷售報表、資料透視表、趨勢分析


In [None]:
print("\n📌 範例 14: PIVOT - 建立部門薪資透視表")
# 使用 PIVOT 建立部門-統計指標透視表
employees_df.createOrReplaceTempView("employees")


In [None]:
pivot_result = spark.sql("""
    SELECT * FROM (
        SELECT `部門`, `薪資`
        FROM employees
    )
    PIVOT (
        COUNT(*) as `人數`,
        AVG(`薪資`) as `平均薪資`
        FOR `部門` IN ('工程部', '行銷部', '人資部')
    )
""")

print("\n部門薪資透視表：")
pivot_result.show()


🎈 概念解釋：
UNPIVOT 將列資料轉換為行，是 PIVOT 的反向操作。

🎯 應用場景：
將寬表轉換為長表、資料正規化


In [None]:
print("\n📌 範例 15: UNPIVOT - 列轉行")
# 建立寬表格式的季度銷售資料
quarterly_sales = spark.createDataFrame([
    ("產品A", 100, 120, 115, 130),
    ("產品B", 80, 90, 95, 100),
], ["產品", "Q1", "Q2", "Q3", "Q4"])
quarterly_sales.createOrReplaceTempView("quarterly_sales")
# 使用 UNPIVOT（Spark 3.4+）或使用 stack 函數
unpivot_result = quarterly_sales.selectExpr(
    "`產品`",
    "stack(4, 'Q1', `Q1`, 'Q2', `Q2`, 'Q3', `Q3`, 'Q4', `Q4`) as (`季度`, `銷售額`)"
)
print("\n轉換為長表格式：")
unpivot_result.show()


🎈 概念解釋：
PySpark 支援複雜的資料型態如陣列、結構體、地圖等。

🎯 應用場景：
處理 JSON 資料、巢狀結構、多值欄位


In [None]:
print("\n📌 範例 16: 陣列操作")
# 建立包含陣列的資料
students_data = [
    (1, "Alice", ["數學", "物理", "化學"]),
    (2, "Bob", ["英文", "歷史"]),
    (3, "Charlie", ["數學", "英文", "體育"]),
]
students_df = spark.createDataFrame(
    students_data,
    ["學號", "姓名", "選修課程"]
)
print("\n原始資料（包含陣列）：")
students_df.show(truncate=False)
# 展開陣列
exploded = students_df.select(
    "學號",
    "姓名",
    explode("選修課程").alias("課程")
)
print("\n展開陣列後：")
exploded.show()
# 陣列相關函數
array_operations = students_df.select(
    "姓名",
    "選修課程",
    size("選修課程").alias("課程數"),
    array_contains("選修課程", "數學").alias("是否選修數學")
)
print("\n陣列操作：")
array_operations.show(truncate=False)


## 📚 第七章：資料品質與清理

In [None]:
# 📚 第七章：資料品質與清理


🎈 概念解釋：
NULL 值處理是資料清理的重要環節。

🎯 常用方法：
fillna, dropna, coalesce, nvl


In [None]:
print("\n📌 範例 17: NULL 值處理")
# 建立包含 NULL 的資料
data_with_null = [
    (1, "Alice", 75000, "工程部"),
    (2, "Bob", None, "行銷部"),
    (3, "Charlie", 68000, None),
    (4, None, 65000, "人資部"),
]
df_null = spark.createDataFrame(
    data_with_null,
    ["員工編號", "姓名", "薪資", "部門"]
)
print("\n原始資料（包含 NULL）：")
df_null.show()
# 方法 1: 填充預設值
filled = df_null.fillna({
    "姓名": "未知",
    "薪資": 60000,
    "部門": "待分配"
})
print("\n填充預設值後：")
filled.show()
# 方法 2: 刪除包含 NULL 的列
dropped = df_null.dropna()
print("\n刪除 NULL 列後：")
dropped.show()
# 方法 3: 使用 COALESCE
coalesced = df_null.select(
    "員工編號",
    coalesce(col("姓名"), lit("未知")).alias("姓名"),
    coalesce(col("薪資"), lit(60000)).alias("薪資"),
    coalesce(col("部門"), lit("待分配")).alias("部門")
)
print("\n使用 COALESCE 處理：")
coalesced.show()


🎈 概念解釋：
找出並移除重複的記錄。

🎯 方法：
distinct, dropDuplicates, 視窗函數


In [None]:
print("\n📌 範例 18: 資料去重")
# 建立包含重複資料
duplicate_data = [
    (1, "Alice", "工程部"),
    (2, "Bob", "行銷部"),
    (1, "Alice", "工程部"),  # 完全重複
    (3, "Alice", "人資部"),  # 姓名重複
]
df_dup = spark.createDataFrame(
    duplicate_data,
    ["員工編號", "姓名", "部門"]
)
print("\n原始資料（包含重複）：")
df_dup.show()
# 方法 1: 完全去重
dedup_all = df_dup.distinct()
print("\n完全去重：")
dedup_all.show()
# 方法 2: 按特定欄位去重（保留第一筆）
dedup_by_name = df_dup.dropDuplicates(["姓名"])
print("\n按姓名去重：")
dedup_by_name.show()
# 方法 3: 使用視窗函數保留特定規則的記錄
windowSpec = Window.partitionBy("姓名").orderBy("員工編號")
dedup_window = df_dup.withColumn("rn", row_number().over(windowSpec)) \
                     .filter(col("rn") == 1) \
                     .drop("rn")
print("\n使用視窗函數去重（保留編號最小的）：")
dedup_window.show()


## 📚 第八章：進階分析函數

In [None]:
# 📚 第八章：進階分析函數


🎈 概念解釋：
NTILE 將資料分成 N 個大致相等的組。

🎯 應用場景：
客戶分層、ABC 分析、分位數分析


In [None]:
print("\n📌 範例 19: NTILE - 薪資四分位數")
windowSpec = Window.orderBy("薪資")
ntile_result = employees_df.withColumn(
    "薪資分組",
    ntile(4).over(windowSpec)
).withColumn(
    "分組說明",
    when(col("薪資分組") == 1, "低薪組")
    .when(col("薪資分組") == 2, "中低薪組")
    .when(col("薪資分組") == 3, "中高薪組")
    .otherwise("高薪組")
)
print("\n薪資四分位數分組：")
ntile_result.orderBy("薪資").show()


🎈 概念解釋：
PERCENT_RANK 計算值在資料集中的相對位置（0 到 1 之間）。

🎯 應用場景：
績效評估、成績分析、相對排名


In [None]:
print("\n📌 範例 20: PERCENT_RANK - 薪資百分位")
windowSpec = Window.orderBy("薪資")
percent_result = employees_df.withColumn(
    "薪資百分位",
    round(percent_rank().over(windowSpec) * 100, 2)
).withColumn(
    "說明",
    concat(
        lit("薪資超過 "),
        round(percent_rank().over(windowSpec) * 100, 0).cast("int"),
        lit("% 的員工")
    )
)
print("\n薪資百分位排名：")
percent_result.orderBy("薪資").select("姓名", "薪資", "薪資百分位", "說明").show()


## 🎓 課程總結

In [None]:
# 🎓 課程總結


In [None]:
print("\n" + "=" * 60)
print("🎊 恭喜！你已經完成了 SQL 進階課程！")
print("=" * 60)


📝 進階技能總結：

1️⃣ 視窗函數：
   - ROW_NUMBER, RANK, DENSE_RANK：排名與編號
   - LEAD, LAG：訪問前後列資料
   - SUM/AVG OVER：累計計算與移動平均
   - NTILE, PERCENT_RANK：分組與百分位

2️⃣ 通用表表達式 (CTE)：
   - WITH 子句：提高查詢可讀性
   - 多層 CTE：分解複雜邏輯
   - 遞迴 CTE：處理層級關係

3️⃣ 子查詢優化：
   - 避免相關子查詢
   - 使用 JOIN 替代
   - 使用視窗函數優化

4️⃣ 進階 JOIN：
   - CROSS JOIN：笛卡爾積
   - SELF JOIN：自我連接
   - SEMI/ANTI JOIN：高效過濾

5️⃣ 效能優化：
   - Broadcast Join：廣播小表
   - 分區與分桶：資料組織
   - 快取與持久化：避免重複計算

6️⃣ 複雜資料轉換：
   - PIVOT/UNPIVOT：行列轉換
   - 陣列與結構：處理複雜型態
   - 資料展開與聚合

7️⃣ 資料品質：
   - NULL 值處理
   - 資料去重
   - 資料驗證

💡 進階學習建議：
- 理解執行計畫，學會效能分析
- 掌握資料分區策略
- 熟悉 Spark UI 的使用
- 實踐 Delta Lake 進階功能
- 學習 Spark Streaming

🚀 下一步：
- 深入研究 Spark 內部機制
- 學習分散式系統原理
- 實踐大規模資料處理專案
- 探索機器學習與 Spark MLlib

記住：進階技能需要大量實踐，多做專案、多解決實際問題才能真正掌握！


In [None]:
print("=" * 60)
print("📚 進階教學文件結束 - 繼續精進你的技能！🚀")
print("=" * 60)
# 關閉 Spark Session
# spark.stop()  # 取消註解以關閉 Spark
