# Streaming application using Spark Structured Streaming

### 1. Write code to create a SparkSession, which uses four cores with a proper application name, use the Melbourne timezone, and make sure a checkpoint location has been set.


In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'

from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.functions import *
from pyspark.sql.types import *


current_dir = os.getcwd()

master = "local[4]"
app_name = "MOTH_streaming"

spark_conf = SparkConf().setMaster(master).setAppName(app_name)\
            .set("spark.sql.session.timeZone", "Australia/Melbourne")\
            .set("spark.sql.streaming.checkpointLocation", current_dir)

spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()

### 2. Similar to assignment 2A, write code to define the data schema for the data files, following the data types suggested in the metadata file. Load the static datasets (e.g. customer, product, category) into data frames. 



In [2]:
category_schema = StructType([
    StructField("#", StringType(), True),
    StructField("category_id", StringType(), True),
    StructField("cat_level1", StringType(), True),
    StructField("cat_level2", StringType(), True),
    StructField("cat_level3", StringType(), True)
])


customer_schema = StructType([
    StructField("#", StringType(), True),  
    StructField("customer_id", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("user_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("birthdate", DateType(), True),
    StructField("device_type", StringType(), True),
    StructField("device_id", StringType(), True),
    StructField("device_version", StringType(), True),
    StructField("home_location_lat", FloatType(), True),
    StructField("home_location_long", FloatType(), True),
    StructField("home_location", StringType(), True),
    StructField("home_country", StringType(), True),
    StructField("first_join_date", DateType(), True)
])

product_schema = StructType([
    StructField("#", StringType(), True),
    StructField("id", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("baseColour", StringType(), True),
    StructField("season", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("usage", StringType(), True),
    StructField("productDisplayName", StringType(), True),
    StructField("category_id", StringType(), True)
])


transaction_schema = StructType([
    StructField("#", StringType(), True),
    StructField("created_at", TimestampType(), True),
    StructField("customer_id", StringType(), True),
    StructField("booking_id", StringType(), True),
    StructField("session_id", StringType(), True),
    StructField("product_metadata", StringType(), True),
    StructField("payment_method", StringType(), True),
    StructField("payment_status", StringType(), True),
    StructField("promo_amount", FloatType(), True),
    StructField("promo_code", StringType(), True),
    StructField("shipment_fee", FloatType(), True),
    StructField("shipment_date_limit", TimestampType(), True),
    StructField("shipment_location_lat", FloatType(), True),
    StructField("shipment_location_long", FloatType(), True),
    StructField("total_amount", FloatType(), True)
])


rt_schema = ArrayType(StructType([
    StructField("#", StringType(), True),
    StructField("session_id", StringType(), True),
    StructField("event_name", StringType(), True),
    StructField("event_id", StringType(), True),
    StructField("traffic_source", StringType(), True),
    StructField("event_metadata", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("ts", IntegerType(), True)
])
)

In [3]:
# loading all files and use predefined schemas
df_category = spark.read.csv("category.csv", header= True, schema= category_schema, escape= '"')
df_customer = spark.read.csv("customer.csv", header= True, schema= customer_schema, escape= '"')
df_product = spark.read.csv("product.csv", header= True, schema= product_schema, escape= '"')
df_transactions = spark.read.csv("new_transactions.csv", header= True, schema= transaction_schema, escape= '"')

### 3 Using the Kafka topic from the producer in Task 1, ingest the streaming data into Spark Streaming, assuming all data comes in the String format. Except for the 'ts' column, you shall receive it as an Int type.




In [4]:
hostip = "118.139.87.176"
topic = "MOTH_Realtime_Data"

# read read stream from task1
df = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", f"{hostip}:9092")\
    .option("subscribe", topic)\
    .load()

# turn value to structured format column
batch_df = df.select(from_json(col("value").cast("string"), rt_schema).alias('parsed_value'))

# get data in structured data
batch_df = batch_df.select(explode(col("parsed_value")).alias("unnested_value"))

# select only needed data from batch_df
df_click_stream = batch_df.select(
                            col("unnested_value.#").alias("#"),
                            col("unnested_value.session_id").alias("session_id"),
                            col("unnested_value.event_name").alias("event_name"),
                            col("unnested_value.event_id").alias("event_id"),
                            col("unnested_value.traffic_source").alias("traffic_source"),
                            col("unnested_value.event_metadata").alias("event_metadata"),
                            col("unnested_value.customer_id").alias("customer_id"),
                            col("unnested_value.ts").alias("event_time"))

### 4 Then, the streaming data format should be transformed into the proper formats following the metadata file schema, similar to assignment 2A.  
Perform the following tasks:  
a) For the 'ts' column, convert it to the timestamp format, we will use it as event_time.  

**Define and register UDF function**

In [5]:
# define udf for converting event names to categorical values 
def cat_event(data):
    """
    This function is use for converting event name to categorical values
    """
    cat_1 = ["ADD_PROMO", "ADD_TO_CART"]
    cat_2 = ["VIEW_PROMO", "VIEW_ITEM", "SEARCH"]
    cat_3 = ["SCROLL", "HOMEPAGE", "CLICK"]
    
    # create conditions for converting
    if data in cat_1:
        return "Category 1"
    elif data in cat_2:
        return "Category 2"
    elif data in cat_3:
        return "Category 3"
    

import calendar
# create udf for converting moths to seasons
def convert_month_season(data):
    """
    input: date
    output: season for particular month
    description: The function is used for converting month to season
    """
    # extract month from date
    month = data.month
    
    # create conditions for each month
    if month in (3, 4, 5):
        return "Spring"
    elif month in (6, 7, 8):
        return "Summer"
    elif month in (9, 10, 11):
        return "Autumn"
    elif month in (12, 1, 2):
        return "Winter"


# register udf function to pyspark dataframe
cat_event_udf = udf(cat_event, StringType())

# register convert_month_season to dataframe
convert_month_season_udf = udf(convert_month_season, StringType())

In [6]:
# cast event_time as Integer
df_click_stream = df_click_stream.withColumn("event_time", col("event_time").cast(IntegerType()))

# change it to timestamp
df_click_stream = df_click_stream.withColumn("event_time", to_timestamp("event_time"))

b) If the data is late for more than 1 minute, discard it.  

In [7]:
# drop "#"
df_click_stream = df_click_stream.drop("#")

# only consider data within one minuet
df_click_stream = df_click_stream.withWatermark("event_time", "1 minute")

In [8]:
df_click_stream.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- traffic_source: string (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- event_time: timestamp (nullable = true)



### 5  
Aggregate the streaming data frame by session id and create features you used in your assignment 2A model. (note: customer ID has already been included in the stream.)   
Then, join the static data frames with the streaming data frame as our final data for prediction.  
Perform data type/column conversion according to your ML model, and print out the Schema.


In [9]:
# apply udf function to the data frame
df_click_stream = df_click_stream\
            .withColumn("event_cat", cat_event_udf(df_click_stream["event_name"]))
df_click_stream = df_click_stream.filter(col("event_name") != "PURCHASE")

In [10]:
# create columns of num_cat_values
feature_df = df_click_stream \
    .groupBy(
        window("event_time", "5 seconds"),
        col("session_id")).agg(
            sum(when(col("event_cat") == "Category 1", 1).otherwise(0)).alias("num_cat_highvalue"),
            sum(when(col("event_cat") == "Category 2", 1).otherwise(0)).alias("num_cat_midvalue"),
            sum(when(col("event_cat") == "Category 3", 1).otherwise(0)).alias("num_cat_lowvalue"),
            count(when(col("event_name") == "ADD_PROMO", 1).cast(IntegerType())).alias("is_promotion"),
            max("customer_id").alias("customer_id"),
            max("event_time").alias("event_time")
)

# add ratios columns
feature_df  = feature_df.withColumn("high_value_ratio", expr("num_cat_highvalue/(num_cat_highvalue + num_cat_midvalue + num_cat_lowvalue)"))
feature_df  = feature_df.withColumn("low_value_ratio", expr("num_cat_lowvalue/(num_cat_highvalue + num_cat_midvalue + num_cat_lowvalue)"))


# fill all product and revenue with 0
feature_df = feature_df.fillna(0)

# extrac season 
feature_df = feature_df.withColumn("season", convert_month_season_udf(col("event_time")))


# extrac needed info from df_custoemr
customer_info = df_customer\
            .select("customer_id", "gender", "birthdate", "device_type", "home_location", "first_join_date", "device_version")

# create new column age then drop birthdate
customer_info = customer_info.withColumn("age", floor(datediff(current_date(), col("birthdate")) / 365.25)\
                                        .cast("integer")).drop("birthdate")

# extract first_join_year from first_join_date
customer_info = customer_info.withColumn("first_join_year", year(col("first_join_date")))\
                .drop("first_join_date")

# join to get customer detail
feature_df = feature_df.join(customer_info, on="customer_id", how= "inner")

# create is_puchase column
feature_df = feature_df.withColumn("is_purchase", lit(0))

In [11]:
feature_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- session_id: string (nullable = true)
 |-- num_cat_highvalue: long (nullable = true)
 |-- num_cat_midvalue: long (nullable = true)
 |-- num_cat_lowvalue: long (nullable = true)
 |-- is_promotion: long (nullable = false)
 |-- event_time: timestamp (nullable = true)
 |-- high_value_ratio: double (nullable = false)
 |-- low_value_ratio: double (nullable = false)
 |-- season: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- home_location: string (nullable = true)
 |-- device_version: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- first_join_year: integer (nullable = true)
 |-- is_purchase: integer (nullable = false)



In [12]:
# extrac quantity and product from event metadata for latter questions
event_metadata_schema = StructType([
    StructField("product_id", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("item_price", IntegerType(), True)
])

metadata_info = df_click_stream.filter(col("event_name") == "ADD_TO_CART")
metadata_info = metadata_info.withColumn("parsed_value", from_json(col("event_metadata").cast("string"), event_metadata_schema))
metadata_info = metadata_info.withColumn("quantity", col("parsed_value.quantity"))
metadata_info = metadata_info.withColumn("product_id", col("parsed_value.product_id"))
metadata_info = metadata_info.withColumn("item_price", col("parsed_value.item_price"))

metadata_info = metadata_info.drop("parsed_value")

In [13]:
metadata_info.printSchema()

root
 |-- session_id: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- traffic_source: string (nullable = true)
 |-- event_metadata: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- event_cat: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- item_price: integer (nullable = true)



### 6 Load your ML model, and use the model to predict if each session will purchase according to the requirements below:

**load library and model from Task2A**

In [14]:
from pyspark.ml import PipelineModel

In [15]:
gbt_model = PipelineModel.load("gbt_pipeline_model")

# feed streamed data to model
pred_df = gbt_model.transform(feature_df)

# select only needed columns
pred_df= pred_df.select("window", "session_id", "prediction", "customer_id")

**a) Every 10 seconds, show the total number of potential sales transactions (prediction = 1) in the last 1 minute.**

In [16]:
# select all data from pred_df for _6a
_6a = pred_df.select("*")
_6a = _6a.filter(col("prediction") == 1)

In [17]:
_6a = _6a\
    .groupBy(window("window", "1 minute", "10 seconds"))\
    .agg(sum(col("prediction")).alias("potential_sales"))

In [18]:
query = _6a\
        .writeStream\
        .outputMode("append")\
        .format("console")\
        .option("truncate", False)\
        .trigger(processingTime="10 seconds")\
        .start()

In [19]:
query.stop()

![\_6a](_6a.png)

**b) Every 30 seconds, show the total potential revenue in the last 30 seconds. “Potiential revenue” here is definded as: When prediction=1, extract customer shopping cart detail from metadata (sum of all items of ADD_TO_CART events).**

In [20]:
# selelet everything from perd_df
_6b = pred_df.select("*")

# filter only prediction== 1
_6b = _6b.filter(col("prediction") == 1)

# _6b = _6b.withWatermark("window", "1 minute")
metadata_info = metadata_info.drop("customer_id").withWatermark("event_time", "1 minute")

In [21]:
# join _6b and metadata
_6b = _6b\
    .join(metadata_info, on= "session_id", how= "inner")

# calculate revenue
_6b = _6b.withColumn("revenue", expr("quantity * item_price"))

# query 30 seconds, potential revenue
_6b = _6b\
    .groupBy(window("window", "30 seconds"))\
    .agg(sum(col("revenue")).alias("potential_revenue"))

In [22]:
query = _6b\
        .writeStream\
        .outputMode("append")\
        .format("console")\
        .option("truncate", False)\
        .trigger(processingTime="30 seconds")\
        .start()

In [23]:
query.stop()

![\_6b](_6b.png)

**c) Every 1 minute, show the top 10 best-selling products by total quantity. (note: No historical data is required, only the top 10 in each 1 minute window.)**


In [24]:
# prepare data
_6c = pred_df.filter(col("prediction") == 1)\
    .join(metadata_info.select("session_id", "quantity", "product_id", "item_price"), on="session_id", how= "inner")\
    .join(df_product.select("id", "productDisplayName"), on= col("product_id") == col("id") , how= "left")\
    .select("window", "productDisplayName", "quantity")

In [25]:
# group by window and productname and sum quantity
_6c = _6c\
    .groupBy(window("window", "1 minute"), "productDisplayName")\
    .agg(sum(col("quantity")).alias("total_quantity_1min"))

In [26]:
# query_1 
query_top_10 = _6c\
        .writeStream\
        .outputMode("append")\
        .format("memory")\
        .option("truncate", False)\
        .queryName("top_10")\
        .trigger(processingTime="60 seconds")\
        .start()

In [52]:
# use sql query to show result
spark.sql("""
select * 
from top_10
order by window, total_quantity_1min desc
limit 10""").show(truncate= False)

+------------------------------------------+-----------------------------------------------+-------------------+
|window                                    |productDisplayName                             |total_quantity_1min|
+------------------------------------------+-----------------------------------------------+-------------------+
|{2023-10-17 21:51:00, 2023-10-17 21:52:00}|U.S. Polo Assn. Men Navy Blue Flip Flops       |13                 |
|{2023-10-17 21:51:00, 2023-10-17 21:52:00}|Nike Men Free Run+ 2 Shield Grey Sports Shoes  |10                 |
|{2023-10-17 21:51:00, 2023-10-17 21:52:00}|ADIDAS Men's Chat Black T-shirt                |6                  |
|{2023-10-17 21:51:00, 2023-10-17 21:52:00}|Just Natural Unisex Brown Rain Jacket          |6                  |
|{2023-10-17 21:51:00, 2023-10-17 21:52:00}|Puma Men Jiff II Sandals                       |5                  |
|{2023-10-17 21:51:00, 2023-10-17 21:52:00}|American Tourister Unisex Casual Olive Backpack|4   

In [51]:
query_top_10.stop()

### 7  Write a Parquet file for the following:


**a) Persist the prediction result along with cart metadata in parquet format; after that, read the parquet file and show the results to verify it is saved properly.**  

In [29]:
# 7a
# join pred with metadata
_7a = pred_df\
    .join(metadata_info, on= "session_id", how= "inner")

# select needed columns
_7a = _7a.select("window", "session_id", "prediction", "quantity", "item_price", "product_id", "customer_id")

In [30]:
# Write into parquet files
query_file_sink = _7a.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "parquet/_7a")\
        .option("checkpointLocation", "parquet/_7a/checkpoint")\
        .start()

In [31]:
# define sche
_7a_schema = StructType([
    StructField("window", StructType([
        StructField("start", TimestampType(), nullable=True),
        StructField("end", TimestampType(), nullable=True)
    ]), nullable=False),
    StructField("session_id", StringType(), True),
    StructField("prediction", DoubleType(), True),
    StructField("item_price", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("product_id", StringType(), True),
    StructField("customer_id", StringType(), True),
])

In [32]:
query_file_sink_df = spark.read.schema(_7a_schema).parquet("parquet/_7a")
query_file_sink_df.printSchema()
query_file_sink_df.show()

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- session_id: string (nullable = true)
 |-- prediction: double (nullable = true)
 |-- item_price: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- customer_id: string (nullable = true)

+------+----------+----------+----------+--------+----------+-----------+
|window|session_id|prediction|item_price|quantity|product_id|customer_id|
+------+----------+----------+----------+--------+----------+-----------+
+------+----------+----------+----------+--------+----------+-----------+



In [33]:
query_file_sink.stop()

**b) Persist the 30-second sales prediction in another parquet file.**

In [34]:
# 7b
_7b = pred_df.select("*")\
    .filter(col("prediction") == 1)\
    .join(metadata_info, on= "session_id", how= "inner")\
    .withColumn("revenue", expr("quantity * item_price"))\
    .groupBy(window("window", "30 seconds"))\
    .agg(sum(col("revenue")).alias("potential_revenue"))

In [35]:
_7b_schema = StructType([
    StructField("window", StructType([
        StructField("start", TimestampType(), nullable=True),
        StructField("end", TimestampType(), nullable=True)
    ]), nullable=False),
    StructField("potential_revenue", LongType(), nullable=True)
])

In [36]:
query_file_sink_7b =  _7b.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "parquet/7b")\
        .option("checkpointLocation", "parquet/7b/checkpoint")\
        .start()

In [37]:
query_file_sink_df = spark.read.schema(_7b_schema).parquet("parquet/7b")
query_file_sink_df.printSchema()
query_file_sink_df.show()

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- potential_revenue: long (nullable = true)

+------+-----------------+
|window|potential_revenue|
+------+-----------------+
+------+-----------------+



![\_7b](_7b.png)

In [38]:
query_file_sink_7b.stop()

### 8  
Read the parquet files as a data stream, for 7a) join customer information and send to a Kafka topic with an appropriate name to the data visualisation. For 7b) Send the message directly to another Kafka topic.

In [39]:
# Stream 1
# read data from parquet file
stream_1 = spark\
        .readStream\
        .format("parquet")\
        .schema(_7a.schema)\
        .load("parquet/_7a")

In [40]:
# join read data with df_customer
stream_1 = stream_1.join(df_customer, on= "customer_id", how= "left")

# select needed cols
stream_1 = stream_1.select("window", "prediction", "item_price", "quantity", "home_location")

# turn stream_1 to dictionary
key = "window"
value = [col for col in stream_1.columns if col != key]

# turn df to key value columns
stream_1_key_value = stream_1.select(col("window").alias('key'), to_json(struct(*[col(c) for c in value])).alias('value'))

In [41]:
# streaming data to task3
stream_1_to_send = stream_1_key_value\
                .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
                .writeStream\
                .format("kafka")\
                .option("kafka.bootstrap.servers", "localhost:9092")\
                .option("checkpointLocation", "parquet/8a/checkpoint")\
                .option("topic", "pred_cart")\
                .start()

In [42]:
stream_1_to_send.stop()

----------------------

In [43]:
# Stream 2
# read data from parquet file
stream_2 = spark\
        .readStream\
        .format("parquet")\
        .schema(_7b.schema)\
        .load("parquet/7b")

# get value colums headers
value_list = [col for col in stream_2.columns if col != key]

# turn df to key value columns
stream_2_key_value = stream_2.select(col("window").alias('key'), to_json(struct(*[col(c) for c in value_list])).alias('value'))

In [44]:
# streaming data to task3
stream_2_to_send = stream_1_key_value\
                .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
                .writeStream\
                .format("kafka")\
                .option("kafka.bootstrap.servers", "localhost:9092")\
                .option("checkpointLocation", "parquet/8b/checkpoint")\
                .option("topic", "potential_revenue")\
                .start()

In [45]:
stream_2_to_send.stop()

--------------------

The above streams take very long time to process and stream, so I decided to use the below approcach to send data to task3.

In [46]:
# import libaries
from time import sleep, time
from json import dumps
from kafka3 import KafkaProducer
import random
import datetime as dt
import pandas as pd

In [47]:
def publish_message(producer_instance, topic_name, data):
    try:
        producer_instance.send(topic_name, data)
        print('Message published successfully. Data: ' + str(data))
    except Exception as ex:
        print('Exception in publishing message.')
        print(str(ex))
        
        
def connect_kafka_producer():
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=[f'{hostip}:9092'],
                                  value_serializer=lambda x: dumps(x).encode('ascii'),
                                  api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka.')
        print(str(ex))
    finally:
        return _producer

In [48]:
# subpplemetary
potential_rev =  _7b\
        .writeStream\
        .outputMode("append")\
        .format("memory")\
        .option("truncate", False)\
        .queryName("potential_revenue")\
        .start()

In [49]:
potential_rev.isActive

True

In [53]:
producer = connect_kafka_producer()
topic_rev= "potential_revenue"
count = 0

while True:
    count += 1
    print(f"This is batch {count}")
    spark.sql("select * from potential_revenue").show(truncate=False)
    to_send = spark.sql("select * from potential_revenue").toJSON().collect()
    publish_message(producer, topic_rev, to_send)
    sleep(30)

This is batch 1
+------------------------------------------+-----------------+
|window                                    |potential_revenue|
+------------------------------------------+-----------------+
|{2023-10-17 21:52:30, 2023-10-17 21:53:00}|39994582         |
|{2023-10-17 21:52:00, 2023-10-17 21:52:30}|73680798         |
|{2023-10-17 21:51:00, 2023-10-17 21:51:30}|6464304          |
|{2023-10-17 21:51:30, 2023-10-17 21:52:00}|38581230         |
|{2023-10-17 21:53:00, 2023-10-17 21:53:30}|48825848         |
|{2023-10-17 21:54:30, 2023-10-17 21:55:00}|53680881         |
|{2023-10-17 21:54:00, 2023-10-17 21:54:30}|45558668         |
|{2023-10-17 21:56:00, 2023-10-17 21:56:30}|41002244         |
|{2023-10-17 21:53:30, 2023-10-17 21:54:00}|65082912         |
|{2023-10-17 21:55:30, 2023-10-17 21:56:00}|49933103         |
|{2023-10-17 21:55:00, 2023-10-17 21:55:30}|69206557         |
|{2023-10-17 21:56:30, 2023-10-17 21:57:00}|48192856         |
|{2023-10-17 21:57:30, 2023-10-17 21:58

This is batch 4
+------------------------------------------+-----------------+
|window                                    |potential_revenue|
+------------------------------------------+-----------------+
|{2023-10-17 21:52:30, 2023-10-17 21:53:00}|39994582         |
|{2023-10-17 21:52:00, 2023-10-17 21:52:30}|73680798         |
|{2023-10-17 21:51:00, 2023-10-17 21:51:30}|6464304          |
|{2023-10-17 21:51:30, 2023-10-17 21:52:00}|38581230         |
|{2023-10-17 21:53:00, 2023-10-17 21:53:30}|48825848         |
|{2023-10-17 21:54:30, 2023-10-17 21:55:00}|53680881         |
|{2023-10-17 21:54:00, 2023-10-17 21:54:30}|45558668         |
|{2023-10-17 21:56:00, 2023-10-17 21:56:30}|41002244         |
|{2023-10-17 21:53:30, 2023-10-17 21:54:00}|65082912         |
|{2023-10-17 21:55:30, 2023-10-17 21:56:00}|49933103         |
|{2023-10-17 21:55:00, 2023-10-17 21:55:30}|69206557         |
|{2023-10-17 21:56:30, 2023-10-17 21:57:00}|48192856         |
|{2023-10-17 21:57:30, 2023-10-17 21:58

This is batch 6
+------------------------------------------+-----------------+
|window                                    |potential_revenue|
+------------------------------------------+-----------------+
|{2023-10-17 21:52:30, 2023-10-17 21:53:00}|39994582         |
|{2023-10-17 21:52:00, 2023-10-17 21:52:30}|73680798         |
|{2023-10-17 21:51:00, 2023-10-17 21:51:30}|6464304          |
|{2023-10-17 21:51:30, 2023-10-17 21:52:00}|38581230         |
|{2023-10-17 21:53:00, 2023-10-17 21:53:30}|48825848         |
|{2023-10-17 21:54:30, 2023-10-17 21:55:00}|53680881         |
|{2023-10-17 21:54:00, 2023-10-17 21:54:30}|45558668         |
|{2023-10-17 21:56:00, 2023-10-17 21:56:30}|41002244         |
|{2023-10-17 21:53:30, 2023-10-17 21:54:00}|65082912         |
|{2023-10-17 21:55:30, 2023-10-17 21:56:00}|49933103         |
|{2023-10-17 21:55:00, 2023-10-17 21:55:30}|69206557         |
|{2023-10-17 21:56:30, 2023-10-17 21:57:00}|48192856         |
|{2023-10-17 21:57:30, 2023-10-17 21:58

This is batch 8


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/conda/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 