# <center> <img src="../labs/img/ITESOLogo.png" alt="ITESO" width="480" height="130"> </center>
# <center> **Departamento de Electrónica, Sistemas e Informática** </center>
---
## <center> **Procesamiento de Datos Masivos** </center>
---
### <center> **Primavera 2025** </center>
---
### <center> **Ejemplos de Spark: Structured Streaming (Kafka + Watermarking)** </center>

---
**Profesor**: Dr. Pablo Camarillo Ramirez

In [52]:
import findspark
findspark.init()

#### Creacion de la conexión con el cluster de spark


In [53]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SparkSQLStructuredStreaming-Kafka-Watermarking") \
    .master("spark://be6296989c4d:7077") \
    .config("spark.ui.port","4040") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.13:3.5.4") \
    .getOrCreate()
sc = spark.sparkContext

### Creación del Kafka Stream

In [54]:
kafka_lines = spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", "5a7ab902bd93:9093") \
                .option("subscribe", "kafka-spark-example") \
                .load()

kafka_lines.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



### Transform binary data into string

In [55]:
kafka_df = kafka_lines.withColumn("value_str", kafka_lines.value.cast("string"))

In [56]:
from pyspark.sql.functions import explode, split

words = kafka_df.select(explode(split(kafka_df.value, " ")).alias("word"), "timestamp")
words.printSchema()

root
 |-- word: string (nullable = false)
 |-- timestamp: timestamp (nullable = true)



### Aplicando el mecanismo para manejar datos tardios con marcas de agua (watermarking)

In [57]:
from pyspark.sql.functions import window
windowed_counts =  words \
                        .withWatermark("timestamp", "2 minutes") \
                        .groupBy(window(words.timestamp, 
                                        "20 seconds", # Window duration 
                                        "10 seconds"), # Slide duration
                                 words.word) \
                        .count()

### Configuración del "Sink" del stream

In [58]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

query = windowed_counts \
                .writeStream \
                .outputMode("complete") \
                .trigger(processingTime='30 seconds') \
                .format("console") \
                .option("truncate", "false") \
                .start()

query.awaitTermination(50)

25/04/08 14:42:28 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-f77e547a-ca16-4ce1-af2a-43acb1d0dece. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/04/08 14:42:28 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/04/08 14:42:28 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+



25/04/08 14:43:00 ERROR MicroBatchExecution: Query [id = 753d6490-35b3-4e03-acd7-ecec8daaaa88, runId = 2f3bcea9-df73-436c-a8d9-5621021993ce] terminated with error
java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.ja

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+-------+-----+
|window                                    |word   |count|
+------------------------------------------+-------+-----+
|{2025-04-08 14:42:40, 2025-04-08 14:43:00}|datos  |1    |
|{2025-04-08 14:42:30, 2025-04-08 14:42:50}|canse  |1    |
|{2025-04-08 14:42:20, 2025-04-08 14:42:40}|holi   |1    |
|{2025-04-08 14:42:30, 2025-04-08 14:42:50}|holi   |1    |
|{2025-04-08 14:42:40, 2025-04-08 14:43:00}|benja  |1    |
|{2025-04-08 14:42:50, 2025-04-08 14:43:10}|Luis   |1    |
|{2025-04-08 14:42:30, 2025-04-08 14:42:50}|benja  |1    |
|{2025-04-08 14:42:40, 2025-04-08 14:43:00}|       |1    |
|{2025-04-08 14:42:30, 2025-04-08 14:42:50}|candada|1    |
|{2025-04-08 14:42:40, 2025-04-08 14:43:00}|grandes|1    |
|{2025-04-08 14:42:40, 2025-04-08 14:43:00}|Luis   |1    |
|{2025-04-08 14:42:20, 2025-04-08 14:42:40}|candada|1    |
|{2025-04-08 14:42

False

-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+-------+-----+
|window                                    |word   |count|
+------------------------------------------+-------+-----+
|{2025-04-08 14:42:30, 2025-04-08 14:42:50}|canse  |1    |
|{2025-04-08 14:43:00, 2025-04-08 14:43:20}|Luis   |1    |
|{2025-04-08 14:42:40, 2025-04-08 14:43:00}|benja  |1    |
|{2025-04-08 14:42:50, 2025-04-08 14:43:10}|Luis   |1    |
|{2025-04-08 14:42:40, 2025-04-08 14:43:00}|datos  |1    |
|{2025-04-08 14:42:20, 2025-04-08 14:42:40}|holi   |1    |
|{2025-04-08 14:42:30, 2025-04-08 14:42:50}|holi   |1    |
|{2025-04-08 14:42:30, 2025-04-08 14:42:50}|benja  |1    |
|{2025-04-08 14:42:40, 2025-04-08 14:43:00}|       |1    |
|{2025-04-08 14:43:10, 2025-04-08 14:43:30}|�Luis  |1    |
|{2025-04-08 14:42:20, 2025-04-08 14:42:40}|candada|1    |
|{2025-04-08 14:43:10, 2025-04-08 14:43:30}|Sam    |2    |
|{2025-04-08 14:43

In [59]:
query.stop()

In [60]:
from pyspark.sql.functions import lit

words = words.withColumn("count", lit(2))

In [61]:
from pyspark.sql.functions import sum
windowed_sum =  words \
                        .withWatermark("timestamp", "2 minutes") \
                        .groupBy(window(words.timestamp, 
                                        "30 seconds", # Window duration 
                                        "10 seconds"), # Slide duration
                                 words.word) \
                        .agg(sum("count").alias("Sum"))

In [None]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

query2 = windowed_sum \
                .writeStream \
                .outputMode("complete") \
                .trigger(processingTime='30 seconds') \
                .format("console") \
                .option("truncate", "false") \
                .start()

query2.awaitTermination(60)

25/04/08 14:44:07 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-9c966d9b-fd11-4ed8-a7bf-3ccdd2df307c. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/04/08 14:44:07 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


True

25/04/08 14:44:07 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


-------------------------------------------
Batch: 0
-------------------------------------------
+------+----+---+
|window|word|Sum|
+------+----+---+
+------+----+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |Sum|
+------------------------------------------+-----+---+
|{2025-04-08 14:43:50, 2025-04-08 14:44:20}|Luis |2  |
|{2025-04-08 14:44:20, 2025-04-08 14:44:50}|Luis |2  |
|{2025-04-08 14:44:20, 2025-04-08 14:44:50}|Benja|2  |
|{2025-04-08 14:44:10, 2025-04-08 14:44:40}|Sam  |2  |
|{2025-04-08 14:44:10, 2025-04-08 14:44:40}|Gatos|2  |
|{2025-04-08 14:44:00, 2025-04-08 14:44:30}|Sam  |2  |
|{2025-04-08 14:43:50, 2025-04-08 14:44:20}|Sam  |2  |
|{2025-04-08 14:44:00, 2025-04-08 14:44:30}|Benja|4  |
|{2025-04-08 14:44:10, 2025-04-08 14:44:40}|Luis |4  |
|{2025-04-08 14:44:10, 2025-04-08 14:44:40}|Benja|4  |
|{2025-04-08 14:44

In [63]:
query2.stop()


In [64]:
from pyspark.sql.functions import length
words = words.withColumn("len", length(words.word))

In [66]:
from pyspark.sql.functions import avg
windowed_sum =  words \
                        .withWatermark("timestamp", "2 minutes") \
                        .groupBy(window(words.timestamp, 
                                        "30 seconds", # Window duration 
                                        "10 seconds"), # Slide duration
                                 words.word) \
                        .agg(avg("len").alias("Avg len"))

In [67]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

query3 = windowed_sum \
                .writeStream \
                .outputMode("complete") \
                .trigger(processingTime='30 seconds') \
                .format("console") \
                .option("truncate", "false") \
                .start()

query3.awaitTermination(60)

25/04/08 14:47:19 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-4138abb1-b4c7-4852-9de9-d57023e7de07. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/04/08 14:47:19 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/04/08 14:47:19 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


-------------------------------------------
Batch: 0
-------------------------------------------
+------+----+-------+
|window|word|Avg len|
+------+----+-------+
+------+----+-------+



                                                                                

-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |Sum|
+------------------------------------------+-----+---+
|{2025-04-08 14:45:40, 2025-04-08 14:46:10}|Hola |2  |
|{2025-04-08 14:45:50, 2025-04-08 14:46:20}|datos|2  |
|{2025-04-08 14:45:30, 2025-04-08 14:46:00}|datos|2  |
|{2025-04-08 14:47:00, 2025-04-08 14:47:30}|Len  |2  |
|{2025-04-08 14:45:40, 2025-04-08 14:46:10}|avg  |2  |
|{2025-04-08 14:46:00, 2025-04-08 14:46:30}|perro|2  |
|{2025-04-08 14:45:40, 2025-04-08 14:46:10}|sum  |2  |
|{2025-04-08 14:45:50, 2025-04-08 14:46:20}|sum  |2  |
|{2025-04-08 14:45:50, 2025-04-08 14:46:20}|adios|2  |
|{2025-04-08 14:45:40, 2025-04-08 14:46:10}|datos|2  |
|{2025-04-08 14:45:30, 2025-04-08 14:46:00}|Hola |2  |
|{2025-04-08 14:45:50, 2025-04-08 14:46:20}|avg  |2  |
|{2025-04-08 14:46:00, 2025-04-08 14:46:30}|avg  |2  |
|{2025-04-08 14:45:40, 

                                                                                

-------------------------------------------
Batch: 4
-------------------------------------------
+------------------------------------------+---------+---+
|window                                    |word     |Sum|
+------------------------------------------+---------+---+
|{2025-04-08 14:45:40, 2025-04-08 14:46:10}|Hola     |2  |
|{2025-04-08 14:47:10, 2025-04-08 14:47:40}|promedio |2  |
|{2025-04-08 14:45:50, 2025-04-08 14:46:20}|datos    |2  |
|{2025-04-08 14:45:30, 2025-04-08 14:46:00}|datos    |2  |
|{2025-04-08 14:47:00, 2025-04-08 14:47:30}|Len      |2  |
|{2025-04-08 14:47:30, 2025-04-08 14:48:00}|promedio |2  |
|{2025-04-08 14:45:40, 2025-04-08 14:46:10}|avg      |2  |
|{2025-04-08 14:46:00, 2025-04-08 14:46:30}|perro    |2  |
|{2025-04-08 14:47:30, 2025-04-08 14:48:00}|queaplica|2  |
|{2025-04-08 14:45:40, 2025-04-08 14:46:10}|sum      |2  |
|{2025-04-08 14:45:50, 2025-04-08 14:46:20}|sum      |2  |
|{2025-04-08 14:45:50, 2025-04-08 14:46:20}|adios    |2  |
|{2025-04-08 14:45

False

In [68]:
query3.stop()

In [51]:
sc.stop()