# Streaming application using Spark Structured Streaming

### 1. Write code to create a SparkSession, which uses four cores with a proper application name, use the Melbourne timezone, and make sure a checkpoint location has been set.


In [1]:
from pyspark import SparkConf
from pyspark import SparkContext # Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql import functions as F
from pyspark.sql.types import *
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'

spark = SparkSession.builder \
    .appName("A2B_Task2") \
    .config("spark.master", "local[4]")\
    .config("spark.sql.session.timeZone", "Australia/Melbourne")\
    .getOrCreate()

### 2. Similar to assignment 2A, write code to define the data schema for the data files, following the data types suggested in the metadata file. Load the static datasets (e.g. customer, product, category) into data frames. 



In [2]:
from pyspark.sql.types import StructType
from pyspark.sql.types import StringType, StructField,DateType, DoubleType, IntegerType, TimestampType

# static datasets
cat_schema = StructType([
    StructField("#", StringType(), True),
    # uniquie identifier - not nominal 
    StructField("category_id", StringType(), True),
    
    # hierachical level but the number will be not used to do maths operations
    StructField("cat_level1", StringType(), True),
    StructField("cat_level2", StringType(), True),
    StructField("cat_level3", StringType(), True)
])

cust_schema = StructType([
    StructField("#", StringType(), True),
     StructField("customer_id", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("username", StringType(), True),
    StructField("email", StringType(), True),
    StructField("gender", StringType(), True),
    # date - dob of the customer
    StructField("birthdate", DateType(), True),
    StructField("device_type", StringType(), True),
    StructField("device_id", StringType(), True),
    StructField("device_version", StringType(), True),
    # double for long- and lat-itude
    StructField("home_location_lat", DoubleType(), True),
    StructField("home_location_long", DoubleType(), True),
    StructField("home_location", StringType(), True),
    StructField("home_country", StringType(), True),
    # date - date when the customer first joined or registered
    StructField("first_join_date", DateType(), True) 
])

prod_schema = StructType([
    StructField("#", StringType(), True),
    StructField("id", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("baseColour", StringType(), True),
    StructField("season", StringType(), True),
    # year - product was introduced or became available. used for sorting or filtering 
    StructField("year", IntegerType(), True),
    StructField("usage", StringType(), True),
    StructField("productDisplayName", StringType(), True),
    StructField("category_id", StringType(), True)
])


In [3]:
# category
cat_df = spark.read.csv('a2a_dataset/category.csv',
                        header=True,schema = cat_schema)
cat_df = cat_df.drop(cat_df[0])
print(f"####### cat_df schema:")
print(f"Number of partitions: {cat_df.rdd.getNumPartitions()}")
cat_df.printSchema()


# customer
cust_df = spark.read.csv('a2a_dataset/customer.csv',
                         header = True, schema = cust_schema)
cust_df = cust_df.drop(cust_df[0])
print(f"####### cust_df schema:")
print(f"Number of partitions: {cust_df.rdd.getNumPartitions()}")
cust_df.printSchema()

# product
prod_df = spark.read.csv('a2a_dataset/product.csv', 
                         header=True, schema = prod_schema)
prod_df = prod_df.drop(prod_df[0])
print(f"####### prod_df schema:")
print(f"Number of partitions: {prod_df.rdd.getNumPartitions()}")
prod_df.printSchema()



####### cat_df schema:
Number of partitions: 1
root
 |-- category_id: string (nullable = true)
 |-- cat_level1: string (nullable = true)
 |-- cat_level2: string (nullable = true)
 |-- cat_level3: string (nullable = true)

####### cust_df schema:
Number of partitions: 4
root
 |-- customer_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- username: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- birthdate: date (nullable = true)
 |-- device_type: string (nullable = true)
 |-- device_id: string (nullable = true)
 |-- device_version: string (nullable = true)
 |-- home_location_lat: double (nullable = true)
 |-- home_location_long: double (nullable = true)
 |-- home_location: string (nullable = true)
 |-- home_country: string (nullable = true)
 |-- first_join_date: date (nullable = true)

####### prod_df schema:
Number of partitions: 1
root
 |-- id: string (nullable = 

In [4]:
#     StructField("#", StringType(), True),
#     StructField("traffic_source", StringType(), True),
#     StructField("event_metadata", StringType(), True),
# clickstream_schema
click_schema = StructType([
    StructField("session_id", StringType(), True),
    StructField("event_name", StringType(), True),
    StructField("event_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("event_metadata", StringType(), True),
    StructField("ts", IntegerType(), True)
])


### 3 Using the Kafka topic from the producer in Task 1, ingest the streaming data into Spark Streaming, assuming all data comes in the String format. Except for the 'ts' column, you shall receive it as an Int type.




In [5]:
# topic from Task1
topic = 'clickstream_realtime'

#configuration
hostip = "118.139.61.182" #change me


# connect to producer 
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", f'{hostip}:9092') \
    .option("startingOffsets", "latest")\
    .option("subscribe", topic) \
    .load()

df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



### 4 Then, the streaming data format should be transformed into the proper formats following the metadata file schema, similar to assignment 2A.  
Perform the following tasks:  
a) For the 'ts' column, convert it to the timestamp format, we will use it as event_time.  
b) If the data is late for more than 1 minute, discard it.  



In [6]:
df = df.select(F.from_json(F.col("value").cast("string"), 
                           click_schema).alias('parsed'))

df.printSchema()


root
 |-- parsed: struct (nullable = true)
 |    |-- session_id: string (nullable = true)
 |    |-- event_name: string (nullable = true)
 |    |-- event_id: string (nullable = true)
 |    |-- customer_id: string (nullable = true)
 |    |-- event_metadata: string (nullable = true)
 |    |-- ts: integer (nullable = true)



In [7]:
# #,session_id,event_name,event_id,traffic_source,event_metadata,customer_id
#                     F.col("parsed_value.traffic_source").alias("traffic_source"),
#                     F.col("parsed_value.event_metadata").alias("event_metadata"),
df_formatted = df.select(
                    F.col("parsed.session_id").alias("session_id"),
                    F.col("parsed.event_name").alias("event_name"),
                    F.col("parsed.event_id").alias("event_id"),
                    F.col("parsed.customer_id").alias("customer_id"),
                    F.col("parsed.event_metadata").alias("event_metadata"),
                    F.col("parsed.ts").cast("timestamp").alias("event_time")
                ).withWatermark("event_time", "2 minute")
df_formatted.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- event_time: timestamp (nullable = true)



### 5  
Aggregate the streaming data frame by session id and create features you used in your assignment 2A model. (note: customer ID has already been included in the stream.)   
Then, join the static data frames with the streaming data frame as our final data for prediction.  
Perform data type/column conversion according to your ML model, and print out the Schema.


selected_df in A2A  
root  
 |-- highval%: double (nullable = true)  
 |-- num_cat_midvalue: long (nullable = true)  
 |-- season: string (nullable = true)  
 |-- gender: string (nullable = true)  
 |-- device_type: string (nullable = true)  
 |-- is_promotion: boolean (nullable = true)  
 |-- label: integer (nullable = false)  
 |-- months_since_joining: double (nullable = true)  


is_promotion is defined by event_name == "ADD_PROMO"  

Features relevant to click_stream data: highval%, num_cat_midvalue, is_promotion

In [8]:
df_joined = df_formatted\
    .join(cust_df,'customer_id','inner')\
    .select('session_id','customer_id', 'event_name','event_time','event_metadata','gender','device_type','first_join_date')\
    .withWatermark("event_time", "1 minute")
df_joined.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- first_join_date: date (nullable = true)



In [9]:
from pyspark.sql.functions import udf,col

# month to season
def season(month):
    if month in [3, 4, 5]:
        return"Spring"
    elif month in [6, 7, 8]:
        return "Summer"
    elif month in [9,10,11]:
        return "Autumn"
    else:
        return "Winter"

season_udf = udf(season, StringType())

In [10]:
# Extract the month of each event time with aliases
month_df = df_joined\
    .withColumn("event_month", F.month(df_joined["event_time"])) \
    .withColumn("season", season_udf(col("event_month"))) \
    .drop("event_month")
month_df.printSchema()


root
 |-- session_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- first_join_date: date (nullable = true)
 |-- season: string (nullable = true)



In [11]:
month_df_query= month_df \
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("month_df") \
    .trigger(processingTime='5 seconds') \
    .start()

In [13]:
spark.sql("select * from month_df").show()

+--------------------+-----------+-----------+-------------------+--------------------+------+-----------+---------------+------+
|          session_id|customer_id| event_name|         event_time|      event_metadata|gender|device_type|first_join_date|season|
+--------------------+-----------+-----------+-------------------+--------------------+------+-----------+---------------+------+
|7aba64ce-8f48-491...|      76356|      CLICK|2023-10-08 18:23:33|                    |     F|    Android|     2021-07-03|Autumn|
|7aba64ce-8f48-491...|      76356|     SCROLL|2023-10-08 18:23:33|                    |     F|    Android|     2021-07-03|Autumn|
|7aba64ce-8f48-491...|      76356|     SEARCH|2023-10-08 18:23:33|{'search_keywords...|     F|    Android|     2021-07-03|Autumn|
|7aba64ce-8f48-491...|      76356|      CLICK|2023-10-08 18:23:33|                    |     F|    Android|     2021-07-03|Autumn|
|7aba64ce-8f48-491...|      76356|      CLICK|2023-10-08 18:23:33|                    |   

In [28]:
month_df_query.stop()

In [14]:
from pyspark.sql.functions import window ,when

# grouped by session_id, and window
# considering windown time: 1min
# w/o watermark: not coming late
categories_df = df_formatted \
    .groupBy("session_id", window(df_formatted.event_time, "1 minute"))\
    .agg(
        F.sum(when(df_formatted["event_name"].isin(['ADD_PROMO', 'ADD_TO_CART']), 1).otherwise(0)).alias("num_cat_highvalue"),
        F.sum(when(df_formatted["event_name"].isin(['VIEW_PROMO', 'VIEW_ITEM', 'SEARCH']), 1).otherwise(0)).alias("num_cat_midvalue"),
        F.sum(when(df_formatted["event_name"].isin(['SCROLL', 'HOMEPAGE', 'CLICK']), 1).otherwise(0)).alias("num_cat_lowvalue"),
    )


# Show the schema of categories_df
categories_df.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- num_cat_highvalue: long (nullable = true)
 |-- num_cat_midvalue: long (nullable = true)
 |-- num_cat_lowvalue: long (nullable = true)



In [15]:
promotion_df = df_formatted \
    .groupBy("session_id", window(df_formatted.event_time, "1 minute"))\
    .agg(
        (F.max(F.when(F.col("event_name") == "ADD_PROMO", 1).otherwise(0)) == 1).alias("is_promotion")
)
promotion_df.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- is_promotion: boolean (nullable = true)



In [16]:
month_df.printSchema()
promotion_df.printSchema()
month_df.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- first_join_date: date (nullable = true)
 |-- season: string (nullable = true)

root
 |-- session_id: string (nullable = true)
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- is_promotion: boolean (nullable = true)

root
 |-- session_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- first_join_date: date (nullable = true)
 |-- season: string (nullable = true)



In [17]:
df_joined = month_df\
.select("session_id", "customer_id","event_name","event_time","event_metadata",
        "gender","device_type","first_join_date","season")\
.join(categories_df,'session_id','inner')\
.join(promotion_df.select("session_id", "window", "is_promotion"),'session_id','inner')

df_joined.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- first_join_date: date (nullable = true)
 |-- season: string (nullable = true)
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- num_cat_highvalue: long (nullable = true)
 |-- num_cat_midvalue: long (nullable = true)
 |-- num_cat_lowvalue: long (nullable = true)
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- is_promotion: boolean (nullable = true)



In [18]:
from pyspark.sql.functions import round
total_actions = df_joined.num_cat_highvalue + df_joined.num_cat_midvalue + df_joined.num_cat_lowvalue

ratio_df = df_joined.withColumn('highval%', 
                                round(df_joined.num_cat_highvalue / total_actions*100,2))
ratio_df.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- first_join_date: date (nullable = true)
 |-- season: string (nullable = true)
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- num_cat_highvalue: long (nullable = true)
 |-- num_cat_midvalue: long (nullable = true)
 |-- num_cat_lowvalue: long (nullable = true)
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- is_promotion: boolean (nullable = true)
 |-- highval%: double (nullable = true)



In [19]:
from pyspark.sql.functions import months_between, current_date

feature_df = ratio_df.withColumn(
    "months_since_joining",
    months_between(current_date(), "first_join_date")).drop("first_join_date")\
    .select("session_id", "customer_id",'event_name',"event_time",'event_metadata',"gender", "device_type", "season", "num_cat_midvalue", 
            "is_promotion","highval%","months_since_joining")
feature_df.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- season: string (nullable = true)
 |-- num_cat_midvalue: long (nullable = true)
 |-- is_promotion: boolean (nullable = true)
 |-- highval%: double (nullable = true)
 |-- months_since_joining: double (nullable = true)



In [20]:
feature_df_query= feature_df \
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("feature_df") \
    .trigger(processingTime='5 seconds') \
    .start()

In [33]:
spark.sql("select * from feature_df").show(truncate=False)

+------------------------------------+-----------+-----------+-------------------+----------------------------------------------------------+------+-----------+------+----------------+------------+--------+--------------------+
|session_id                          |customer_id|event_name |event_time         |event_metadata                                            |gender|device_type|season|num_cat_midvalue|is_promotion|highval%|months_since_joining|
+------------------------------------+-----------+-----------+-------------------+----------------------------------------------------------+------+-----------+------+----------------+------------+--------+--------------------+
|803c7826-ff62-4c82-aee1-9039686efba7|48765      |CLICK      |2023-10-08 18:24:57|                                                          |F     |Android    |Autumn|1               |false       |0.0     |63.22580645         |
|803c7826-ff62-4c82-aee1-9039686efba7|48765      |SEARCH     |2023-10-08 18:24:57|{'sear

In [34]:
feature_df_query.stop()

In [24]:
feature_df.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- season: string (nullable = true)
 |-- num_cat_midvalue: long (nullable = true)
 |-- is_promotion: boolean (nullable = true)
 |-- highval%: double (nullable = true)
 |-- months_since_joining: double (nullable = true)



Above feature_df match with the targeted schema for feature_df:

root  
|-- highval%: double (nullable = true)  
|-- num_cat_midvalue: long (nullable = true)  
|-- season: string (nullable = true)  
|-- gender: string (nullable = true)  
|-- device_type: string (nullable = true)  
|-- is_promotion: boolean (nullable = true)  
|-- label: integer (nullable = false)  
|-- months_since_joining: double (nullable = true)  



### 6 Load your ML model, and use the model to predict if each session will purchase according to the requirements below:
a) Every 10 seconds, show the total number of potential sales transactions (prediction = 1) in the last 1 minute.   
b) Every 30 seconds, show the total potential revenue in the last 30 seconds. “Potiential revenue” here is definded as: When prediction=1, extract customer shopping cart detail from metadata (sum of all items of ADD_TO_CART events).  
c) Every 1 minute, show the top 10 best-selling products by total quantity. (note: No historical data is required, only the top 10 in each 1 minute window.)  


In [32]:
#Loading the Pipeline Model From the filesystem
from pyspark.ml import PipelineModel
pipelineModel = PipelineModel.load('asg2a_prediction_model')
print(pipelineModel.stages[-1]._java_obj.paramMap())

{
	GBTClassifier_37f2b33bc82e-featuresCol: features,
	GBTClassifier_37f2b33bc82e-labelCol: label
}


In [35]:
# 6a
## Fit the pipeline to new data
# gbt
gbt_predictions = pipelineModel.transform(feature_df)

In [141]:
gbt_predictions_query = gbt_predictions \
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("gbt_predictions")\
    .trigger(processingTime="5 seconds") \
    .start()

In [174]:
spark.sql("select * from gbt_predictions").show(truncate=False)

+------------------------------------+-----------+-----------+-------------------+----------------------------------------------------------+------+-----------+------+----------------+------------+--------+--------------------+------------+--------------+------------+--------------+-----------------+-------------------+-----------------------------------+------------------------------------------+-----------------------------------------+----------+
|session_id                          |customer_id|event_name |event_time         |event_metadata                                            |gender|device_type|season|num_cat_midvalue|is_promotion|highval%|months_since_joining|season_index|season_encoded|gender_index|gender_encoded|device_type_index|device_type_encoded|features                           |rawPrediction                             |probability                              |prediction|
+------------------------------------+-----------+-----------+-------------------+----------

In [59]:
gbt_predictions_query.stop()

In [68]:
tt_sales= gbt_predictions\
.groupBy(F.col('prediction'), window("event_time", "1 minute"))\
.agg(
    F.count(F.when(gbt_predictions.prediction == 1, 1).otherwise(0)).alias("Tt_sales")
    )

In [69]:
tt_sales_query = tt_sales \
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("tt_sales")\
    .trigger(processingTime="10 seconds") \
    .start()

In [87]:
spark.sql("select * from tt_sales").show(truncate=False)

+----------+------------------------------------------+--------+
|prediction|window                                    |Tt_sales|
+----------+------------------------------------------+--------+
|0.0       |{2023-10-08 18:44:00, 2023-10-08 18:45:00}|947     |
|1.0       |{2023-10-08 18:44:00, 2023-10-08 18:45:00}|3413    |
|0.0       |{2023-10-08 18:46:00, 2023-10-08 18:47:00}|1715    |
|1.0       |{2023-10-08 18:45:00, 2023-10-08 18:46:00}|3219    |
|1.0       |{2023-10-08 18:46:00, 2023-10-08 18:47:00}|6399    |
|0.0       |{2023-10-08 18:45:00, 2023-10-08 18:46:00}|1013    |
+----------+------------------------------------------+--------+



In [90]:
tt_sales_query.stop()

In [75]:
# 6b
gbt_predictions.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- season: string (nullable = true)
 |-- num_cat_midvalue: long (nullable = true)
 |-- is_promotion: boolean (nullable = true)
 |-- highval%: double (nullable = true)
 |-- months_since_joining: double (nullable = true)
 |-- season_index: double (nullable = false)
 |-- season_encoded: vector (nullable = true)
 |-- gender_index: double (nullable = false)
 |-- gender_encoded: vector (nullable = true)
 |-- device_type_index: double (nullable = false)
 |-- device_type_encoded: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [73]:
from pyspark.sql.functions import from_json
# metadata is read as a string
# to extract item_price to cal rev -- parse the struct
metadata_schema = StructType([
    StructField("product_id", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("item_price", DoubleType(), True)
])

# condition: prediction=1 & add_to_cart
# potential rev = qty * item_price
potential_revenue = gbt_predictions \
    .filter((F.col('prediction') == 1) & (F.col('event_name') == 'ADD_TO_CART')) \
    .withColumn("metadata_struct", from_json(F.col("event_metadata"), metadata_schema)) \
    .groupBy(window(F.col('event_time'), '30 seconds')) \
    .agg(
        F.sum(F.when(F.col('prediction') == 1,
                     F.col("metadata_struct.item_price")* F.col("metadata_struct.quantity")).otherwise(0)).alias("potential_revenue")
    )

In [186]:
potential_revenue_query = potential_revenue \
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("potential_revenue")\
    .trigger(processingTime="10 seconds") \
    .start()

In [191]:
spark.sql('select * from potential_revenue').show(truncate=False)

+------------------------------------------+-----------------+
|window                                    |potential_revenue|
+------------------------------------------+-----------------+
|{2023-10-08 20:38:30, 2023-10-08 20:39:00}|1.04399511E8     |
|{2023-10-08 20:38:00, 2023-10-08 20:38:30}|7.0977936E7      |
|{2023-10-08 20:39:30, 2023-10-08 20:40:00}|2.28494711E8     |
|{2023-10-08 20:39:00, 2023-10-08 20:39:30}|4.8130296E7      |
+------------------------------------------+-----------------+



In [91]:
potential_revenue_query.stop()

In [88]:
# 6c
from pyspark.sql.functions import from_json,rank
from pyspark.sql.functions import pandas_udf, PandasUDFType

# metadata is read as a string
# to extract item_price to cal rev -- parse the struct
metadata_schema = StructType([
    StructField("product_id", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("item_price", DoubleType(), True)
])

# cond: prediction of purchase = 1
# gp by time window and prod_id
# sum over the qty to find tt_qty
top_10 = gbt_predictions \
    .filter(F.col('prediction') == 1) \
    .withColumn("metadata_struct", from_json(F.col("event_metadata"), metadata_schema)) \
    .groupBy(
            window(F.col('event_time'), "1 minute"), 
            F.col("metadata_struct.product_id").alias("product_id")
) \
    .agg(F.sum(F.col("metadata_struct.quantity")).alias("tt_quantity"))


In [89]:
top_10_query = top_10 \
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("top_10")\
    .trigger(processingTime="1 minute")\
    .start()

In [95]:
spark.sql("select product_id, tt_quantity from top_10 order by tt_quantity desc limit 10").show(truncate=False)

+----------+-----------+
|product_id|tt_quantity|
+----------+-----------+
|4889      |36         |
|39618     |20         |
|9772      |20         |
|47088     |17         |
|16496     |16         |
|48094     |16         |
|48038     |14         |
|25062     |13         |
|52142     |13         |
|1920      |12         |
+----------+-----------+



In [96]:
top_10_query.stop()

### 7  
a) Persist the prediction result along with cart metadata in parquet format; after that, read the parquet file and show the results to verify it is saved properly.  
b) Persist the 30-second sales prediction in another parquet file.

In [44]:
gbt_predictions.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- season: string (nullable = true)
 |-- num_cat_midvalue: long (nullable = true)
 |-- is_promotion: boolean (nullable = true)
 |-- highval%: double (nullable = true)
 |-- months_since_joining: double (nullable = true)
 |-- season_index: double (nullable = false)
 |-- season_encoded: vector (nullable = true)
 |-- gender_index: double (nullable = false)
 |-- gender_encoded: vector (nullable = true)
 |-- device_type_index: double (nullable = false)
 |-- device_type_encoded: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [123]:
pred_save = gbt_predictions.select('session_id','customer_id','event_name','event_time',
                                  'event_metadata', 'gender', 'device_type',
                                   'season', 'num_cat_midvalue', 'is_promotion',
                                   'highval%', 'months_since_joining','prediction')
pred_save.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- season: string (nullable = true)
 |-- num_cat_midvalue: long (nullable = true)
 |-- is_promotion: boolean (nullable = true)
 |-- highval%: double (nullable = true)
 |-- months_since_joining: double (nullable = true)
 |-- prediction: double (nullable = false)



In [220]:
# 7a
parquet_pred = pred_save\
        .writeStream\
        .format("parquet")\
        .outputMode("append")\
        .option("path", "parquet/clickstream_df/prediction")\
        .option("checkpointLocation", "parquet/clickstream_df/prediction/checkpoint")\
        .start()


In [221]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BooleanType, DoubleType, LongType
parquet_pred_schema = StructType([
    StructField("session_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("event_name", StringType(), True),
    StructField("event_time", TimestampType(), True),
    StructField("event_metadata", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("device_type", StringType(), True),
    StructField("season", StringType(), True),
    StructField("num_cat_midvalue", LongType(), True),
    StructField("is_promotion", BooleanType(), True),
    StructField("highval%", DoubleType(), True),
    StructField("months_since_joining", DoubleType(), True),
    StructField("prediction", DoubleType(), False)
])


# Read the saved parquet data
parquet_pred = spark.readStream.schema(parquet_pred_schema).parquet("parquet/clickstream_df/prediction")
parquet_pred.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- season: string (nullable = true)
 |-- num_cat_midvalue: long (nullable = true)
 |-- is_promotion: boolean (nullable = true)
 |-- highval%: double (nullable = true)
 |-- months_since_joining: double (nullable = true)
 |-- prediction: double (nullable = true)



In [177]:
parquet_pred.isStreaming

True

In [222]:
parquet_pred_query = parquet_pred\
    .writeStream\
    .queryName("parquet_pred_df")\
    .outputMode("append")\
    .format("memory")\
    .trigger(processingTime="10 seconds")\
    .start()


In [282]:
spark.sql("SELECT * FROM parquet_pred_df ORDER BY event_time DESC").show(truncate=False)

+------------------------------------+-----------+-----------+-------------------+----------------------------------------------------------+------+-----------+------+----------------+------------+--------+--------------------+----------+
|session_id                          |customer_id|event_name |event_time         |event_metadata                                            |gender|device_type|season|num_cat_midvalue|is_promotion|highval%|months_since_joining|prediction|
+------------------------------------+-----------+-----------+-------------------+----------------------------------------------------------+------+-----------+------+----------------+------------+--------+--------------------+----------+
|608fdb2c-20f6-498d-bc47-d373b31f4a01|78079      |SCROLL     |2023-10-08 19:57:02|                                                          |M     |Android    |Autumn|1               |false       |25.0    |40.06451613         |0.0       |
|605f8bdd-2976-44d5-a62e-28a1f174f3fe|26248 

In [219]:
parquet_pred_query.stop()

In [102]:
# 7b
potential_revenue.printSchema()

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- potential_revenue: double (nullable = true)



In [192]:
sales_saved = potential_revenue\
        .writeStream\
        .format("parquet")\
        .outputMode("append")\
        .option("path", "parquet/clickstream_df/sales")\
        .option("checkpointLocation", "parquet/clickstream_df/sales/checkpoint")\
        .start()


In [296]:
from pyspark.sql.types import StructType, StructField, TimestampType, DoubleType

sales_parquet_schema = StructType([
    StructField("window", StructType([
        StructField("start", TimestampType(), nullable=True),
        StructField("end", TimestampType(), nullable=True)
    ]), nullable=False),
    StructField("potential_revenue", DoubleType(), nullable=True)
])


# Read the saved parquet data
sales_parquet= spark.readStream.schema(sales_parquet_schema).parquet("parquet/clickstream_df/sales")
sales_parquet.printSchema()


root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- potential_revenue: double (nullable = true)



In [199]:
sales_parquet.isStreaming

True

In [194]:
sales_parquet_query = sales_parquet\
    .writeStream\
    .queryName("sales_parquet_df")\
    .outputMode("append")\
    .format("memory")\
    .trigger(processingTime="30 seconds")\
    .start()


In [205]:
spark.sql("SELECT * FROM sales_parquet_df ORDER BY window DESC").show(truncate=False)

+------------------------------------------+-----------------+
|window                                    |potential_revenue|
+------------------------------------------+-----------------+
|{2023-10-08 20:47:00, 2023-10-08 20:47:30}|147791.0         |
|{2023-10-08 20:46:30, 2023-10-08 20:47:00}|1.12027758E8     |
|{2023-10-08 20:45:30, 2023-10-08 20:46:00}|2.3073782E7      |
|{2023-10-08 20:44:30, 2023-10-08 20:45:00}|1.30211004E8     |
|{2023-10-08 20:44:00, 2023-10-08 20:44:30}|5.2950039E7      |
+------------------------------------------+-----------------+



In [214]:
sales_parquet_query.stop()

### 8  
Read the parquet files as a data stream, for 7a) join customer information and send to a Kafka topic with an appropriate name to the data visualisation. For 7b) Send the message directly to another Kafka topic.

In [111]:
cust_df.printSchema()
parquet_pred_saved.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- username: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- birthdate: date (nullable = true)
 |-- device_type: string (nullable = true)
 |-- device_id: string (nullable = true)
 |-- device_version: string (nullable = true)
 |-- home_location_lat: double (nullable = true)
 |-- home_location_long: double (nullable = true)
 |-- home_location: string (nullable = true)
 |-- home_country: string (nullable = true)
 |-- first_join_date: date (nullable = true)

root
 |-- session_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- season: string (nullable = true)
 |-- num_

In [293]:
# Stream 1
cust_pred = parquet_pred.join(cust_df, 'customer_id', 'inner')\
    .select("customer_id", "prediction", "home_location", 'home_location_lat', 
            'home_location_long',"event_time")\
    .groupBy("home_location", 'home_location_lat', 
            'home_location_long', "event_time")\
    .agg({"prediction": "sum"})

aggregated_df = cust_pred.withColumnRenamed("sum(prediction)", "cumulative_orders")\
    .withWatermark(("event_time"), "1 minute")  

aggregated_df.printSchema()

root
 |-- home_location: string (nullable = true)
 |-- home_location_lat: double (nullable = true)
 |-- home_location_long: double (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- cumulative_orders: double (nullable = true)



In [294]:
aggregated_df.isStreaming

True

In [298]:
from kafka3 import KafkaProducer
from pyspark.sql.functions import to_json, struct

#configuration
hostip = "118.139.61.182" #change me
topic1 = "cumulative_orders"



sd_to_Kafka = aggregated_df\
    .writeStream \
    .format("kafka") \
    .outputMode("update") \
    .option("kafka.bootstrap.servers", f'{hostip}:9092') \
    .option("topic", topic) \
    .option("checkpointLocation", "parquet/clickstream_df/prediction/checkpoint")\
    .start()


try:
    while not sd_to_Kafka.isActive:
        pass
    print("Data is being sent to Kafka topic:", topic)
except KeyboardInterrupt:
    print("Producer interrupted")
    
# sd_to_Kafka.stop()

Data is being sent to Kafka topic: cumulative_orders


In [302]:
sd_to_Kafka.isActive

False

In [297]:
sales_parquet.printSchema()

root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- potential_revenue: double (nullable = true)



In [301]:
# Stream 2
hostip = "118.139.61.182" #change me
topic2 = "tt_sales"


sd_to_Kafka2 = sales_parquet.writeStream \
    .format("kafka") \
    .outputMode("append") \
    .option("kafka.bootstrap.servers", f'{hostip}:9092') \
    .option("topic", topic) \
    .option("checkpointLocation", "parquet/clickstream_df/sales/checkpoint")\
    .start()

try:
    while not sd_to_Kafka2.isActive:
        pass
    print("Data is being sent to Kafka topic:", topic)
except KeyboardInterrupt:
    print("Producer interrupted")
    
# sd_to_Kafka2.stop()

StreamingQueryException: [STREAM_FAILED] Query [id = b5d3f539-b8ff-4252-87dd-f03b0a890872, runId = 6e143ea9-6c81-4f52-a1ff-f6ddb40008c7] terminated with exception: Failed to read log file file:/home/student/parquet/clickstream_df/sales/checkpoint/sources/0/0. Log file was malformed: failed to read correct log version from  v1.

In [300]:
sd_to_Kafka2.isActive

False