In [1]:
import findspark
findspark.init()

In [20]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_csv, col
from modelRegression import RegressionModel
import random
from pyspark.ml import PipelineModel
from pyspark.sql.functions import when, col, collect_set, mean

In [3]:
scala_version = '2.12'
spark_version = '3.5.0'
packages = [ f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}' ,
                                                'org.apache.kafka:kafka-clients:3.6.0']
spark = SparkSession.builder.master("local")\
                            .appName("Bigmart Regresion System")\
                            .config("spark.jars.packages", ",".join(packages))\
                            .getOrCreate()

23/11/26 16:10:12 WARN Utils: Your hostname, dothinh.local resolves to a loopback address: 127.0.0.1; using 192.168.1.2 instead (on interface en0)
23/11/26 16:10:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/usr/local/Cellar/apache-spark/3.5.0/libexec/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/dothinhtpr247gmai.com/.ivy2/cache
The jars for the packages stored in: /Users/dothinhtpr247gmai.com/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.kafka#kafka-clients added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f61b797f-115c-4b22-919a-3ab0e925afab;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.0 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
	found org.apache.kafka#kafka-clients;3.6.0 in central
	found com.github.

## Pyspark consumer for streaming data

In [4]:
KAFKA_TOPIC_NAME_CONS = "BigmartTopic"
KAFKA_BOOTSTRAP_SERVERS_CONS = 'localhost:9092'

# Construct a streaming DataFrame that reads from test-topic
bigmart_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS) \
        .option("subscribe", KAFKA_TOPIC_NAME_CONS) \
        .option("startingOffsets", "latest") \
        .load()

bigmart_df1 = bigmart_df.selectExpr("CAST(value AS STRING)", "timestamp")

bigmart_schema_string = "Item_Identifier STRING,\
                        Item_Weight DOUBLE,\
                        Item_Fat_Content STRING,\
                        Item_Visibility DOUBLE,\
                        Item_Type STRING,\
                        Item_MRP DOUBLE,\
                        Outlet_Identifier STRING,\
                        Outlet_Establishment_Year INT,\
                        Outlet_Size STRING,\
                        Outlet_Location_Type STRING,\
                        Outlet_Type STRING"

bigmart_df2 = bigmart_df1.select(from_csv(col("value"), \
                                bigmart_schema_string) \
                                .alias("bigmart"), "timestamp")
bigmart_df3 = bigmart_df2.select("bigmart.*", "timestamp")

## Spark SQL View
bigmart_df3.createOrReplaceTempView("bigmart_data");
song_find_text = spark.sql("SELECT * FROM bigmart_data")
bigmart_agg_write_stream = song_find_text.writeStream \
                                        .trigger(processingTime='3 seconds') \
                                        .outputMode("append") \
                                        .option("truncate", "false") \
                                        .format("memory") \
                                        .queryName("getTableBigmartData") \
                                        .start()                

bigmart_agg_write_stream.awaitTermination(1)

23/11/26 16:10:21 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/n4/b5xh84d97qzb6sptmp370phr0000gn/T/temporary-597f7f52-3903-4aa4-905f-2836642608f1. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/11/26 16:10:21 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


False

In [36]:
def preprocess(df):
    df = df.withColumn('Item_Weight', col('Item_Weight').cast('float'))
    df = df.withColumn('Item_Visibility', col('Item_Visibility').cast('float'))
    df = df.withColumn('Item_MRP', col('Item_MRP').cast('float'))
    df = df.withColumn('Outlet_Establishment_Year', col('Outlet_Establishment_Year').cast('int'))
    
    df = df.withColumn("Outlet_Size", when(df.Outlet_Size.isNull(),
                                                        when(df.Outlet_Size.isin('Supermarket Type3',
                                                                                       'Supermarket Type2'),
                                                            'Medium')\
                                                        .when(df.Outlet_Size == 'Grocery Store','Small')\
                                                        .when(df.Outlet_Location_Type == 'Tier 3','High')\
                                                        .when(df.Outlet_Location_Type == 'Tier 2','Small')\
                                                        .when(df.Outlet_Location_Type == 'Tier 1',random.choice(['Medium', 'Small']))\
                                                    )\
                                                    .otherwise(df.Outlet_Size))
    df = df.na.fill( float(12.857645339263398) ,['Item_Weight'])
    df = df.replace(to_replace ={'low fat':'Low Fat','LF':'Low Fat', 'reg':'Regular'}, subset = ['Item_Fat_Content'])
    return df


In [52]:
from time import sleep
from IPython.display import display, clear_output

resModel = PipelineModel.load('REPROCESSING_PIPLINE_MODEL1')

for x in range(2000):
        try:
                print("Showing live view refreshed every 3 seconds")
                print(f"Seconds passed: {x*3}")
                result = spark.sql(f'SELECT * from {bigmart_agg_write_stream.name}')
                df = result
                df = df.sort(df.timestamp.desc())
                df = df.drop('timestamp')
                df = preprocess(df)
                prediction = resModel.transform(df)
                prediction = prediction.select(['Item_Identifier', 'Item_Weight', 'Item_Visibility', 'Item_MRP', 
                                                col('prediction').alias('Item_Outlet_Sales_Prediction')])
                display(prediction.toPandas().head(20))
                sleep(3)
                clear_output(wait = True)
        except KeyboardInterrupt:
                print("break")
                break
print("Live view ended...")

Showing live view refreshed every 3 seconds
Seconds passed: 12


Unnamed: 0,Item_Identifier,Item_Weight,Item_Visibility,Item_MRP,Item_Outlet_Sales_Prediction
0,FDO33,14.75,0.089248,113.151802,4215.39066
1,FDK33,17.85,0.011235,211.856003,4248.270068
2,FDV13,17.35,0.027606,88.985603,1992.140173
3,FDC23,18.0,0.017903,178.2686,5933.409839
4,FDQ37,20.75,0.089399,193.9478,5951.053552
5,FDD28,10.695,0.053514,59.190399,1345.992405
6,FDW15,15.35,0.055338,150.473404,4960.343164
7,FDG29,17.6,0.056406,42.545399,668.463197
8,NCH42,6.86,0.036537,230.600998,6060.777319
9,FDK41,12.857645,0.223309,85.5224,786.219918


break
Live view ended...


                                                                                

In [51]:
df = spark.sql(f"SELECT * FROM {bigmart_agg_write_stream.name}")
df_stream = df
display(df.toPandas())

Unnamed: 0,Item_Identifier,Item_Weight,Item_Fat_Content,Item_Visibility,Item_Type,Item_MRP,Outlet_Identifier,Outlet_Establishment_Year,Outlet_Size,Outlet_Location_Type,Outlet_Type,timestamp
0,DRK35,8.365,low fat,0.072139,Hard Drinks,36.7506,OUT018,2009,Medium,Tier 3,Supermarket Type2,2023-11-26 16:10:23.049
1,FDD47,,Regular,0.249343,Starchy Foods,168.5448,OUT019,1985,Small,Tier 1,Grocery Store,2023-11-26 16:10:24.051
2,NCM53,18.750,Low Fat,0.052031,Health and Hygiene,106.2280,OUT035,2004,Small,Tier 2,Supermarket Type1,2023-11-26 16:10:25.054
3,FDO45,13.150,Regular,0.037921,Snack Foods,87.3856,OUT013,1987,High,Tier 3,Supermarket Type1,2023-11-26 16:10:26.055
4,NCM42,6.130,LF,0.047404,Household,109.3912,OUT010,1998,,Tier 3,Grocery Store,2023-11-26 16:10:27.061
...,...,...,...,...,...,...,...,...,...,...,...,...
3240,FDA33,6.480,Low Fat,0.034038,Snack Foods,147.2076,OUT018,2009,Medium,Tier 3,Supermarket Type2,2023-11-26 17:04:34.990
3241,FDN45,19.350,Low Fat,0.118004,Snack Foods,223.3088,OUT013,1987,High,Tier 3,Supermarket Type1,2023-11-26 17:04:35.995
3242,FDT45,15.850,Low Fat,0.095931,Snack Foods,56.0956,OUT010,1998,,Tier 3,Grocery Store,2023-11-26 17:04:37.000
3243,FDR32,6.780,Regular,0.085792,Fruits and Vegetables,228.9694,OUT035,2004,Small,Tier 2,Supermarket Type1,2023-11-26 17:04:38.006


                                                                                