In [8]:
from pyspark.sql import SparkSession
from pyspark.ml import PipelineModel
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("KafkaSparkStreaming") \
    .config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.4")\
    .getOrCreate()

# Kafka configuration
kafka_broker = "127.0.0.1:9092"
kafka_topic = "Item_Oulet_Sales_Prediction"

# Define the schema for incoming Kafka messages
schema = StructType([
    StructField("Item_Identifier", StringType(), True),  # Corrected to StringType
    StructField("Item_Weight", DoubleType(), True),
    StructField("Item_Visibility", DoubleType(), True),
    StructField("Item_MRP", DoubleType(), True),
    StructField("Outlet_Establishment_Year", DoubleType(), True),
    # StructField("Item_Outlet_Sales", DoubleType(), True),
    StructField("Item_Fat_Content", StringType(), True),[]
    # StructField("Item_Type", StringType(), True),
    # StructField("Outlet_Identifier", StringType(), True),
    # StructField("Outlet_Size", StringType(), True),
    # StructField("Outlet_Location_Type", StringType(), True),
    # StructField("Outlet_Type", StringType(), True),
])

# Load the trained model
model_path = "model"
try:
    model = PipelineModel.load(model_path)
    print("Model loaded successfully!")
except Exception as e:
    print(f"Failed to load model: {e}")

# Read data from Kafka
# kafka_stream = spark.readStream \
#     .format("kafka") \
#     .option("kafka.bootstrap.servers", kafka_broker) \
#     .option("subscribe", kafka_topic) \
#     .option("startingOffsets", "latest") \
#     .load()

# kafka_stream = spark.readStream \
#     .format("kafka") \
#     .option("kafka.bootstrap.servers", kafka_broker) \
#     .option("subscribe", kafka_topic) \
#     .option("startingOffsets", "latest") \
#     .option("failOnDataLoss", "false") \
#     .load()

kafka_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_broker) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .load()



# Deserialize and parse the Kafka messages
parsed_stream = kafka_stream.selectExpr("CAST(value AS STRING) as json") \
    .select(from_json(col("json"), schema).alias("data")) \
    .select("data.*")  # Extract nested fields

# Handle null values in the parsed stream
parsed_stream = parsed_stream.na.drop()  # Remove rows with null values



# Apply the trained model to the streaming data
predictions = model.transform(parsed_stream)

# Rename and select relevant columns
predictions = predictions.withColumnRenamed("prediction", "Item_Outlet_Sales_Prediction") \
    .select(
        col("Item_Identifier"),
        col("Item_Weight"),
        col("Item_Visibility"),
        col("Item_MRP"),
        col("Outlet_Establishment_Year"),
        col("Item_Outlet_Sales_Prediction")
    )

# Output predictions to the console in a tabular format
query = predictions.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .option("numRows", 20) \
    .trigger(processingTime="3 seconds") \
    .start()

query.awaitTermination()


Model loaded successfully!


25/01/14 01:15:41 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-598e6932-10d7-4263-855e-3d39393abcee. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/01/14 01:15:41 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/01/14 01:15:41 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


-------------------------------------------
Batch: 0
-------------------------------------------
+---------------+-----------+---------------+--------+-------------------------+----------------------------+
|Item_Identifier|Item_Weight|Item_Visibility|Item_MRP|Outlet_Establishment_Year|Item_Outlet_Sales_Prediction|
+---------------+-----------+---------------+--------+-------------------------+----------------------------+
|FDW58          |20.75      |0.007564836    |107.8622|1999.0                   |1689.1573689790748          |
|FDW14          |8.3        |0.038427677    |87.3198 |2007.0                   |1358.827055733047           |
|NCN55          |14.6       |0.099574908    |241.7538|1998.0                   |3892.4079412046194          |
|FDQ58          |7.315      |0.015388393    |155.034 |2007.0                   |2471.6206128916456          |
|FDH56          |9.8        |0.063817206    |117.1492|1997.0                   |1910.0236110322057          |
|FDL48          |19.35 

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/lamp/Projects/big-data/venv/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/lamp/Projects/big-data/venv/lib/python3.12/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/socket.py", line 720, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
parsed_stream.describe

<bound method DataFrame.describe of DataFrame[Item_Identifier: string, Item_Weight: double, Item_Visibility: double, Item_MRP: double, Outlet_Establishment_Year: double, Item_Outlet_Sales: double, Item_Fat_Content: string, Item_Type: string, Outlet_Identifier: string, Outlet_Size: string, Outlet_Location_Type: string, Outlet_Type: string]>

In [None]:
parsed_stream.printSchema()


root
 |-- Item_Identifier: string (nullable = true)
 |-- Item_Weight: double (nullable = true)
 |-- Item_Visibility: double (nullable = true)
 |-- Item_MRP: double (nullable = true)
 |-- Outlet_Establishment_Year: double (nullable = true)
 |-- Item_Outlet_Sales: double (nullable = true)
 |-- Item_Fat_Content: string (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Outlet_Identifier: string (nullable = true)
 |-- Outlet_Size: string (nullable = true)
 |-- Outlet_Location_Type: string (nullable = true)
 |-- Outlet_Type: string (nullable = true)



In [9]:
from pyspark.sql import Row

# Tạo dữ liệu đầu vào giả lập
test_data = spark.createDataFrame([
    Row(Item_Weight=5.0, Item_Visibility=0.2, Item_MRP=249.0, Outlet_Establishment_Year=2005, Item_Fat_Content = 'Low Fat', Item_Type= 'Snack Foods', Outlet_Identifier = 'OUT049', Outlet_Size='Medium', Outlet_Location_Type='Tier 1', Outlet_Type='Supermarket Type1'),
    # Row(Item_Weight=8.5, Item_Visibility=0.1, Item_MRP=150.0, Outlet_Establishment_Year=2010 , Item_Fat_Content = 'Low Fat',Item_Type= 'Snack Foods')
])

# Áp dụng mô hình
try:
    predictions = model.transform(test_data)
    predictions.select("prediction").show()
except Exception as e:
    print(f"Error testing model: {e}")


+-----------------+
|       prediction|
+-----------------+
|3989.619983548526|
+-----------------+

