In [1]:
import os
os.environ['HADOOP_HOME'] = "C:\\hadoop"
os.environ['PATH'] += ";C:\\hadoop\\bin"

In [2]:
import findspark
findspark.init()

from pyspark.sql import SparkSession  
from pyspark.sql import functions as f
from pyspark.conf import SparkConf
from pyspark.sql.streaming import DataStreamReader
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, FloatType, IntegerType

import json

In [3]:
scala_version = '2.12'
spark_version = '3.5.1'
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.3.1'
]
spark = SparkSession \
    .builder \
    .appName("BigData") \
    .master("local") \
    .config("spark.executor.memory", "16g") \
    .config("spark.driver.memory", "16g") \
    .config("spark.python.worker.reuse", "true") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.sql.execution.arrow.maxRecordsPerBatch", "16") \
    .config("spark.jars.packages", ",".join(packages)) \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

conf=SparkConf()

spark

In [4]:
topic_name = 'doanBD_3'
kafka_server = 'localhost:9092'
streamRawDf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", kafka_server).option("subscribe", topic_name).option("startingOffsets","latest").load()

# Import model

In [5]:
import joblib
# Load the saved model components
dct_objects = joblib.load('model/all_objects.pkl')
dct_model = joblib.load('model/multi_target_lr_model.pkl')
dct_vectorizer = joblib.load('model/vectorizer.pkl')

clf_model = joblib.load('model/random_forest_model.pkl')
clf_vectorizer = joblib.load('model/rf_vectorizer.pkl')

In [6]:
# Function to predict entities in a comment
def predict_entities(comment, model, vectorizer, all_objects):
    X_new = vectorizer.transform([comment])
    Y_new_pred = model.predict(X_new)
    entities = [all_objects[i] for i in range(len(all_objects)) if Y_new_pred[0, i] == 1]
    return ', '.join(entities)

def classify_comment(comment, model, vectorizer):    
    X_new = vectorizer.transform([comment])
    Y_new_pred = model.predict(X_new)
    predict = int(Y_new_pred[0])
    return predict

In [7]:
comment = '[hay qua]'
classify_comment(comment, clf_model, clf_vectorizer)

1

In [8]:
# Register UDF
predict_entities_udf = udf(lambda comment: predict_entities(comment, dct_model, dct_vectorizer, dct_objects), StringType())
classify_comment_udf = udf(lambda comment: classify_comment(comment, clf_model, clf_vectorizer), StringType())

# Streaming

In [9]:
def decode_json_column(col):
    return f.udf(lambda x: json.loads(x), StringType())(col)

In [10]:
df = streamRawDf.selectExpr("CAST(value AS STRING) as value").withColumn("Comment", decode_json_column("value"))

stream_writer = (df.writeStream.trigger(processingTime="5 seconds").outputMode("append").format("memory").queryName("Table"))
query = stream_writer.start()

In [11]:
from time import sleep
from IPython.display import clear_output

x = 0
while(True):
    try:
        # Query the in-memory table
        result_df = spark.sql("SELECT * FROM Table")
        result_df = result_df.withColumn("Entities", predict_entities_udf(f.col("Comment")))
        result_df = result_df.withColumn("Classified", classify_comment_udf(f.col("Comment")))
        result_df = result_df.select("Comment", "Entities", "Classified")
        all_rows = result_df.collect()

        # Lấy 20 dòng cuối cùng
        last_20_rows = all_rows[-20:]

        # Tạo một DataFrame mới từ 20 dòng cuối cùng
        tail_df = spark.createDataFrame(last_20_rows, result_df.schema)

        # Display the streaming query status
        clear_output(wait=True)
        print('Showing live view refreshed every 5 seconds')
        print(f'Seconds passed: {x*5}')

        # Hiển thị 20 dòng cuối cùng
        tail_df.show(truncate=100)

        sleep(5)
        x += 1  
    except KeyboardInterrupt:
        print("Streaming query interrupted.")
        break
    

Showing live view refreshed every 5 seconds
Seconds passed: 20
+----------------------------------------------------------------------------------------------------+--------+----------+
|                                                                                             Comment|Entities|Classified|
+----------------------------------------------------------------------------------------------------+--------+----------+
|                                                                                [kết mỗi di dimaria]| dimaria|         1|
|                                                                 [chân ricardo quaresma dẻo vãi cặc]|        |         1|
|                                    [hầu như những người thành công nhất đều xuất thân từ nghèo khổ]|        |         1|
|                                                                     [quang hai se tim lai phong độ]|        |         0|
|                                                              [chưa thấy nh

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "C:\spark_3.5.1\python\lib\py4j-0.10.9.7-src.zip\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\spark_3.5.1\python\lib\py4j-0.10.9.7-src.zip\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\hoang\AppData\Local\Programs\Python\Python311\Lib\socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


Streaming query interrupted.
