# **Assignment 4: PySpark Structured Streaming Using Kafka Source**

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("pyspark-kafka-streaming").\
        master("spark://spark-master:7077").\
        config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0"). \
        config("spark.executor.memory", "512m").\
        getOrCreate()

Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/local/lib/python3.9/dist-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0ccecbce-7531-4b28-a8a5-85e3ce5b917f;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in central
	found org.apache.kafka#kafka-clients;2.4.1 in central
	found com.github.luben#zstd-jni;1.4.4-3 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.7.5 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 469ms :: artifacts dl 11

## ==== Q2 ====

#### **Q2.1:** All your code for 2.1 should be in the following cell

In [2]:
#Answer to 2.1
df_streamed_raw = (spark
  .readStream
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9093") \
  .option("subscribe", "topic_test1") \
  .load())


In [3]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import col

# convert byte stream to string
df_streamed_kv = (df_streamed_raw
    .withColumn("key", df_streamed_raw["key"].cast(StringType()))
    .withColumn("value", df_streamed_raw["value"].cast(StringType())))

test_query = (df_streamed_kv 
              .writeStream \
              .format("memory") # output to memory \
              .outputMode("update") # only write updated rows to the sink \
              .queryName("test_query_table")  # Name of the in memory table \
              .start())

23/04/12 13:00:31 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-177d8fc9-cdfc-40e6-8e81-0cd0db791887. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
[Stage 3:>                                                          (0 + 1) / 1]

#### If all goes well, the following cell should display a table populated with values being streamed from you Kafka producer. NOTE: If you recently ran the producer, it may take a while before the table is populated. Keep rerunning the cell to check for updates:

In [5]:
spark.sql("select * from test_query_table").show()

+--------+--------------------+-----------+---------+------+--------------------+-------------+
|     key|               value|      topic|partition|offset|           timestamp|timestampType|
+--------+--------------------+-----------+---------+------+--------------------+-------------+
|dummykey|{"feel":"9.09","m...|topic_test1|        0|  2269|2023-04-12 13:00:...|            0|
|dummykey|{"feel":"24.11","...|topic_test1|        0|  2270|2023-04-12 13:00:...|            0|
|dummykey|{"feel":"7.73","m...|topic_test1|        0|  2271|2023-04-12 13:00:...|            0|
|dummykey|{"feel":"6.85","m...|topic_test1|        0|  2272|2023-04-12 13:00:...|            0|
+--------+--------------------+-----------+---------+------+--------------------+-------------+



In [6]:
test_query.stop()

#### The following cells contain code that take the streamed dataframe and formats it properly into a table. If any of the given cells fails, there might be a formatting issue with one of your previous solutions. 

In [7]:
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, BooleanType, LongType, IntegerType, DoubleType

event_schema = StructType([
    StructField("station", StringType()),
    StructField("valid", StringType()),
    StructField("tmpf", StringType()),
    StructField("dwpf", StringType()),
    StructField("relh", StringType()),
    StructField("feel", StringType()),
    StructField("drct", StringType()),
    StructField("sped", StringType()),
    StructField("alti", StringType()),
    StructField("mslp", StringType()),
    StructField("p01m", StringType()),
    StructField("vsby", StringType()),
    StructField("skyc1", StringType()),
    StructField("skyl1", StringType()),
    StructField("wxcodes", StringType()),
    StructField("ice_acceretion_1hr", StringType()),
])

# Parse the events from JSON format
df_parsed = (df_streamed_kv
           # Sets schema for event data
           .withColumn("value", from_json("value", event_schema))
          )

In [8]:
from pyspark.sql.functions import to_timestamp, unix_timestamp

# Here, we need to convert date_time string to date_time object in the "dd/MMM/yyyy:HH:mm:ss Z" format.

df_formatted = (df_parsed.select(
    col("key").alias("event_key")
    ,col("topic").alias("event_topic")
    ,col("timestamp").alias("event_timestamp")
    ,col("value.station").alias("station")
    ,col("value.valid").alias("valid")
    ,col("value.tmpf").alias("tmpf")
    ,col("value.dwpf").alias("dwpf")
    ,col("value.relh").alias("relh")
    ,col("value.feel").alias("feel")
    ,col("value.drct").alias("drct")
    ,col("value.sped").alias("sped")
    ,col("value.alti").alias("alti")
    ,col("value.mslp").alias("mslp")
    ,col("value.p01m").alias("p01m")
    ,col("value.vsby").alias("vsby")
    ,col("value.skyc1").alias("skyc1")
    ,col("value.skyl1").alias("skyl1")
    ,col("value.wxcodes").alias("wxcodes")
    ,col("value.ice_acceretion_1hr").alias("ice_acceretion_1hr")
#     cast(IntegerType()).
))

#### **Q2.2:** All your code for 2.2 should be in the following cell


In [6]:
# Answer to 2.2
query = (df_formatted
            .writeStream
            .format("console")
            .trigger(processingTime='5 seconds')
            .outputMode("append")
            .option("truncate",'false')
            .start()
        )

23/04/12 04:17:22 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-4640065d-2fdf-4528-8ec4-8c98de75017a. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+
|event_key|event_topic|event_timestamp|station|valid|tmpf|dwpf|relh|feel|drct|sped|alti|mslp|p01m|vsby|skyc1|skyl1|wxcodes|ice_acceretion_1hr|
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+---------+-----------+----------------------+-------+----------------+----+-----+-----+-----+-----+----+-----+------+----+----+-----+-----+-------+------------------+
|event_key|event_topic|event_timestamp       |station|valid      

In [13]:
# Print the name of active streams (This may be useful during debugging)
for s in spark.streams.active:
    print(f"ID:{s.id} | NAME:{s.name}")

-------------------------------------------
Batch: 2
-------------------------------------------
+---------+-----------+-----------------------+-------+----------------+-----+----+-----+----+-----+----+-----+----+----+----+-----+-----+-------+------------------+
|event_key|event_topic|event_timestamp        |station|valid           |tmpf |dwpf|relh |feel|drct |sped|alti |mslp|p01m|vsby|skyc1|skyl1|wxcodes|ice_acceretion_1hr|
+---------+-----------+-----------------------+-------+----------------+-----+----+-----+----+-----+----+-----+----+----+----+-----+-----+-------+------------------+
|dummykey |topic_test1|2023-04-12 03:25:15.575|FSO    |2013-01-01 19:15|14.54|0.5 |52.99|3.5 |280.0|8.05|29.89|-1.0|0.00|10.0|CLR  |M    |M      |M                 |
+---------+-----------+-----------------------+-------+----------------+-----+----+-----+----+-----+----+-----+----+----+----+-----+-----+-------+------------------+

ID:34aa0644-e793-49a2-b1a3-7855377e6510 | NAME:None
ID:7cb2097c-0a38-459

In [18]:
query.stop()

-------------------------------------------
Batch: 9
-------------------------------------------
-------------------------------------------
Batch: 161
-------------------------------------------
+---------+-----------+----------------------+-------+----------------+----+-----+-----+----+----+----+-----+----+----+----+-----+------+-------+------------------+
|event_key|event_topic|event_timestamp       |station|valid           |tmpf|dwpf |relh |feel|drct|sped|alti |mslp|p01m|vsby|skyc1|skyl1 |wxcodes|ice_acceretion_1hr|
+---------+-----------+----------------------+-------+----------------+----+-----+-----+----+----+----+-----+----+----+----+-----+------+-------+------------------+
|dummykey |topic_test1|2023-04-12 03:38:31.95|EFK    |2013-01-02 04:15|7.16|-0.58|69.95|7.16|0.0 |0.0 |29.86|0   |0.00|5.0 |OVC  |3400.0|-SN    |M                 |
+---------+-----------+----------------------+-------+----------------+----+-----+-----+----+----+----+-----+----+----+----+-----+------+-------

# ==== Project - Start your feature extraction queries from here ====

In [15]:
!pip install numpy

Collecting numpy
  Downloading numpy-1.24.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (17.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m17.3/17.3 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: numpy
Successfully installed numpy-1.24.2
[0m

In [9]:
from pyspark.sql.functions import mean, when

# Calculate the mean of the column
# mean_val = df_formatted.select(mean("mslp")).collect()[0][0]

# Replace "X" with the mean of the column
df_new = df_formatted.withColumn("mslp", when(df_formatted.mslp == -1, 0).otherwise(df_formatted.mslp))


In [10]:
query_new = (df_new
            .writeStream
            .format("console")
            .trigger(processingTime='5 seconds')
            .outputMode("append")
            .option("truncate",'false')
            .start()
        )

23/04/12 13:01:18 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-f451b2f9-b35d-485a-994a-39e09717d4db. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+
|event_key|event_topic|event_timestamp|station|valid|tmpf|dwpf|relh|feel|drct|sped|alti|mslp|p01m|vsby|skyc1|skyl1|wxcodes|ice_acceretion_1hr|
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+---------+-----------+-----------------------+-------+----------------+----+----+-----+----+-----+----+----+----+----+----+-----+------+-------+------------------+
|event_key|event_topic|event_timestamp        |station|valid        

In [11]:
query_new.stop()

In [12]:
# Replace "X" with the mean of the column
df = df_new.withColumn("skyl1", when(df_new.skyl1 == "M", 0).otherwise(df_new.skyl1))

In [8]:
query_skyl1 = (df
            .writeStream
            .format("console")
            .trigger(processingTime='5 seconds')
            .outputMode("append")
            .option("truncate",'false')
            .start()
        )

23/04/12 04:08:43 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-e806f283-b6e9-4d0f-8f84-025a04d1d355. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+
|event_key|event_topic|event_timestamp|station|valid|tmpf|dwpf|relh|feel|drct|sped|alti|mslp|p01m|vsby|skyc1|skyl1|wxcodes|ice_acceretion_1hr|
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+---------+-----------+-----------------------+-------+----------------+----+----+-----+-----+-----+----+-----+----+----+----+-----+-----+-------+------------------+
|event_key|event_topic|event_timestamp        |station|valid       

In [9]:
query_skyl1.stop()

In [13]:
df_wxcodes1 = df.withColumn("wxcodes", when(df.wxcodes == "M", "CLR").otherwise(df.wxcodes))

In [9]:
query_wxcodes1 = (df_wxcodes1
            .writeStream
            .format("console")
            .trigger(processingTime='5 seconds')
            .outputMode("append")
            .option("truncate",'false')
            .start()
        )

23/04/12 04:23:41 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-1f37d6c1-3e50-4279-8edd-4d07c62f188c. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+
|event_key|event_topic|event_timestamp|station|valid|tmpf|dwpf|relh|feel|drct|sped|alti|mslp|p01m|vsby|skyc1|skyl1|wxcodes|ice_acceretion_1hr|
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+---------+-----------+-----------------------+-------+----------------+------+------+-----+------+-----+----+-----+----+----+----+-----+-----+-------+------------------+
|event_key|event_topic|event_timestamp        |station|valid  

In [11]:
query_wxcodes1.stop()

In [14]:
df_ice_acceretion = df_wxcodes1.withColumn("ice_acceretion_1hr", when(df_wxcodes1.ice_acceretion_1hr == "M", 0).otherwise(df_wxcodes1.ice_acceretion_1hr))

In [13]:
q_ice = (df_ice_acceretion
            .writeStream
            .format("console")
            .trigger(processingTime='5 seconds')
            .outputMode("append")
            .option("truncate",'false')
            .start()
        )

23/04/12 04:28:18 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-1d09e70d-5ddb-446a-8721-25a627d6edee. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+
|event_key|event_topic|event_timestamp|station|valid|tmpf|dwpf|relh|feel|drct|sped|alti|mslp|p01m|vsby|skyc1|skyl1|wxcodes|ice_acceretion_1hr|
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+---------+-----------+-----------------------+-------+----------------+------+------+-----+------+----+----+-----+----+----+----+-----+-----+-------+------------------+
|event_key|event_topic|event_timestamp        |station|valid   

In [14]:
q_ice.stop()

In [15]:
df_skyc1 = df_ice_acceretion.withColumn("skyc1", when(df_ice_acceretion.skyc1 == "M", "CLR").otherwise(df_ice_acceretion.skyc1))

In [16]:
q_skyc1 = (df_skyc1
            .writeStream
            .format("console")
            .trigger(processingTime='5 seconds')
            .outputMode("append")
            .option("truncate",'false')
            .start()
        )

23/04/12 04:32:40 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-e7ecbca0-e12e-453d-bcca-c728e5a9329f. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+
|event_key|event_topic|event_timestamp|station|valid|tmpf|dwpf|relh|feel|drct|sped|alti|mslp|p01m|vsby|skyc1|skyl1|wxcodes|ice_acceretion_1hr|
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+---------+-----------+-----------------------+-------+----------------+-----+-----+-----+------+-----+----+-----+----+----+----+-----+-----+-------+------------------+
|event_key|event_topic|event_timestamp        |station|valid    

In [17]:
q_skyc1.stop()

In [16]:
df_skyc1_encoded1 = df_skyc1.withColumn("skyc1", when(df_skyc1.skyc1 == "CLR", 0).otherwise(df_skyc1.skyc1))
df_skyc1_encoded2 = df_skyc1_encoded1.withColumn("skyc1", when(df_skyc1_encoded1.skyc1 == "VV", 1).otherwise(df_skyc1_encoded1.skyc1))
df_skyc1_encoded3 = df_skyc1_encoded2.withColumn("skyc1", when(df_skyc1_encoded2.skyc1 == "SCT", 2).otherwise(df_skyc1_encoded2.skyc1))
df_skyc1_encoded4 = df_skyc1_encoded3.withColumn("skyc1", when(df_skyc1_encoded3.skyc1 == "FEW", 3).otherwise(df_skyc1_encoded3.skyc1))
df_skyc1_encoded5 = df_skyc1_encoded4.withColumn("skyc1", when(df_skyc1_encoded4.skyc1 == "OVC", 4).otherwise(df_skyc1_encoded4.skyc1))
df_skyc1_encoded6 = df_skyc1_encoded5.withColumn("skyc1", when(df_skyc1_encoded5.skyc1 == "BKN", 5).otherwise(df_skyc1_encoded5.skyc1))

In [19]:
q_skyc1_enc = (df_skyc1_encoded6
            .writeStream
            .format("console")
            .trigger(processingTime='5 seconds')
            .outputMode("append")
            .option("truncate",'false')
            .start()
        )

23/04/12 05:08:12 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-23fc8e2a-fcf0-46f2-9075-671ddc81aa8f. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+
|event_key|event_topic|event_timestamp|station|valid|tmpf|dwpf|relh|feel|drct|sped|alti|mslp|p01m|vsby|skyc1|skyl1|wxcodes|ice_acceretion_1hr|
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+---------+-----------+-----------------------+-------+----------------+----+----+-----+----+----+----+-----+----+----+----+-----+------+-------+------------------+
|event_key|event_topic|event_timestamp        |station|valid        

In [20]:
q_skyc1_enc.stop()

In [17]:
dfse1 = df_skyc1_encoded6.withColumn("station", when(df_skyc1_encoded6.station == "CDA", 0).otherwise(df_skyc1_encoded6.station))
dfse2 = dfse1.withColumn("station", when(dfse1.station == "EFK", 1).otherwise(dfse1.station))
dfse3 = dfse2.withColumn("station", when(dfse2.station == "FSO", 2).otherwise(dfse2.station))
dfse4 = dfse3.withColumn("station", when(dfse3.station == "MVL", 3).otherwise(dfse3.station))
dfse5 = dfse4.withColumn("station", when(dfse4.station == "RUT", 4).otherwise(dfse4.station))
dfse6 = dfse5.withColumn("station", when(dfse5.station == "MPV", 5).otherwise(dfse5.station))
dfse7 = dfse6.withColumn("station", when(dfse6.station == "VSF", 6).otherwise(dfse6.station))
dfse8 = dfse7.withColumn("station", when(dfse7.station == "DOH", 7).otherwise(dfse7.station))
dfse9 = dfse8.withColumn("station", when(dfse8.station == "1V4", 8).otherwise(dfse8.station))
dfse10 = dfse9.withColumn("station", when(dfse9.station == "BTV", 9).otherwise(dfse9.station))
dfse11 = dfse10.withColumn("station", when(dfse10.station == "6B0", 10).otherwise(dfse10.station))


In [38]:
q_se = (dfse11
            .writeStream
            .format("console")
            .trigger(processingTime='5 seconds')
            .outputMode("append")
            .option("truncate",'false')
            .start()
        )

23/04/12 13:59:26 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-f4340d77-41f2-4a9b-950f-d43cc4395634. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+
|event_key|event_topic|event_timestamp|station|valid|tmpf|dwpf|relh|feel|drct|sped|alti|mslp|p01m|vsby|skyc1|skyl1|wxcodes|ice_acceretion_1hr|
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+---------+-----------+-----------------------+-------+----------------+----+----+-----+-----+-----+----+-----+----+----+----+-----+------+-------+------------------+
|event_key|event_topic|event_timestamp        |station|valid           |tmpf|dwpf|relh |feel |drct |sped|alti |mslp|p01m|vsby|skyc1|skyl1 |wxcodes|ice_acceretion_1hr|
+---------+-----------+-----------------------+-------+----------------+----+----+-----+-----+-----+----+-----+----+----+----+-----+------+-------+------------------+
|dummykey |topic_test1|2023-04-12 13:59:27.678|0      |2013-01-02 21:15|1   |-7.6|65.13|-15.4|230.0|11.5|30.01|0   |0.00|4.0 |5    |1700.0|HZ     |0                 |
+---------+-----------+-----------------------+-------+----------------+----+----+-----+-----+-----+----+-----+----+----+----+-----+------+-------+------------------+

-------------------------------------------
Batch: 2
--------------

[Stage 27:>                                                         (0 + 1) / 1]

In [39]:
q_se.stop()

23/04/12 13:59:47 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@1d2838da is aborting.
23/04/12 13:59:47 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@1d2838da aborted.
23/04/12 13:59:48 WARN TaskSetManager: Lost task 0.0 in stage 27.0 (TID 27, 172.18.0.7, executor 0): TaskKilled (Stage cancelled)


In [30]:
from pyspark.sql.functions import window
from pyspark.ml.feature import StandardScaler

# Create a StandardScaler transformer
scaler = StandardScaler(inputCol="tmpf", outputCol="tmpfNorm")

In [36]:
from pyspark.sql.types import ByteType
dfse11 = dfse11.withColumn("tmpf", col("tmpf").cast(ByteType()))

In [37]:
dftmpNorm = (dfse11
         .groupBy(window(dfse11.event_timestamp, "10 seconds", "10 seconds").alias("TimeWindow"))
         .agg(
             col("station"), col("valid"),
             scaler.fit(dfse11).transform(col("tmpf")).alias("tmpfNorm"),
             scaler.fit(dfse11).transform(col("dwpf")).alias("dwpfNorm"),
             scaler.fit(dfse11).transform(col("relh")).alias("relhNorm"),
             scaler.fit(dfse11).transform(col("feel")).alias("feelNorm"),
             scaler.fit(dfse11).transform(col("drct")).alias("drctNorm"),
             scaler.fit(dfse11).transform(col("sped")).alias("spedNorm"),
             scaler.fit(dfse11).transform(col("alti")).alias("altiNorm"),
             scaler.fit(dfse11).transform(col("mslp")).alias("mslpNorm"),
             scaler.fit(dfse11).transform(col("p01m")).alias("p01mNorm"),
             scaler.fit(dfse11).transform(col("vsby")).alias("vsbyNorm"),
             skyc1.alias("skyc1"),
             scaler.fit(dfse11).transform(col("skyl1")).alias("skyl1Norm"),
             wxcodes.alias("wxcodes"),
             scaler.fit(dfse11).transform(col("ice_acceretion_1hr")).alias("iceAccNorm"),
             ).select("TimeWindow", "station", "valid", "tmpfNorm", "dwpfNorm", "relhNorm", "feelNorm", "drctNorm", "spedNorm", "altiNorm", 
                     "mslpNorm", "p01mNorm", "vsbyNorm", "skyc1", "skyl1Norm", "wxcodes", "iceAccNorm")
         )

IllegalArgumentException: requirement failed: Column tmpf must be of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually tinyint.

In [None]:
from pyspark.sql.functions import *

df_cumulative_count = (df_formatted
            .groupBy("event_topic")
            .count()
            .orderBy("event_topic"))

In [None]:
final_count=(df_cumulative_count
                .writeStream
                .outputMode("complete")
                .format("console")
                .trigger(processingTime="5 seconds")
                .option("truncate",'false')
                .start()
            )

In [None]:
final_count.stop()

#### **Q3.2:** All your code for 3.2 should be in the following cell

In [None]:
from pyspark.sql.functions import *

df_request_type = (df_formatted
                    .groupBy(window(df_formatted.event_timestamp, "10 seconds", "10 seconds"),df_formatted.request_type)
                    .count()
                 )

In [None]:
final_request_type= (df_request_type
          .writeStream
          .outputMode("complete")
          .format("console")
          .option("truncate",'false')
          .trigger(processingTime="5 seconds")
          .start())

In [None]:
final_request_type.stop()

#### **Q3.3:** All your code for 3.3 should be in the following cell


In [None]:
from pyspark.sql.functions import *

smAvg = (df_formatted
               .groupBy(window(df_formatted.event_timestamp, "10 seconds", "10 seconds"))
               .agg(avg("response_size")
               .alias("moving_average"))
               .writeStream
               .outputMode("complete")
               .format("console")
               .option("truncate",'false')
               .trigger(processingTime="10 seconds")
               .start()
        )

In [None]:
smAvg.stop()

#### **Q3.4:** All your code for 3.4 should be in the following cell

#### 3.4.1

In [None]:
from pyspark.sql.functions import *
df_select = (df_formatted
               .groupBy(window("event_timestamp", "10 seconds").alias('Time_Window'))
               .agg(
                    round(avg("response_size"),4).alias("Avg"),
                    round(stddev_samp("response_size"),4).alias("Standard_Dev"),
                    count("*").alias("Count"),
                    collect_list("response_size").alias("List")
                   )
               .select("Time_Window", "Avg", "Standard_Dev", "Count", "List")
            )

In [None]:
df_select_final= (df_select
                    .writeStream
                    .outputMode("complete")
                    .format("console")
                    .trigger(processingTime="10 seconds")
                    .option("truncate",'false')
                    .start()
                 )

In [None]:
df_select_final.stop()

#### 3.4.2

In [None]:
from pyspark.sql.functions import explode

df_explode_query = df_select.select("Time_Window", "Avg", "Standard_Dev", explode("List").alias("ResponseSize"))

In [None]:
df_explode_final= (df_explode_query
                    .writeStream
                    .outputMode("complete")
                    .format("console")
                    .trigger(processingTime="10 seconds")
                    .option("truncate",'false')
                    .start()
                  )

In [None]:
df_explode_final.stop()

#### 3.4.3

In [None]:
df_score = (df_explode_query.withColumn('z_score', expr('(ResponseSize - Avg) / Standard_Dev'))
            .filter("z_score > 1 or z_score < -1")
            .filter(~isnan(col("z_score")))
            .select("Time_Window", "Avg", "Standard_Dev","ResponseSize"))

In [None]:
final_data= (df_score
            .writeStream
            .outputMode("complete")
            .format("console")
            .option("truncate",'false')
            .trigger(processingTime="10 seconds")
            .start())

In [None]:
final_data.stop()