In this task, we will implement Spark Structured Streaming to consume the data from task 1 and perform streaming classification.

1. SparkSession is created using a SparkConf object, which would use two local cores with a proper application name, and use UTC as the timezone.

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import datetime
import zipfile as zf
from pyspark.ml import PipelineModel

spark_conf = SparkConf().setMaster("local[2]").setAppName("Task3").set("spark.sql.session.timeZone", "UTC")

spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
spark.sparkContext.setLogLevel('ERROR')

2. From the Kafka producers in Task 1, ingest the streaming data into Spark Streaming.

In [2]:
topic = 'flightTopic'
flightsDf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", topic).load()

3. Then the streaming data format should be transformed into the proper formats following the file schema in the metadata.

In [3]:
from pyspark.sql import functions as F

schema = ArrayType(StructType([    
    StructField('YEAR', StringType(), True), 
    StructField('MONTH', StringType(), True), 
    StructField('DAY', StringType(), True), 
    StructField('DAY_OF_WEEK', StringType(), True), 
    StructField('AIRLINE', StringType(), True), 
    StructField('FLIGHT_NUMBER', StringType(), True), 
    StructField('TAIL_NUMBER', StringType(), True), 
    StructField('ORIGIN_AIRPORT', StringType(), True), 
    StructField('DESTINATION_AIRPORT', StringType(), True),
    StructField('SCHEDULED_DEPARTURE', StringType(), True),
    StructField('DEPARTURE_TIME', StringType(), True),
    StructField('DEPARTURE_DELAY', StringType(), True),
    StructField('TAXI_OUT', StringType(), True),
    StructField('WHEELS_OFF', StringType(), True),
    StructField('SCHEDULED_TIME', StringType(), True),
    StructField('ELAPSED_TIME', StringType(), True),
    StructField('AIR_TIME', StringType(), True),
    StructField('DISTANCE', StringType(), True),
    StructField('WHEELS_ON', StringType(), True),
    StructField('TAXI_IN', StringType(), True),
    StructField('SCHEDULED_ARRIVAL', StringType(), True),
    StructField('ARRIVAL_TIME', StringType(), True),
    StructField('ARRIVAL_DELAY', StringType(), True),
    StructField('DIVERTED', StringType(), True),
    StructField('CANCELLED', StringType(), True),
    StructField('CANCELLATION_REASON', StringType(), True),
    StructField('AIR_SYSTEM_DELAY', StringType(), True),
    StructField('SECURITY_DELAY', StringType(), True),
    StructField('AIRLINE_DELAY', StringType(), True),
    StructField('LATE_AIRCRAFT_DELAY', StringType(), True),
    StructField('WEATHER_DELAY', StringType(), True),
    StructField('ts', TimestampType(), True)            
]))


flightsDf = flightsDf.select(F.from_json(F.col("value").cast("string"), schema).alias('parsed_value'))
flightsDf = flightsDf.select(F.explode(F.col("parsed_value")).alias('parsed_value'))  

flightsDf = flightsDf.select(
                    F.col("parsed_value.YEAR").cast("int").alias("YEAR"),
                    F.col("parsed_value.MONTH").cast("int").alias("MONTH"),
                    F.col("parsed_value.DAY").cast("int").alias("DAY"),
                    F.col("parsed_value.DAY_OF_WEEK").cast("int").alias("DAY_OF_WEEK"),
                    F.col("parsed_value.AIRLINE").alias("AIRLINE"),
                    F.col("parsed_value.FLIGHT_NUMBER").alias("FLIGHT_NUMBER"),
                    F.col("parsed_value.TAIL_NUMBER").alias("TAIL_NUMBER"),
                    F.col("parsed_value.ORIGIN_AIRPORT").alias("ORIGIN_AIRPORT"),
                    F.col("parsed_value.DESTINATION_AIRPORT").alias("DESTINATION_AIRPORT"),
                    F.col("parsed_value.SCHEDULED_DEPARTURE").cast("int").alias("SCHEDULED_DEPARTURE"),
                    F.col("parsed_value.DEPARTURE_TIME").cast("int").alias("DEPARTURE_TIME"),
                    F.col("parsed_value.DEPARTURE_DELAY").cast("int").alias("DEPARTURE_DELAY"),
                    F.col("parsed_value.TAXI_OUT").cast("int").alias("TAXI_OUT"),
                    F.col("parsed_value.WHEELS_OFF").cast("int").alias("WHEELS_OFF"),
                    F.col("parsed_value.SCHEDULED_TIME").cast("int").alias("SCHEDULED_TIME"),
                    F.col("parsed_value.AIR_TIME").cast("int").alias("AIR_TIME"),
                    F.col("parsed_value.ELAPSED_TIME").cast("int").alias("ELAPSED_TIME"),
                    F.col("parsed_value.DISTANCE").cast("int").alias("DISTANCE"),
                    F.col("parsed_value.WHEELS_ON").cast("int").alias("WHEELS_ON"),
                    F.col("parsed_value.TAXI_IN").cast("int").alias("TAXI_IN"),
                    F.col("parsed_value.SCHEDULED_ARRIVAL").cast("int").alias("SCHEDULED_ARRIVAL"),
                    F.col("parsed_value.ARRIVAL_TIME").cast("int").alias("ARRIVAL_TIME"),
                    F.col("parsed_value.ARRIVAL_DELAY").cast("int").alias("ARRIVAL_DELAY"),
                    F.col("parsed_value.DIVERTED").cast("int").alias("DIVERTED"),
                    F.col("parsed_value.CANCELLED").cast("int").alias("CANCELLED"),
                    F.col("parsed_value.CANCELLATION_REASON").alias("CANCELLATION_REASON"),
                    F.col("parsed_value.AIR_SYSTEM_DELAY").cast("int").alias("AIR_SYSTEM_DELAY"),
                    F.col("parsed_value.SECURITY_DELAY").cast("int").alias("SECURITY_DELAY"),
                    F.col("parsed_value.AIRLINE_DELAY").cast("int").alias("AIRLINE_DELAY"),
                    F.col("parsed_value.LATE_AIRCRAFT_DELAY").cast("int").alias("LATE_AIRCRAFT_DELAY"),
                    F.col("parsed_value.WEATHER_DELAY").cast("int").alias("WEATHER_DELAY"),
                    F.col("parsed_value.ts").alias("ts")
                )
flightsDf.printSchema()

root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: string (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- WHEELS_OFF: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- AIR_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true)
 |-- ARRIVAL_DELAY: integer (nulla

4. Create the output sink for the flight data stream by viewing the output in the console:
    
    a. Use trigger interval of 5 seconds
    
    b. Take a screenshot of output in the console, and paste as image in the jupyter notebook file.

In [4]:
query = flightsDf.writeStream.outputMode("append").format("console").trigger(processingTime='5 seconds').start()

![title](image.png)

5. As in Assignment 2A, add and drop the following columns:
    
    a. Add label column "binaryArrDelay"
    
    b. Drop columns "CANCELLATION_REASON", "AIR_SYSTEM_DELAY", "SECURITY_DELAY", "AIRLINE_DELAY", "LATE_AIRCRAFT_DELAY", "WEATHER_DELAY"
    
    c. Add DEPT_TIME_FLAG column
    
    d. Drop records with Null values

In [5]:
import pyspark.sql.functions as F

#a. Add label column "binaryArrDelay"
flightsDf = flightsDf.withColumn("binaryArrDelay", F.when(F.col("ARRIVAL_DELAY") > 0, 1).otherwise(0))

# b. Drop columns ".."
#list down the columns to be removed
columnsToDrop = ['CANCELLATION_REASON', 'AIR_SYSTEM_DELAY', 'SECURITY_DELAY', 'AIRLINE_DELAY', 'LATE_AIRCRAFT_DELAY', 'WEATHER_DELAY'] 
#dropping the columns from the list
flightsDf = flightsDf.drop(*columnsToDrop)

# c. Add DEPT_TIME_FLAG column
flightsDf = flightsDf.withColumn('DEPT_TIME_FLAG', F.when((F.col('SCHEDULED_DEPARTURE') >= 0) & (F.col('SCHEDULED_DEPARTURE') < 500), "Midnight")
                                 .when((F.col('SCHEDULED_DEPARTURE') >= 500) & (F.col('SCHEDULED_DEPARTURE') < 1100), "Morning")
                                 .when((F.col('SCHEDULED_DEPARTURE') >= 1100) & (F.col('SCHEDULED_DEPARTURE') < 1600), "Afternoon")
                                 .when((F.col('SCHEDULED_DEPARTURE') >= 1600) & (F.col('SCHEDULED_DEPARTURE') < 2000), "Evening")
                                 .when((F.col('SCHEDULED_DEPARTURE') >= 2000) & (F.col('SCHEDULED_DEPARTURE') <= 2359), "Night")
                                 .otherwise("Invalid value"))
# d. Drop records with Null values
flightsDf=flightsDf.na.drop()

6. Load the machine learning model given (“binary_arrival_delay_pipeline_model.zip”), and use the model to classify whether each flight records are delayed. This is based on the assumption that the data has been labelled, as in the "binaryArrDelay" column. 

Hints: You can import zipfile as zf to load the machine learning model

In [6]:
from zipfile import ZipFile
# extracting zip file
zip = ZipFile('binary_arrival_delay_pipeline_model.zip')
zip.extractall()

# load the machine learning model
model = PipelineModel.load('binary_arrival_delay_pipeline_model')

# classify whether each flight records are delayed
prediction = model.transform(flightsDf)

7. Using the classification results, monitor the data following the requirements below. For each DAY_OF_WEEK = ‘1’, DAY_OF_WEEK = ‘2’, and DAY_OF_WEEK = ‘3’, keep track of the classification accuracy for every 20-second window.

i. Create an output sink to store the prediction results in memory.

ii. Show the prediction results for every 20-second window (you can show partial results). Your results should include the timestamps of every 20-second window, DAY_OF_WEEK (1,2 and 3), prediction accuracies and count/number of flights.

In [7]:
cnt_cond = lambda cond: F.sum(F.when(cond, 1).otherwise(0)) # converting the conditions to 0 and 1 to count the True values

#grouping by the timestamp and day of week
prediction =  prediction.groupBy(window(col("ts"),"2 seconds"), "DAY_OF_WEEK")

# find accuracy and count
prediction = prediction.agg((cnt_cond(col("prediction") == col('binaryArrDelay'))/count(col('prediction'))).alias("accuracy"), count(col("DAY_OF_WEEK")).alias('count')).sort(desc("window"))

# Only for day = 1, day = 2 and day =3
prediction = prediction.filter("DAY_OF_WEEK < 4")

# make a query
prediction_query = prediction.writeStream.outputMode("complete").format("memory").queryName("prediction").trigger(processingTime='2 seconds').start()
prediction_query.stop()

In [None]:
# print out
spark.sql("select * from prediction").show(truncate=False)