In [None]:
!git clone https://github.com/pt1127/Streaming-Data-With-Spark.git

In [None]:
# Install PySpark and Spark NLP
! pip install -q pyspark==3.1.2 spark-nlp

In [None]:
from pyspark.ml import PipelineModel
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.functions import when

In [None]:
best_model = PipelineModel.load('/content/Streaming-Data-With-Spark/best_model')

In [None]:
spark = (
    SparkSession.builder.appName("SentimentAnalysis")
    .config("spark.executor.memory", "8g")
    .getOrCreate()
)

In [None]:
example = spark.createDataFrame([['Tôi là một con người tốt bụng']]).toDF("text")

In [None]:
user_regex = r"(@\w{1,15})"
hashtag_replace_regex = "#(\w{1,})"
url_regex = r"((https?|ftp|file):\/{2,3})+([-\w+&@#/%=~|$?!:,.]*)|(www.)+([-\w+&@#/%=~|$?!:,.]*)"
email_regex = r"[\w.-]+@[\w.-]+\.[a-zA-Z]{1,}"
i_regex = r"i "

def cleaning_process(data):
            # Loại bỏ @Mention khỏi text
    data=(data.withColumn("text",f.regexp_replace(f.col("text"), user_regex, "")) 
            # Loại bỏ #Hashtag khỏi text
             .withColumn("text",f.regexp_replace(f.col("text"), hashtag_replace_regex, "$1"))
            # Loại bỏ URL khỏi text
            .withColumn("text",f.regexp_replace(f.col("text"), url_regex, "")) 
            # Loại bỏ Email khỏi text
            .withColumn("text",f.regexp_replace(f.col("text"), email_regex, ""))
             # Chuẩn hoá viết thường
            .withColumn("text",f.lower(f.col("text")))
            # Loại bỏ số cũng như các ký tự khỏi đoạn text
            .withColumn("text",f.regexp_replace(f.col("text"), '[^a-záàảãạăắằẳẵặâấầẩẫậéèẻẽẹêếềểễệóòỏõọôốồổỗộơớờởỡợíìỉĩịúùủũụưứừửữựýỳỷỹỵđ]', " "))
            # Loại bỏ các khoảng trắng thừa trong câu
            .withColumn("text",f.regexp_replace(f.col("text"), " +", " "))
            # Loại bỏ các khoảng trắng đầu và cuối câu
            .withColumn("text",f.trim(f.col("text")))
            # Giữ lại các dòng mà đoạn text có nội dung 
            .filter(f.col("text") != ""))
    
    return data

In [None]:
example = cleaning_process(example).withColumnRenamed('text', 'clean_text')

In [None]:
result = best_model.transform(example).select('prediction')\
                  .withColumn('prediction_text', when(f.col('prediction') == 0, 'CLEAN').when(f.col('prediction') == 1, 'OFFENSIVE').otherwise('HATE'))\
                  .select('prediction_text')

In [None]:
result.collect()[0][0]