In [3]:
import os

os.environ['PYSPARK_PYTHON'] = r"C:\Users\admin\anaconda3\envs\absa_env\python.exe"
os.environ['PYSPARK_DRIVER_PYTHON'] = r"C:\Users\admin\anaconda3\envs\absa_env\python.exe"

In [1]:
# ===============================================
# Spark Streaming + Kafka + PhoBERT ABSA Inference
# ===============================================

import os
import re
import json
import torch
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType
from transformers import AutoTokenizer, AutoModel
import torch.nn as nn

# =================== Environment ===================
os.environ["PYSPARK_PYTHON"] = r"C:\Users\admin\anaconda3\envs\absa_env\python.exe"
os.environ["PYSPARK_DRIVER_PYTHON"] = r"C:\Users\admin\anaconda3\envs\absa_env\python.exe"

# =================== Kafka / Spark ===================
KAFKA_SERVER = "localhost:9092"
TOPIC_NAME   = "absa-topic-luanlt"

scala_version = "2.12"
spark_version = "3.5.1"
packages = [
    f"org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}",
    "org.apache.kafka:kafka-clients:3.5.1"
]

spark = (SparkSession.builder
         .appName("Kafka_PhoBERT_ABSA_Streaming")
         .master("local[1]")
         .config("spark.driver.host", "127.0.0.1")
         .config("spark.jars.packages", ",".join(packages))
         .config("spark.sql.execution.arrow.pyspark.enabled", "false")
         .getOrCreate())
spark.sparkContext.setLogLevel("ERROR")


  from .autonotebook import tqdm as notebook_tqdm


In [None]:

# =================== Model / Tokenizer ===================
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
SAVE_DIR = "saved_absa_model"

with open(f"{SAVE_DIR}/absa_config.json", "r", encoding="utf-8") as f:
    cfg = json.load(f)

ASPECT_COLS = cfg["aspect_cols"]

class MultiTaskPhoBERT(nn.Module):
    def __init__(self, model_name, num_aspects, num_classes):
        super().__init__()
        self.encoder = AutoModel.from_pretrained(model_name)
        hidden_size = self.encoder.config.hidden_size
        self.num_aspects = num_aspects
        self.num_classes = num_classes
        self.dropout = nn.Dropout(0.1)
        self.classifiers = nn.ModuleList([nn.Linear(hidden_size, num_classes) for _ in range(num_aspects)])
    def forward(self, input_ids, attention_mask):
        outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask, return_dict=True)
        cls_hidden = outputs.last_hidden_state[:, 0, :]
        pooled = self.dropout(cls_hidden)
        logits = [head(pooled) for head in self.classifiers]
        return torch.stack(logits, dim=1)  # (B, num_aspects, num_classes)

# Load model & tokenizer once
model = MultiTaskPhoBERT(cfg["model_name"], len(ASPECT_COLS), cfg["num_classes"])
model.load_state_dict(torch.load(f"{SAVE_DIR}/pytorch_model.bin", map_location=DEVICE))
model.to(DEVICE)
model.eval()

tokenizer = AutoTokenizer.from_pretrained(SAVE_DIR)

# =================== Text normalization (optional) ===================
def normalize_text(text: str) -> str:
    if not isinstance(text, str): text = str(text)
    text = text.lower().strip()
    text = re.sub(r"[^\w\sáàảãạăắằẳẵặâấầẩẫậéèẻẽẹêếềểễệíìỉĩịóòỏõọôốồổỗộơớờởỡợúùủũụưứừửữựýỳỷỹỵđ]", " ", text)
    text = re.sub(r"\s+", " ", text)
    return text

# =================== Inference function ===================
def infer_aspects(texts):
    texts = [normalize_text(t) for t in texts]
    enc = tokenizer(texts, padding=True, truncation=True, max_length=cfg["max_len"], return_tensors="pt")
    with torch.no_grad():
        logits = model(enc["input_ids"].to(DEVICE), enc["attention_mask"].to(DEVICE))
        preds = torch.argmax(logits, dim=-1).cpu().numpy()
    return preds  # (batch_size, num_aspects)

# =================== Kafka Stream Schema ===================
schema = StructType([
    StructField("Review", StringType())
])

df_raw = (spark.readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", "localhost:9092")
          .option("subscribe", "absa-topic-luanlt")
          .option("startingOffsets", "latest")  # important for testing
          .load())

stream_df = (df_raw.selectExpr("CAST(value AS STRING) AS json")
             .select(from_json(col("json"), schema).alias("d"))
             .select("d.*"))

# =================== foreachBatch ===================
def foreach_batch(batch_df, batch_id):
    pdf = batch_df.toPandas()
    if pdf.empty:
        print(f"\n=== Batch {batch_id} EMPTY ===\n")
        return

    reviews = pdf["Review"].fillna("").tolist()
    preds = infer_aspects(reviews)

    rows = []
    for txt, p in zip(reviews, preds):
        row = {
            "Review": txt,
            "StudentName": "LeTrongLuan"   # thêm cột StudentName
        }
        for a, l in zip(ASPECT_COLS, p):
            row[f"{a}_pred"] = l
        rows.append(row)

    pdf_out = pd.DataFrame(rows)
    print(f"\n=== Batch {batch_id} ===")
    print(pdf_out.head(5))

# =================== Start streaming ===================
query = (stream_df.writeStream
         .foreachBatch(foreach_batch)
         .trigger(processingTime="5 seconds")
         .start())

query.awaitTermination()


=== Batch 0 EMPTY ===


=== Batch 1 ===
                                              Review  StudentName  Price_pred  \
0  điểm yếu k có hộp về giầy bị móp đi hơi đau ở ...  LeTrongLuan           0   
1                     Rẻ mà ngon nha.. 100/100 điểm.  LeTrongLuan           2   

   Shipping_pred  Outlook_pred  Quality_pred  Size_pred  Shop_Service_pred  \
0              0             0             1          0                  1   
1              0             0             0          0                  0   

   General_pred  Others_pred  
0             0            0  
1             2            0  

=== Batch 2 ===
                                              Review  StudentName  Price_pred  \
0            Hàng đúng như mô tả, giao hàng hơi lâu.  LeTrongLuan           0   
1        Giao hàng nhanh nha,giày đẹp nên mua nha mn  LeTrongLuan           0   
2  Hàng giao nhanh giày vừa chân nhưng mà mùi hôi...  LeTrongLuan           0   

   Shipping_pred  Outlook_pred  Quality_pred 