In [29]:
!pip install --user -U numpy==1.26.4 pandas==2.1.4 pyarrow==14.0.1 pyspark==3.5.1 findspark==2.0.1

Collecting pandas==2.1.4
  Downloading pandas-2.1.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (18 kB)
Collecting pyarrow==14.0.1
  Downloading pyarrow-14.0.1-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (3.0 kB)
Collecting pyspark==3.5.1
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m466.6 kB/s[0m eta [36m0:00:00[0m00:01[0m00:03[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Downloading pandas-2.1.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (12.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.2/12.2 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hDownloading pyarrow-14.0.1-cp311-cp311-manylinux_2_28_x86_64.whl (38.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m38.0/38.0 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hBuilding wheels for collected packages: pys

In [42]:
!pip install --user -U tensorflow==2.13.1



In [43]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, pandas_udf, udf
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType, IntegerType

import numpy as np
import pandas as pd
from typing import Iterator

In [44]:
import os
import warnings
import logging

# Tắt warning chung của Python
warnings.filterwarnings('ignore')

# Tắt các thông báo INFO và WARNING của TensorFlow (chỉ hiện ERROR)
# Mức 1: Filter INFO. Mức 2: Filter WARNING. Mức 3: Filter ERROR.
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' 

# Tắt logging DEBUG và INFO của TensorFlow (khi sử dụng tf.get_logger)
logging.getLogger('tensorflow').setLevel(logging.ERROR)

# --- Thêm dòng này SAU KHI bạn tạo SparkSession ---
# Ví dụ: spark = SparkSession.builder...getOrCreate()
# spark.sparkContext.setLogLevel("ERROR") # Chỉ hiện lỗi của Spark

In [45]:
# Cell 4: Khởi tạo Spark Session (CHẠY CPU, KHÔNG DÙNG GPU)

KAFKA_SERVER = "kafka:9092"
TOPIC_NAME = "review_stream"

scala_version = '2.12'
spark_version = '3.5.1'  # Đảm bảo đúng phiên bản Spark của bạn

packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.5.0'
]

# Ép tất cả worker và driver không nhìn thấy GPU
spark = (
    SparkSession.builder
    .master("local[2]")
    .appName("Streaming_ABSA_CNN")
    .config("spark.jars.packages", ",".join(packages))
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")
    # bật reuse worker để không load lại model nhiều lần
    .config("spark.python.worker.reuse", "true")
    .getOrCreate()
)

spark

In [46]:
# Các hằng số từ notebook huấn luyện của bạn
MODEL_PATH = 'cnn_multi_aspect_model.h5'
TOKENIZER_PATH = 'tokenizer.pickle'
MAX_SEQUENCE_LENGTH = 100

ASPECT_COLUMNS = ['Price', 'Shipping', 'Outlook', 'Quality', 'Size', 'Shop_Service', 'General', 'Others']

# Ánh xạ ngược từ index (0-3) về nhãn gốc (-1, 0, 1, 2)
label_map = {-1: 0, 0: 1, 1: 2, 2: 3}
inverse_label_map = {v: k for k, v in label_map.items()}

# Broadcast đường dẫn file để các executor có thể thấy
sc = spark.sparkContext
broadcasted_model_path = sc.broadcast(MODEL_PATH)
broadcasted_tokenizer_path = sc.broadcast(TOKENIZER_PATH)

# Schema cho đầu ra của UDF: một mảng chứa 8 mảng con (mỗi mảng con 4 xác suất)
schema_output = ArrayType(ArrayType(FloatType()))

In [47]:
@pandas_udf(schema_output)
def predict_sentiments_udf(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    # Các import này là BẮT BUỘC bên trong UDF
    from tensorflow.keras.models import load_model
    from tensorflow.keras.preprocessing.sequence import pad_sequences
    import pickle
    import re
    import numpy as np
    import os

    # Tải mô hình và tokenizer MỘT LẦN cho mỗi worker
    model_path = broadcasted_model_path.value
    tokenizer_path = broadcasted_tokenizer_path.value
    
    if not os.path.exists(model_path) or not os.path.exists(tokenizer_path):
        # Xử lý lỗi nếu file không tồn tại trên worker
        raise FileNotFoundError(f"Model/Tokenizer không tìm thấy trên worker. Đảm bảo {model_path} và {tokenizer_path} có thể truy cập được.")
        
    model = load_model(model_path)
    with open(tokenizer_path, 'rb') as handle:
        tokenizer = pickle.load(handle)
    
    # Hàm làm sạch văn bản (cần định nghĩa lại bên trong UDF)
    def clean_text_udf(text):
        text = str(text).lower()
        text = re.sub(r'[^\w\s]', '', text) # Xóa dấu câu
        text = re.sub(r'\d+', '', text)      # Xóa số
        return text.strip()

    # Xử lý từng batch (pd.Series) được Spark gửi đến
    for comments_batch in iterator:
        # 1. Làm sạch văn bản
        cleaned_comments = comments_batch.apply(clean_text_udf)
        
        # 2. Tokenize và Pad
        sequences = tokenizer.texts_to_sequences(cleaned_comments)
        padded_sequences = pad_sequences(sequences, maxlen=MAX_SEQUENCE_LENGTH, padding='post')
        
        # 3. Dự đoán
        if len(padded_sequences) > 0:
            # model.predict trả về một list 8 mảng (mỗi mảng cho 1 khía cạnh)
            predictions = model.predict(padded_sequences, verbose=0)
            
            # 4. Định dạng lại đầu ra: [[aspect1_probs], [aspect2_probs], ...]
            # Chúng ta cần zip chúng lại để mỗi *hàng* chứa 8 mảng dự đoán
            # predictions là [arr_price, arr_shipping, ...]
            # zip(*predictions) sẽ nhóm [ (price_row1, ship_row1, ...), (price_row2, ship_row2, ...), ... ]
            # Chuyển đổi sang list để serialize
            result = [list(map(lambda x: x.tolist(), p)) for p in zip(*predictions)]
        else:
            result = []

        # 5. Trả về batch kết quả
        yield pd.Series(result)


A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.1.3 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.

Traceback (most recent call last):  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/opt/conda/lib/python3.11/site-packages/ipykernel_launcher.py", line 17, in <module>
    app.launch_new_instance()
  File "/opt/conda/lib/python3.11/site-packages/traitlets/config/application.py", line 1053, in launch_instance
    app.start()
  File "/opt/conda/lib/python3.11/site-packages/ipykernel/kernelapp.py", line 736, in start
    self.io_loop.start()
  File "/opt/conda/lib/python3.11/site-packages/tornado

AttributeError: _ARRAY_API not found

ImportError: PyArrow >= 4.0.0 must be installed; however, it was not found.

In [None]:
# Schema cho tin nhắn JSON từ Kafka (Thêm các cột nhãn)
json_fields = [StructField("review_text", StringType())]
for aspect in ASPECT_COLUMNS:
    # Nhãn gốc có thể là null nếu dùng None trong producer
    json_fields.append(StructField(aspect, IntegerType(), True)) # True = nullable

json_schema = StructType(json_fields)


# 1. Đọc từ Kafka
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_SERVER) \
    .option("subscribe", TOPIC_NAME) \
    .option("startingOffsets", "latest") \
    .load()

# 2. Parse JSON và lấy các cột
parsed_df = kafka_df.select(
    col("value").cast("string").alias("json_value")
).select(
    from_json(col("json_value"), json_schema).alias("data")
#).select("data.review_text") # Lấy tất cả các cột con từ 'data'
).select("data.*") # Sử dụng .* để lấy tất cả các trường trong struct 'data'


# 3. Áp dụng Pandas UDF để dự đoán (chỉ cần 'review_text')
predictions_df = parsed_df.withColumn(
    "predictions_prob",
    predict_sentiments_udf(col("review_text"))
)

# 4. Giải nén dự đoán và giữ lại nhãn thật
# Bắt đầu với dataframe chứa nhãn thật và xác suất dự đoán
result_df = predictions_df # predictions_df giờ đã chứa cả cột nhãn thật và predictions_prob

# UDF để map ngược index (0, 1, 2, 3) về nhãn (-1, 0, 1, 2)
udf_inverser = udf(lambda idx: inverse_label_map.get(idx, -99), IntegerType())

for i, aspect in enumerate(ASPECT_COLUMNS):
    # UDF để lấy index có xác suất cao nhất
    udf_extractor = udf(lambda prob_array: int(np.argmax(prob_array[i])), IntegerType())

    # Lấy ra index dự đoán (0-3)
    result_df = result_df.withColumn(
        f"pred_idx_{aspect}",
        udf_extractor(col("predictions_prob"))
    )
    # Ánh xạ ngược index về nhãn gốc (-1, 0, 1, 2)
    result_df = result_df.withColumn(
        f"pred_{aspect}",
        udf_inverser(col(f"pred_idx_{aspect}"))
    ).drop(f"pred_idx_{aspect}") # Xóa cột index trung gian

# Xóa cột xác suất không cần thiết nữa
result_df = result_df.drop("predictions_prob")

print("Đã định nghĩa luồng xử lý (bao gồm nhãn thật).")

In [None]:
# Chọn các cột cuối cùng để hiển thị
final_output_df = result_df.select(
    "review_text",
    "pred_Quality",
    "pred_Price",
    "pred_Shipping",
    "pred_Shop_Service",
    "pred_Size",
    "pred_Outlook",
    "pred_General",
    "pred_Others"
)

# Chạy stream và hiển thị ra console
query = final_output_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .start()

print("Query đã bắt đầu. Đang chờ dữ liệu từ Kafka...")
print("Chạy cell producer 'sendStream_reviews.ipynb' để gửi dữ liệu.")
print("Nhấn Interrupt Kernel (nút Stop) để dừng stream.")

try:
    query.awaitTermination()
except KeyboardInterrupt:
    print("Đang dừng query...")
    query.stop()
    print("Query đã dừng.")

In [None]:
# Chọn các cột cuối cùng để ghi vào memory (bao gồm nhãn thật và dự đoán)
display_columns = ["review_text"]
for aspect in ASPECT_COLUMNS:
    display_columns.append(aspect) # Cột nhãn thật
    display_columns.append(f"pred_{aspect}") # Cột dự đoán

final_output_df_mem = result_df.select(*display_columns) # Dùng *

# Nếu bạn muốn hiển thị kết quả trong một bảng (table) mà bạn có thể query:
query_memory = final_output_df_mem.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("review_predictions_table") \
    .start()

print("Query (memory) đã bắt đầu. Chạy cell tiếp theo để xem kết quả và đánh giá.")

In [None]:
from IPython.display import display, clear_output
import time

try:
    while True:
        clear_output(wait=True)
        print("Đang làm mới... (Nhấn Interrupt Kernel để dừng)")
        # Hiển thị bảng từ memory
        display(spark.sql("SELECT * FROM review_predictions_table").toPandas())
        time.sleep(5) # Làm mới sau mỗi 5 giây
except KeyboardInterrupt:
    print("Đã dừng hiển thị.")
    query_memory.stop()

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col
import pandas as pd

print("--- Bắt đầu Đánh giá Mô hình trên Dữ liệu Đã Thu thập từ Stream ---")

# --- 1. Đọc dữ liệu dự đoán và nhãn thật từ memory sink ---
try:
    # Bảng này giờ đã chứa cả nhãn thật (vd: 'Price') và dự đoán (vd: 'pred_Price')
    eval_df = spark.sql("SELECT * FROM review_predictions_table")

    if eval_df.count() == 0:
        print("Chưa có dữ liệu trong bảng 'review_predictions_table'. Hãy đợi stream chạy.")
    else:
        print(f"Đã đọc {eval_df.count()} bản ghi từ memory sink để đánh giá.")

        # --- 2. Đánh giá từng khía cạnh ---
        evaluator_f1 = MulticlassClassificationEvaluator(metricName="f1")
        evaluator_accuracy = MulticlassClassificationEvaluator(metricName="accuracy")

        print("\n--- KẾT QUẢ ĐÁNH GIÁ ---")
        total_accuracy = 0
        total_f1 = 0
        valid_aspects = 0

        for aspect in ASPECT_COLUMNS:
            true_col = aspect
            pred_col = f"pred_{aspect}"

            # Chọn cột nhãn và dự đoán, đổi tên, bỏ null
            aspect_eval_df = eval_df.select(
                col(true_col).cast("double").alias("label"),
                col(pred_col).cast("double").alias("prediction")
            ).na.drop() # Rất quan trọng: Bỏ qua nếu nhãn thật là null (-99 hoặc None)

            count = aspect_eval_df.count()
            if count > 0:
                f1_score = evaluator_f1.evaluate(aspect_eval_df)
                accuracy = evaluator_accuracy.evaluate(aspect_eval_df)
                print(f"Khía cạnh: {aspect} ({count} bản ghi)")
                print(f"  Accuracy: {accuracy:.4f}")
                print(f"  F1-Score: {f1_score:.4f}")
                total_accuracy += accuracy
                total_f1 += f1_score
                valid_aspects += 1
            else:
                 print(f"Khía cạnh: {aspect} - Không có dữ liệu hợp lệ (non-null) để đánh giá.")

        # Tính trung bình nếu có khía cạnh hợp lệ
        if valid_aspects > 0:
            avg_accuracy = total_accuracy / valid_aspects
            avg_f1 = total_f1 / valid_aspects
            print("\n--- Trung bình ---")
            print(f"  Average Accuracy: {avg_accuracy:.4f}")
            print(f"  Average F1-Score: {avg_f1:.4f}")
        print("--------------------")


except Exception as e:
    print(f"Lỗi khi truy vấn hoặc đánh giá bảng 'review_predictions_table': {e}")
    print("Hãy đảm bảo query ghi vào memory sink đang chạy và đã xử lý dữ liệu.")

print("\n--- Đánh giá Hoàn tất ---")