## 2 Streaming application using Spark Structured Streaming

Implement Spark Structured Streaming to consume the data from the producer and perform predictive analytics.

### 2.1 Import library

In [None]:
# import library
from pyspark import SparkConf
from pyspark import SparkContext # Spark
from pyspark.sql import SparkSession # Spark SQL
import os
from pyspark.sql.types import StructType, IntegerType, TimestampType,StringType,DateType, ArrayType, StructField, FloatType
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import year, month, dayofmonth, weekofyear, dayofweek
from pyspark.sql.functions import when
from pyspark.sql.functions import window, col
from pyspark.sql.functions import to_json, struct
from pyspark.sql.functions import collect_list
from pyspark.sql.functions import unix_timestamp

### 2.2 Create SparkSession

Get SparkContext object from SparkSession. Using SparkConf object to configure the Spark app with the setting as follow: (Sangat, P., 2021)

- Application name: Pedestrain Traffic Predictive Analysis
- Session timezone: UTC
- run with 2 local cores 

In [None]:

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'

# local[2]: run Spark in local mode with2 working processors as logical cores on your machine
master = "local[2]"

# The `appName` field is a name to be shown on the Spark cluster UI page
app_name = "Pedestrain Traffic Predictive Analysis"

#### set melbourne timezone

# Setup configuration parameters for Spark
spark_conf = SparkConf().setMaster(master).setAppName(app_name)

# Using SparkSession
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
spark.conf.set("spark.sql.session.timeZone", 'UTC')

### 2.2 Define the data schema (for sensor location CSV)
Define the data schema for the sensor location CSV file following the data types in the metadata file except for the location columns (Use StringType for “location” column).

In [None]:
schema_B = StructType() \
      .add("sensor_id",IntegerType(),True) \
      .add("sensor_description",StringType(),True) \
      .add("sensor_name",StringType(),True) \
      .add("installation_date",DateType(),True) \
      .add("status",StringType(),True) \
      .add("note",StringType(),True) \
      .add("direction_1",StringType(),True) \
      .add("direction_2",StringType(),True) \
      .add("latitude",FloatType(),True)\
      .add("longitude",FloatType(),True)\
      .add("location",StringType(),True)

In [None]:
df_senloc = spark.read.options(header = 'True', delimiter=',').schema(schema_B) \
  .csv("Pedestrian_Counting_System_-_Sensor_Locations.csv")

### 2.3 Read Stream from the Kafka Topic

Connection to Kafka Producer/Broker and subscribe to the topic - pedestrain_count and load data from Kafka topic with <code>readStream</code>

In [None]:
topic = "pedestrain_count"
df = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
      .option("subscribe", topic) \
      .load()

Converting the key/value from the kafka data stream to string.

In [None]:
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.printSchema()

### 2.4 Persist the data in parquet format

In [None]:
# Write into parquet files the unsuccessful requests partitioned by status code
query_file_sink = df.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "parquet/pedcount_df")\
        .option("checkpointLocation", "parquet/pedcount_df/checkpoint")\
        .start()


In [None]:
#Stop the file_sink query
query_file_sink.stop()

In [None]:
#Define the schema for the structured datastream received
schema1 = ArrayType(StructType([    
    StructField('ID', IntegerType(), True), 
    StructField('Date_Time', StringType(), True), 
    StructField('Year', IntegerType(), True),
    StructField('Month', StringType(), True),
    StructField('Mdate', IntegerType(), True),
    StructField('Day', StringType(), True),
    StructField('Time', IntegerType(), True),
    StructField('Sensor_ID', IntegerType(), True),
    StructField('Sensor_Name', StringType(), True),
    StructField('Hourly_Counts', IntegerType(), True), 
    StructField('date', StringType(), True), 
    StructField('time1', StringType(), True),
    StructField('am/pm', StringType(), True)
]))

Use the `explode` function to flatten the nested columns, then proceed with to rename the columns

In [None]:
df_pedcount = df.select(F.from_json(F.col("value").cast("string"), schema1).alias('parsed_value'))
df_pedcount = df_pedcount.select(F.explode(F.col("parsed_value")).alias('unnested_value')) 

# rename the columns
df_formatted = df_pedcount.select(
                    F.col("unnested_value.ID").alias("ID"),
                    F.col("unnested_value.Date_Time").alias("Date_Time"),
                    F.col("unnested_value.Year").alias("Year"),
                    F.col("unnested_value.Month").alias("Month"),
                    F.col("unnested_value.Mdate").alias("Mdate"),
                    F.col("unnested_value.Day").alias("Day"),
                    F.col("unnested_value.Time").alias("Time"),
                    F.col("unnested_value.Sensor_ID").alias("Sensor_ID"),
                    F.col("unnested_value.Sensor_Name").alias("Sensor_Name"),
                    F.col("unnested_value.Hourly_Counts").alias("Hourly_Counts"),
                    F.col("unnested_value.date").alias("date"),
                    F.col("unnested_value.time1").alias("time1"),
                    F.col("unnested_value.am/pm").alias("am/pm")      
                )

### 2.6 Add required columns for the model prediction

Perform the following transformations to prepare the columns for model prediction.

In [None]:
from pyspark.sql.functions import to_timestamp
df_formatted = df_formatted.withColumn('Date_Time', to_timestamp(df_formatted.Date_Time, 'MM/dd/yyyy hh:mm:ss a'))
df_formatted = df_formatted.withColumn('date', to_timestamp(df_formatted.date, 'MM/dd/yyyy'))

# Create a date format column named “next_date”
df_nextdate = df_formatted.withColumn('next_date',F.date_add(df_formatted['Date_Time'], 1))

# Create the column named “next_Mdate”
df_mdate = df_nextdate.withColumn('next_Mdate',dayofmonth("next_date"))

# Create the column named “next_day_week”
df_nextdayweek = df_mdate.withColumn('next_day_week',weekofyear("next_date"))

# Create the column named “next_day_of_week”
df_nextdayofweek = df_nextdayweek.withColumn('next_day_of_weekk',dayofweek("next_date")-1)
new_pedcount = df_nextdayofweek.withColumn("next_day_of_week", \
              when(df_nextdayofweek["next_day_of_weekk"] == 0, 7)\
                        .otherwise(df_nextdayofweek["next_day_of_weekk"]))

# Rename the column “Hourly_Count” as “prev_count
new_pedcount = new_pedcount.withColumnRenamed('Hourly_Counts', 'prev_count')



In [None]:
# check
# query = new_pedcount \
#     .writeStream \
#     .outputMode("append") \
#     .format("console") \
#     .trigger(processingTime='5 seconds') \
#     .start()

In [None]:
#query.stop()

### 2.7 Load prediction model

Load the given prediction model to predict the next day's pedestrain count between 9am - 11.59pm.The provided model “count_estimation_pipeline_model” is a simplified version to predict the hourly count given the input of sensor ID, week of the year, day of the month, day of the week, time, and previous day’s hourly count at the same hour.

In [None]:
#Loading the Pipeline Model From the filesystem
from pyspark.ml import PipelineModel
pipelineModel = PipelineModel.load('count_estimation_pipeline_model')

print(pipelineModel.stages[-1]._java_obj.paramMap())

Filter out times not between **9am to 11:59pm**

In [None]:
pred_pedcount = new_pedcount.filter(F.col('Time') >= 9)

In [None]:
predictions_df = pipelineModel.transform(pred_pedcount)

Persist the prediction result in parquet format

In [None]:
# Write into parquet files the unsuccessful requests partitioned by status code
query_file_prediction = predictions_df.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "parquet/prediction_df")\
        .option("checkpointLocation", "parquet/prediction_df/checkpoint1")\
        .start()

In [None]:
#Stop the file_sink query
query_file_prediction.stop()

### 2.8 Data processing
Process the data with the prediction result, following the below requirements:

    a. Get number of hours that the predicted pedestrain count would exceeded 2000 on each day for each sensor.

In [None]:
# Add threshold column
predictions_df = predictions_df.withColumn('above_threshold', F.when(F.col('prediction') < 2001, 0).otherwise(1))

In [None]:
# set watermark for 1 day and window that slide 1 day and get the 
# sum of hours that pedcount exceeded 2000
windowedCounts = predictions_df \
    .withWatermark("date", "1 Day") \
    .groupBy(window(predictions_df.date, "1 Day", "1 Day"),predictions_df.Sensor_ID )\
    .agg(F.sum("above_threshold").alias("no_of_hours"))\
    .select("window","Sensor_ID","no_of_hours")

In [None]:
# Create function to show values received from input dataframe
def foreach_batch_function(df, epoch_id):
    df.show(20, truncate = False)

In [None]:
# Show the result inside the notebook
query = windowedCounts \
    .writeStream \
    .outputMode("update") \
    .foreachBatch(foreach_batch_function) \
    .queryName("hour_count") \
    .option("truncate","false")\
    .start()

In [None]:
query.stop()

    b. Conbine the predicted results that exceeded 2000 with sensor longitude and latitude

In [None]:
# Rename the column “sendor_id” as 'sen_id'
df_senloc = df_senloc.withColumnRenamed('sensor_id', 'sen_id')

# Select onyl the required columns
df_senloc = df_senloc.select("sen_id", "latitude", "longitude")

Joining the streaming dataFrame - predictions_df with a static dataFrame - df_senloc.

In [None]:
# Join Dataframes
ped_location = predictions_df.join(df_senloc,predictions_df.Sensor_ID==df_senloc.sen_id,how='inner')

# filter the df and select the required columns
ped_2000 = ped_location.filter(col("above_threshold") == 1)\
                .select('Date_Time', 'next_date', 'next_Mdate', 'next_day_week', 'next_day_of_week', 'prediction', 'Sensor_ID', 'latitude', 'longitude', 'prediction')

# construct key and value columns
new_ped = ped_2000.select(col("next_date"), to_json(struct("*")), col('Date_Time')).toDF("key", "value", "datee")

# aggregate the value with the same day (key) with watermakr of 1 day
windowedCounts1 = new_ped \
    .withWatermark("datee", "1 Day") \
    .groupBy(window("datee", "1 Day"),"key" )\
    .agg(collect_list('value').alias("value"))

Wrte the stream back to Kafka

In [None]:
new_ped1 = windowedCounts1.selectExpr("CAST(key AS STRING) AS key", "CAST(value AS STRING) AS value")
topic1 = 'ped_count_2000'
query= new_ped1.writeStream \
            .outputMode("append") \
            .format("kafka") \
            .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
            .option('topic', topic1) \
            .option("checkpointLocation", "kafka/ped_count_2000/checkpoint2")\
            .start()

In [None]:
query.stop()

In [None]:
#check
# query = windowedCounts1 \
#     .writeStream \
#     .outputMode("append") \
#     .format("console") \
#     .trigger(processingTime='5 seconds') \
#     .start()