# **Assignment 4: PySpark Structured Streaming Using Kafka Source**

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("pyspark-kafka-streaming").\
        master("spark://spark-master:7077").\
        config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0"). \
        config("spark.executor.memory", "512m").\
        getOrCreate()

Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/local/lib/python3.9/dist-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-827a1b59-0058-4541-8bbe-1726ca509d9f;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in central
	found org.apache.kafka#kafka-clients;2.4.1 in central
	found com.github.luben#zstd-jni;1.4.4-3 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.7.5 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 1084ms :: artifacts dl 5

## ==== Q2 ====

#### **Q2.1:** All your code for 2.1 should be in the following cell

In [2]:
#Answer to 2.1
df_streamed_raw = (spark
  .readStream
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9093") \
  .option("subscribe", "topic_test1") \
  .load())


In [3]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import col

# convert byte stream to string
df_streamed_kv = (df_streamed_raw
    .withColumn("key", df_streamed_raw["key"].cast(StringType()))
    .withColumn("value", df_streamed_raw["value"].cast(StringType())))

test_query = (df_streamed_kv 
              .writeStream \
              .format("memory") # output to memory \
              .outputMode("update") # only write updated rows to the sink \
              .queryName("test_query_table")  # Name of the in memory table \
              .start())

23/04/12 06:38:51 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-47351cc7-3d4d-4584-9f83-d15057113f75. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


#### If all goes well, the following cell should display a table populated with values being streamed from you Kafka producer. NOTE: If you recently ran the producer, it may take a while before the table is populated. Keep rerunning the cell to check for updates:

In [4]:
spark.sql("select * from test_query_table").show()

+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+
+---+-----+-----+---------+------+---------+-------------+



In [5]:
test_query.stop()

23/04/12 06:38:57 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@3616af31 is aborting.
23/04/12 06:38:57 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@3616af31 aborted.


#### The following cells contain code that take the streamed dataframe and formats it properly into a table. If any of the given cells fails, there might be a formatting issue with one of your previous solutions. 

In [6]:
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, BooleanType, LongType, IntegerType

event_schema = StructType([
    StructField("station", StringType()),
    StructField("valid", StringType()),
    StructField("tmpf", StringType()),
    StructField("dwpf", StringType()),
    StructField("relh", StringType()),
    StructField("feel", StringType()),
    StructField("drct", StringType()),
    StructField("sped", StringType()),
    StructField("alti", StringType()),
    StructField("mslp", StringType()),
    StructField("p01m", StringType()),
    StructField("vsby", StringType()),
    StructField("skyc1", StringType()),
    StructField("skyl1", StringType()),
    StructField("wxcodes", StringType()),
    StructField("ice_acceretion_1hr", StringType()),
])

# Parse the events from JSON format
df_parsed = (df_streamed_kv
           # Sets schema for event data
           .withColumn("value", from_json("value", event_schema))
          )

In [7]:
from pyspark.sql.functions import to_timestamp, unix_timestamp

# Here, we need to convert date_time string to date_time object in the "dd/MMM/yyyy:HH:mm:ss Z" format.

df_formatted = (df_parsed.select(
    col("key").alias("event_key")
    ,col("topic").alias("event_topic")
    ,col("timestamp").alias("event_timestamp")
    ,col("value.station").alias("station")
#     ,col("value.valid").alias("valid")
    ,to_timestamp(col("value.valid"), "yyyy-MM-dd HH:mm").alias("valid")
    ,col("value.tmpf").alias("tmpf")
    ,col("value.dwpf").alias("dwpf")
    ,col("value.relh").alias("relh")
    ,col("value.feel").alias("feel")
    ,col("value.drct").alias("drct")
    ,col("value.sped").alias("sped")
    ,col("value.alti").alias("alti")
    ,col("value.mslp").alias("mslp")
    ,col("value.p01m").alias("p01m")
    ,col("value.vsby").alias("vsby")
    ,col("value.skyc1").alias("skyc1")
    ,col("value.skyl1").alias("skyl1")
    ,col("value.wxcodes").alias("wxcodes")
    ,col("value.ice_acceretion_1hr").alias("ice_acceretion_1hr")
#     cast(IntegerType()).
))

#### **Q2.2:** All your code for 2.2 should be in the following cell


In [8]:
# Answer to 2.2
query = (df_formatted
            .writeStream
            .format("console")
            .trigger(processingTime='5 seconds')
            .outputMode("append")
            .option("truncate",'false')
            .start()
        )

23/04/12 06:38:58 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-db146522-2460-4cb2-8a4e-8ccabfd3c47e. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


In [9]:
# Print the name of active streams (This may be useful during debugging)
for s in spark.streams.active:
    print(f"ID:{s.id} | NAME:{s.name}")

ID:6d9fda1d-108c-4c1d-9e75-15271de0dc27 | NAME:None


In [10]:
query.stop()

# ==== Project - Start your feature extraction queries from here ====

#### **Q3.1:** All your code for 3.1 should be in the following cell

In [11]:
from pyspark.sql.functions import *
df_day = df_formatted.filter((hour(df_formatted.valid) >= 8) & (hour(df_formatted.valid) < 19))

23/04/12 06:38:59 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 172.18.0.7, executor 1): TaskKilled (Stage cancelled)


In [12]:
from pyspark.sql.functions import col, date_add, to_date
# Subtract 7 days from the current date to get the date 1 week ago
one_week_ago = date_add(to_date(col("valid"), "yyyy-MM-dd HH:mm"), -7)
# Filter out the rows where the valid timestamp is one week old or older
filtered_df = df_day.where(col("valid").cast("date") >= one_week_ago)

In [13]:
query1 = (filtered_df
            .writeStream
            .format("console")
            .trigger(processingTime='5 seconds')
            .outputMode("append")
            .option("truncate",'false')
            .start()
        )

23/04/12 06:38:59 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-75b6131c-30f9-4569-9e53-1b80045713db. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


In [14]:
query1.stop()

23/04/12 06:39:00 WARN Shell: Interrupted while joining on: Thread[Thread-67,5,main]
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1257)
	at java.lang.Thread.join(Thread.java:1331)
	at org.apache.hadoop.util.Shell.joinThread(Shell.java:629)
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:580)
	at org.apache.hadoop.util.Shell.run(Shell.java:482)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:776)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:869)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:852)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:491)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:532)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:509)
	at org.apache.hadoop.f

-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+
|event_key|event_topic|event_timestamp|station|valid|tmpf|dwpf|relh|feel|drct|sped|alti|mslp|p01m|vsby|skyc1|skyl1|wxcodes|ice_acceretion_1hr|
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+



In [15]:
df_night=df_formatted.filter(
    ((hour(df_formatted.valid) >= 19) & (hour(df_formatted.valid) <=23)) |
    ((hour(df_formatted.valid) >= 0) & (hour(df_formatted.valid) < 8))
)

In [16]:
from pyspark.sql.functions import col, date_add, to_date
# Subtract 7 days from the current date to get the date 1 week ago
one_week_ago = date_add(to_date(col("valid"), "yyyy-MM-dd HH:mm"), -7)
# Filter out the rows where the valid timestamp is one week old or older
filtered_df1 = df_night.where(col("valid").cast("date") >= one_week_ago)

In [17]:
query2 = (filtered_df1
            .writeStream
            .format("console")
            .trigger(processingTime='5 seconds')
            .outputMode("append")
            .option("truncate",'false')
            .start()
        )

23/04/12 06:39:03 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-fcf1b5ef-aec9-47d9-94f6-99182339d412. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


In [18]:
query2.stop()

## Weekly

In [19]:
from pyspark.sql.functions import avg, col, window,stddev
# assuming your input stream is named `input_stream`
df_stream_day = (df_day
   .withWatermark("valid", "7 days")\
    .groupBy("station", window("valid", "7 days"))\
    .agg(avg("tmpf").alias("avg_temp"), stddev("tmpf").alias("stddev_temp")))


In [20]:
query_3 = df_stream_day\
    .writeStream\
    .format("console")\
    .trigger(processingTime="10 seconds")\
    .outputMode("complete")\
    .option("truncate", "false")\
    .start()

23/04/12 06:39:04 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-ef9d9c6c-0f15-4431-aef9-f0e65cde9994. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


In [21]:
query_3.stop()

In [22]:
from pyspark.sql.functions import avg, col, window,stddev
# assuming your input stream is named `input_stream`
df_stream_night = (df_night
   .withWatermark("valid", "15 minutes")\
    .groupBy("station", window("valid", "7 days"))\
    .agg(avg("tmpf").alias("avg_temp"), stddev("tmpf").alias("stddev_temp")))


In [41]:
query_4 = df_stream_night\
    .writeStream\
    .format("console")\
    .trigger(processingTime="10 seconds")\
    .outputMode("complete")\
    .option("truncate", "false")\
    .start()

23/04/12 06:41:39 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-a657be28-0150-478b-816a-38b23d2a396b. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+------+--------+-----------+
|station|window|avg_temp|stddev_temp|
+-------+------+--------+-----------+
+-------+------+--------+-----------+



23/04/12 06:41:55 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000 milliseconds, but spent 16266 milliseconds
23/04/12 06:42:08 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000 milliseconds, but spent 12368 milliseconds


-------------------------------------------
Batch: 1
-------------------------------------------
+-------+------+--------+-----------+
|station|window|avg_temp|stddev_temp|
+-------+------+--------+-----------+
+-------+------+--------+-----------+





In [42]:
query_4.stop()

23/04/12 06:42:13 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@329f5e51 is aborting.
23/04/12 06:42:13 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@329f5e51 aborted.
23/04/12 06:42:13 WARN TaskSetManager: Lost task 78.0 in stage 15.0 (TID 1086, 172.18.0.7, executor 1): TaskKilled (Stage cancelled)


In [43]:
df_day_night=df_stream_day.union(df_stream_night)

In [44]:
query_day_night = df_day_night\
    .writeStream\
    .format("console")\
    .trigger(processingTime="10 seconds")\
    .outputMode("append")\
    .option("truncate", "false")\
    .start()

23/04/12 06:42:20 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-627c5742-0fda-4538-a649-7c99e4743ada. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


AnalysisException: Multiple streaming aggregations are not supported with streaming DataFrames/Datasets;;
Union
:- Aggregate [station#113, window#377-T604800000ms], [station#113, window#377-T604800000ms AS window#337-T604800000ms, avg(cast(tmpf#115 as double)) AS avg_temp#358, stddev_samp(cast(tmpf#115 as double)) AS stddev_temp#368]
:  +- Filter isnotnull(valid#114-T604800000ms)
:     +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(valid#114-T604800000ms, TimestampType, LongType) - 0) as double) / cast(604800000000 as double))) as double) = (cast((precisetimestampconversion(valid#114-T604800000ms, TimestampType, LongType) - 0) as double) / cast(604800000000 as double))) THEN (CEIL((cast((precisetimestampconversion(valid#114-T604800000ms, TimestampType, LongType) - 0) as double) / cast(604800000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(valid#114-T604800000ms, TimestampType, LongType) - 0) as double) / cast(604800000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 604800000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(valid#114-T604800000ms, TimestampType, LongType) - 0) as double) / cast(604800000000 as double))) as double) = (cast((precisetimestampconversion(valid#114-T604800000ms, TimestampType, LongType) - 0) as double) / cast(604800000000 as double))) THEN (CEIL((cast((precisetimestampconversion(valid#114-T604800000ms, TimestampType, LongType) - 0) as double) / cast(604800000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(valid#114-T604800000ms, TimestampType, LongType) - 0) as double) / cast(604800000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 604800000000) + 0) + 604800000000), LongType, TimestampType)) AS window#377-T604800000ms, event_key#110, event_topic#111, event_timestamp#112, station#113, valid#114-T604800000ms, tmpf#115, dwpf#116, relh#117, feel#118, drct#119, sped#120, alti#121, mslp#122, p01m#123, vsby#124, skyc1#125, skyl1#126, wxcodes#127, ice_acceretion_1hr#128]
:        +- EventTimeWatermark valid#114: timestamp, 7 days
:           +- Filter ((hour(valid#114, Some(Etc/UTC)) >= 8) AND (hour(valid#114, Some(Etc/UTC)) < 19))
:              +- Project [key#21 AS event_key#110, topic#9 AS event_topic#111, timestamp#12 AS event_timestamp#112, value#102.station AS station#113, to_timestamp('value.valid, Some(yyyy-MM-dd HH:mm)) AS valid#114, value#102.tmpf AS tmpf#115, value#102.dwpf AS dwpf#116, value#102.relh AS relh#117, value#102.feel AS feel#118, value#102.drct AS drct#119, value#102.sped AS sped#120, value#102.alti AS alti#121, value#102.mslp AS mslp#122, value#102.p01m AS p01m#123, value#102.vsby AS vsby#124, value#102.skyc1 AS skyc1#125, value#102.skyl1 AS skyl1#126, value#102.wxcodes AS wxcodes#127, value#102.ice_acceretion_1hr AS ice_acceretion_1hr#128]
:                 +- Project [key#21, from_json(StructField(station,StringType,true), StructField(valid,StringType,true), StructField(tmpf,StringType,true), StructField(dwpf,StringType,true), StructField(relh,StringType,true), StructField(feel,StringType,true), StructField(drct,StringType,true), StructField(sped,StringType,true), StructField(alti,StringType,true), StructField(mslp,StringType,true), StructField(p01m,StringType,true), StructField(vsby,StringType,true), StructField(skyc1,StringType,true), StructField(skyl1,StringType,true), StructField(wxcodes,StringType,true), StructField(ice_acceretion_1hr,StringType,true), value#29, Some(Etc/UTC)) AS value#102, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
:                    +- Project [key#21, cast(value#8 as string) AS value#29, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
:                       +- Project [cast(key#7 as string) AS key#21, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
:                          +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@2f1d20c3, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@179a7088, org.apache.spark.sql.util.CaseInsensitiveStringMap@e20c70b1, [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@68afa7b5,kafka,List(),None,List(),None,Map(subscribe -> topic_test1, kafka.bootstrap.servers -> kafka:9093),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
+- Project [station#113 AS station#1033, window#402-T900000ms AS window#1034-T900000ms, avg_temp#423 AS avg_temp#1035, stddev_temp#433 AS stddev_temp#1036]
   +- Aggregate [station#113, window#442-T900000ms], [station#113, window#442-T900000ms AS window#402-T900000ms, avg(cast(tmpf#115 as double)) AS avg_temp#423, stddev_samp(cast(tmpf#115 as double)) AS stddev_temp#433]
      +- Filter isnotnull(valid#114-T900000ms)
         +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(valid#114-T900000ms, TimestampType, LongType) - 0) as double) / cast(604800000000 as double))) as double) = (cast((precisetimestampconversion(valid#114-T900000ms, TimestampType, LongType) - 0) as double) / cast(604800000000 as double))) THEN (CEIL((cast((precisetimestampconversion(valid#114-T900000ms, TimestampType, LongType) - 0) as double) / cast(604800000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(valid#114-T900000ms, TimestampType, LongType) - 0) as double) / cast(604800000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 604800000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(valid#114-T900000ms, TimestampType, LongType) - 0) as double) / cast(604800000000 as double))) as double) = (cast((precisetimestampconversion(valid#114-T900000ms, TimestampType, LongType) - 0) as double) / cast(604800000000 as double))) THEN (CEIL((cast((precisetimestampconversion(valid#114-T900000ms, TimestampType, LongType) - 0) as double) / cast(604800000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(valid#114-T900000ms, TimestampType, LongType) - 0) as double) / cast(604800000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 604800000000) + 0) + 604800000000), LongType, TimestampType)) AS window#442-T900000ms, event_key#110, event_topic#111, event_timestamp#112, station#113, valid#114-T900000ms, tmpf#115, dwpf#116, relh#117, feel#118, drct#119, sped#120, alti#121, mslp#122, p01m#123, vsby#124, skyc1#125, skyl1#126, wxcodes#127, ice_acceretion_1hr#128]
            +- EventTimeWatermark valid#114: timestamp, 15 minutes
               +- Filter (((hour(valid#114, Some(Etc/UTC)) >= 19) AND (hour(valid#114, Some(Etc/UTC)) <= 23)) OR ((hour(valid#114, Some(Etc/UTC)) >= 0) AND (hour(valid#114, Some(Etc/UTC)) < 8)))
                  +- Project [key#21 AS event_key#110, topic#9 AS event_topic#111, timestamp#12 AS event_timestamp#112, value#102.station AS station#113, to_timestamp('value.valid, Some(yyyy-MM-dd HH:mm)) AS valid#114, value#102.tmpf AS tmpf#115, value#102.dwpf AS dwpf#116, value#102.relh AS relh#117, value#102.feel AS feel#118, value#102.drct AS drct#119, value#102.sped AS sped#120, value#102.alti AS alti#121, value#102.mslp AS mslp#122, value#102.p01m AS p01m#123, value#102.vsby AS vsby#124, value#102.skyc1 AS skyc1#125, value#102.skyl1 AS skyl1#126, value#102.wxcodes AS wxcodes#127, value#102.ice_acceretion_1hr AS ice_acceretion_1hr#128]
                     +- Project [key#21, from_json(StructField(station,StringType,true), StructField(valid,StringType,true), StructField(tmpf,StringType,true), StructField(dwpf,StringType,true), StructField(relh,StringType,true), StructField(feel,StringType,true), StructField(drct,StringType,true), StructField(sped,StringType,true), StructField(alti,StringType,true), StructField(mslp,StringType,true), StructField(p01m,StringType,true), StructField(vsby,StringType,true), StructField(skyc1,StringType,true), StructField(skyl1,StringType,true), StructField(wxcodes,StringType,true), StructField(ice_acceretion_1hr,StringType,true), value#29, Some(Etc/UTC)) AS value#102, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
                        +- Project [key#21, cast(value#8 as string) AS value#29, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
                           +- Project [cast(key#7 as string) AS key#21, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
                              +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@2f1d20c3, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@179a7088, org.apache.spark.sql.util.CaseInsensitiveStringMap@e20c70b1, [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@68afa7b5,kafka,List(),None,List(),None,Map(subscribe -> topic_test1, kafka.bootstrap.servers -> kafka:9093),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]


In [27]:
query_day_night.stop()

## 2-hour Window 

In [28]:
from pyspark.sql.functions import window
df_windowed_2hrs = (df_formatted
    .withWatermark("valid", "30 minutes")
    .groupBy("station", window("valid", "2 hours"))\
    .agg(collect_list("tmpf").alias("temp_list")))

In [29]:
query_5=df_windowed_2hrs\
    .writeStream\
    .format("console")\
    .trigger(processingTime="10 seconds")\
    .outputMode("complete")\
    .option("truncate", "false")\
    .start()

23/04/12 06:39:09 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-2e9516df-1dc8-452c-9801-29b3a3681323. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


In [30]:
query_5.stop()

In [31]:
from pyspark.sql.functions import explode

df_exploded = df_windowed_2hrs.select("Window", "station", explode("temp_list").alias("temperature"))

In [32]:
query_6= (df_exploded
            .writeStream
            .outputMode("complete")
            .format("console")
            .option("truncate",'false')
            .trigger(processingTime="10 seconds")
            .start())

23/04/12 06:39:11 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-2399f2d2-9628-4ca6-9110-90de9f831185. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


In [33]:
query_6.stop()

In [38]:
joined_df = df_day_night.join(
    df_exploded,
    on=(df_day_night["station"] == df_exploded["station"]) 
    & (df_day_night["valid"] >= df_exploded["Window.start"]) 
    & (df_day_night["valid"] <= df_exploded["Window.end"])
)

In [39]:
query_7= (joined_df
            .writeStream
            .outputMode("append")
            .format("console")
            .option("truncate",'false')
            .trigger(processingTime="10 seconds")
            .start())

23/04/12 06:39:42 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-ee1b765c-c311-4fdc-a82c-afed4072e95a. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/04/12 06:39:42 WARN UnsupportedOperationChecker: Detected pattern of possible 'correctness' issue due to global watermark. The query contains stateful operation which can emit rows older than the current watermark plus allowed late record delay, which are "late rows" in downstream stateful operations and these rows can be discarded. Please refer the programming guide doc for more details.;
Join Inner, (((station#113 = station#701) AND (valid#114 >= Window#524-T1800000ms.start)) AND (valid#114 <= Window#524-T1800000ms.end))
:- Union
:  :- Filter ((hour(valid#114, Some(Etc/UTC)) >= 8) AND (hour(valid#114, 

-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+------+-------+-----------+
|event_key|event_topic|event_timestamp|station|valid|tmpf|dwpf|relh|feel|drct|sped|alti|mslp|p01m|vsby|skyc1|skyl1|wxcodes|ice_acceretion_1hr|Window|station|temperature|
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+------+-------+-----------+
+---------+-----------+---------------+-------+-----+----+----+----+----+----+----+----+----+----+----+-----+-----+-------+------------------+------+-------+-----------+



23/04/12 06:40:40 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000 milliseconds, but spent 57819 milliseconds

In [40]:
query_7.stop()

23/04/12 06:41:06 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@2379cd94 is aborting.
23/04/12 06:41:06 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@2379cd94 aborted.
23/04/12 06:41:07 WARN TaskSetManager: Lost task 1.0 in stage 9.0 (TID 606, 172.18.0.7, executor 1): TaskKilled (Stage cancelled)
23/04/12 06:41:07 WARN TaskSetManager: Lost task 0.0 in stage 9.0 (TID 605, 172.18.0.6, executor 0): TaskKilled (Stage cancelled)


In [37]:
df_with_zscore = joined_df.withColumn('z_score', expr('(tmpf - avg_temp) / stddev_temp'))

AnalysisException: cannot resolve '`avg_temp`' given input columns: [Window, alti, drct, dwpf, event_key, event_timestamp, event_topic, feel, ice_acceretion_1hr, mslp, p01m, relh, skyc1, skyl1, sped, station, station, temperature, tmpf, valid, vsby, wxcodes]; line 1 pos 8;
'Project [event_key#110, event_topic#111, event_timestamp#112, station#113, valid#114, tmpf#115, dwpf#116, relh#117, feel#118, drct#119, sped#120, alti#121, mslp#122, p01m#123, vsby#124, skyc1#125, skyl1#126, wxcodes#127, ice_acceretion_1hr#128, Window#524-T1800000ms, station#569, temperature#556, ((tmpf#115 - 'avg_temp) / 'stddev_temp) AS z_score#695]
+- Join Inner, (((station#113 = station#569) AND (valid#114 >= Window#524-T1800000ms.start)) AND (valid#114 <= Window#524-T1800000ms.end))
   :- Union
   :  :- Filter ((hour(valid#114, Some(Etc/UTC)) >= 8) AND (hour(valid#114, Some(Etc/UTC)) < 19))
   :  :  +- Project [key#21 AS event_key#110, topic#9 AS event_topic#111, timestamp#12 AS event_timestamp#112, value#102.station AS station#113, to_timestamp('value.valid, Some(yyyy-MM-dd HH:mm)) AS valid#114, value#102.tmpf AS tmpf#115, value#102.dwpf AS dwpf#116, value#102.relh AS relh#117, value#102.feel AS feel#118, value#102.drct AS drct#119, value#102.sped AS sped#120, value#102.alti AS alti#121, value#102.mslp AS mslp#122, value#102.p01m AS p01m#123, value#102.vsby AS vsby#124, value#102.skyc1 AS skyc1#125, value#102.skyl1 AS skyl1#126, value#102.wxcodes AS wxcodes#127, value#102.ice_acceretion_1hr AS ice_acceretion_1hr#128]
   :  :     +- Project [key#21, from_json(StructField(station,StringType,true), StructField(valid,StringType,true), StructField(tmpf,StringType,true), StructField(dwpf,StringType,true), StructField(relh,StringType,true), StructField(feel,StringType,true), StructField(drct,StringType,true), StructField(sped,StringType,true), StructField(alti,StringType,true), StructField(mslp,StringType,true), StructField(p01m,StringType,true), StructField(vsby,StringType,true), StructField(skyc1,StringType,true), StructField(skyl1,StringType,true), StructField(wxcodes,StringType,true), StructField(ice_acceretion_1hr,StringType,true), value#29, Some(Etc/UTC)) AS value#102, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
   :  :        +- Project [key#21, cast(value#8 as string) AS value#29, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
   :  :           +- Project [cast(key#7 as string) AS key#21, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
   :  :              +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@2f1d20c3, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@179a7088, org.apache.spark.sql.util.CaseInsensitiveStringMap@e20c70b1, [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@68afa7b5,kafka,List(),None,List(),None,Map(subscribe -> topic_test1, kafka.bootstrap.servers -> kafka:9093),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
   :  +- Project [event_key#110 AS event_key#467, event_topic#111 AS event_topic#468, event_timestamp#112 AS event_timestamp#469, station#113 AS station#470, valid#114 AS valid#471, tmpf#115 AS tmpf#472, dwpf#116 AS dwpf#473, relh#117 AS relh#474, feel#118 AS feel#475, drct#119 AS drct#476, sped#120 AS sped#477, alti#121 AS alti#478, mslp#122 AS mslp#479, p01m#123 AS p01m#480, vsby#124 AS vsby#481, skyc1#125 AS skyc1#482, skyl1#126 AS skyl1#483, wxcodes#127 AS wxcodes#484, ice_acceretion_1hr#128 AS ice_acceretion_1hr#485]
   :     +- Filter (((hour(valid#114, Some(Etc/UTC)) >= 19) AND (hour(valid#114, Some(Etc/UTC)) <= 23)) OR ((hour(valid#114, Some(Etc/UTC)) >= 0) AND (hour(valid#114, Some(Etc/UTC)) < 8)))
   :        +- Project [key#21 AS event_key#110, topic#9 AS event_topic#111, timestamp#12 AS event_timestamp#112, value#102.station AS station#113, to_timestamp('value.valid, Some(yyyy-MM-dd HH:mm)) AS valid#114, value#102.tmpf AS tmpf#115, value#102.dwpf AS dwpf#116, value#102.relh AS relh#117, value#102.feel AS feel#118, value#102.drct AS drct#119, value#102.sped AS sped#120, value#102.alti AS alti#121, value#102.mslp AS mslp#122, value#102.p01m AS p01m#123, value#102.vsby AS vsby#124, value#102.skyc1 AS skyc1#125, value#102.skyl1 AS skyl1#126, value#102.wxcodes AS wxcodes#127, value#102.ice_acceretion_1hr AS ice_acceretion_1hr#128]
   :           +- Project [key#21, from_json(StructField(station,StringType,true), StructField(valid,StringType,true), StructField(tmpf,StringType,true), StructField(dwpf,StringType,true), StructField(relh,StringType,true), StructField(feel,StringType,true), StructField(drct,StringType,true), StructField(sped,StringType,true), StructField(alti,StringType,true), StructField(mslp,StringType,true), StructField(p01m,StringType,true), StructField(vsby,StringType,true), StructField(skyc1,StringType,true), StructField(skyl1,StringType,true), StructField(wxcodes,StringType,true), StructField(ice_acceretion_1hr,StringType,true), value#29, Some(Etc/UTC)) AS value#102, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
   :              +- Project [key#21, cast(value#8 as string) AS value#29, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
   :                 +- Project [cast(key#7 as string) AS key#21, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
   :                    +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@2f1d20c3, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@179a7088, org.apache.spark.sql.util.CaseInsensitiveStringMap@e20c70b1, [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@68afa7b5,kafka,List(),None,List(),None,Map(subscribe -> topic_test1, kafka.bootstrap.servers -> kafka:9093),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
   +- Project [Window#524-T1800000ms, station#569, temperature#556]
      +- Generate explode(temp_list#545), false, [temperature#556]
         +- Aggregate [station#569, window#546-T1800000ms], [station#569, window#546-T1800000ms AS window#524-T1800000ms, collect_list(tmpf#571, 0, 0) AS temp_list#545]
            +- Filter isnotnull(valid#570-T1800000ms)
               +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(valid#570-T1800000ms, TimestampType, LongType) - 0) as double) / cast(7200000000 as double))) as double) = (cast((precisetimestampconversion(valid#570-T1800000ms, TimestampType, LongType) - 0) as double) / cast(7200000000 as double))) THEN (CEIL((cast((precisetimestampconversion(valid#570-T1800000ms, TimestampType, LongType) - 0) as double) / cast(7200000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(valid#570-T1800000ms, TimestampType, LongType) - 0) as double) / cast(7200000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 7200000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(valid#570-T1800000ms, TimestampType, LongType) - 0) as double) / cast(7200000000 as double))) as double) = (cast((precisetimestampconversion(valid#570-T1800000ms, TimestampType, LongType) - 0) as double) / cast(7200000000 as double))) THEN (CEIL((cast((precisetimestampconversion(valid#570-T1800000ms, TimestampType, LongType) - 0) as double) / cast(7200000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(valid#570-T1800000ms, TimestampType, LongType) - 0) as double) / cast(7200000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 7200000000) + 0) + 7200000000), LongType, TimestampType)) AS window#546-T1800000ms, event_key#566, event_topic#567, event_timestamp#568, station#569, valid#570-T1800000ms, tmpf#571, dwpf#572, relh#573, feel#574, drct#575, sped#576, alti#577, mslp#578, p01m#579, vsby#580, skyc1#581, skyl1#582, wxcodes#583, ice_acceretion_1hr#584]
                  +- EventTimeWatermark valid#570: timestamp, 30 minutes
                     +- Project [key#21 AS event_key#566, topic#9 AS event_topic#567, timestamp#12 AS event_timestamp#568, value#102.station AS station#569, to_timestamp('value.valid, Some(yyyy-MM-dd HH:mm)) AS valid#570, value#102.tmpf AS tmpf#571, value#102.dwpf AS dwpf#572, value#102.relh AS relh#573, value#102.feel AS feel#574, value#102.drct AS drct#575, value#102.sped AS sped#576, value#102.alti AS alti#577, value#102.mslp AS mslp#578, value#102.p01m AS p01m#579, value#102.vsby AS vsby#580, value#102.skyc1 AS skyc1#581, value#102.skyl1 AS skyl1#582, value#102.wxcodes AS wxcodes#583, value#102.ice_acceretion_1hr AS ice_acceretion_1hr#584]
                        +- Project [key#21, from_json(StructField(station,StringType,true), StructField(valid,StringType,true), StructField(tmpf,StringType,true), StructField(dwpf,StringType,true), StructField(relh,StringType,true), StructField(feel,StringType,true), StructField(drct,StringType,true), StructField(sped,StringType,true), StructField(alti,StringType,true), StructField(mslp,StringType,true), StructField(p01m,StringType,true), StructField(vsby,StringType,true), StructField(skyc1,StringType,true), StructField(skyl1,StringType,true), StructField(wxcodes,StringType,true), StructField(ice_acceretion_1hr,StringType,true), value#29, Some(Etc/UTC)) AS value#102, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
                           +- Project [key#21, cast(value#8 as string) AS value#29, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
                              +- Project [cast(key#7 as string) AS key#21, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
                                 +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@2f1d20c3, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@179a7088, org.apache.spark.sql.util.CaseInsensitiveStringMap@e20c70b1, [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@68afa7b5,kafka,List(),None,List(),None,Map(subscribe -> topic_test1, kafka.bootstrap.servers -> kafka:9093),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]


In [None]:
df_anomalous_requests = (df_with_zscore.filter(~isnan(col("z_score")))
                      .filter("z_score > 2 or z_score < -2"))

In [None]:
final_query= (df_anomalous_requests
            .writeStream
            .outputMode("complete")
            .format("console")
            .option("truncate",'false')
            .trigger(processingTime="10 seconds")
            .start())

In [None]:
final_query.stop()