In [12]:
import os
import datetime
import uuid
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from configs import kafka_config, SENSORS_TOPIC_NAME, COMMON_ALERT_TOPIC_NAME


In [13]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 pyspark-shell'



spark = (SparkSession.builder
         .appName("KafkaStreaming")
         .master("local[*]")
         .config("spark.sql.debug.maxToStringFields", "200")
         .config("spark.sql.columnNameLengthThreshold", "200")
         .getOrCreate())

In [14]:
alerts_df = spark.read.csv("alerts_conditions.csv", header=True)
print(alerts_df.show())

+---+------------+------------+---------------+---------------+----+-------------+
| id|humidity_min|humidity_max|temperature_min|temperature_max|code|      message|
+---+------------+------------+---------------+---------------+----+-------------+
|  1|           0|          40|           -999|           -999| 101| It's too dry|
|  2|          60|         100|           -999|           -999| 102| It's too wet|
|  3|        -999|        -999|           -300|             30| 103|It's too cold|
|  4|        -999|        -999|             40|            300| 104| It's too hot|
+---+------------+------------+---------------+---------------+----+-------------+

None


In [15]:
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_config['bootstrap_servers'][0]) \
    .option("kafka.security.protocol", "SASL_PLAINTEXT") \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.sasl.jaas.config",
            'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="VawEzo1ikLtrA8Ug8THa";') \
    .option("subscribe", SENSORS_TOPIC_NAME) \
    .option("startingOffsets", "earliest") \
    .option("maxOffsetsPerTrigger", "300") \
    .load()

In [16]:
json_schema = StructType([
    StructField("sensor_id", IntegerType(), True),
    StructField("timestamp", StringType(), True),
    StructField("temperature", IntegerType(), True),
    StructField("humidity", IntegerType(), True)
])

In [17]:

avg_stats = df.selectExpr("CAST(key AS STRING) AS key_deserialized", "CAST(value AS STRING) AS value_deserialized", "*") \
    .drop('key', 'value') \
    .withColumnRenamed("key_deserialized", "key") \
    .withColumn("value_json", from_json(col("value_deserialized"), json_schema)) \
    .withColumn("timestamp", from_unixtime(col("value_json.timestamp").cast(DoubleType())).cast("timestamp")) \
    .withWatermark("timestamp", "10 seconds") \
    .groupBy(window(col("timestamp"), "1 minute", "30 seconds")) \
    .agg(
    avg("value_json.temperature").alias("t_avg"),
    avg("value_json.humidity").alias("h_avg")
) \
    .drop("topic")

In [18]:
all_alerts = avg_stats.crossJoin(alerts_df)

In [19]:
valid_alerts = all_alerts \
    .where("t_avg > temperature_min AND t_avg < temperature_max") \
    .unionAll(
    all_alerts
    .where("h_avg > humidity_min AND h_avg < humidity_max")
) \
    .withColumn("timestamp", lit(str(datetime.datetime.now()))) \
    .drop("id", "humidity_min", "humidity_max", "temperature_min", "temperature_max")

In [20]:

# displaying_df = valid_alerts.writeStream \
#     .trigger(processingTime='10 seconds') \
#     .outputMode("update") \
#     .format("console") \
#     .start() \
#     .awaitTermination()

In [21]:
uuid_udf = udf(lambda: str(uuid.uuid4()), StringType())

prepare_to_kafka_df = valid_alerts \
    .withColumn("key", uuid_udf()) \
    .select(
    col("key"),
    to_json(struct(col("window"),
                   col("t_avg"),
                   col("h_avg"),
                   col("code"),
                   col("message"),
                   col("timestamp"))).alias("value")
)

In [22]:
query = prepare_to_kafka_df.writeStream \
    .trigger(processingTime='30 seconds') \
    .outputMode("update") \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_config['bootstrap_servers'][0]) \
    .option("topic", COMMON_ALERT_TOPIC_NAME) \
    .option("kafka.security.protocol", kafka_config['security_protocol']) \
    .option("kafka.sasl.mechanism", kafka_config['sasl_mechanism']) \
    .option("kafka.sasl.jaas.config",
            "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='VawEzo1ikLtrA8Ug8THa';") \
    .option("checkpointLocation", "/tmp/checkpoints") \
    .start() \
    .awaitTermination()


24/11/26 16:25:12 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/11/26 16:25:12 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
ERROR:root:KeyboardInterrupt while sending command.                             
Traceback (most recent call last):
  File "/home/msolonin/PycharmProjects/GoIT/Data-Engineering/.venv/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/msolonin/PycharmProjects/GoIT/Data-Engineering/.venv/lib/python3.8/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 