In [34]:
from typing import List
import findspark
from my_model import BiLSTM_CNN
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from settings import CATERGORIES, CHECKPOINT_PATH, KAFKA_SERVERS, LABELS, MAX_LEN, TOPIC_NAME
from pyspark.sql.types import StructType,StructField, StringType, ArrayType, IntegerType
import tensorflow as tf
import tensorflow_addons as tfa
import numpy as np

scala_version = '2.13'
spark_version = '3.3.1'
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
     'org.apache.kafka:kafka-clients:3.3.1'
]

spark = SparkSession.builder.master("local")\
        .appName("ABSA")\
        .config("spark.jars.packages", ",".join(packages))\
        .getOrCreate()
# Reduce logging
sc = spark.sparkContext 

sc.setLogLevel("OFF")


model = BiLSTM_CNN()
model.build((None, MAX_LEN))
optimizer = tfa.optimizers.RectifiedAdam(total_steps=10000, warmup_proportion=0.1, min_lr=1e-5)
list_loss = ['categorical_crossentropy' for _ in range(len(CATERGORIES))]
model.compile(loss = list_loss, optimizer=optimizer, metrics=['accuracy'])
model.load_weights(CHECKPOINT_PATH).expect_partial()

model_bc = sc.broadcast(model)
# tokenizer_bc = sc.broadcast(INPUT_TOKERNIZER)
settings_bc = sc.broadcast({
        'MAX_LEN':MAX_LEN,
        'CATERGORIES':CATERGORIES,
        'LABELS':LABELS
})

def get_result(vector:List[int]):
    # text = clean_doc(text)
    # origin = text
    # text = tokenizer_bc.value.texts_to_sequences([text])
    # text = pad_sequences(text, settings_bc.value['MAX_LEN'], padding="post").tolist()
    vector = tf.convert_to_tensor([vector])
    #predict
    predict = model_bc.value.predict(vector, verbose=0)
    predict = ['{'+ f'{settings_bc.value["CATERGORIES"][id]}, {settings_bc.value["LABELS"][np.argmax(label)]}' +'}'
                     for id, label in enumerate(predict) if np.argmax(label)]
    
    predict = "{" + ", ".join(predict) +"}"
    # print(f'\n{predict}\n =====================\n')
    return predict

udf_get_result = F.udf(get_result,StringType())


Keras weights file (<HDF5 file "variables.h5" (mode r+)>) saving:
...BiLSTM
......vars
...BiLSTM\backward_layer
......vars
...BiLSTM\backward_layer\cell
......vars
.........0
.........1
.........2
...BiLSTM\forward_layer
......vars
...BiLSTM\forward_layer\cell
......vars
.........0
.........1
.........2
...BiLSTM\layer
......vars
...BiLSTM\layer\cell
......vars
...classifiers\dense
......vars
.........0
.........1
...classifiers\dense_1
......vars
.........0
.........1
...classifiers\dense_10
......vars
.........0
.........1
...classifiers\dense_11
......vars
.........0
.........1
...classifiers\dense_2
......vars
.........0
.........1
...classifiers\dense_3
......vars
.........0
.........1
...classifiers\dense_4
......vars
.........0
.........1
...classifiers\dense_5
......vars
.........0
.........1
...classifiers\dense_6
......vars
.........0
.........1
...classifiers\dense_7
......vars
.........0
.........1
...classifiers\dense_8
......vars
.........0
.........1
...classifiers\dense

In [35]:
spark

In [36]:
from settings import INPUT_TOKERNIZER
from typing import List
tokenizer_bc = sc.broadcast(INPUT_TOKERNIZER)

def short_vector(vector:List[int]):
    while vector[-1] == 0:
        vector.pop()
    return vector

udf_short_vector = F.udf(short_vector,ArrayType(IntegerType()))

def vector2text(vector:List[int]):
    text:str = tokenizer_bc.value.sequences_to_texts([vector])
    # return " ".join(text[0])
    return text

udf_vector2text = F.udf(vector2text,StringType())

In [37]:
from utils import clean_doc
from keras.utils import pad_sequences

def test(text:str):
    text = clean_doc(text)
    text = INPUT_TOKERNIZER.texts_to_sequences([text])
    text = pad_sequences(text, 96, padding="post").tolist()
    text = tf.convert_to_tensor(text)
    #predict
    predict = model.predict(text, verbose=0)
    predict = ['{'+ f'{settings_bc.value["CATERGORIES"][id]}, {settings_bc.value["LABELS"][np.argmax(label)]}' +'}'
                     for id, label in enumerate(predict) if np.argmax(label)]
    
    predict = "{" + ", ".join(predict) +"}"
    return predict

In [38]:
# Thức ăn rất ngon, nêm nếm vừa ăn, khẩu phần ăn nhiều.

In [39]:
test('Nhân viên nhiệt tình nhưng mà quán đông nên cũng phải chờ khá lâu nếu kêu thêm gì đó.')

'{{SERVICE#GENERAL, negative}, {RESTAURANT#GENERAL, negative}}'

In [40]:
tokenizer_bc.value.sequences_to_texts([[6, 51, 347, 482, 270, 406, 20, 610, 1114,1,1,1,1]])

['không gian ấm cúng sang trọng mà cổ điển UNK UNK UNK UNK']

In [41]:
kafkaDf = spark.read.format("kafka")\
    .option("kafka.bootstrap.servers", KAFKA_SERVERS)\
    .option("subscribe", TOPIC_NAME)\
    .option("startingOffsets", "earliest")\
    .load()

query = kafkaDf\
    .select(
        F.col('key').cast('string').alias('review_id'),
        F.split(F.col('value').cast('string'),",").cast(ArrayType(IntegerType())).alias('vector'),
        # udf_get_result(F.col('value').cast('string')).alias('predict')
    )#.printSchema()



In [42]:
query1 = query.select(
    F.col('review_id'),
    udf_short_vector(F.col('vector')).alias('short_vector'),
    udf_get_result(F.col('vector')).alias('predict'),
)#.printSchema()
query2 = query1.select(
    F.col('review_id'),
    udf_vector2text(F.col('short_vector')).alias('text'),
    F.col('predict'),
    # F.col('short_vector'),
)

In [43]:
print(query2.show(21, truncate=False))

+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------+
|review_id |text                                                                                                                                                                                                             |predict                                                     |
+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------+
|1871981006|[thức ăn khá ngon và các bạn phục vụ nhiệt tình đặc biệt các bạn UNK nhất tinh nhất và hiếu phục vụ vui vẻ]                             