# **Assignment 4: PySpark Structured Streaming Using Kafka Source**

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("pyspark-kafka-streaming").\
        master("spark://spark-master:7077").\
        config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0"). \
        config("spark.executor.memory", "512m").\
        getOrCreate()

Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/local/lib/python3.9/dist-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b32c72eb-c581-4937-9010-f8a50738ccef;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in central
	found org.apache.kafka#kafka-clients;2.4.1 in central
	found com.github.luben#zstd-jni;1.4.4-3 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.7.5 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 303ms :: artifacts dl 5m

# RDD -- dSTREAMS

In [2]:
from pyspark.sql.types import StringType, StructType ,StructField, BooleanType, LongType, IntegerType


# Define the schema for the event data
event_schema = StructType([
    StructField("timestamp", StringType()),
    StructField("code", StringType()),
    StructField("message", StringType()),
    StructField("ip_address", StringType()),
    StructField("label", StringType())
])




In [None]:
# # Map the RDD to a new format
# formatted_rdd = rdd.map(lambda x: (x[0], x[1], x[2], x[3], x[4], int(x[5]), int(x[6]), x[7], x[8]))

# # Convert the RDD of tuples to a DataFrame with appropriate schema and column names
# formatted_df = formatted_rdd.toDF(["key", "ip_address", "date_time", "request_type", "request_arg",
#                                    "status_code", "response_size", "referrer", "user_agent"])

# # Write the DataFrame to a persistent storage system
# formatted_df.write.format("parquet").mode("append").save("hdfs://path/to/destination")

In [3]:

def process_batch(batch_df, batch_id):
    rdd = batch_df.rdd.map(lambda x: (x['timestamp'], x['code'], x['message'], x['ip_address'],x['label']))

In [4]:
from pyspark.sql.functions import from_json, col

df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9093") \
    .option("subscribe", "ssh") \
    .load() \
    .select(from_json(col("value").cast("string"), event_schema).alias("data")) \
    .selectExpr("data.*")

In [5]:
query = (df.writeStream \
    .foreachBatch(process_batch) \
    .outputMode("update") \
          .format("console") \
    .option("kafka.bootstrap.servers", "kafka:9093") \
  .option("topic", "topic_test") \
  .trigger(processingTime = '5 seconds')\
 .start())

23/04/25 14:53:29 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-9f9c4ddc-2994-498d-b673-59b00c41f9cb. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+---------+----+-------+----------+-----+
|timestamp|code|message|ip_address|label|
+---------+----+-------+----------+-----+
+---------+----+-------+----------+-----+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+---------------+-----------+--------------------+------------+---------------+
|      timestamp|       code|             message|  ip_address|          label|
+---------------+-----------+--------------------+------------+---------------+
|Dec 10 07:28:05|sshd[24245]|Failed password f...|112.95.230.3|failed_password|
+---------------+-----------+--------------------+------------+---------------+



                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+---------------+-----------+--------------------+------------+---------------+
|      timestamp|       code|             message|  ip_address|          label|
+---------------+-----------+--------------------+------------+---------------+
|Dec 10 07:28:05|sshd[24245]|Received disconne...|112.95.230.3|     disconnect|
|Dec 10 07:28:06|sshd[24247]|pam_unix(sshd:aut...|112.95.230.3|   auth_failure|
|Dec 10 07:28:08|sshd[24247]|Failed password f...|112.95.230.3|failed_password|
+---------------+-----------+--------------------+------------+---------------+

-------------------------------------------
Batch: 3
-------------------------------------------
+---------------+-----------+--------------------+------------+---------------+
|      timestamp|       code|             message|  ip_address|          label|
+---------------+-----------+--------------------+------------+---------------+
|Dec 

In [6]:
query.stop()

In [None]:
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, BooleanType, LongType, IntegerType

event_schema = StructType([
    StructField("ip_address", StringType()),
    StructField("date_time", StringType()),
    StructField("request_type", StringType()),
    StructField("request_arg", StringType()),
    StructField("status_code", StringType()),
    StructField("response_size", StringType()),
    StructField("referrer", StringType()),
    StructField("user_agent", StringType())
])

# Parse the events from JSON format
df_parsed = (df_streamed_kv
           # Sets schema for event data
           .withColumn("value", from_json("value", event_schema))
          )

In [None]:
df_formatted = (df_parsed.select(
    col("key").alias("event_key")
    ,col("topic").alias("event_topic")
    ,col("timestamp").alias("event_timestamp")
    ,col("value.ip_address").alias("ip_address")
    ,col("value.date_time").alias("date_time")
    ,col("value.request_type").alias("request_type")
    ,col("value.request_arg").alias("request_arg")
    ,col("value.status_code").alias("status_code")
    ,col("value.response_size").cast(IntegerType()).alias("response_size")
    ,col("value.referrer").alias("referrer")
    ,col("value.user_agent").alias("user_agent")
))

In [None]:
print(df_formatted)

#### **Q2.2:** All your code for 2.2 should be in the following cell


In [None]:
# Answer to 2.2
query = (df_formatted
 .writeStream \
 .outputMode("update") \
  .format("console") \
  .option("kafka.bootstrap.servers", "kafka:9093") \
  .option("topic", "topic_test") \
  .trigger(processingTime = '5 seconds')
 .start())

In [None]:
# Print the name of active streams (This may be useful during debugging)
for s in spark.streams.active:
    print(f"ID:{s.id} | NAME:{s.name}")

In [None]:
query.stop()

## ==== Q3 ====

#### **Q3.1:** All your code for 3.1 should be in the following cell

In [None]:
cnt = df_formatted.groupBy("event_topic").count()

In [None]:
q = (cnt \
 .writeStream\
 .format("memory") \
 .outputMode("update") \
 .trigger(processingTime = '5 seconds')\
 .queryName("count")\
 .start())

In [None]:
spark.sql("Select * from count").show(10)

In [None]:
q.stop()

#### **Q3.2:** All your code for 3.2 should be in the following cell

In [None]:
from pyspark.sql.functions import *


windowedCounts = df_formatted \
    .groupBy(
        window(df_formatted.event_timestamp, "10 seconds", "10 seconds"),
        df_formatted.request_type) \
    .count()

In [None]:
q1 = (windowedCounts \
 .writeStream\
 .format("memory") \
 .outputMode("update") \
 .trigger(processingTime = '5 seconds')\
 .queryName("windowed_count")\
 .start())

In [None]:
spark.sql("Select * from windowed_count").show(10)

In [None]:
q1.stop()

#### **Q3.3:** All your code for 3.3 should be in the following cell


In [None]:
avg = df_formatted.groupBy(
        window(df_formatted.event_timestamp, "10 seconds", "10 seconds"),
        df_formatted.request_type) \
    .agg(
      avg("response_size").alias("response_size_average"))

In [None]:
q2 = (avg \
 .writeStream\
 .format("memory") \
 .outputMode("update") \
 .trigger(processingTime = '5 seconds')\
 .queryName("response_size_average")\
 .start())

In [None]:
spark.sql("Select * from response_size_average").show(100)

In [None]:
q2.stop()

#### **Q3.4:** All your code for 3.4 should be in the following cell

In [None]:
from pyspark.sql.functions import *

std = df_formatted \
    .groupBy( window(df_formatted.event_timestamp, "10 seconds", "10 seconds"),
    df_formatted.request_type) \
    .agg(avg("response_size").alias("Average"),
    stddev("response_size").alias("SDev"), 
    count("response_size").alias("Count"),
    collect_list("response_size").alias("List"))


In [None]:
q3 = (std \
 .writeStream\
 .format("memory") \
 .outputMode("update") \
 .trigger(processingTime = '5 seconds')\
 .queryName("response_size_std")\
 .start())

In [None]:
spark.sql("Select * from response_size_std").show(100)

In [None]:
exp = std.select("window","request_type",explode("List"))

In [None]:
q4 = (exp \
 .writeStream\
 .format("memory") \
 .outputMode("update") \
 .trigger(processingTime = '5 seconds')\
 .queryName("exp_table")\
 .start())

In [None]:
spark.sql("Select * from exp_table").show(100)

In [None]:
from pyspark.sql.functions import *

expr1 = df_formatted \
    .groupBy( window(df_formatted.event_timestamp, "10 seconds", "10 seconds"),
    df_formatted.request_type) \
    .agg(avg("response_size").alias("Average"),
    stddev("response_size").alias("SDev"), 
    collect_list("response_size").alias("List"))\
    .select("window","Average","SDev",explode("List").alias("Exploded_list"))


In [None]:
filter1 = expr1.filter(expr("Exploded_list > Average +SDev"))

In [None]:
q5 = (filter1 \
 .writeStream\
 .format("memory") \
 .outputMode("update") \
 .trigger(processingTime = '5 seconds')\
 .queryName("expr_table")\
 .start())

In [None]:
spark.sql("Select * from expr_table").show(100)

In [None]:
q5.stop()