# üß† Sentiment Analysis on Amazon Reviews (PySpark + PyTorch)

M·ª•c ti√™u: 
- L√†m s·∫°ch d·ªØ li·ªáu ƒë√°nh gi√° s·∫£n ph·∫©m b·∫±ng PySpark.
- Chuy·ªÉn ƒë·ªïi d·ªØ li·ªáu ƒë√£ x·ª≠ l√Ω sang tensor ƒë·ªÉ hu·∫•n luy·ªán m√¥ h√¨nh PyTorch.

C√¥ng c·ª•:
- PySpark (x·ª≠ l√Ω d·ªØ li·ªáu l·ªõn)
- PyTorch (hu·∫•n luy·ªán m√¥ h√¨nh Deep Learning)


## Kh√°m ph√° d·ªØ li·ªáu (Exploratory Data Analysis - EDA)

Trong ph·∫ßn n√†y, ch√∫ng ta s·∫Ω:
- ƒê·ªçc d·ªØ li·ªáu g·ªëc t·ª´ file `.csv`
- Ki·ªÉm tra schema, s·ªë l∆∞·ª£ng d√≤ng/c·ªôt
- Ki·ªÉm tra d·ªØ li·ªáu thi·∫øu (null/missing)
- Chu·∫©n ho√° c·ªôt `Rating`
- Ph√¢n t√≠ch ph√¢n b·ªë ƒëi·ªÉm ƒë√°nh gi√° v√† c·∫£m x√∫c
- T√≠nh to√°n ƒë·ªô d√†i review
- Ph√¢n t√≠ch theo th·ªùi gian (n·∫øu c√≥ c·ªôt ng√†y ƒë√°nh gi√°)


### Import th∆∞ vi·ªán

In [20]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import Tokenizer, StopWordsRemover

import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim

### T·∫°o Spark Session

In [21]:
# ==============================================================
# üîπ T·∫°o SparkSession
# ==============================================================

spark = SparkSession.builder \
    .appName("Amazon_Review_Sentiment") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

### ƒê·ªçc d·ªØ li·ªáu g·ªëc

In [22]:
# ==============================================================
# üîπ ƒê·ªçc d·ªØ li·ªáu g·ªëc
# ==============================================================

data_path = "../data/raw/Amazon_Reviews.csv"

df = spark.read.option("header", True).option("inferSchema", True).csv(data_path)

print("ƒê·ªçc d·ªØ li·ªáu th√†nh c√¥ng.")
df.show(5, truncate=False)


ƒê·ªçc d·ªØ li·ªáu th√†nh c√¥ng.
+----------------+-------------------------------+-------+------------+------------------------+----------------------+-----------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+
|Reviewer Name   |Profile Link                   |Country|Review Count|Review Date             |Rating                |Review Title                                   |R

### Th√¥ng tin t·ªïng quan v·ªÅ d·ªØ li·ªáu

In [23]:
# ==============================================================
# üîπ Th√¥ng tin t·ªïng quan
# ==============================================================

print("Schema:")
df.printSchema()

print(f"T·ªïng s·ªë d√≤ng: {df.count():,}")
print(f"T·ªïng s·ªë c·ªôt: {len(df.columns)}")
print("Danh s√°ch c·ªôt:", df.columns)


Schema:
root
 |-- Reviewer Name: string (nullable = true)
 |-- Profile Link: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Review Count: string (nullable = true)
 |-- Review Date: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Review Title: string (nullable = true)
 |-- Review Text: string (nullable = true)
 |-- Date of Experience: string (nullable = true)

T·ªïng s·ªë d√≤ng: 21,656
T·ªïng s·ªë c·ªôt: 9
Danh s√°ch c·ªôt: ['Reviewer Name', 'Profile Link', 'Country', 'Review Count', 'Review Date', 'Rating', 'Review Title', 'Review Text', 'Date of Experience']


### Ki·ªÉm tra gi√° tr·ªã thi·∫øu

In [24]:
# ==============================================================
# üîπ Ki·ªÉm tra gi√° tr·ªã thi·∫øu
# ==============================================================

null_df = df.select([
    F.count(F.when(F.col(c).isNull() | (F.col(c) == ''), c)).alias(c)
    for c in df.columns
])
print("S·ªë l∆∞·ª£ng gi√° tr·ªã null ho·∫∑c r·ªóng trong t·ª´ng c·ªôt:")
null_df.show(vertical=True)

S·ªë l∆∞·ª£ng gi√° tr·ªã null ho·∫∑c r·ªóng trong t·ª´ng c·ªôt:
-RECORD 0-----------------
 Reviewer Name      | 0   
 Profile Link       | 141 
 Country            | 436 
 Review Count       | 523 
 Review Date        | 565 
 Rating             | 581 
 Review Title       | 590 
 Review Text        | 595 
 Date of Experience | 920 



### Th·ªëng k√™ c·ªôt Rating

In [25]:
# ==============================================================
# üîπ Th·ªëng k√™ c·ªôt Rating
# ==============================================================

df = df.withColumn(
    "Rating",
    F.regexp_extract(F.col("Rating"), r"(\d)(?=\s*out of 5)", 1).cast(IntegerType())
)

# Th·ªëng k√™ ƒëi·ªÉm ƒë√°nh gi√°
rating_stats = df.groupBy("Rating").agg(
    F.count("*").alias("count")
).orderBy("Rating")

print("\n Th·ªëng k√™ ƒëi·ªÉm Rating:")
rating_stats.show()


 Th·ªëng k√™ ƒëi·ªÉm Rating:
+------+-----+
|Rating|count|
+------+-----+
|  NULL|  601|
|     1|13123|
|     2| 1227|
|     3|  885|
|     4| 1292|
|     5| 4528|
+------+-----+



### Th·ªëng k√™ ƒë·ªô d√†i review

In [26]:
# ==============================================================
# üîπ Th·ªëng k√™ ƒë·ªô d√†i n·ªôi dung Review
# ==============================================================

if "Review Text" in df.columns:
    df = df.withColumn("ReviewLength", F.length(F.col("Review Text")))

    review_stats = df.select(
        F.mean("ReviewLength").alias("mean_len"),
        F.min("ReviewLength").alias("min_len"),
        F.max("ReviewLength").alias("max_len")
    ).collect()[0]

    print(f"ƒê·ªô d√†i trung b√¨nh: {review_stats['mean_len']:.2f}")
    print(f"Ng·∫Øn nh·∫•t: {review_stats['min_len']}, D√†i nh·∫•t: {review_stats['max_len']}")


ƒê·ªô d√†i trung b√¨nh: 422.60
Ng·∫Øn nh·∫•t: 6, D√†i nh·∫•t: 8166


### Ph√¢n t√≠ch theo th·ªùi gian

In [27]:
# ==============================================================
# üîπ Ph√¢n t√≠ch theo th·ªùi gian (n·∫øu c√≥ c·ªôt Review Date)
# ==============================================================

if "Review Date" in df.columns:
    df = df.withColumn("Review Date", F.to_date(F.col("Review Date")))
    time_dist = df.groupBy(F.year("Review Date").alias("year")).count().orderBy("year")
    print("S·ªë l∆∞·ª£ng review theo nƒÉm:")
    time_dist.show()

S·ªë l∆∞·ª£ng review theo nƒÉm:
+----+-----+
|year|count|
+----+-----+
|NULL|  601|
|2007|    1|
|2008|    3|
|2009|    9|
|2010|   22|
|2011|  360|
|2012| 1166|
|2013|   88|
|2014|  166|
|2015|  175|
|2016|  317|
|2017|  616|
|2018| 1041|
|2019| 2314|
|2020| 2638|
|2021| 2750|
|2022| 2551|
|2023| 4017|
|2024| 2821|
+----+-----+



## Ti·ªÅn x·ª≠ l√Ω d·ªØ li·ªáu (Data Preprocessing)

Trong ph·∫ßn n√†y, ch√∫ng ta s·∫Ω:
- Chu·∫©n ho√° c·ªôt **Rating** v·ªÅ gi√° tr·ªã s·ªë (1‚Äì5)
- G√°n nh√£n **Sentiment** (`positive`, `negative`)
- L√†m s·∫°ch vƒÉn b·∫£n: xo√° k√Ω t·ª± ƒë·∫∑c bi·ªát, ch·ªØ th∆∞·ªùng, lo·∫°i b·ªè stopwords
- K·∫øt h·ª£p ti√™u ƒë·ªÅ + n·ªôi dung review
- L∆∞u d·ªØ li·ªáu ƒë√£ chu·∫©n ho√° ra file `.parquet` ƒë·ªÉ ph·ª•c v·ª• hu·∫•n luy·ªán m√¥ h√¨nh.


### Chu·∫©n ho√° c·ªôt Rating

In [28]:
# ==============================================================
# üîπ Chu·∫©n ho√° c·ªôt Rating
# ==============================================================
df = df.withColumn(
    "Rating",
    F.regexp_extract(F.col("Rating"), r"(\d+)", 1).cast(IntegerType())
)

df = df.filter((F.col("Rating") >= 1) & (F.col("Rating") <= 5))
print("ƒê√£ chu·∫©n ho√° c·ªôt Rating v·ªÅ kho·∫£ng [1‚Äì5].")

df.select("Rating").distinct().orderBy("Rating").show()

ƒê√£ chu·∫©n ho√° c·ªôt Rating v·ªÅ kho·∫£ng [1‚Äì5].
+------+
|Rating|
+------+
|     1|
|     2|
|     3|
|     4|
|     5|
+------+



### G√°n nh√£n Sentiment

In [29]:
# ==============================================================
# üîπ G√°n nh√£n c·∫£m x√∫c d·ª±a tr√™n Rating
# ==============================================================

df = df.withColumn(
    "Sentiment",
    F.when(F.col("Rating") <= 2, "negative")
     .otherwise("positive")
)

df.groupBy("Sentiment").count().show()


+---------+-----+
|Sentiment|count|
+---------+-----+
| positive| 6705|
| negative|14350|
+---------+-----+



### L√†m s·∫°ch vƒÉn b·∫£n

In [30]:
# ==============================================================
# üîπ L√†m s·∫°ch n·ªôi dung Review
# ==============================================================

if "Review Title" in df.columns and "Review Text" in df.columns:
    df = df.withColumn("FullReview", F.concat_ws(". ", "Review Title", "Review Text"))
elif "Review Text" in df.columns:
    df = df.withColumnRenamed("Review Text", "FullReview")

df = (
    df.withColumn("FullReview", F.lower("FullReview"))
      .withColumn("FullReview", F.regexp_replace("FullReview", r"[^a-zA-Z0-9\s]", " "))
      .withColumn("FullReview", F.regexp_replace("FullReview", r"\s+", " "))
      .filter(F.length("FullReview") > 10)
)

print("ƒê√£ l√†m s·∫°ch vƒÉn b·∫£n.")
df.select("FullReview").show(5, truncate=False)


ƒê√£ l√†m s·∫°ch vƒÉn b·∫£n.
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|FullReview                                                                                                                                                                                                                                                                                                                                      

### Lo·∫°i b·ªè Stopwords

In [31]:
# ==============================================================
# üîπ Tokenization & Stopword Removal
# ==============================================================

tokenizer = Tokenizer(inputCol="FullReview", outputCol="Words")
remover = StopWordsRemover(inputCol="Words", outputCol="FilteredWords")

df = tokenizer.transform(df)
df = remover.transform(df)
df = df.withColumn("CleanText", F.concat_ws(" ", "FilteredWords"))
df = df.filter(F.length("CleanText") > 10)

df.select("CleanText").show(5, truncate=False)
print("ƒê√£ lo·∫°i b·ªè stopwords.")


+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|CleanText                                                                                                                                                                                                                                                                                                                                                                                             |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### Lo·∫°i b·ªè d·ªØ li·ªáu tr√πng l·∫∑p

In [32]:
# ==============================================================
# Lo·∫°i b·ªè d·ªØ li·ªáu tr√πng l·∫∑p
# ==============================================================

before_count = df.count()
df = df.dropDuplicates(["CleanText"])
after_count = df.count()

print(f"ƒê√£ lo·∫°i b·ªè {before_count - after_count} d√≤ng tr√πng l·∫∑p.")
print(f"C√≤n l·∫°i {after_count} d√≤ng d·ªØ li·ªáu sau khi l√†m s·∫°ch & unique.")
df.select("CleanText").show(5, truncate=False)

                                                                                

ƒê√£ lo·∫°i b·ªè 81 d√≤ng tr√πng l·∫∑p.
C√≤n l·∫°i 20967 d√≤ng d·ªØ li·ªáu sau khi l√†m s·∫°ch & unique.




+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|CleanText                                                                                                                                                                                                                                                                                                                                                              |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

### Gi·ªØ c·ªôt quan tr·ªçng v√† l∆∞u d·ªØ li·ªáu

In [33]:
# ==============================================================
# üîπ Gi·ªØ c·ªôt c·∫ßn thi·∫øt v√† l∆∞u d·ªØ li·ªáu
# ==============================================================

keep_cols = ["Reviewer Name", "Country", "Review Date", "Rating", "CleanText", "Sentiment"]
df = df.select([c for c in keep_cols if c in df.columns])

print("D·ªØ li·ªáu ƒë√£ ƒë∆∞·ª£c x·ª≠ l√Ω v√† tinh g·ªçn:")
df.show(5, truncate=False)

output_path = "../data/processed"
df.write.mode("overwrite").parquet(output_path)
print(f"ƒê√£ l∆∞u d·ªØ li·ªáu ti·ªÅn x·ª≠ l√Ω t·∫°i: {output_path}")


D·ªØ li·ªáu ƒë√£ ƒë∆∞·ª£c x·ª≠ l√Ω v√† tinh g·ªçn:


                                                                                

+-------------+-------+-----------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+
|Reviewer Name|Country|Review Date|Rating|CleanText                                                                                                                                                                                                                                  



ƒê√£ l∆∞u d·ªØ li·ªáu ti·ªÅn x·ª≠ l√Ω t·∫°i: ../data/processed


                                                                                

### Data Validation sau khi preprocessing

In [34]:
# ==============================================================
# üîπ Data Validation sau khi preprocessing
# ==============================================================

from pyspark.sql import functions as F

# Ki·ªÉm tra null
print("Ki·ªÉm tra null:")
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show(vertical=True)

# Ki·ªÉm tra tr√πng l·∫∑p
dup_count = df.groupBy("CleanText").count().filter(F.col("count") > 1).count()
print(f"S·ªë d√≤ng tr√πng l·∫∑p: {dup_count}")

# Ki·ªÉm tra ph√¢n b·ªë rating
df.groupBy("Rating").count().orderBy("Rating").show()

print("Validation ho√†n t·∫•t ‚Äì d·ªØ li·ªáu ƒë√£ s·∫µn s√†ng cho feature engineering.")


Ki·ªÉm tra null:


                                                                                

-RECORD 0------------
 Reviewer Name | 0   
 Country       | 0   
 Review Date   | 0   
 Rating        | 0   
 CleanText     | 0   
 Sentiment     | 0   



                                                                                

S·ªë d√≤ng tr√πng l·∫∑p: 0




+------+-----+
|Rating|count|
+------+-----+
|     1|13105|
|     2| 1225|
|     3|  879|
|     4| 1274|
|     5| 4484|
+------+-----+

Validation ho√†n t·∫•t ‚Äì d·ªØ li·ªáu ƒë√£ s·∫µn s√†ng cho feature engineering.


                                                                                

## X√¢y d·ª±ng m√¥ h√¨nh Machine Learning

### Import th∆∞ vi·ªán

In [None]:
import pandas as pd
import numpy as np
from pyspark.sql import functions as F
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, accuracy_score
import joblib

### ƒê·ªçc d·ªØ li·ªáu v√† tr√≠ch xu·∫•t ƒë·∫∑c tr∆∞ng

In [36]:
df = spark.read.parquet("../data/processed/")

# Chuy·ªÉn sang pandas ƒë·ªÉ train model sklearn
pandas_df = df.select("CleanText", "Sentiment").toPandas()
pandas_df = pandas_df.dropna()

print("D·ªØ li·ªáu s·∫µn s√†ng ƒë·ªÉ train:", pandas_df.shape)
pandas_df.head()

D·ªØ li·ªáu s·∫µn s√†ng ƒë·ªÉ train: (20967, 2)


Unnamed: 0,CleanText,Sentiment
0,0 win complaint 0 win complaint amazon right ...,negative
1,100 shipping didn tell 100 shipping didn tell...,negative
2,10000000000 stars bad company 10000000000 sta...,negative
3,514 billion revenues least come good customer...,negative
4,600 order missing ve lot amazon orders incorr...,negative


### Chu·∫©n b·ªã d·ªØ li·ªáu cho m√¥ h√¨nh

In [38]:
X = pandas_df["CleanText"]
y = pandas_df["Sentiment"]

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

# TF-IDF vectorization
tfidf = TfidfVectorizer(max_features=5000, ngram_range=(1,2))
X_train_tfidf = tfidf.fit_transform(X_train)
X_test_tfidf = tfidf.transform(X_test)

print("TF-IDF shape:", X_train_tfidf.shape)

TF-IDF shape: (16773, 5000)


### ƒê·ªãnh nghƒ©a 3 m√¥ h√¨nh v√† tham s·ªë ƒë·ªÉ Cross Validation

In [39]:
models = {
    "logistic_regression": {
        "model": LogisticRegression(max_iter=200),
        "params": {"C": [0.1, 1, 5]}
    },
    "svm": {
        "model": LinearSVC(),
        "params": {"C": [0.1, 1, 5]}
    },
    "random_forest": {
        "model": RandomForestClassifier(),
        "params": {"n_estimators": [100, 200], "max_depth": [10, 20]}
    }
}

### Train v√† Cross Validation cho t·ª´ng m√¥ h√¨nh

In [40]:
best_models = {}

for name, cfg in models.items():
    print(f"\n Training {name} ...")
    
    grid = GridSearchCV(
        estimator=cfg["model"],
        param_grid=cfg["params"],
        cv=5,
        scoring="accuracy",
        n_jobs=-1,
        verbose=1
    )
    
    grid.fit(X_train_tfidf, y_train)
    
    best_model = grid.best_estimator_
    y_pred = best_model.predict(X_test_tfidf)
    acc = accuracy_score(y_test, y_pred)
    
    print(f" {name} accuracy: {acc:.4f}")
    print(classification_report(y_test, y_pred))
    
    # L∆∞u model t·ªët nh·∫•t
    joblib.dump(best_model, f"../models/{name}_model.pkl")
    best_models[name] = (best_model, acc)


 Training logistic_regression ...
Fitting 5 folds for each of 3 candidates, totalling 15 fits
 logistic_regression accuracy: 0.9266
              precision    recall  f1-score   support

    negative       0.93      0.97      0.95      2866
    positive       0.93      0.83      0.88      1328

    accuracy                           0.93      4194
   macro avg       0.93      0.90      0.91      4194
weighted avg       0.93      0.93      0.93      4194


 Training svm ...
Fitting 5 folds for each of 3 candidates, totalling 15 fits
 svm accuracy: 0.9220
              precision    recall  f1-score   support

    negative       0.91      0.98      0.94      2866
    positive       0.95      0.80      0.87      1328

    accuracy                           0.92      4194
   macro avg       0.93      0.89      0.91      4194
weighted avg       0.92      0.92      0.92      4194


 Training random_forest ...
Fitting 5 folds for each of 4 candidates, totalling 20 fits
 random_forest accuracy

In [41]:
best_name = max(best_models, key=lambda x: best_models[x][1])
print(f"\nM√¥ h√¨nh t·ªët nh·∫•t: {best_name} (Accuracy = {best_models[best_name][1]:.4f})")


M√¥ h√¨nh t·ªët nh·∫•t: logistic_regression (Accuracy = 0.9266)
