In [18]:
import datetime
import uuid

from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType
from pyspark.sql import SparkSession
from configs import kafka_config
import os


In [19]:

os.environ[
    'PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 pyspark-shell'


In [20]:

spark = (SparkSession.builder
         .appName("KafkaStreaming")
         .master("local[*]")
         .config("spark.sql.debug.maxToStringFields", "200")
         .config("spark.sql.columnNameLengthThreshold", "200")
         .getOrCreate())


In [21]:

alerts_df = spark.read.csv("../data/alerts_conditions.csv", header=True)


In [22]:

window_duration = "1 minute"
sliding_interval = "30 seconds"


In [23]:

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_config['bootstrap_servers'][0]) \
    .option("kafka.security.protocol", "SASL_PLAINTEXT") \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.sasl.jaas.config",
            'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="VawEzo1ikLtrA8Ug8THa";') \
    .option("subscribe", "building_sensors_greenmoon") \
    .option("startingOffsets", "earliest") \
    .option("maxOffsetsPerTrigger", "300") \
    .load()


In [24]:

json_schema = StructType([
    StructField("sensor_id", IntegerType(), True),
    StructField("timestamp", StringType(), True),
    StructField("temperature", IntegerType(), True),
    StructField("humidity", IntegerType(), True)
])


In [25]:

avg_stats = df.selectExpr("CAST(key AS STRING) AS key_deserialized", "CAST(value AS STRING) AS value_deserialized", "*") \
    .drop('key', 'value') \
    .withColumnRenamed("key_deserialized", "key") \
    .withColumn("value_json", from_json(col("value_deserialized"), json_schema)) \
    .withColumn("timestamp", from_unixtime(col("value_json.timestamp").cast(DoubleType())).cast("timestamp")) \
    .withWatermark("timestamp", "10 seconds") \
    .groupBy(window(col("timestamp"), window_duration, sliding_interval)) \
    .agg(
    avg("value_json.temperature").alias("t_avg"),
    avg("value_json.humidity").alias("h_avg")
) \
    .drop("topic")


In [26]:

all_alerts = avg_stats.crossJoin(alerts_df)


In [27]:

valid_alerts = all_alerts \
    .where("t_avg > temperature_min AND t_avg < temperature_max") \
    .unionAll(
    all_alerts
    .where("h_avg > humidity_min AND h_avg < humidity_max")
) \
    .withColumn("timestamp", lit(str(datetime.datetime.now()))) \
    .drop("id", "humidity_min", "humidity_max", "temperature_min", "temperature_max")


In [28]:

# Для дебагінгу. Принт проміжного резульату.
# displaying_df = valid_alerts.writeStream \
#     .trigger(processingTime='10 seconds') \
#     .outputMode("update") \
#     .format("console") \
#     .start() \
#     .awaitTermination()

uuid_udf = udf(lambda: str(uuid.uuid4()), StringType())


In [29]:

prepare_to_kafka_df = valid_alerts \
    .withColumn("key", uuid_udf()) \
    .select(
    col("key"),
    to_json(struct(col("window"),
                   col("t_avg"),
                   col("h_avg"),
                   col("code"),
                   col("message"),
                   col("timestamp"))).alias("value")
)


In [31]:
# Для дебагінгу. Принт проміжного резульату.
displaying_df = valid_alerts.writeStream \
    .trigger(processingTime='10 seconds') \
    .outputMode("update") \
    .format("console") \
    .start() \
    .awaitTermination()

24/11/21 20:19:36 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-7002ad4b-f79b-45c1-a4d6-6edff093874b. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/11/21 20:19:36 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/11/21 20:19:36 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
24/11/21 20:19:51 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 30000 milliseconds, but spent 68401 milliseconds
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+-----------------+-----+----+------------+--------------------+
|              window|            t_avg|h_avg|code|     message|           timestamp|
+--------------------+-----------------+-----+----+------------+--------------------+
|{2024-11-21 18:33...|36.36666666666667| 60.1| 102|It's too wet|2024-11-21 20:02:...|
+--------------------+-----------------+-----+----+------------+--------------------+



24/11/21 20:20:11 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000 milliseconds, but spent 35481 milliseconds
24/11/21 20:21:09 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 30000 milliseconds, but spent 77578 milliseconds
                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+-----------------+------------------+----+------------+--------------------+
|              window|            t_avg|             h_avg|code|     message|           timestamp|
+--------------------+-----------------+------------------+----+------------+--------------------+
|{2024-11-21 18:44...|             39.0|39.666666666666664| 101|It's too dry|2024-11-21 20:02:...|
|{2024-11-21 20:02...|35.78947368421053| 60.31578947368421| 102|It's too wet|2024-11-21 20:02:...|
+--------------------+-----------------+------------------+----+------------+--------------------+



24/11/21 20:21:28 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000 milliseconds, but spent 76250 milliseconds
24/11/21 20:22:20 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 30000 milliseconds, but spent 71297 milliseconds
                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+--------------------+-----------------+-----------------+----+------------+--------------------+
|              window|            t_avg|            h_avg|code|     message|           timestamp|
+--------------------+-----------------+-----------------+----+------------+--------------------+
|{2024-11-21 20:12...|37.54545454545455|38.81818181818182| 101|It's too dry|2024-11-21 20:02:...|
+--------------------+-----------------+-----------------+----+------------+--------------------+



24/11/21 20:22:34 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000 milliseconds, but spent 66237 milliseconds
24/11/21 20:23:23 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 30000 milliseconds, but spent 63285 milliseconds
                                                                                

-------------------------------------------
Batch: 3
-------------------------------------------
+--------------------+-----------------+------------------+----+------------+--------------------+
|              window|            t_avg|             h_avg|code|     message|           timestamp|
+--------------------+-----------------+------------------+----+------------+--------------------+
|{2024-11-21 20:22...|             34.5|              64.5| 102|It's too wet|2024-11-21 20:02:...|
|{2024-11-21 20:22...|32.27777777777778|60.388888888888886| 102|It's too wet|2024-11-21 20:02:...|
+--------------------+-----------------+------------------+----+------------+--------------------+



24/11/21 20:23:38 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000 milliseconds, but spent 64229 milliseconds
24/11/21 20:24:25 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 30000 milliseconds, but spent 61532 milliseconds
                                                                                

-------------------------------------------
Batch: 4
-------------------------------------------
+--------------------+-----------------+-----------------+----+------------+--------------------+
|              window|            t_avg|            h_avg|code|     message|           timestamp|
+--------------------+-----------------+-----------------+----+------------+--------------------+
|{2024-11-21 20:22...|35.76190476190476|60.38095238095238| 102|It's too wet|2024-11-21 20:02:...|
+--------------------+-----------------+-----------------+----+------------+--------------------+



24/11/21 20:24:40 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000 milliseconds, but spent 61492 milliseconds
24/11/21 20:25:28 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 30000 milliseconds, but spent 63757 milliseconds
                                                                                

-------------------------------------------
Batch: 5
-------------------------------------------
+--------------------+------------------+------------------+----+------------+--------------------+
|              window|             t_avg|             h_avg|code|     message|           timestamp|
+--------------------+------------------+------------------+----+------------+--------------------+
|{2024-11-21 20:24...|36.166666666666664|36.833333333333336| 101|It's too dry|2024-11-21 20:02:...|
+--------------------+------------------+------------------+----+------------+--------------------+



24/11/21 20:25:43 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000 milliseconds, but spent 63335 milliseconds
                                                                                

-------------------------------------------
Batch: 6
-------------------------------------------
+------+-----+-----+----+-------+---------+
|window|t_avg|h_avg|code|message|timestamp|
+------+-----+-----+----+-------+---------+
+------+-----+-----+----+-------+---------+



24/11/21 20:26:05 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 30000 milliseconds, but spent 36337 milliseconds
24/11/21 20:26:05 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000 milliseconds, but spent 21790 milliseconds
                                                                                

-------------------------------------------
Batch: 7
-------------------------------------------
+--------------------+------------------+-----------------+----+------------+--------------------+
|              window|             t_avg|            h_avg|code|     message|           timestamp|
+--------------------+------------------+-----------------+----+------------+--------------------+
|{2024-11-21 20:26...|30.666666666666668|76.33333333333333| 102|It's too wet|2024-11-21 20:02:...|
|{2024-11-21 20:25...|30.666666666666668|76.33333333333333| 102|It's too wet|2024-11-21 20:02:...|
+--------------------+------------------+-----------------+----+------------+--------------------+





-------------------------------------------
Batch: 8
-------------------------------------------
+--------------------+------+------+----+------------+--------------------+
|              window| t_avg| h_avg|code|     message|           timestamp|
+--------------------+------+------+----+------------+--------------------+
|{2024-11-21 20:26...|31.125|65.375| 102|It's too wet|2024-11-21 20:02:...|
|{2024-11-21 20:25...|31.125|65.375| 102|It's too wet|2024-11-21 20:02:...|
+--------------------+------+------+----+------------+--------------------+



24/11/21 20:26:41 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000 milliseconds, but spent 11915 milliseconds
24/11/21 20:27:28 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 30000 milliseconds, but spent 58387 milliseconds
24/11/21 20:27:39 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000 milliseconds, but spent 57189 milliseconds


-------------------------------------------
Batch: 9
-------------------------------------------
+--------------------+------------------+------------------+----+------------+--------------------+
|              window|             t_avg|             h_avg|code|     message|           timestamp|
+--------------------+------------------+------------------+----+------------+--------------------+
|{2024-11-21 20:26...|31.928571428571427|62.642857142857146| 102|It's too wet|2024-11-21 20:02:...|
+--------------------+------------------+------------------+----+------------+--------------------+



24/11/21 20:28:24 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 30000 milliseconds, but spent 55637 milliseconds
24/11/21 20:28:35 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000 milliseconds, but spent 56643 milliseconds


-------------------------------------------
Batch: 10
-------------------------------------------
+--------------------+-----+-----+----+-------------+--------------------+
|              window|t_avg|h_avg|code|      message|           timestamp|
+--------------------+-----+-----+----+-------------+--------------------+
|{2024-11-21 20:27...| 29.0|63.25| 103|It's too cold|2024-11-21 20:02:...|
|{2024-11-21 20:27...| 29.0|63.25| 102| It's too wet|2024-11-21 20:02:...|
+--------------------+-----+-----+----+-------------+--------------------+



24/11/21 20:29:15 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 30000 milliseconds, but spent 51893 milliseconds
                                                                                

-------------------------------------------
Batch: 11
-------------------------------------------
+------+-----+-----+----+-------+---------+
|window|t_avg|h_avg|code|message|timestamp|
+------+-----+-----+----+-------+---------+
+------+-----+-----+----+-------+---------+



24/11/21 20:29:27 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000 milliseconds, but spent 51774 milliseconds
                                                                                

-------------------------------------------
Batch: 12
-------------------------------------------
+------+-----+-----+----+-------+---------+
|window|t_avg|h_avg|code|message|timestamp|
+------+-----+-----+----+-------+---------+
+------+-----+-----+----+-------+---------+



24/11/21 20:29:50 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000 milliseconds, but spent 23080 milliseconds
24/11/21 20:29:50 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 30000 milliseconds, but spent 34809 milliseconds
ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/gamelt/.local/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/gamelt/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

24/11/21 23:01:56 WARN KafkaOffsetReaderAdmin: Error in attempt 1 getting Kafka offsets: 
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, deadlineMs=1732219480008, tries=1, nextAllowedTryMs=1732222916146) timed out at 1732222916046 after 1 attempt(s)
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
	at org.apache.spark.sql.kafka010.ConsumerStrategy.retrieveAllPartitions(ConsumerStrategy.scala:66)
	at org.apache.spark.sql.kafka010.ConsumerStrategy.retrieveAllPartitions$(ConsumerStrategy.scala:65)
	at org.apache.spark.sql.kafka010.SubscribeStrategy.retrieveAllPartitions(ConsumerStrategy.scala:102)
	at org.apache.spark.sql.kafka010.SubscribeStrategy.assignedTopicPartitions(ConsumerStrategy.scala:113)
	at org.a

In [None]:

# query = prepare_to_kafka_df.writeStream \
#     .trigger(processingTime='30 seconds') \
#     .outputMode("update") \
#     .format("kafka") \
#     .option("kafka.bootstrap.servers", "77.81.230.104:9092") \
#     .option("topic", "avg_alerts") \
#     .option("kafka.security.protocol", "SASL_PLAINTEXT") \
#     .option("kafka.sasl.mechanism", "PLAIN") \
#     .option("kafka.sasl.jaas.config",
#             "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='VawEzo1ikLtrA8Ug8THa';") \
#     .option("checkpointLocation", "/tmp/checkpoints-7") \
#     .start() \
#     .awaitTermination()


24/11/21 20:02:03 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/11/21 20:02:03 WARN StreamingQueryManager: Stopping existing streaming query [id=bd929ca2-6269-41ea-bc73-a19cee7c73a8, runId=d7a7e731-e3f3-40ab-92a4-8f989f2af06e], as a new run is being started.
24/11/21 20:02:03 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 4, writer: org.apache.spark.sql.kafka010.KafkaStreamingWrite@626516e5] is aborting.
24/11/21 20:02:03 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 4, writer: org.apache.spark.sql.kafka010.KafkaStreamingWrite@626516e5] aborted.
24/11/21 20:02:03 WARN TaskSetManager: Lost task 0.0 in stage 20.0 (TID 1616) (10.255.255.254 executor driver): TaskKilled (Stage cancelled: Job 16 cancelled part of cancelled job group d7a7e731-e3f3-40ab-92a4-8f989f2af06e)
24/11/21 20:02:06 WARN HDFSBackedStateStoreProvider: The state for version

KeyboardInterrupt: 

24/11/21 20:18:00 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 30000 milliseconds, but spent 55211 milliseconds
24/11/21 20:18:43 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 30000 milliseconds, but spent 42289 milliseconds