# [structured streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#quick-example)

在 Apache Spark Structured Streaming 中支持的一些主要数据源格式及其使用示例如下：

1. **文本文件（Text）**：
   ```python
   text_df = spark.readStream.format("text").load("/path/to/directory")
   ```

2. **CSV 文件**：
   ```python
   csv_df = spark.readStream.format("csv").option("header", "true").load("/path/to/directory")
   ```

3. **JSON 文件**：
   ```python
   json_df = spark.readStream.format("json").load("/path/to/directory")
   ```

4. **ORC 文件**：
   ```python
   orc_df = spark.readStream.format("orc").load("/path/to/directory")
   ```

5. **Parquet 文件**：
   ```python
   parquet_df = spark.readStream.format("parquet").load("/path/to/directory")
   ```

6. **从 Kafka 读取**：
   ```python
   kafka_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:port").option("subscribe", "topic").load()
   ```

7. **套接字（Socket，用于测试）**：
   ```python
   socket_df = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
   ```

8. **Rate（用于测试）**：
   ```python
   rate_df = spark.readStream.format("rate").load()
   ```

在分布式环境中， **确保指定的路径或资源对所有 Spark 节点可访问**。

In [54]:
# 定义 CSV 文件的 schema
schema1 = StructType([
    StructField('time', TimestampType()),
    StructField('app_id', StringType()),
    StructField('store', StringType()),
    StructField('adid', StringType()),
    StructField('openid', StringType()),
    StructField('activity_kind', StringType()),
    StructField('created_at', StringType()),
    StructField('installed_at', StringType()),
    StructField('reattributed_at', StringType()),
    StructField('network_name', StringType()),
    StructField('country', StringType()),
    StructField('device_name', StringType()),
    StructField('device_type', StringType()),
    StructField('os_name', StringType()),
    StructField('timezone', StringType()),
    StructField('event_name', StringType()),
    StructField('revenue_float', StringType()),
    StructField('revenue', StringType()),
    StructField('currency', StringType()),
    StructField('revenue_usd', StringType()),
    StructField('reporting_revenue', StringType())
])

# 定义 CSV 文件的 schema
schema2 = StructType([
    StructField('adid', StringType()),
    StructField('store', StringType()),
    StructField('time', TimestampNTZType()),
    StructField('zone_offset', StringType()),
    StructField('activity_kind', StringType()),
    StructField('event_name', StringType())
])


# csv

## csv1

In [55]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [56]:
# 创建 SparkSession
spark = SparkSession.builder.appName("StructuredStreamingCSV1").getOrCreate()

file = 'file:///home/jupyter/data/test'

# 读取 CSV 文件
csvDF = spark.readStream \
    .option("sep", ",") \
    .schema(schema1) \
    .csv(file)

# 定义数据处理逻辑
# 例如，简单的转换或聚合操作

# 定义输出接收器，例如输出到控制台
query = csvDF.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# 等待流处理结束
query.awaitTermination(timeout=3)
query.status
query.stop()
# query.lastProgress
query.status

spark.stop()

23/12/07 06:01:14 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
23/12/07 06:01:14 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-10daf6f6-70a4-4dab-ac7d-ca6bb9a55d2b. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/12/07 06:01:14 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+-------------------+--------------------+------+--------------------+------+-------------+----------+------------+---------------+--------------------+-------+-----------+-----------+-------+--------+----------+-------------+-------+--------+-----------+-----------------+
|               time|              app_id| store|                adid|openid|activity_kind|created_at|installed_at|reattributed_at|        network_name|country|device_name|device_type|os_name|timezone|event_name|revenue_float|revenue|currency|revenue_usd|reporting_revenue|
+-------------------+--------------------+------+--------------------+------+-------------+----------+------------+---------------+--------------------+-------+-----------+-----------+-------+--------+----------+-------------+-------+--------+-----------+-----------------+
|2023-10-01 00:00:00|          1456241577|itunes|041bf78c9dc6dd5f5...|  NULL|    

False

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

{'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}

## csv2 

In [57]:
# 创建 SparkSession
spark = SparkSession.builder.appName("StructuredStreamingCSV2").getOrCreate()

file = 'file:///home/jupyter/data/test'
# 读取 CSV 文件
csvdf = spark.readStream.format("csv") \
        .option("header", "false") \
        .schema(schema1) \
        .load(file)

In [58]:
query = csvdf.writeStream.format('console').start()
query.awaitTermination(timeout=10)

query.status
query.stop()
# query.lastProgress
query.status

spark.stop()

23/12/07 06:01:44 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-9347e172-6369-482e-bfd2-169cdff52d68. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/12/07 06:01:44 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-------------------+--------------------+------+--------------------+------+-------------+----------+------------+---------------+--------------------+-------+-----------+-----------+-------+--------+----------+-------------+-------+--------+-----------+-----------------+
|               time|              app_id| store|                adid|openid|activity_kind|created_at|installed_at|reattributed_at|        network_name|country|device_name|device_type|os_name|timezone|event_name|revenue_float|revenue|currency|revenue_usd|reporting_revenue|
+-------------------+--------------------+------+--------------------+------+-------------+----------+------------+---------------+--------------------+-------+-----------+-----------+-------+--------+----------+-------------+-------+--------+-----------+-----------------+
|2023-10-01 00:00:00|          1456241577|itunes|041bf78c9dc6dd5f5...|  NULL|    

False

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

{'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}

# output

In [59]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

## outhive

In [60]:
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("adjust") \
    .config("spark.sql.session.timeZone", "UTC") \
    .enableHiveSupport() \
    .getOrCreate()

In [76]:
file = 'file:///home/jupyter/data/adjust'
# 读取 CSV 文件
csvdf = spark.readStream.format("csv") \
        .option("header", "false") \
        .schema(schema2) \
        .load(file)

csvdf = csvdf.withColumn("time", F.to_timestamp(csvdf["time"]))
_csvdf = csvdf.withWatermark('time', '10 seconds').groupby(
    F.window(csvdf.time, "1440 minutes", "720 minutes"),
    csvdf.adid
).agg(
    F.count("time").alias("count"),
    F.min('time').alias('min_time'),
    F.max('time').alias('max_time'),
    F.last("event_name").alias("last_adid"),
    F.max("event_name").alias("max_adid")
)


In [77]:
def write_to_hive(batch_df, batch_id):
    # 对于每个微批数据，将其写入 Hive 表
    batch_df.write.mode('append').saveAsTable("adjust_update")

query = _csvdf.writeStream \
    .foreachBatch(write_to_hive) \
    .outputMode("append") \
    .start()

query.awaitTermination(timeout=10)
query.status
# query.lastProgress
# query.stop()  # 如果没有写完数据，直接stop会报错

23/12/07 06:05:21 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-ef3ad6dc-78bb-426f-b8df-7fbb6d817d7b. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/12/07 06:05:21 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;
Aggregate [window#5060, adid#5023], [window#5060 AS window#5043, adid#5023, count(time#5035-T10000ms) AS count#5051L, min(time#5035-T10000ms) AS min_time#5053, max(time#5035-T10000ms) AS max_time#5055, last(event_name#5028, false) AS last_adid#5057, max(event_name#5028) AS max_adid#5059]
+- Filter isnotnull(time#5035)
   +- Expand [[named_struct(start, knownnullable(precisetimestampconversion(((precisetimestampconversion(time#5035, TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(time#5035, TimestampType, LongType) - 0) % 43200000000) < cast(0 as bigint)) THEN (((precisetimestampconversion(time#5035, TimestampType, LongType) - 0) % 43200000000) + 43200000000) ELSE ((precisetimestampconversion(time#5035, TimestampType, LongType) - 0) % 43200000000) END) - 0), LongType, TimestampType)), end, knownnullable(precisetimestampconversion((((precisetimestampconversion(time#5035, TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(time#5035, TimestampType, LongType) - 0) % 43200000000) < cast(0 as bigint)) THEN (((precisetimestampconversion(time#5035, TimestampType, LongType) - 0) % 43200000000) + 43200000000) ELSE ((precisetimestampconversion(time#5035, TimestampType, LongType) - 0) % 43200000000) END) - 0) + 86400000000), LongType, TimestampType))), adid#5023, store#5024, time#5035-T10000ms, zone_offset#5026, activity_kind#5027, event_name#5028], [named_struct(start, knownnullable(precisetimestampconversion(((precisetimestampconversion(time#5035, TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(time#5035, TimestampType, LongType) - 0) % 43200000000) < cast(0 as bigint)) THEN (((precisetimestampconversion(time#5035, TimestampType, LongType) - 0) % 43200000000) + 43200000000) ELSE ((precisetimestampconversion(time#5035, TimestampType, LongType) - 0) % 43200000000) END) - 43200000000), LongType, TimestampType)), end, knownnullable(precisetimestampconversion((((precisetimestampconversion(time#5035, TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(time#5035, TimestampType, LongType) - 0) % 43200000000) < cast(0 as bigint)) THEN (((precisetimestampconversion(time#5035, TimestampType, LongType) - 0) % 43200000000) + 43200000000) ELSE ((precisetimestampconversion(time#5035, TimestampType, LongType) - 0) % 43200000000) END) - 43200000000) + 86400000000), LongType, TimestampType))), adid#5023, store#5024, time#5035-T10000ms, zone_offset#5026, activity_kind#5027, event_name#5028]], [window#5060, adid#5023, store#5024, time#5035-T10000ms, zone_offset#5026, activity_kind#5027, event_name#5028]
      +- EventTimeWatermark time#5035: timestamp, 10 seconds
         +- Project [adid#5023, store#5024, to_timestamp(time#5025, None, TimestampType, Some(GMT), false) AS time#5035, zone_offset#5026, activity_kind#5027, event_name#5028]
            +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@40fdb743,csv,List(),Some(StructType(StructField(adid,StringType,true),StructField(store,StringType,true),StructField(time,TimestampNTZType,true),StructField(zone_offset,StringType,true),StructField(activity_kind,StringType,true),StructField(event_name,StringType,true))),List(),None,Map(header -> false, path -> file:///home/jupyter/data/adjust),None), FileSource[file:///home/jupyter/data/adjust], [adid#5023, store#5024, time#5025, zone_offset#5026, activity_kind#5027, event_name#5028]


In [64]:
query.stop()
spark.stop()

## outmysql

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("adjust_outmysql") \
    .config("spark.jars", "/home/jupyter/data/jbdc/mysql-connector-j-8.2.0.jar") \
    .getOrCreate()

file = 'file:///home/jupyter/data/adjust'

# 定义 CSV 文件的 schema
schema = StructType([
    StructField('adid', StringType()),
    StructField('store', StringType()),
    StructField('time', TimestampNTZType()),
    StructField('zone_offset', StringType()),
    StructField('activity_kind', StringType()),
    StructField('event_name', StringType())
])

# 读取 CSV 文件
csvdf = spark.readStream.format("csv") \
        .option("header", "false") \
        .schema(schema) \
        .load(file)

csvdf = csvdf.withColumn("time", F.to_timestamp(csvdf["time"]))
_csvdf = csvdf.withWatermark('time', '10 seconds').groupby(
    F.window(csvdf.time, "1440 minutes", "720 minutes"),
    csvdf.adid
).agg(
    F.count("time").alias("count"),
    F.min('time').alias('min_time'),
    F.max('time').alias('max_time'),
    F.last("event_name").alias("last_adid"),
    F.max("event_name").alias("max_adid")
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
def write_to_mysql(batch_df, batch_id):
    # JDBC URL
    jdbc_url = "jdbc:mysql://172.18.0.2:3306/test"
    properties = {
        "user": "longfengpili",
        "password": "123456abc",
        "driver": "com.mysql.cj.jdbc.Driver"
    }

    # 将每个批次写入 MySQL
    batch_df.write.jdbc(url=jdbc_url, table="sparkstream", mode="append", properties=properties)

# 设置 Streaming 写入
query = _csvdf.writeStream \
    .foreachBatch(write_to_mysql) \
    .outputMode("update") \
    .start()

query.awaitTermination(timeout=10)
query.status
# query.lastProgress
# query.stop()  # 如果没有写完数据，直接stop会报错

23/12/05 04:09:15 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-3c3808c3-f0be-4e64-97e6-3be7c2470dbd. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/12/05 04:09:15 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


False

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

## outputMode
在 Spark Structured Streaming 中，`outputMode` 定义了如何输出批次数据到下游系统。主要有三种模式：

1. **Append Mode**：仅输出自上次触发以来新增的行。适用于不更改现有数据的场景，如实时日志处理。

2. **Complete Mode**：输出整个结果表。适用于结果表经常更改的场景，如实时聚合统计。

3. **Update Mode**：仅输出自上次触发以来更改的行。适用于只关心更改数
```

请根据你的实际数据源和业务需求调整代码。

In [36]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("output") \
    .getOrCreate()

In [52]:

file = 'file:///home/jupyter/data/test'



### append

In [53]:
# 读取 CSV 文件
csvdf = spark.readStream.format("csv") \
        .option("header", "false") \
        .schema(schema) \
        .load(file)

csvdf = csvdf.withWatermark('time', '10 second')

csvdf = csvdf.groupby(
    csvdf.adid
).agg(
    F.count("time").alias("count"),
    F.min("time").alias("min_time"),
    F.max("time").alias("max_time")
)

query_append = csvdf.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query_append.awaitTermination(timeout=30)

23/12/07 05:57:15 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-1aebd474-e56c-4e54-a0a3-e3de7b50934a. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/12/07 05:57:15 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+-----+-------------------+-------------------+
|                adid|count|           min_time|           max_time|
+--------------------+-----+-------------------+-------------------+
|884ca98e510fb4a1a...|    4|2023-10-01 00:00:12|2023-10-01 00:00:24|
|041bf78c9dc6dd5f5...|    2|2023-10-01 00:00:00|2023-10-01 00:00:15|
|4c8abb5e79e5291c4...|    3|2023-10-01 00:00:09|2023-10-01 00:00:29|
+--------------------+-----+-------------------+-------------------+



False

### append

In [None]:
_csvdf = csvdf.withWatermark("time", "60 second")
_csvdf = _csvdf.groupby('time').agg(
    F.count("time").alias("count"),
    F.last("adid").alias("last_adid"),
    F.max("adid").alias("max_adid")
)
# query = _csvdf.writeStream.outputMode('complete').format('console').start()
query = _csvdf.writeStream \
        .outputMode('append') \
        .format('csv') \
        .option('path', 'file:///home/jupyter/notebook/output/path') \
        .option('checkpointLocation', 'file:///home/jupyter/notebook/output/checkpointLocation') \
        .start()
query.awaitTermination(timeout=10)

query.status
# query.lastProgress
query.stop()
query.status

### update

In [None]:
from pyspark.sql import functions as F

_csvdf = csvdf.groupby('time').agg(
    F.count("time").alias("count"),
    F.last("adid").alias("last_adid"),
    F.max("adid").alias("max_adid")
)
query = _csvdf.writeStream.outputMode('update').format('console').start()

query.awaitTermination(timeout=10)

query.status
# query.lastProgress
query.stop()
query.status

In [None]:
spark.stop()

# window 