# PySpark 快速上手教程

## 第一步：安装 PySpark

在 Jupyter Notebook 中，我们使用虚拟环境或直接安装到用户目录

In [None]:
# 方法 1：安装到用户目录（推荐）
!pip install --user pyspark pandas numpy matplotlib

In [None]:
# ============================================================
# 环境设置：修复 Python 路径问题
# ============================================================
import sys

# 添加必要的路径到 sys.path
paths_to_add = [
    '/Users/zhengzhang/.local/lib/python3.9/site-packages',  # PySpark 位置
    '/Users/zhengzhang/opt/anaconda3/lib/python3.9/site-packages'  # Matplotlib, Pandas, Numpy 位置
]

for path in paths_to_add:
    if path not in sys.path:
        sys.path.insert(0, path)
        print(f"✓ 已添加路径: {path}")

print("\n验证所有依赖包...")

# 验证 PySpark
import pyspark
print(f"✓ PySpark 版本: {pyspark.__version__}")

# 验证其他依赖
import pandas as pd
print(f"✓ Pandas 版本: {pd.__version__}")

import numpy as np
print(f"✓ Numpy 版本: {np.__version__}")

import matplotlib
print(f"✓ Matplotlib 版本: {matplotlib.__version__}")

print("\n所有依赖包已就绪！")

In [None]:
# 验证安装
import pyspark
print(f"✓ PySpark 版本: {pyspark.__version__}")

## 第二步：创建 Spark Session

这是使用 PySpark 的第一步

In [None]:
from pyspark.sql import SparkSession

# 创建 Spark Session
spark = SparkSession.builder \
    .appName("PySpark 学习") \
    .master("local[*]") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

print(f"✓ Spark Session 创建成功！")
print(f"✓ Spark 版本: {spark.version}")
print(f"✓ Spark Web UI: http://localhost:4040")

## 示例 1：星型模式（Star Schema）数据仓库

这是数据仓库中最常用的模型，包含：
- **1个事实表**（Fact Table）：存储业务度量值和外键
- **多个维度表**（Dimension Tables）：存储描述性信息

In [None]:
# ============================================================
# 星型模式（Star Schema）示例 - 电商销售数据仓库
# ============================================================

# 1. 事实表（Fact Table）- 销售事实表
# 包含度量值（销售额、数量等）和维度表的外键
sales_fact_data = [
    (1, 101, 201, 301, 401, "2024-01-15", 2, 299.98, 59.99, 359.97),
    (2, 102, 202, 302, 402, "2024-01-15", 1, 899.00, 0.00, 899.00),
    (3, 103, 203, 303, 401, "2024-01-16", 3, 149.97, 15.00, 164.97),
    (4, 101, 204, 304, 403, "2024-01-16", 1, 599.00, 59.90, 658.90),
    (5, 104, 205, 301, 402, "2024-01-17", 5, 249.95, 0.00, 249.95),
    (6, 105, 201, 305, 404, "2024-01-17", 2, 1799.98, 180.00, 1979.98),
    (7, 102, 206, 302, 401, "2024-01-18", 1, 1299.00, 129.90, 1428.90),
    (8, 106, 207, 303, 403, "2024-01-18", 4, 199.96, 0.00, 199.96),
    (9, 103, 208, 306, 402, "2024-01-19", 1, 79.99, 0.00, 79.99),
    (10, 107, 209, 301, 404, "2024-01-19", 2, 399.98, 40.00, 439.98)
]

sales_fact_columns = [
    "订单ID", "客户ID", "产品ID", "商店ID", "时间ID", 
    "订单日期", "数量", "销售金额", "折扣金额", "总金额"
]

fact_sales = spark.createDataFrame(sales_fact_data, sales_fact_columns)

print("=" * 60)
print("事实表：销售事实表 (Fact_Sales)")
print("=" * 60)
fact_sales.show()

# 2. 维度表 1 - 客户维度（Dim_Customer）
customer_dim_data = [
    (101, "张三", "男", 28, "北京", "海淀区", "VIP"),
    (102, "李四", "女", 35, "上海", "浦东新区", "普通"),
    (103, "王五", "男", 42, "广州", "天河区", "VIP"),
    (104, "赵六", "女", 25, "深圳", "南山区", "普通"),
    (105, "钱七", "男", 31, "杭州", "西湖区", "黄金"),
    (106, "孙八", "女", 29, "成都", "武侯区", "VIP"),
    (107, "周九", "男", 38, "北京", "朝阳区", "黄金")
]

customer_dim_columns = ["客户ID", "客户姓名", "性别", "年龄", "城市", "区域", "会员等级"]
dim_customer = spark.createDataFrame(customer_dim_data, customer_dim_columns)

print("\n" + "=" * 60)
print("维度表 1：客户维度 (Dim_Customer)")
print("=" * 60)
dim_customer.show()

# 3. 维度表 2 - 产品维度（Dim_Product）
product_dim_data = [
    (201, "iPhone 15", "电子产品", "手机", "Apple", 6999.00),
    (202, "MacBook Pro", "电子产品", "笔记本", "Apple", 12999.00),
    (203, "小米手环", "电子产品", "可穿戴", "小米", 199.00),
    (204, "戴尔显示器", "电子产品", "显示器", "Dell", 2999.00),
    (205, "罗技鼠标", "电子产品", "外设", "罗技", 149.00),
    (206, "索尼耳机", "电子产品", "音频", "Sony", 1899.00),
    (207, "键盘机械", "电子产品", "外设", "Cherry", 699.00),
    (208, "充电宝", "电子产品", "配件", "Anker", 199.00),
    (209, "iPad Air", "电子产品", "平板", "Apple", 4999.00)
]

product_dim_columns = ["产品ID", "产品名称", "产品类别", "产品子类别", "品牌", "单价"]
dim_product = spark.createDataFrame(product_dim_data, product_dim_columns)

print("\n" + "=" * 60)
print("维度表 2：产品维度 (Dim_Product)")
print("=" * 60)
dim_product.show()

# 4. 维度表 3 - 商店维度（Dim_Store）
store_dim_data = [
    (301, "北京旗舰店", "北京", "海淀区", "中关村大街1号", "大型", "张经理"),
    (302, "上海体验店", "上海", "浦东新区", "陆家嘴环路999号", "中型", "李经理"),
    (303, "广州专卖店", "广州", "天河区", "天河路123号", "小型", "王经理"),
    (304, "深圳科技店", "深圳", "南山区", "科技园路88号", "大型", "赵经理"),
    (305, "杭州电商仓", "杭州", "余杭区", "文一西路999号", "仓库", "钱经理"),
    (306, "成都门店", "成都", "武侯区", "科华北路66号", "中型", "孙经理")
]

store_dim_columns = ["商店ID", "商店名称", "城市", "区域", "地址", "商店类型", "店长"]
dim_store = spark.createDataFrame(store_dim_data, store_dim_columns)

print("\n" + "=" * 60)
print("维度表 3：商店维度 (Dim_Store)")
print("=" * 60)
dim_store.show()

# 5. 维度表 4 - 时间维度（Dim_Time）
time_dim_data = [
    (401, "2024-01-15", 2024, 1, "一月", 3, 1, "星期一", "工作日", "Q1"),
    (402, "2024-01-16", 2024, 1, "一月", 3, 2, "星期二", "工作日", "Q1"),
    (403, "2024-01-17", 2024, 1, "一月", 3, 3, "星期三", "工作日", "Q1"),
    (404, "2024-01-18", 2024, 1, "一月", 3, 4, "星期四", "工作日", "Q1")
]

time_dim_columns = [
    "时间ID", "日期", "年", "月", "月份名称", 
    "周数", "星期几", "星期名称", "日类型", "季度"
]
dim_time = spark.createDataFrame(time_dim_data, time_dim_columns)

print("\n" + "=" * 60)
print("维度表 4：时间维度 (Dim_Time)")
print("=" * 60)
dim_time.show()

print("\n" + "=" * 60)
print("星型模式结构说明")
print("=" * 60)
print("""
中心：事实表 (Fact_Sales) - 包含业务度量值
  ├─ 客户ID → Dim_Customer (客户维度)
  ├─ 产品ID → Dim_Product (产品维度)
  ├─ 商店ID → Dim_Store (商店维度)
  └─ 时间ID → Dim_Time (时间维度)
""")

## 示例 2：星型模式查询分析

下面演示如何对星型模式进行多维分析

In [None]:
# Import necessary functions
from pyspark.sql.functions import count, sum, avg, desc

# 查询 1：按城市统计销售额（事实表 JOIN 商店维度）
print("=" * 60)
print("查询 1：各城市销售统计")
print("=" * 60)

city_sales = fact_sales.join(dim_store, "商店ID") \
    .groupBy("城市") \
    .agg(
        count("订单ID").alias("订单数量"),
        sum("总金额").alias("总销售额"),
        avg("总金额").alias("平均订单金额")
    ) \
    .orderBy(desc("总销售额"))

city_sales.show()

In [None]:
# 查询 2：按产品品牌统计（事实表 JOIN 产品维度）
from pyspark.sql.functions import col, round as spark_round

print("\n" + "=" * 60)
print("查询 2：各品牌销售排行")
print("=" * 60)

brand_sales = fact_sales.join(dim_product, "产品ID") \
    .groupBy("品牌") \
    .agg(
        count("订单ID").alias("销售次数"),
        sum("数量").alias("销售数量"),
        spark_round(sum("总金额"), 2).alias("总销售额")
    ) \
    .orderBy(desc("总销售额"))

brand_sales.show()

In [None]:
# 查询 3：VIP 客户消费分析（事实表 JOIN 客户维度）
print("\n" + "=" * 60)
print("查询 3：按会员等级统计消费")
print("=" * 60)

customer_level_sales = fact_sales.join(dim_customer, "客户ID") \
    .groupBy("会员等级") \
    .agg(
        count("订单ID").alias("订单数"),
        spark_round(sum("总金额"), 2).alias("总消费"),
        spark_round(avg("总金额"), 2).alias("平均消费")
    ) \
    .orderBy(desc("总消费"))

customer_level_sales.show()

In [None]:
# 查询 4：多维度联合查询 - 完整的星型模式 JOIN
print("\n" + "=" * 60)
print("查询 4：综合销售明细报表（JOIN 所有维度表）")
print("=" * 60)

# 将事实表与所有维度表 JOIN
full_report = fact_sales \
    .join(dim_customer, "客户ID") \
    .join(dim_product, "产品ID") \
    .join(dim_store, "商店ID") \
    .join(dim_time, "时间ID") \
    .select(
        "订单ID",
        "客户姓名",
        "会员等级",
        "产品名称",
        "品牌",
        col("数量"),
        col("总金额"),
        dim_store["城市"].alias("购买城市"),
        "商店名称",
        "星期名称",
        "日类型"
    )

full_report.show(truncate=False)

In [None]:
# 查询 5：使用 SQL 进行星型模式查询
print("\n" + "=" * 60)
print("查询 5：SQL 方式查询星型模式")
print("=" * 60)

# 注册所有表为临时视图
fact_sales.createOrReplaceTempView("fact_sales")
dim_customer.createOrReplaceTempView("dim_customer")
dim_product.createOrReplaceTempView("dim_product")
dim_store.createOrReplaceTempView("dim_store")
dim_time.createOrReplaceTempView("dim_time")

# 复杂 SQL 查询：按城市、产品类别统计
sql_result = spark.sql("""
    SELECT 
        s.城市,
        p.产品类别,
        COUNT(f.订单ID) as 订单数,
        SUM(f.数量) as 销售数量,
        ROUND(SUM(f.总金额), 2) as 总销售额,
        ROUND(AVG(f.总金额), 2) as 平均订单金额
    FROM fact_sales f
    JOIN dim_store s ON f.商店ID = s.商店ID
    JOIN dim_product p ON f.产品ID = p.产品ID
    GROUP BY s.城市, p.产品类别
    ORDER BY 总销售额 DESC
""")

sql_result.show(truncate=False)

## 示例 3：星型模式的优势

通过上面的示例，你可以看到星型模式的优势：

1. **查询性能好**：简单的 JOIN 关系，查询速度快
2. **易于理解**：结构清晰，业务人员容易理解
3. **灵活分析**：可以从任意维度进行分析
4. **数据冗余少**：维度表数据不重复

下面继续其他 PySpark 示例...

In [None]:
from pyspark.sql.functions import count, avg, sum, max, min

# 基本统计
print(f"总员工数: {df.count()}")
print("\n描述性统计：")
df.describe().show()

In [None]:
# 按部门分组统计
print("按部门统计：")
dept_stats = df.groupBy("部门").agg(
    count("*").alias("员工数"),
    avg("薪资").alias("平均薪资"),
    max("薪资").alias("最高薪资"),
    min("薪资").alias("最低薪资"),
    sum("薪资").alias("总薪资")
)

dept_stats.show()

## 示例 4：使用 Spark SQL

In [None]:
# 注册为临时表
df.createOrReplaceTempView("员工表")

# 使用 SQL 查询
result = spark.sql("""
    SELECT 部门,
           COUNT(*) as 员工数,
           ROUND(AVG(薪资), 2) as 平均薪资,
           MAX(薪资) as 最高薪资
    FROM 员工表
    WHERE 年龄 > 25
    GROUP BY 部门
    ORDER BY 平均薪资 DESC
""")

print("SQL 查询结果：")
result.show()

## 示例 5：JOIN 操作

In [None]:
# 创建部门信息表
dept_data = [
    ("工程部", "A栋", "张经理"),
    ("销售部", "B栋", "李经理"),
    ("市场部", "C栋", "王经理"),
    ("人力资源部", "D栋", "赵经理")
]

dept_df = spark.createDataFrame(dept_data, ["部门", "办公地点", "经理"])

print("部门信息：")
dept_df.show()

# JOIN 操作
print("\n员工和部门信息 JOIN 结果：")
joined_df = df.join(dept_df, "部门", "left")
joined_df.select("姓名", "部门", "薪资", "办公地点", "经理").show()

## 示例 6：自定义函数 (UDF)

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# 定义 Python 函数
def categorize_age(age):
    if age < 25:
        return "新人"
    elif age < 30:
        return "骨干"
    else:
        return "资深"

# 注册为 UDF
categorize_udf = udf(categorize_age, StringType())

# 使用 UDF
print("添加员工级别分类：")
df.withColumn("级别", categorize_udf(col("年龄"))).show()

## 示例 7：窗口函数（部门内排名）

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, desc

# 定义窗口：按部门分区，按薪资降序
window_spec = Window.partitionBy("部门").orderBy(desc("薪资"))

# 添加排名
print("部门内薪资排名：")
df.withColumn("部门内排名", rank().over(window_spec)) \
  .orderBy("部门", "部门内排名") \
  .show()

## 示例 8：经典 WordCount（词频统计）

In [None]:
from pyspark.sql.functions import explode, split

# 创建文本数据
text_data = [
    ("Apache Spark is a unified analytics engine for big data processing",),
    ("PySpark is the Python API for Apache Spark",),
    ("Spark provides high level APIs in Python Java Scala and R",),
    ("Spark runs on Hadoop YARN Kubernetes and standalone mode",)
]

text_df = spark.createDataFrame(text_data, ["text"])

print("原始文本：")
text_df.show(truncate=False)

# 分词和统计
words = text_df.select(explode(split(col("text"), " ")).alias("word"))
word_count = words.groupBy("word").count().orderBy(desc("count"))

print("\n词频统计结果（Top 10）：")
word_count.show(10)

## 示例 9：读写文件

In [None]:
# 保存为 CSV
output_path = "/tmp/pyspark_demo"

print("保存为 CSV...")
df.write.mode("overwrite").option("header", "true").csv(f"{output_path}/employees.csv")
print("✓ CSV 保存成功")

# 保存为 Parquet（推荐格式）
print("\n保存为 Parquet...")
df.write.mode("overwrite").parquet(f"{output_path}/employees.parquet")
print("✓ Parquet 保存成功")

# 读取 Parquet
print("\n从 Parquet 读取：")
read_df = spark.read.parquet(f"{output_path}/employees.parquet")
read_df.show()

## 示例 10：数据可视化（配合 Pandas）

In [None]:
import matplotlib.pyplot as plt
import pandas as pd

# 将 Spark DataFrame 转换为 Pandas DataFrame
dept_stats_pd = dept_stats.toPandas()

# 绘制柱状图
plt.figure(figsize=(10, 6))

# 员工数
plt.subplot(1, 2, 1)
plt.bar(dept_stats_pd['部门'], dept_stats_pd['员工数'])
plt.title('各部门员工数')
plt.xlabel('部门')
plt.ylabel('员工数')
plt.xticks(rotation=45)

# 平均薪资
plt.subplot(1, 2, 2)
plt.bar(dept_stats_pd['部门'], dept_stats_pd['平均薪资'])
plt.title('各部门平均薪资')
plt.xlabel('部门')
plt.ylabel('平均薪资 (元)')
plt.xticks(rotation=45)

plt.tight_layout()
plt.show()

print("\n统计数据：")
print(dept_stats_pd)

In [None]:

import pandas as pd

# 将 Spark DataFrame 转换为 Pandas DataFrame
dept_stats_pd = dept_stats.toPandas()

# 绘制柱状图
plt.figure(figsize=(10, 6))

# 员工数
plt.subplot(1, 2, 1)
plt.bar(dept_stats_pd['部门'], dept_stats_pd['员工数'])
plt.title('各部门员工数')
plt.xlabel('部门')
plt.ylabel('员工数')
plt.xticks(rotation=45)

# 平均薪资
plt.subplot(1, 2, 2)
plt.bar(dept_stats_pd['部门'], dept_stats_pd['平均薪资'])
plt.title('各部门平均薪资')
plt.xlabel('部门')
plt.ylabel('平均薪资 (元)')
plt.xticks(rotation=45)

plt.tight_layout()
plt.show()

print("\n统计数据：")
print(dept_stats_pd)

## 练习题

### 练习 1：数据筛选
找出薪资在 65000 到 75000 之间的员工

In [None]:
# 你的代码


### 练习 2：复杂查询
计算每个部门最年轻和最年长员工的年龄差

In [None]:
# 你的代码


### 练习 3：数据转换
创建一个新列，将薪资转换为薪资等级：
- < 65000: "初级"
- 65000-75000: "中级"
- > 75000: "高级"

In [None]:
# 你的代码


## 最后：关闭 Spark Session

In [None]:
# 完成所有操作后关闭 Spark Session
# spark.stop()
# print("✓ Spark Session 已关闭")

In [None]:
# 查看 Spark 配置信息
print("=" * 60)
print("Spark 运行环境信息")
print("=" * 60)

# 1. Master 模式
print(f"运行模式: {spark.sparkContext.master}")

# 2. 应用名称
print(f"应用名称: {spark.sparkContext.appName}")

# 3. 默认并行度（决定分区数）
print(f"默认并行度: {spark.sparkContext.defaultParallelism}")

# 4. 查看你的 CPU 核心数
import os
cpu_count = os.cpu_count()
print(f"CPU 核心数: {cpu_count}")

# 5. 内存配置
print(f"Driver 内存: {spark.sparkContext._conf.get('spark.driver.memory')}")

print("\n" + "=" * 60)
print("分区和 Stage 示例")
print("=" * 60)

# 6. 查看 DataFrame 的分区数
print(f"fact_sales 分区数: {fact_sales.rdd.getNumPartitions()}")
print(f"dim_store 分区数: {dim_store.rdd.getNumPartitions()}")

# 7. 解释查询执行计划（可以看到 Shuffle）
print("\n查询执行计划（Physical Plan）：")
city_sales.explain(mode="simple")

## Spark 执行详解

## 下一步学习

1. **进阶主题**
   - Spark Streaming（实时数据处理）
   - Spark MLlib（机器学习）
   - 性能优化技巧

2. **实战项目**
   - 日志分析系统
   - 推荐系统
   - 实时监控仪表板

3. **学习资源**
   - [官方文档](https://spark.apache.org/docs/latest/)
   - [PySpark API](https://spark.apache.org/docs/latest/api/python/)
   - [Databricks 免费平台](https://community.cloud.databricks.com/)

加油学习！有问题随时查看文档或提问。

In [None]:
# 1. 查看工作目录
pwd
# 输出: /Users/zhengzhang

# 2. 清理 Notebook 输出
jupyter nbconvert --clear-output --inplace PySpark_Setup.ipynb

# 3. 查看修改状态
git status
# 输出: modified: PySpark_Setup.ipynb

# 4. 查看具体改动（可选）
git diff PySpark_Setup.ipynb

# 5. 添加到暂存区
git add PySpark_Setup.ipynb

# 6. 提交
git commit -m "Add: 新增按时间维度的销售分析查询"

# 7. 推送到 GitHub
git push
# Username: lydiazz0517
# Password: [粘贴 token]


In [8]:
pwd

'/Users/zhengzhang'