# 1Predicting sales data using Spark Streaming

### 2.1 Create SparkSession


In [69]:
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import col, decode, expr, window
from pyspark.sql.functions import split
from pyspark.sql.functions import explode
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql import SparkSession  # Spark SQL
import os
import json
from json import dumps
from kafka3 import KafkaProducer

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'

# the below setup will run Spark in local mode with * working processors(equal to logical cores on the machine)
master = "local[4]"

# Setup `appName` field to be displayed at Spark cluster UI page
app_name = "FIT5202 Assignment 2b"
# Setup configuration parameters for Spark
spark_conf = (SparkConf().setMaster(master).set(
    "spark.sql.streaming.checkpointLocation",
    'data/streaming_checkpoint')
    .setAppName(app_name)
)

# Setup SparkSession and configure it with Melbourne timezone.
spark = (
    SparkSession
    .builder
    .config(conf=spark_conf)
    .getOrCreate()
)

### 2.2 Define schema and load file



In [70]:
# provide a schema to the valuees
produce_data_labels = [
    ("Store", StringType()),
    ("Date", StringType()),
    ("Temperature", StringType()),
    ("Fuel_Price", StringType()),
    ("MarkDown1", StringType()),
    ("MarkDown2", StringType()),
    ("MarkDown3", StringType()),
    ("MarkDown4", StringType()),
    ("MarkDown5", StringType()),
    ("CPI", StringType()),
    ("Unemployment", StringType()),
    ("IsHoliday", StringType()),
    ("last_weekly_sales", StringType()),
    ("ts", LongType())
]

# features schema
produce_data_schema = ArrayType(StructType(
    [StructField(x[0], x[1], True) for x in produce_data_labels]))

### 2.3 Injest Kafka data

In [71]:
# setup
hostip = "192.168.8.133"  # change me
topic = 'assignment2b'

# read df
df = (spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", f'{hostip}:9092')
      .option("subscribe", topic)
      .option("dateFormat", "d/M/y")
      .load()
      # re-hydrate binary
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .select(F.from_json(F.col("value").cast("string"),
                          produce_data_schema).alias('parsed_value'))  # parse json
      # un-nest columns
      .select(F.explode(F.col("parsed_value")).alias('unnested_value'))
      )

# format dataframe and cast into proper data types
df_formatted = df.select(
    F.col("unnested_value.Store").alias("Store"),
    F.col("unnested_value.Date").alias("Date"),
    F.col("unnested_value.Temperature").alias("Temperature"),
    F.col("unnested_value.Fuel_Price").alias("Fuel_Price"),
    F.col("unnested_value.MarkDown1").alias("MarkDown1"),
    F.col("unnested_value.MarkDown2").alias("MarkDown2"),
    F.col("unnested_value.MarkDown3").alias("MarkDown3"),
    F.col("unnested_value.MarkDown4").alias("MarkDown4"),
    F.col("unnested_value.MarkDown5").alias("MarkDown5"),
    F.col("unnested_value.CPI").alias("CPI"),
    F.col("unnested_value.Unemployment").alias("Unemployment"),
    F.col("unnested_value.IsHoliday").alias("IsHoliday"),
    F.col("unnested_value.last_weekly_sales").alias("last_weekly_sales"),
    F.col("unnested_value.ts").alias("ts"),
)

### 2.4 Persist raw data


In [72]:
def foreach_batch_function(df, epoch_id):
    df.show(5, False)

In [73]:
# final dataframe
query_parquet = (df_formatted
                 .writeStream
                 .format("parquet")
                 .option("path", "data/parquet_output")
                 .option("checkpointLocation", "data/parquet_output/checkpoint")
                 .foreachBatch(foreach_batch_function)
                 .start()
                 )

+-----+----------+-----------+----------+---------+---------+---------+---------+---------+---------+------------+---------+------------------+----------+
|Store|Date      |Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|CPI      |Unemployment|IsHoliday|last_weekly_sales |ts        |
+-----+----------+-----------+----------+---------+---------+---------+---------+---------+---------+------------+---------+------------------+----------+
|17   |2011-01-21|26.62      |2.934     |nan      |nan      |nan      |nan      |nan      |127.44048|6.866       |false    |758510.348657608  |1675676397|
|28   |2011-01-21|53.53      |3.223     |nan      |nan      |nan      |nan      |nan      |127.44048|14.021      |false    |1098286.6037750244|1675676397|
|11   |2011-01-21|51.51      |3.016     |nan      |nan      |nan      |nan      |nan      |215.12683|7.551       |false    |1194449.7854175568|1675676397|
|45   |2011-01-21|30.55      |3.229     |nan      |nan      |nan      

In [74]:
# stop query
query_parquet.stop()

### 2.6 Prepare feature columns


In [75]:
# format dataframe and cast into proper data types
df_final = (df_formatted
            # cast data types
            .select(
                F.col("Store").cast(IntegerType()),
                F.col("Date").cast(DateType()),
                F.col("Temperature").cast(FloatType()),
                F.col("Fuel_Price").cast(FloatType()),
                F.col("MarkDown1").cast(FloatType()),
                F.col("MarkDown2").cast(FloatType()),
                F.col("MarkDown3").cast(FloatType()),
                F.col("MarkDown4").cast(FloatType()),
                F.col("MarkDown5").cast(FloatType()),
                F.col("CPI").cast(FloatType()),
                F.col("Unemployment").cast(FloatType()),
                F.col("IsHoliday").cast(IntegerType()),
                F.col("last_weekly_sales").cast(FloatType()),
                F.col("ts").cast(TimestampType()))
            # create new columns
            .withColumn("Month", F.month("Date"))
            .withColumn("day_of_month", F.dayofmonth("Date"))
            .withColumn("day_of_year", F.dayofyear("Date"))
            .withColumn("week_of_year", F.weekofyear("Date"))
            )

### 2.7 Join the local data


In [76]:
# read stores dataset

# stores data type
stores_labels = [
    ("Store", IntegerType()),
    ("Type", StringType()),
    ("Size", IntegerType()),
]
# stores schema
stores_schema = StructType([StructField(x[0], x[1], True)
                           for x in stores_labels])

# load stores df
df_stores = (
    spark.read.format("csv")
    .option("header", True)
    .option("encoding", "UTF-8")
    .load("data/stores.csv", schema=stores_schema)
)

df_joined = (df_final
             .join(df_stores, df_final.Store == df_stores.Store, how="left")
             .drop(df_final.Store)
             )

### 2.8 Perform predictions


In [77]:
# load libraries
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel

In [78]:
# load persisted model
pipelineModel = PipelineModel.load('sales_estimation_pipeline_model')

In [79]:
# create predictions df
predictions_df = pipelineModel.transform(df_joined)

### 2.9 write code to process the data following requirements

In [15]:
# code to create a grouped query
query_29 = (predictions_df
            # transformations
            .withColumn("achieve_goal", col('last_weekly_sales')/col('Size'))
            .filter(col("achieve_goal") > 8.5)
            .withWatermark("ts", "3 seconds")
            .groupby(window(col("ts"),
                            windowDuration="10 seconds",
                            slideDuration="5 seconds"),
                     col("Type"))
            .agg(F.count("Store").alias("count"))
            # query parameters
            .writeStream
            .outputMode("update")
            .format("console")
            .option("truncate", False)
            # drop empty rows
            .option("dropEmptyRows", "true")
            # send the above dataframe to console every 5 seconds
            .trigger(processingTime='5 seconds')
            .start()
            )

In [16]:
query_29.stop()

In [None]:
# potentially write a foreach that filters out and only leave the last 3

### 2.10 average weekly sales predictions of different types of stores and write the stream back to Kafka sink using a different topic name

The data you sended should be like this:

|  key   | value  |
|  ----  | ----  |
| timestamp of window start | JSON of store type and avg sales |
| '1673233646'  | '{"Type":"A","predict_weekly_sales":20000}' |

In [92]:
# Define the function to process the data and write it back to the Kafka topic
def process_batch2(df,epoch_id):
    # Convert the processed data to a dict and serialize it as a string
    data = [{"key": row.key, "value": row.value} for row in df.collect()]
    serialized_data = json.dumps(data)
    # Write the serialized data back to the Kafka topic
    producer = KafkaProducer(
        bootstrap_servers=[f'{hostip}:9092'],
        value_serializer=lambda x: x.encode("ascii"),
    )
    producer.send("test_out", value=serialized_data)
    producer.flush()

In [93]:
# Define the function to process the data and write it back to the Kafka topic
def process_batch3(df,epoch_id):
    # Convert the processed data to a dict and serialize it as a string
    data = [{"key": row.key, "value": row.value} for row in df]
    serialized_data = json.dumps(data)
    # Write the serialized data back to the Kafka topic
    producer = KafkaProducer(
        bootstrap_servers=[f'{hostip}:9092'],
        value_serializer=lambda x: x.encode("ascii"),
    )
    producer.send("test_out", value=serialized_data)
    producer.flush()

In [94]:
# code to create a grouped query
# test
streaming_df = (predictions_df
                # transformations
                .withWatermark("ts", "3 seconds")
                .groupby(window(col("ts"),
                                windowDuration="10 seconds",
                                slideDuration="5 seconds"),
                         col("Type"))
                .agg(F.mean("prediction").alias("predict_weekly_sales"))
                .withColumn("key", F.unix_timestamp("window.start"))
                .withColumn("value", F.to_json(F.struct("Type", "predict_weekly_sales")))
                .select("key", "value")
                .selectExpr(
                    "cast(key as string) as key",
                    "cast(value as string) as value")
                )

In [95]:
streamingQuery = (streaming_df
                  .writeStream
                  .foreachBatch(process_batch3)
                  .start())

In [97]:
streamingQuery.stop()

In [98]:
streamingQuery.status

{'message': 'Terminated with exception: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):\n  File "/opt/conda/lib/python3.8/site-packages/py4j/clientserver.py", line 617, in _call_proxy\n    return_value = getattr(self.pool[obj_id], method)(*params)\n  File "/opt/conda/lib/python3.8/site-packages/pyspark/sql/utils.py", line 272, in call\n    raise e\n  File "/opt/conda/lib/python3.8/site-packages/pyspark/sql/utils.py", line 269, in call\n    self.func(DataFrame(jdf, self.session), batch_id)\n  File "/tmp/ipykernel_101/3418698970.py", line 5, in process_batch3\n    serialized_data = json.dumps(data)\n  File "/opt/conda/lib/python3.8/json/__init__.py", line 231, in dumps\n    return _default_encoder.encode(obj)\n  File "/opt/conda/lib/python3.8/json/encoder.py", line 199, in encode\n    chunks = self.iterencode(o, _one_shot=True)\n  File "/opt/conda/lib/python3.8/json/encoder.py", line 257, in iterencode\n    return _iterencode(o, 0)\n  File 