# Spark结构化流处理 (Structured Streaming)

本笔记本介绍Spark的结构化流处理，这是Spark 2.0引入的新的流处理引擎，建立在Spark SQL引擎之上。

## 什么是结构化流处理？

结构化流处理是一个可扩展且容错的流处理引擎，建立在Spark SQL引擎之上。您可以像表达静态数据上的批处理计算一样表达流计算，Spark SQL引擎将负责增量地、连续地运行它并更新最终结果。

### 核心概念

- **无界表**：将数据流视为一个不断增长的表
- **增量查询**：对流数据的查询被转换为增量执行计划
- **输出模式**：控制如何将结果写入输出接收器
- **触发器**：控制何时检查新数据并更新结果
- **检查点和WAL**：用于容错的机制

## 1. 创建SparkSession

结构化流处理使用SparkSession作为入口点。

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
import threading
import random
import os

# 创建SparkSession
spark = SparkSession.builder \
    .appName("结构化流处理") \
    .getOrCreate()

# 设置日志级别
spark.sparkContext.setLogLevel("WARN")

print(f"Spark版本: {spark.version}")
print("结构化流处理环境准备完成")

## 2. 创建流数据源

结构化流处理支持多种数据源，包括文件、套接字、Kafka等。

### 2.1 从文件创建流

In [None]:
# 创建监控目录
streaming_dir = "/home/jovyan/data/streaming_input"
os.makedirs(streaming_dir, exist_ok=True)

# 定义schema
schema = StructType([
    StructField("timestamp", LongType(), True),
    StructField("product", StringType(), True),
    StructField("category", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("customer_id", StringType(), True),
    StructField("region", StringType(), True)
])

# 从CSV文件创建流
file_stream = spark.readStream \
    .option("header", "true") \
    .schema(schema) \
    .csv(streaming_dir)

print(f"监控目录: {streaming_dir}")
print("文件流创建成功")
print("Schema:")
file_stream.printSchema()

### 2.2 从Rate Source创建流（用于测试）

In [None]:
# Rate source生成测试数据
rate_stream = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 5) \
    .load()

print("Rate流创建成功")
print("Schema:")
rate_stream.printSchema()

### 2.3 创建模拟数据生成器

In [None]:
# 创建模拟销售数据生成器
def generate_sales_data(num_records=10):
    """生成模拟销售数据"""
    products = ['laptop', 'phone', 'tablet', 'watch', 'headphones', 'camera', 'speaker']
    categories = ['Electronics', 'Accessories', 'Computing']
    regions = ['North', 'South', 'East', 'West', 'Central']
    
    data = []
    for i in range(num_records):
        timestamp = int(time.time()) + i
        product = random.choice(products)
        category = random.choice(categories)
        price = round(random.uniform(50, 1000), 2)
        quantity = random.randint(1, 5)
        customer_id = f"C{random.randint(1000, 9999)}"
        region = random.choice(regions)
        
        data.append(f"{timestamp},{product},{category},{price},{quantity},{customer_id},{region}")
    
    return data

# 测试数据生成器
sample_data = generate_sales_data(5)
print("示例销售数据:")
for item in sample_data:
    print(item)

## 3. 基本流处理操作

结构化流处理支持大部分DataFrame操作。

In [None]:
# 使用rate stream进行基本操作演示
# 添加计算列
enhanced_stream = rate_stream.select(
    col("timestamp"),
    col("value"),
    (col("value") * 2).alias("double_value"),
    when(col("value") % 2 == 0, "even").otherwise("odd").alias("parity")
)

print("增强后的流Schema:")
enhanced_stream.printSchema()

In [None]:
# 过滤操作
filtered_stream = enhanced_stream.filter(col("value") > 10)

# 聚合操作
aggregated_stream = enhanced_stream.groupBy("parity").count()

print("流处理操作配置完成")

## 4. 输出操作和输出模式

结构化流处理支持三种输出模式：
- **Append**：只有新行被添加到结果表
- **Complete**：整个结果表被写入外部存储
- **Update**：只有结果表中被更新的行被写入外部存储

In [None]:
# 输出到控制台 - Append模式
console_query = enhanced_stream \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .trigger(processingTime='5 seconds') \
    .start()

print("控制台输出查询已启动")

# 等待一段时间后停止
time.sleep(15)
console_query.stop()
print("控制台输出查询已停止")

In [None]:
# 聚合查询 - Complete模式
aggregation_query = aggregated_stream \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .start()

print("聚合查询已启动")

# 等待一段时间后停止
time.sleep(15)
aggregation_query.stop()
print("聚合查询已停止")

## 5. 窗口操作

结构化流处理支持基于时间的窗口操作。

In [None]:
# 创建带时间戳的流
timestamped_stream = rate_stream.select(
    col("value"),
    col("timestamp").cast("timestamp").alias("event_time")
)

# 窗口聚合：每10秒窗口，每5秒滑动
windowed_counts = timestamped_stream \
    .groupBy(
        window(col("event_time"), "10 seconds", "5 seconds")
    ) \
    .count()

# 输出窗口结果
window_query = windowed_counts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .trigger(processingTime='5 seconds') \
    .start()

print("窗口查询已启动")

# 等待一段时间后停止
time.sleep(20)
window_query.stop()
print("窗口查询已停止")

## 6. 水印和延迟数据处理

水印用于处理延迟到达的数据。

In [None]:
# 使用水印处理延迟数据
watermarked_stream = timestamped_stream \
    .withWatermark("event_time", "10 seconds")

# 带水印的窗口聚合
watermarked_counts = watermarked_stream \
    .groupBy(
        window(col("event_time"), "10 seconds")
    ) \
    .count()

# 输出带水印的结果
watermark_query = watermarked_counts \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", "false") \
    .trigger(processingTime='5 seconds') \
    .start()

print("带水印的查询已启动")

# 等待一段时间后停止
time.sleep(15)
watermark_query.stop()
print("带水印的查询已停止")

## 7. 流-流连接

结构化流处理支持流与流之间的连接操作。

In [None]:
# 创建两个流用于连接演示
stream1 = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 2) \
    .load() \
    .select(
        col("value").alias("id"),
        col("timestamp").cast("timestamp").alias("timestamp1")
    )

stream2 = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 2) \
    .option("numPartitions", 1) \
    .load() \
    .select(
        col("value").alias("id"),
        col("timestamp").cast("timestamp").alias("timestamp2")
    )

# 添加水印
stream1_watermarked = stream1.withWatermark("timestamp1", "10 seconds")
stream2_watermarked = stream2.withWatermark("timestamp2", "10 seconds")

# 流-流内连接
joined_stream = stream1_watermarked.join(
    stream2_watermarked,
    expr("""
        id = id AND
        timestamp1 >= timestamp2 - interval 5 seconds AND
        timestamp1 <= timestamp2 + interval 5 seconds
    """)
)

# 输出连接结果
join_query = joined_stream \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .trigger(processingTime='5 seconds') \
    .start()

print("流-流连接查询已启动")

# 等待一段时间后停止
time.sleep(20)
join_query.stop()
print("流-流连接查询已停止")

## 8. 实际案例：实时销售分析系统

让我们创建一个完整的实时销售分析系统。

In [None]:
# 创建销售数据文件用于演示
def create_sales_files():
    """创建销售数据文件"""
    for i in range(3):
        filename = f"{streaming_dir}/sales_{int(time.time())}_{i}.csv"
        
        # 创建CSV内容
        header = "timestamp,product,category,price,quantity,customer_id,region\n"
        data_lines = generate_sales_data(20)
        
        with open(filename, 'w') as f:
            f.write(header)
            for line in data_lines:
                f.write(line + "\n")
        
        print(f"创建文件: {filename}")
        time.sleep(2)  # 间隔2秒创建下一个文件

# 在后台线程中创建文件
file_thread = threading.Thread(target=create_sales_files)
file_thread.start()

print("开始创建销售数据文件...")

In [None]:
# 等待文件创建完成
file_thread.join()

# 从文件创建销售数据流
sales_stream = spark.readStream \
    .option("header", "true") \
    .schema(schema) \
    .csv(streaming_dir)

# 添加计算列
enhanced_sales = sales_stream.select(
    col("*"),
    (col("price") * col("quantity")).alias("total_amount"),
    from_unixtime(col("timestamp")).cast("timestamp").alias("event_time")
)

print("销售数据流创建完成")

In [None]:
# 1. 实时销售总额统计
sales_summary = enhanced_sales \
    .groupBy() \
    .agg(
        sum("total_amount").alias("total_sales"),
        count("*").alias("total_orders"),
        avg("total_amount").alias("avg_order_value")
    )

summary_query = sales_summary \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .queryName("sales_summary") \
    .trigger(processingTime='5 seconds') \
    .start()

print("销售总额统计查询已启动")

In [None]:
# 2. 按地区统计销售额
region_sales = enhanced_sales \
    .groupBy("region") \
    .agg(
        sum("total_amount").alias("region_sales"),
        count("*").alias("region_orders")
    ) \
    .orderBy(desc("region_sales"))

region_query = region_sales \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .queryName("region_sales") \
    .trigger(processingTime='5 seconds') \
    .start()

print("地区销售统计查询已启动")

In [None]:
# 3. 时间窗口销售趋势
windowed_sales = enhanced_sales \
    .withWatermark("event_time", "10 seconds") \
    .groupBy(
        window(col("event_time"), "30 seconds", "15 seconds")
    ) \
    .agg(
        sum("total_amount").alias("window_sales"),
        count("*").alias("window_orders")
    )

window_sales_query = windowed_sales \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", "false") \
    .queryName("windowed_sales") \
    .trigger(processingTime='5 seconds') \
    .start()

print("时间窗口销售趋势查询已启动")

In [None]:
# 4. 异常检测：检测大额订单
large_orders = enhanced_sales.filter(col("total_amount") > 2000)

alert_query = large_orders \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .queryName("large_orders_alert") \
    .trigger(processingTime='3 seconds') \
    .start()

print("大额订单告警查询已启动")

In [None]:
# 让所有查询运行一段时间
print("所有查询正在运行，等待30秒...")
time.sleep(30)

# 停止所有查询
queries = [summary_query, region_query, window_sales_query, alert_query]
for query in queries:
    if query.isActive:
        query.stop()
        print(f"查询 {query.name} 已停止")

print("所有查询已停止")

## 9. 检查点和容错

检查点是结构化流处理容错机制的核心。

In [None]:
# 设置检查点目录
checkpoint_dir = "/home/jovyan/data/checkpoint_structured"

# 创建带检查点的查询
checkpointed_query = enhanced_sales \
    .groupBy("category") \
    .agg(sum("total_amount").alias("category_sales")) \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("checkpointLocation", checkpoint_dir) \
    .trigger(processingTime='5 seconds') \
    .start()

print(f"带检查点的查询已启动，检查点目录: {checkpoint_dir}")

# 运行一段时间
time.sleep(15)

# 停止查询
checkpointed_query.stop()
print("带检查点的查询已停止")

# 检查检查点目录
if os.path.exists(checkpoint_dir):
    print(f"检查点目录内容: {os.listdir(checkpoint_dir)}")
else:
    print("检查点目录不存在")

## 10. 监控和调试

结构化流处理提供了丰富的监控和调试功能。

In [None]:
# 创建一个查询用于监控演示
monitoring_stream = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 10) \
    .load()

monitoring_query = monitoring_stream \
    .groupBy() \
    .count() \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime='3 seconds') \
    .start()

print("监控查询已启动")

# 等待几个批次
time.sleep(10)

# 获取查询状态
print("\n=== 查询状态 ===")
print(f"查询ID: {monitoring_query.id}")
print(f"查询名称: {monitoring_query.name}")
print(f"是否活跃: {monitoring_query.isActive}")

# 获取最近的进度信息
progress = monitoring_query.lastProgress
if progress:
    print(f"\n=== 最近进度 ===")
    print(f"批次ID: {progress.get('batchId', 'N/A')}")
    print(f"批次持续时间: {progress.get('batchDuration', 'N/A')} ms")
    print(f"输入行数: {progress.get('inputRowsPerSecond', 'N/A')}")
    print(f"处理行数: {progress.get('processedRowsPerSecond', 'N/A')}")

# 停止查询
monitoring_query.stop()
print("\n监控查询已停止")

## 11. 练习

现在，让我们通过一些练习来巩固所学知识。

### 练习1：实时用户活动分析

创建一个实时用户活动分析系统，分析用户的点击行为。

In [None]:
# 创建用户活动数据生成器
def generate_user_activity():
    """生成用户活动数据"""
    users = [f"user_{i}" for i in range(1, 21)]
    pages = ['home', 'product', 'cart', 'checkout', 'profile', 'search']
    actions = ['view', 'click', 'scroll', 'purchase']
    
    data = []
    for _ in range(50):
        timestamp = int(time.time())
        user_id = random.choice(users)
        page = random.choice(pages)
        action = random.choice(actions)
        session_id = f"session_{random.randint(1000, 9999)}"
        
        data.append(f"{timestamp},{user_id},{page},{action},{session_id}")
    
    return data

# 创建用户活动文件
activity_dir = "/home/jovyan/data/user_activity"
os.makedirs(activity_dir, exist_ok=True)

def create_activity_files():
    for i in range(2):
        filename = f"{activity_dir}/activity_{int(time.time())}_{i}.csv"
        
        header = "timestamp,user_id,page,action,session_id\n"
        data_lines = generate_user_activity()
        
        with open(filename, 'w') as f:
            f.write(header)
            for line in data_lines:
                f.write(line + "\n")
        
        print(f"创建活动文件: {filename}")
        time.sleep(3)

# 在后台创建文件
activity_thread = threading.Thread(target=create_activity_files)
activity_thread.start()

print("开始创建用户活动文件...")

In [None]:
# 等待文件创建完成
activity_thread.join()

# 定义用户活动schema
activity_schema = StructType([
    StructField("timestamp", LongType(), True),
    StructField("user_id", StringType(), True),
    StructField("page", StringType(), True),
    StructField("action", StringType(), True),
    StructField("session_id", StringType(), True)
])

# 创建用户活动流
activity_stream = spark.readStream \
    .option("header", "true") \
    .schema(activity_schema) \
    .csv(activity_dir)

# 添加事件时间
activity_with_time = activity_stream.select(
    col("*"),
    from_unixtime(col("timestamp")).cast("timestamp").alias("event_time")
)

print("用户活动流创建完成")

In [None]:
# 1. 实时页面访问统计
page_stats = activity_with_time \
    .groupBy("page") \
    .count() \
    .orderBy(desc("count"))

page_query = page_stats \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .queryName("page_stats") \
    .trigger(processingTime='5 seconds') \
    .start()

# 2. 用户行为分析
user_behavior = activity_with_time \
    .groupBy("user_id", "action") \
    .count() \
    .orderBy("user_id", desc("count"))

behavior_query = user_behavior \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .queryName("user_behavior") \
    .trigger(processingTime='5 seconds') \
    .start()

print("用户活动分析查询已启动")

# 运行查询
time.sleep(20)

# 停止查询
page_query.stop()
behavior_query.stop()
print("用户活动分析查询已停止")

### 练习2：实时异常检测

创建一个实时异常检测系统，检测异常的用户行为模式。

In [None]:
# 异常检测：检测短时间内大量点击的用户
suspicious_activity = activity_with_time \
    .withWatermark("event_time", "10 seconds") \
    .groupBy(
        "user_id",
        window(col("event_time"), "30 seconds")
    ) \
    .count() \
    .filter(col("count") > 10)  # 30秒内超过10次活动视为异常

anomaly_query = suspicious_activity \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", "false") \
    .queryName("anomaly_detection") \
    .trigger(processingTime='5 seconds') \
    .start()

print("异常检测查询已启动")

# 运行查询
time.sleep(15)

# 停止查询
anomaly_query.stop()
print("异常检测查询已停止")

## 12. 总结

在本笔记本中，我们学习了：

1. 结构化流处理的基本概念和优势
2. 如何创建流数据源（文件、Rate source等）
3. 基本流处理操作（选择、过滤、聚合）
4. 输出操作和输出模式（Append、Complete、Update）
5. 窗口操作和时间处理
6. 水印和延迟数据处理
7. 流-流连接操作
8. 实际案例：实时销售分析系统
9. 检查点和容错机制
10. 监控和调试技术
11. 实践练习：用户活动分析和异常检测

结构化流处理是Spark中推荐的流处理方式，它提供了更高级的API、更好的性能和更强的容错能力。它建立在Spark SQL引擎之上，可以利用Catalyst优化器的所有优化功能。

## 下一步

接下来，我们将学习如何将Spark Streaming与Kafka集成，处理真实的流数据。请继续学习 `kafka-integration.ipynb` 笔记本。

In [None]:
# 清理资源
spark.stop()
print("SparkSession已停止")