In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

from pyspark.ml.feature import (
    Tokenizer,
    StopWordsRemover,
    HashingTF,
    IDF
)

from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


1. INITIALIZE SPARK SESSION

In [3]:
# khởi tạo spark session
spark = SparkSession.builder.appName("SentimentAnalysis").getOrCreate()

2. LOAD DATA AND PREPROCESS

In [4]:
data_path = "C:\\Users\\DoubleDD\\HUS\\NLP&DL\\datasets\\sentiments.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)
df.printSchema

<bound method DataFrame.printSchema of DataFrame[text: string, sentiment: int]>

In [5]:
df.show(5)

+--------------------+---------+
|                text|sentiment|
+--------------------+---------+
|Kickers on my wat...|        1|
|user: AAP MOVIE. ...|        1|
|user I'd be afrai...|        1|
|   MNTA Over 12.00  |        1|
|    OI  Over 21.37  |        1|
+--------------------+---------+
only showing top 5 rows



In [6]:
# Convert -1/1 labels to 0/1: Normalize sentiment labels (Spark yêu cầu nhãn phải là số 0 hoặc 1 cho Logistic Regression.)
df = df.withColumn("label", (col("sentiment").cast("integer") + 1) / 2)

initial_row_count = df.count()
df = df.dropna(subset=["sentiment"])
initial_row_count, df.count()

(5792, 5791)

In [7]:
# Chia train, test tỉ lệ 80:20
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
train_df.count(), test_df.count()

(4682, 1109)

3. BUILD PRE-PROCESSING PIPELINE

In [63]:
# A Pipeline in Spark ML consists of a sequence of Transformer and Estimator.

# (1) Tokenizer - Tách từ 
tokenizer = Tokenizer(inputCol="text", outputCol="words", )

# (2) StopWordsRemover – Loại bỏ stopwords
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

# (3) HashingTF – Vector hóa văn bản. Là 1 Vectorizer
# Biến danh sách token thành vector theo kỹ thuật:
    # Hashing trick
    # Kích thước cố định (10.000 chiều)
    # Không cần từ điển (dictionary-free)
hashingTF = HashingTF(
    inputCol="filtered_words", 
    outputCol="raw_features",
    numFeatures=10000
)

# (4) IDF – Inverse Document Frequency
# IDF điều chỉnh trọng số:
    # Giảm tầm quan trọng từ xuất hiện nhiều
    # Tăng trọng số từ có tính phân biệt
    # → Kết quả cuối cùng là vector TF-IDF.
idf = IDF(inputCol="raw_features", outputCol="features")



In [74]:
# (5) Huấn luyện mô hình
lr = LogisticRegression(
    maxIter=10, 
    regParam=0.001,  # regParam điều chỉnh regularization để giảm overfitting.
    featuresCol="features", 
    labelCol="label"
)


In [75]:
# (6) Ghép thành pipeline
pipeline = Pipeline(stages=[tokenizer, stopwordsRemover, hashingTF, idf, lr])

4. TRAIN MODEL

In [76]:
model = pipeline.fit(train_df)

In [77]:
# Predict on test data
predictions = model.transform(test_df)
predictions.show(10, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------+---------+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

5. EVALUATING

In [78]:
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(accuracy)

0.7294860234445446


6. MODEL IMPROVEMENT

```
Một chút nhận xét về mô hình phía trên: Với accuracy khá khiêm tốn là 0.729, em triển khai 1 số kĩ thuật sau với kỳ vọng sẽ cải tiến được độ chính xác của mô hình:
    
    - thay thế simple Tokenizer bằng RegexTokenizer (-> noise filtering)
    - thay thế HashingTF bằng CounterVectorizer (-> vocabulary reduction. Cụ thể vocab sẽ không thêm vào các từ xuất hiện quá hiếm, 
        thông qua 2 params tương ứng là minDF, minTF. Ví dụ: 
            minDF=5,          # remove very rare words (<5 documents)
            minTF=2           # remove words appearing <2 times per doc
        This helps reduce the dimensionality of the feature space and combat noise.)
    - giảm chiều: thông qua tham số vocabSize của CountVectorizer (-> Reduce TF-IDF dimensionality)
```   

In [87]:
from pyspark.ml.feature import RegexTokenizer, CountVectorizer

tokenizer2 = RegexTokenizer(
    inputCol="text",
    outputCol="words",
    pattern=r"\W+"         # split by any non-word (better than simple Tokenizer)
)

# ============================================================
# (2) StopWordsRemover
# ============================================================
stopwordsRemover2 = StopWordsRemover(
    inputCol="words",
    outputCol="filtered_words"
)

# ============================================================
# (B) VOCABULARY REDUCTION (replace HashingTF)
#    CountVectorizer allows: minDF, maxVocabSize
# ============================================================
vectorizer2 = CountVectorizer(
    inputCol="filtered_words",
    outputCol="raw_features",
    vocabSize=10000,   # reduce dimensionality
    minDF=5,          # remove very rare words (<5 documents)
    minTF=2           # remove words appearing <2 times per doc
)

# ============================================================
# (4) IDF
# ============================================================
idf2 = IDF(
    inputCol="raw_features",
    outputCol="features"
)

# ============================================================
# (5) Logistic Regression model
# ============================================================
lr2 = LogisticRegression(
    maxIter=20,
    regParam=0.01,
    featuresCol="features",
    labelCol="label"
)

# ============================================================
# (6) Pipeline
# ============================================================
pipeline2 = Pipeline(stages=[
    tokenizer2,
    stopwordsRemover2,
    vectorizer2,
    idf2,
    lr2
])

In [88]:
model2 = pipeline2.fit(train_df)

# Predict on test data
predictions2 = model2.transform(test_df)
predictions2.show(10, truncate=False)

evaluator2 = MulticlassClassificationEvaluator(metricName="accuracy")
accuracy2 = evaluator2.evaluate(predictions2)

print(accuracy2)

+---------------------------------------------------------------------------------------------------------------------------------------------------+---------+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------+------------+------------+----------------------------------------+---------------------------------------+----------+
|text                                                                                                                                               |sentiment|label|words                                                                                                                                                          |filtered_words                                                                               