# 1Predicting sales data using Spark Streaming

### 2.1 Create SparkSession


In [31]:
#importing the libraries and statements
import os
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

#creating spark configuration object
processing_cores = "local[4]"

#application name
application_name = "assignment2b"

configuration = SparkConf().setMaster(processing_cores).setAppName(application_name)

#creating the spark session
spark = SparkSession.builder.config(conf = configuration).getOrCreate()

# creating spark context and setting a checkpoiint directory
sc = spark.sparkContext
sc.setLogLevel("ERROR")
sc.setCheckpointDir("spark_checkpoint")

### 2.2 Define schema and load file



In [32]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

#creating a schema for the stores data
stores_schema = StructType() \
.add("Store", IntegerType(), True) \
.add("Type", StringType(), True) \
.add("Size", IntegerType(), True)

#reading the stores data
stores_df = spark.read.format("csv").option("header", True).options(delimiter = ',').schema(stores_schema).load("stores.csv")
stores_df.printSchema()

root
 |-- Store: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- Size: integer (nullable = true)



### 2.3 Injest Kafka data




In [33]:
#configuration
hostip = "192.168.62.158"

#reading the encoded data from the kafka producer stream
spark_consumer_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", f'{hostip}:9092').option("encoding", "UTF-8") \
.option("subscribe", "kafka_producer_stream").option("startingOffsets", "latest").load()

In [34]:
#creating the schema for the data from the kafka stream
#ingesting "ts" as Long type and others as string type
producer_schema = ArrayType(StructType() \
.add("Store", StringType(), True) \
.add("Date", StringType(), True) \
.add("Temperature", StringType(), True) \
.add("Fuel_Price", StringType(), True) \
.add("MarkDown1", StringType(), True) \
.add("MarkDown2", StringType(), True) \
.add("MarkDown3", StringType(), True) \
.add("MarkDown4", StringType(), True) \
.add("MarkDown5", StringType(), True) \
.add("CPI", StringType(), True) \
.add("Unemployment", StringType(), True) \
.add("IsHoliday", StringType(), True) \
.add("last_weekly_sales", StringType(), True) \
.add("ts", LongType(), True))

#converting the values from the kafka data stream to string
df = spark_consumer_df.selectExpr("CAST(value AS STRING)")

# parsing the string to the json format based on the defined schema
df = df.select(from_json(col("value"), producer_schema).alias('parsed_data'))

# exploding the parsed json and getting the nested values
df = df.select(explode(col("parsed_data")).alias("data"))

# ".*" for selecting the nested values
df = df.selectExpr("data.*")

#printing the schema for the dataframe
df.printSchema()

root
 |-- Store: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Temperature: string (nullable = true)
 |-- Fuel_Price: string (nullable = true)
 |-- MarkDown1: string (nullable = true)
 |-- MarkDown2: string (nullable = true)
 |-- MarkDown3: string (nullable = true)
 |-- MarkDown4: string (nullable = true)
 |-- MarkDown5: string (nullable = true)
 |-- CPI: string (nullable = true)
 |-- Unemployment: string (nullable = true)
 |-- IsHoliday: string (nullable = true)
 |-- last_weekly_sales: string (nullable = true)
 |-- ts: long (nullable = true)



### 2.4 Persist raw data


In [35]:
#persisting the exploded data and writing the stream in parquet format
#and saving the parquet file in the given location
persisted_sink = df.writeStream \
.outputMode("append").format("parquet") \
.option("path", "persisted_parquet_data") \
.option("checkpointLocation", "persisted_parquet_data/checkpoint") \
.trigger(processingTime = "5 seconds") \
.start()

In [36]:
#reading the parquet result and showing the file
parquet_df = spark.read.parquet("persisted_parquet_data/*.snappy.parquet")
parquet_df.show(5, truncate = False)

+-----+----------+-----------+----------+---------+---------+---------+---------+---------+---------+------------+---------+------------------+----------+
|Store|Date      |Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|CPI      |Unemployment|IsHoliday|last_weekly_sales |ts        |
+-----+----------+-----------+----------+---------+---------+---------+---------+---------+---------+------------+---------+------------------+----------+
|1    |2011-02-04|42.27      |2.989     |nan      |nan      |nan      |nan      |nan      |212.56688|7.742       |false    |1316899.3076019287|1675857813|
|10   |2011-02-04|44.88      |3.348     |nan      |nan      |nan      |nan      |nan      |127.71958|8.744       |false    |1715769.0571422577|1675857813|
|11   |2011-02-04|47.17      |2.989     |nan      |nan      |nan      |nan      |nan      |215.88634|7.551       |false    |1100418.6917500496|1675857813|
|12   |2011-02-04|45.14      |3.348     |nan      |nan      |nan      

In [37]:
persisted_sink.stop()

### 2.5 Transform data formats


In [38]:
#creating a new dataframe - "formatted_df" by transforming the datatypes in the dataframe - "df" 
formatted_df = df.withColumn("Store", df["Store"].cast(IntegerType())) \
.withColumn("Date", df["Date"].cast(DateType())) \
.withColumn("Temperature", df["Temperature"].cast(DoubleType())) \
.withColumn("Fuel_Price", df["Fuel_Price"].cast(DoubleType())) \
.withColumn("MarkDown1", df["MarkDown1"].cast(DoubleType())) \
.withColumn("MarkDown2", df["MarkDown2"].cast(DoubleType())) \
.withColumn("MarkDown3", df["MarkDown3"].cast(DoubleType())) \
.withColumn("MarkDown4", df["MarkDown4"].cast(DoubleType())) \
.withColumn("MarkDown5", df["MarkDown5"].cast(DoubleType())) \
.withColumn("CPI", df["CPI"].cast(DoubleType())) \
.withColumn("Unemployment", df["Unemployment"].cast(DoubleType())) \
.withColumn("IsHoliday", df["IsHoliday"].cast(BooleanType())) \
.withColumn("last_weekly_sales", df["last_weekly_sales"].cast(DoubleType())) \
.withColumn("ts", df["ts"].cast(TimestampType()))

#printing the schema for the data
formatted_df.printSchema()

root
 |-- Store: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- MarkDown1: double (nullable = true)
 |-- MarkDown2: double (nullable = true)
 |-- MarkDown3: double (nullable = true)
 |-- MarkDown4: double (nullable = true)
 |-- MarkDown5: double (nullable = true)
 |-- CPI: double (nullable = true)
 |-- Unemployment: double (nullable = true)
 |-- IsHoliday: boolean (nullable = true)
 |-- last_weekly_sales: double (nullable = true)
 |-- ts: timestamp (nullable = true)



### 2.6 Prepare feature columns


In [39]:
from pyspark.sql import functions as F

#creating new columns - month, day of the month, day of the year and week of the year from the date column
formatted_df = formatted_df.withColumn("Month", F.month("Date"))
formatted_df = formatted_df.withColumn("day_of_month", F.dayofmonth("Date"))
formatted_df = formatted_df.withColumn("day_of_year", F.dayofyear("Date"))
formatted_df = formatted_df.withColumn("week_of_year", F.weekofyear("Date"))

formatted_df.printSchema()

root
 |-- Store: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- MarkDown1: double (nullable = true)
 |-- MarkDown2: double (nullable = true)
 |-- MarkDown3: double (nullable = true)
 |-- MarkDown4: double (nullable = true)
 |-- MarkDown5: double (nullable = true)
 |-- CPI: double (nullable = true)
 |-- Unemployment: double (nullable = true)
 |-- IsHoliday: boolean (nullable = true)
 |-- last_weekly_sales: double (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- Month: integer (nullable = true)
 |-- day_of_month: integer (nullable = true)
 |-- day_of_year: integer (nullable = true)
 |-- week_of_year: integer (nullable = true)



### 2.7 Join the local data


In [40]:
#joining the formatted_df with stores df based on store id 
joined_df = formatted_df.join(stores_df, ["Store"], "inner")

#printing the schema for the joined dataframe
joined_df.printSchema()

root
 |-- Store: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- MarkDown1: double (nullable = true)
 |-- MarkDown2: double (nullable = true)
 |-- MarkDown3: double (nullable = true)
 |-- MarkDown4: double (nullable = true)
 |-- MarkDown5: double (nullable = true)
 |-- CPI: double (nullable = true)
 |-- Unemployment: double (nullable = true)
 |-- IsHoliday: boolean (nullable = true)
 |-- last_weekly_sales: double (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- Month: integer (nullable = true)
 |-- day_of_month: integer (nullable = true)
 |-- day_of_year: integer (nullable = true)
 |-- week_of_year: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- Size: integer (nullable = true)



### 2.8 Perform predictions


In [41]:
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel

#loading the ml model provided 
model = PipelineModel.load("sales_estimation_pipeline_model")

#transforming the joined dataframe
predictions_df = model.transform(joined_df) 

In [42]:
# writing the stream of predictions into parquet files
predictions_sink = predictions_df.writeStream \
.outputMode("append").format("parquet") \
.option("path", "predictions_parquet_data") \
.option("checkpointLocation", "predictions_parquet_data/checkpoint") \
.trigger(processingTime = "5 seconds") \
.start()

In [43]:
#reading the predictions parquet file and showing the data
predictions = spark.read.parquet("predictions_parquet_data/*.snappy.parquet")
predictions.show(2, truncate = False)

+-----+----------+-----------+----------+---------+---------+---------+---------+---------+---------+------------+---------+------------------+-------------------+-----+------------+-----------+------------+----+------+--------+-------------+------------------------------------------------------------------------------------------------------------------+-----------------+
|Store|Date      |Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|CPI      |Unemployment|IsHoliday|last_weekly_sales |ts                 |Month|day_of_month|day_of_year|week_of_year|Type|Size  |Type_idx|Type_vec     |features                                                                                                          |prediction       |
+-----+----------+-----------+----------+---------+---------+---------+---------+---------+---------+------------+---------+------------------+-------------------+-----+------------+-----------+------------+----+------+--------+-------------+------

In [44]:
#stopping the streaming of predictions
predictions_sink.stop()

### 2.9 write code to process the data following requirements


In [45]:
#filtering stores that have (weekly_sales prediction / store size) > 8.5 in the predictions dataframe
#abandoning the data received after 3 seconds using watermark
#showing the grouped data with window duration 10 seconds and slide duration 5 seconds

result_df = predictions_df.filter((predictions_df.prediction)/(predictions_df.Size) > 8.5) \
.withWatermark("ts", "3 seconds") \
.groupBy(F.window("ts", windowDuration = "10 seconds", slideDuration = "5 seconds"), "Type").count()

In [51]:
#printing the data where the number of rows >0
# i.e. showing the count where there is data

def print_function(df, epoch_id):
    if(df.count() > 0):
        df.show(3, truncate = False)

In [52]:
#writing the result stream into a memory sink  
#and processing each batch of data every 5 seconds
result_sink = result_df.writeStream.outputMode("complete")\
            .format("memory").queryName("data") \
            .foreachBatch(print_function)\
            .trigger(processingTime = '5 seconds')\
            .start()

+------------------------------------------+----+-----+
|window                                    |Type|count|
+------------------------------------------+----+-----+
|{2023-02-08 12:04:55, 2023-02-08 12:05:05}|C   |5    |
|{2023-02-08 12:04:55, 2023-02-08 12:05:05}|A   |9    |
|{2023-02-08 12:05:00, 2023-02-08 12:05:10}|C   |5    |
+------------------------------------------+----+-----+
only showing top 3 rows



In [53]:
#stopping the streaming of results 
result_sink.stop()

### 2.10 average weekly sales predictions of different types of stores and write the stream back to Kafka sink using a different topic name

The data you sended should be like this:

|  key   | value  |
|  ----  | ----  |
| timestamp of window start | JSON of store type and avg sales |
| '1673233646'  | '{"Type":"A","predict_weekly_sales":20000}' |

In [54]:
#creating a new dataframe that shows the average predictions from the predictions_df
#abandoning the data received after 3 seconds using watermark
#showing the grouped data with window duration 10 seconds and slide duration 5 seconds

stream_df = predictions_df.withWatermark("ts", "3 seconds") \
.groupBy(F.window("ts", windowDuration = "10 seconds", slideDuration = "5 seconds"), "Type") \
.agg(avg("prediction").alias("predict_weekly_sales"))

#creating a json format column called value that holds the type and predicted weekly sales
#selecting the start values from window function and converting it into unix timestamp 
#and naming the column as "key"
kafka_df = stream_df.withColumn("value", to_json(struct("Type", "predict_weekly_sales"))) \
            .withColumn("key", F.unix_timestamp(stream_df.window.start)) \
            .select("key", "value")

In [55]:
#writing the stream back to Kafka sink using a different topic name - pyspark stream
kafka_sink = kafka_df.selectExpr("CAST(key AS STRING) AS key", "CAST(value AS STRING) AS value") \
            .writeStream.format("kafka") \
            .option("kafka.bootstrap.servers", f"{hostip}:9092") \
            .option("topic", "pyspark_stream") \
            .option("startingOffsets", "latest") \
            .option("checkpointLocation", "pyspark_stream_checkpoint") \
            .start()

In [56]:
#stopping the kafka stream sink
kafka_sink.stop()


##### For cleaning up the quries and files

In [57]:
import shutil
for i in ["persisted_parquet_data", "predictions_parquet_data", "pyspark_stream_checkpoint", "spark_checkpoint"]:
    try:
        shutil.rmtree(i)
    except Exception as e:
        print(e)

## References
- Tutorials week- 9,10,11

- afsarafsar                    6911 silver badge99 bronze badges (1964) How to convert spark streaming nested JSON coming on Kafka to flat dataframe?, Stack Overflow. Available at: https://stackoverflow.com/questions/46204750/how-to-convert-spark-streaming-nested-json-coming-on-kafka-to-flat-dataframe (Accessed: February 8, 2023). 

- Kukreja, M. (2020) Track real-time gold prices using Apache Kafka, Pandas &amp; matplotlib, Medium. Towards Data Science. Available at: https://towardsdatascience.com/track-real-time-gold-prices-using-apache-kafka-pandas-matplotlib-122a73728a88 (Accessed: February 8, 2023). 

- Structured Streaming + kafka integration guide (kafka broker version 0.10.0 or higher) (no date) Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) - Spark 3.3.1 Documentation. Available at: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html (Accessed: February 8, 2023). 

- Structured Streaming Programming Guide (no date) Structured Streaming Programming Guide - Spark 3.3.1 Documentation. Available at: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html (Accessed: February 8, 2023). 

