#### Import library

In [2]:
# load in all required libraries
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType, IntegerType, ArrayType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
from pyspark.ml import PipelineModel

from pyspark.sql import SparkSession
from pyspark import SparkConf 
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql import functions as F
from pyspark.sql.types import *

import math 
import statistics as stat
from collections import Counter
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as md
import time
import os




#### Initial Spark Session

In [4]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'

# set up the spark configuartion
spark_conf = SparkConf()\
.setMaster('local[2]')\
.set('spark.sql.session.timeZone', 'UTC')

# start the spark session
spark = SparkSession \
    .builder \
    .config(conf=spark_conf) \
    .appName("Flight Data Streaming") \
    .getOrCreate() 
sc = spark.sparkContext
sc.setLogLevel('ERROR')


#### Connect to Kafka

In [3]:
# set up the topic for this session
topic = 'flight-topic'

# read the stream sent by the producer
flight_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
    .option("subscribe", topic) \
    .load()

In [4]:
# printing the schema of the data stream
flight_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



#### Data preprocessing

In [5]:
# converting the key, value kafka data stream to string format
flight_df = flight_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [6]:
# print schema
flight_df.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



In [7]:
# split value string into a list
flight_df = flight_df.selectExpr("split(value, ',') AS values") 
    

In [8]:
# format each field to a suitable data type
flight_df = flight_df.selectExpr("CAST(values[0] AS INT)AS YEAR", 
                   "CAST(values[1] AS INT) AS MONTH",
                   "CAST(values[2] AS INT) AS DAY",
                   "CAST(values[3] AS INT) AS DAY_OF_WEEK",
                   "values[4] AS AIRLINE",
                   "CAST(values[5] AS INT) AS FLIGHT_NUMBER",
                   "values[6] AS TAIL_NUMBER",
                   "values[7] AS ORIGIN_AIRPORT",
                   "values[8] AS DESTINATION_AIRPORT",
                   "CAST(values[9] AS INT) AS SCHEDULED_DEPARTURE",
                   "CAST(values[10] AS INT) AS DEPARTURE_TIME",
                   "CAST(values[11] AS INT) AS DEPARTURE_DELAY",
                   "CAST(values[12] AS INT) AS TAXI_OUT",
                   "CAST(values[13] AS INT) AS WHEELS_OFF",
                   "CAST(values[14] AS INT) AS SCHEDULED_TIME",
                   "CAST(values[15] AS INT) AS ELAPSED_TIME",
                   "CAST(values[16] AS INT) AS AIR_TIME",
                   "CAST(values[17] AS INT) AS DISTANCE",
                   "CAST(values[18] AS INT) AS WHEELS_ON",
                   "CAST(values[19] AS INT) AS TAXI_IN",
                   "CAST(values[20] AS INT) AS SCHEDULED_ARRIVAL",
                   "CAST(values[21] AS INT) AS ARRIVAL_TIME",
                   "CAST(values[22] AS INT) AS ARRIVAL_DELAY",
                   "CAST(values[23] AS INT) AS DIVERTED",
                   "CAST(values[24] AS INT) AS CANCELLED",
                   "values[25] AS CANCELLATION_REASON",
                   "CAST(values[26] AS INT) AS AIR_SYSTEM_DELAY",
                   "CAST(values[27] AS INT) AS SECURITY_DELAY",
                   "CAST(values[28] AS INT) AS AIRLINE_DELAY",
                   "CAST(values[29] AS INT) AS LATE_AIRCRAFT_DELAY",
                   "CAST(values[30] AS INT) AS WEATHER_DELAY"

)
    


In [9]:
# print schema
flight_df.printSchema()

root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- WHEELS_OFF: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- AIR_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true)
 |-- ARRIVAL_DELAY: integer (null

#### Data cleaning

In [10]:
# list of columns deemed to be not useful for our models
removed_columns = ['CANCELLATION_REASON','AIR_SYSTEM_DELAY','SECURITY_DELAY',
                     'AIRLINE_DELAY','LATE_AIRCRAFT_DELAY','WEATHER_DELAY']

In [11]:
# create a function that removes unwated columns
def eliminate_columns(removed_columns, df):
    
    # drop the unwated columns
    df = df.drop(*removed_columns)
    
    # return the modified dataframe
    return df

In [12]:
# apply the function that removes unwated columns
flightsRawDf = eliminate_columns(removed_columns, flight_df)

In [13]:
# drop rows with na values
flightsDf = flightsRawDf.na.drop("any")

# drop rows with null values
flightsDf = flightsDf.dropna("any")

#### Make prediction using trained model

##### Model 1: Decision Tree and arrival delay

In [None]:
# load model 1
dt_arr_model = PipelineModel.load('models/dt_arr')
# apply model 1
dt_arr_pred = dt_arr_model.transform(flightsDf)

In [29]:
# write predicted dataframe to in-memory table 
see_results1 = dt_arr_pred \
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("predictions1") \
    .trigger(processingTime='2 seconds') \
    .start()

In [28]:
# stop query
see_results1.stop()

In [31]:
# display prediction
from IPython.display import clear_output, display
while True:
    clear_output(wait=True)
    display(spark.sql('SELECT prediction, probability FROM predictions1').show())
    time.sleep(5)

+----------+--------------------+
|prediction|         probability|
+----------+--------------------+
|       0.0|[0.70365735671538...|
|       0.0|[0.70365735671538...|
|       0.0|[0.83846775707005...|
|       0.0|[0.70365735671538...|
|       1.0|[0.40971299495083...|
|       0.0|[0.83846775707005...|
|       0.0|[0.70365735671538...|
|       0.0|[0.70365735671538...|
|       1.0|[0.14783537177247...|
|       0.0|[0.70365735671538...|
|       0.0|[0.83846775707005...|
|       0.0|[0.83846775707005...|
|       1.0|[0.40971299495083...|
|       0.0|[0.83846775707005...|
|       0.0|[0.70365735671538...|
|       0.0|[0.83846775707005...|
|       0.0|[0.83846775707005...|
|       0.0|[0.83846775707005...|
|       0.0|[0.83846775707005...|
|       0.0|[0.70365735671538...|
+----------+--------------------+
only showing top 20 rows



None

##### Model 2: Decision Tree and departure delay

In [17]:
# load model 2 
dt_dept_model = PipelineModel.load('models/dt_dep')

# apply model 2
dt_dept_pred = dt_dept_model.transform(flightsDf)

In [18]:
# write predicted dataframe to the sink 
see_results2 = dt_dept_pred \
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("predictions2") \
    .trigger(processingTime='2 seconds') \
    .start()

In [None]:
# stop query
see_results2.stop()

In [32]:
# display prediction
from IPython.display import clear_output, display
while True:
    clear_output(wait=True)
    display(spark.sql('SELECT prediction, probability FROM predictions2').show())
    time.sleep(5)

+----------+--------------------+
|prediction|         probability|
+----------+--------------------+
|       0.0|[0.70365735671538...|
|       0.0|[0.83846775707005...|
|       0.0|[0.83846775707005...|
|       0.0|[0.83846775707005...|
|       0.0|[0.83846775707005...|
|       0.0|[0.70365735671538...|
|       0.0|[0.70365735671538...|
|       0.0|[0.70365735671538...|
|       0.0|[0.83846775707005...|
|       0.0|[0.70365735671538...|
|       0.0|[0.83846775707005...|
|       0.0|[0.70365735671538...|
|       0.0|[0.83846775707005...|
|       0.0|[0.70365735671538...|
|       0.0|[0.83846775707005...|
|       1.0|[0.14783537177247...|
|       0.0|[0.70365735671538...|
|       0.0|[0.83846775707005...|
|       0.0|[0.70365735671538...|
|       0.0|[0.70365735671538...|
+----------+--------------------+
only showing top 20 rows



None

##### Model 3: Gradient Boost Tree and arrival delay

In [20]:
# load model 3 
gbt_arr_model = PipelineModel.load('models/gbt_arr')

# apply model 3
gbt_arr_pred = gbt_arr_model.transform(flightsDf)

In [21]:
# write predicted dataframe to the sink 
see_results3 = gbt_arr_pred \
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("predictions3") \
    .trigger(processingTime='2 seconds') \
    .start()

In [None]:
# stop query
see_results3.stop()

In [33]:
# display prediction
from IPython.display import clear_output, display
while True:
    clear_output(wait=True)
    display(spark.sql('SELECT prediction, probability FROM predictions3').show())
    time.sleep(5)

+----------+--------------------+
|prediction|         probability|
+----------+--------------------+
|       0.0|[0.83846775707005...|
|       0.0|[0.70365735671538...|
|       0.0|[0.83846775707005...|
|       0.0|[0.70365735671538...|
|       0.0|[0.83846775707005...|
|       1.0|[0.14783537177247...|
|       0.0|[0.70365735671538...|
|       0.0|[0.83846775707005...|
|       0.0|[0.70365735671538...|
|       0.0|[0.70365735671538...|
|       0.0|[0.83846775707005...|
|       0.0|[0.70365735671538...|
|       0.0|[0.83846775707005...|
|       0.0|[0.70365735671538...|
|       0.0|[0.83846775707005...|
|       0.0|[0.70365735671538...|
|       0.0|[0.70365735671538...|
|       0.0|[0.83846775707005...|
|       0.0|[0.70365735671538...|
|       1.0|[0.40971299495083...|
+----------+--------------------+
only showing top 20 rows



None

##### Model 4: Gradient Boost Tree and departure delay

In [23]:
# load model 4 
gbt_dept_model = PipelineModel.load('models/abt_dep')

# apply model 4
gbt_dept_pred = gbt_dept_model.transform(flightsDf)

In [36]:
# write predicted dataframe to the sink 
see_results4 = gbt_dept_pred \
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("predictions4") \
    .trigger(processingTime='2 seconds') \
    .start()

In [26]:
# stop query
see_results4.stop()

In [39]:
# display prediction
from IPython.display import clear_output, display
while True:
    clear_output(wait=True)
    display(spark.sql('SELECT prediction, probability FROM predictions4').show())
    time.sleep(5)

+----------+--------------------+
|prediction|         probability|
+----------+--------------------+
|       0.0|[0.70365735671538...|
|       0.0|[0.70365735671538...|
|       0.0|[0.83846775707005...|
|       0.0|[0.70365735671538...|
|       0.0|[0.83846775707005...|
|       0.0|[0.70365735671538...|
|       0.0|[0.83846775707005...|
|       0.0|[0.70365735671538...|
|       0.0|[0.83846775707005...|
|       1.0|[0.14783537177247...|
|       0.0|[0.70365735671538...|
|       0.0|[0.83846775707005...|
|       0.0|[0.70365735671538...|
|       0.0|[0.70365735671538...|
|       0.0|[0.83846775707005...|
|       0.0|[0.70365735671538...|
|       0.0|[0.83846775707005...|
|       0.0|[0.70365735671538...|
|       0.0|[0.83846775707005...|
|       0.0|[0.83846775707005...|
+----------+--------------------+
only showing top 20 rows



None