# FIT5202 Data processing for Big data

##  Activity: Assignment 2 Part B

### Task 2

##### Student ID: `31265154`
##### Student Name: `Vivekkumar Chaudhari`


### Table of Contents


* [1. Spark Structured Streaming with Spark DataStreams](#one)
    * [1.1 Initialization of SparkSession](#oneone)
    * [1.2 Define the data schema for the sensor location CSV](#onetwo)
    * [1.3 Ingest the streaming data into Spark Streaming](#onethree)
    * [1.4 Persist the raw streaming data in parquet format](#onefour)
    * [1.5 Transform the streaming data into the proper formats](#onefive)
    * [1.6 Transformations to prepare the columns for model prediction](#onesix)
    * [1.7 Derive model predictions and saving it into parquet format](#oneseven)
    * [1.8 (a) Counting above throshold Hourly counts from predicted result](#oneeighta)
    * [1.8 (b) Getting above threshold for next day from prediction result and write to new Kafka topic](#oneeightb)

<a class="anchor" name="one"></a>
# 1. Spark Structured Streaming with Spark DataStreams
<a class="anchor" name="oneone"></a>
## 1.1 Initialization of SparkSession

In [1]:
#2.1
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.ml import feature as FT
from pyspark.ml import classification as C
from pyspark.ml import regression as R
from pyspark.ml import evaluation as E
from pyspark.ml import Pipeline, PipelineModel
from matplotlib import pyplot as plt
from matplotlib import style
from time import sleep
import geohash

## Creating Spark Configuration
# Use all available cores
master = 'local[*]'
# Spark Application name
app_name = 'Streaming Pedestrian Predictions'
# Creating spark session object with default UTC timezone. 
spark = SparkSession.\
    builder.\
    config(conf=SparkConf().\
           setMaster(master).\
           setAppName(app_name).\
           set("spark.sql.session.timeZone", "UTC")).\
    getOrCreate()

<a class="anchor" name="onetwo"></a>
## 1.2 Define the data schema for the sensor location CSV

In [2]:
#2.2
# defining the schema of sensor location dataframe
sensor_location_schema = T.StructType([
    T.StructField('sensor_id', T.IntegerType(), True),
    T.StructField('sensor_description', T.StringType(), True),
    T.StructField('sensor_name', T.StringType(), True),
    T.StructField('installation_date', T.DateType(), True),
    T.StructField('status', T.StringType(), True),
    T.StructField('note', T.StringType(), True),
    T.StructField('direction_1', T.StringType(), True),
    T.StructField('direction_2', T.StringType(), True),
    T.StructField('latitude', T.FloatType(), True),
    T.StructField('longitude', T.FloatType(), True),
    T.StructField('location', T.StringType(), True),
])

# reading sensor locations csv file to dataframe with header and schema.
df_sensors = spark.read.\
            format('csv').\
            option('header', 'true').\
            option('escape', '"').\
            option("dateFormat", 'yyyy/MM/dd').\
            schema(sensor_location_schema).\
            load('Pedestrian_Counting_System_-_Sensor_Locations.csv')

# print the schema of sensor location csv dataframe
df_sensors.printSchema()

root
 |-- sensor_id: integer (nullable = true)
 |-- sensor_description: string (nullable = true)
 |-- sensor_name: string (nullable = true)
 |-- installation_date: date (nullable = true)
 |-- status: string (nullable = true)
 |-- note: string (nullable = true)
 |-- direction_1: string (nullable = true)
 |-- direction_2: string (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- location: string (nullable = true)



<a class="anchor" name="onethree"></a>
## 1.3 Ingest the streaming data into Spark Streaming

In [3]:
# 2.3
# Create the same topic as in Task 1.
topic = 'Pedestrian_data'
# Create the spark's read stream at same location and port as producer
# And set starting offeset of reading stream to earliest to consume all available data on the topic from start.
# In case of reading stream starts late.
pedestrian_stream = spark.\
    readStream.\
    format("kafka").\
    option("kafka.bootstrap.servers", "127.0.0.1:9092").\
    option("subscribe", topic).\
    option("startingOffsets", "earliest").\
    load()

pedestrian_stream.printSchema()
# Convert byte value to string data
df_pedestrian = pedestrian_stream.selectExpr("CAST(value AS STRING) AS day_data", 'timestamp')
df_pedestrian.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

root
 |-- day_data: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



<a class="anchor" name="onefour"></a>
## 1.4 Persist the raw streaming data in parquet format

In [4]:
#2.4
# Write the raw data of read stream to file sink in parquet format
# Writing every new avaialble data with append outputmode.
# With the same trigger as in Task 1. (daywise)
pedestrian_stream.\
    writeStream.\
    format('parquet').\
    outputMode("append").\
    option("path", "parquet/raw_data_stream.parquet").\
    option("checkpointLocation", "parquet/raw_data_stream_checkpoint").\
    trigger(processingTime='5 seconds').\
    start()

<pyspark.sql.streaming.StreamingQuery at 0x7f7293e87130>

<a class="anchor" name="onefive"></a>
## 1.5 Transform the streaming data into the proper formats

In [5]:
#2.5
# defining the schema of monthly ped. count dataframe
count_schema = T.ArrayType( 
    T.StructType([
        T.StructField('ID', T.IntegerType(), True),
        T.StructField('Date_Time', T.StringType(), True),
        T.StructField('Year', T.IntegerType(), True),
        T.StructField('Month', T.StringType(), True),
        T.StructField('Mdate', T.IntegerType(), True),
        T.StructField('Day', T.StringType(), True),
        T.StructField('Time', T.IntegerType(), True),
        T.StructField('Sensor_ID', T.IntegerType(), True),
        T.StructField('Sensor_Name', T.StringType(), True),
        T.StructField('Hourly_Counts', T.IntegerType(), True)
    ]))

# Convert string data to required format using schema format. 
df_counts = df_pedestrian.\
    select(
        F.from_json(F.col("day_data").cast("string"), count_schema).alias('parsed_value'))
# Flatterns nested data 
df_counts = df_counts.select(F.explode(F.col("parsed_value")).alias('unnested_value'))
# Rename the coulumns and convert date time column type format.
df_counts = df_counts.\
    filter(F.col('unnested_value.Time').between(9, 23)).\
    withColumn("Date_Time", F.to_timestamp(F.col('unnested_value.Date_Time'), 'MM/dd/yyyy hh:mm:ss a').cast('timestamp')).\
    select(
        F.col("unnested_value.ID").alias("ID"),
        'Date_Time',
        F.col("unnested_value.Year").alias("Year"),
        F.col("unnested_value.Month").alias("Month"),
        F.col("unnested_value.Mdate").alias("Mdate"),
        F.col("unnested_value.Day").alias("Day"),
        F.col("unnested_value.Time").alias("Time"),
        F.col("unnested_value.Sensor_ID").alias("Sensor_ID"),
        F.col("unnested_value.Sensor_Name").alias("Sensor_Name"),
        F.col("unnested_value.Hourly_Counts").alias("Hourly_Counts")
        )

# Printing final streaming dataframe schema. 
df_counts.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Date_Time: timestamp (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: string (nullable = true)
 |-- Mdate: integer (nullable = true)
 |-- Day: string (nullable = true)
 |-- Time: integer (nullable = true)
 |-- Sensor_ID: integer (nullable = true)
 |-- Sensor_Name: string (nullable = true)
 |-- Hourly_Counts: integer (nullable = true)



<a class="anchor" name="onesix"></a>
## 1.6 Transformations to prepare the columns for model prediction

In [6]:
#2.6
# udf function to get week day as number from week day name as string
get_week_day = F.udf(lambda cellVal: cellVal - 1 if cellVal > 1 else 7, T.IntegerType())
# Getting columns not required for predictions.
remove_cols = [c for c in df_counts.columns if c not in ['Date_Time', 'Sensor_ID', 'Time', 'Hourly_Counts']]

# Filter streaming dataframe into input dataframe to prediction with required columns.
df_filtered_counts = df_counts.\
    drop(*remove_cols).\
    withColumn('next_date', F.date_add(F.col('Date_Time'), 1)).\
    withColumn('next_Mdate', F.dayofmonth('next_date')).\
    withColumn('next_day_week', F.weekofyear('next_date')).\
    withColumn('next_day_of_week', get_week_day(F.dayofweek('next_date'))).\
    drop('Date_Time').\
    withColumnRenamed('Hourly_Counts', 'prev_count')

# Printing filtered dataframe's schema.
df_filtered_counts.printSchema()

root
 |-- Time: integer (nullable = true)
 |-- Sensor_ID: integer (nullable = true)
 |-- prev_count: integer (nullable = true)
 |-- next_date: date (nullable = true)
 |-- next_Mdate: integer (nullable = true)
 |-- next_day_week: integer (nullable = true)
 |-- next_day_of_week: integer (nullable = true)



<a class="anchor" name="oneseven"></a>
## 1.7 Derive model predictions and saving it into parquet format

In [7]:
#2.7
# Load the model.
model = PipelineModel.load('count_estimation_pipeline_model')
# Get predictions from the model by passing formatted streaming dataframe.
df_predictions = model.transform(df_filtered_counts)
# Prinitng schema of output prediction dataframe.
df_predictions.printSchema()

# Writing streaming prediction dataframe results to file sink using parquet format.
# With the same trigger as in Task 1. (daywise)
df_predictions.\
    writeStream.\
    format('parquet').\
    outputMode("append").\
    option("path", "parquet/prediction_stream.parquet").\
    option("checkpointLocation", "parquet/prediction_stream_checkpoint").\
    trigger(processingTime='5 seconds').\
    start()

root
 |-- Time: integer (nullable = true)
 |-- Sensor_ID: integer (nullable = true)
 |-- prev_count: integer (nullable = true)
 |-- next_date: date (nullable = true)
 |-- next_Mdate: integer (nullable = true)
 |-- next_day_week: integer (nullable = true)
 |-- next_day_of_week: integer (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: double (nullable = false)



<pyspark.sql.streaming.StreamingQuery at 0x7f7293e64160>

<a class="anchor" name="oneeighta"></a>
## 1.8 (a) Counting above throshold Hourly counts from predicted result

In [8]:
#2.8 a
# udf function that get count of values that are greater than 2000 in input list. 
above_threshold_count = F.udf(lambda cellVal: sum(map(lambda x: x > 2000, cellVal)), T.IntegerType())

# From streaming prediction dataframe, group result by each sensor prediction on each day window.
# Select the window frame as start and end date and display total count that exceed 2000 on that frame window. (day-window)
df_grouped_by_threshold = df_predictions.\
    groupby(df_predictions.Sensor_ID, 
            F.window(df_predictions.next_date, '1 day').alias('date_window')).\
    agg(above_threshold_count(F.collect_list('prediction')).alias('above_threshold_count')).\
    select(\
        F.col('date_window.start').alias('start_date'), \
        F.col('date_window.end').alias('end_date'), \
        F.col('Sensor_ID'), \
        F.col('above_threshold_count')).\
    sort('date_window.start')

df_grouped_by_threshold.printSchema()

# Create function to show values received from input dataframe
def foreach_batch_function(df, epoch_id):
    df.\
    show(10, False)
    print(f'Number of Rows processed: {df.count()}')

# Write the output of performed group operation on streaming prediction dataframe to notebook
# Using predefine foreach batch function.
# With the same trigger as in Task 1. (daywise)
group_by_query = df_grouped_by_threshold.\
    writeStream.\
    outputMode('complete').\
    foreachBatch(foreach_batch_function).\
    trigger(processingTime='5 seconds').\
    start()

root
 |-- start_date: timestamp (nullable = true)
 |-- end_date: timestamp (nullable = true)
 |-- Sensor_ID: integer (nullable = true)
 |-- above_threshold_count: integer (nullable = true)



<a class="anchor" name="oneeightb"></a>
## 1.8 (b) Getting above threshold for next day from prediction result and write to new Kafka topic 

In [9]:
#2.8 b

# Geohash the location upto 1.1 m precision using 5 decimal places into latitude and longitude coordinates.
# This key maybe used for later use, to reduce the data for almost same location points.
def get_geohash_id(latitude_str, longitude_str):
    return geohash.encode(float(latitude_str), float(longitude_str), 5)

# Build value dictionary to send through kafka topic.
def build_values(mdate, time, sen_id, pred, lat, lon, sn):
    return str({'mdate': mdate, 'time': time, 'sid': sen_id, 'latitude': lat, 'longitude': lon, 'sname': sn, 'prediction': pred})

# udf function to get "key" parameter to set in kafka topic
get_geohash = F.udf(get_geohash_id, T.StringType())

# udf function to get "value" parameter to set in kafka topic
get_value = F.udf(build_values, T.StringType())

# retrive all prediction results that has higher thrshold count (>2000) 
df_filtered = df_predictions.\
    filter(F.col('prediction')>2000)

join_condition2 = [df_filtered.Sensor_ID==df_sensors.sensor_id]
    

# New sensor: 73,Bourke St - Spencer St (South) -> Not avaialble in Sensor data
# Sensor 73 has higher pedestrain count >2000 but we do not have its location information 
# so, this sensor is exclude
# Combine result with sensor data to retrive sensor name and location.
df_joined = df_filtered.\
    join(df_sensors, join_condition2, 'inner').\
    select(df_sensors.sensor_id, 
           df_filtered.next_date, 
           df_filtered.Time,
           df_filtered.prediction,
           df_sensors.sensor_description,
           df_sensors.latitude,df_sensors.longitude).\
    withColumn('next_date', F.col('next_date').cast('string')).\
    withColumn('geohashkey', F.lit(get_geohash('latitude', 'longitude'))).\
    withColumn('data', 
               F.lit(get_value(
                   'next_date', 'Time', 'sensor_id', 'prediction',
                   'latitude', 'longitude', 'sensor_description'))).\
    select('geohashkey', 'data')

# Declare new topic name to create new kafka topic that holds prediction results.
new_topic_name = f'{topic}_Prediction_Location'

# Create new kapka stream with key value retrived from join streaming quey.
# Alos, we only sending data that is new or updated using output mode update.
join_query = df_joined.\
    selectExpr("CAST(geohashkey AS STRING) as key", "CAST(data AS STRING) as value").\
    writeStream.\
    format("kafka").\
    option("kafka.bootstrap.servers","localhost:9092").\
    option("topic", new_topic_name).\
    option("checkpointLocation", f'{new_topic_name}_checkpoint').\
    outputMode("update").\
    start()

+-------------------+-------------------+---------+---------------------+
|start_date         |end_date           |Sensor_ID|above_threshold_count|
+-------------------+-------------------+---------+---------------------+
|2020-12-02 00:00:00|2020-12-03 00:00:00|34       |0                    |
|2020-12-02 00:00:00|2020-12-03 00:00:00|29       |0                    |
|2020-12-02 00:00:00|2020-12-03 00:00:00|19       |0                    |
|2020-12-02 00:00:00|2020-12-03 00:00:00|73       |0                    |
|2020-12-02 00:00:00|2020-12-03 00:00:00|5        |34                   |
|2020-12-02 00:00:00|2020-12-03 00:00:00|68       |0                    |
|2020-12-02 00:00:00|2020-12-03 00:00:00|63       |68                   |
|2020-12-02 00:00:00|2020-12-03 00:00:00|28       |0                    |
|2020-12-02 00:00:00|2020-12-03 00:00:00|67       |51                   |
|2020-12-02 00:00:00|2020-12-03 00:00:00|52       |0                    |
+-------------------+-----------------