In [2]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col, regexp_replace
from pyspark.ml.feature import StringIndexer
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext


In [5]:
spark = SparkSession.builder \
    .appName("SentimentAnalysis") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .getOrCreate()

25/02/26 06:44:01 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/02/26 06:44:01 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [None]:
# if 'spark' in locals():
#     spark.stop()

In [6]:
import os
import pymongo
from pymongo import MongoClient
mongo_uri = os.environ.get('MONGO_URI')
client = MongoClient(mongo_uri)
db = client.get_database()
collection = db['clean_news']

In [None]:
#import data from clean_news collection to spark dataframe
connection_string = f"{mongo_uri}.clean_news"
df = spark.read.format("mongo").option("uri", connection_string).load()

In [None]:
df.printSchema()
df.show(5, truncate=True)

In [12]:
# Tokenization
tokenizer = Tokenizer(inputCol="clean_content", outputCol="words")

# Loại bỏ stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

# Tạo feature vector
countVectorizer = CountVectorizer(inputCol="filtered_words", 
                                outputCol="raw_features", 
                                minDF=2.0)
# TF-IDF
idf = IDF(inputCol="raw_features", outputCol="features")

# Label Indexing
labelIndexer = StringIndexer(inputCol="sentiment", 
                            outputCol="label")

# Mô hình Logistic Regression
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

# Tạo pipeline
pipeline = Pipeline(stages=[
    tokenizer,
    remover,
    countVectorizer,
    idf,
    labelIndexer,
    lr
])

In [13]:
# Chia dataset
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Tạo và huấn luyện pipeline
model = pipeline.fit(train_data)

# Dự đoán trên tập test
predictions = model.transform(test_data)

# Đánh giá mô hình
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", 
    predictionCol="prediction", 
    metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)


25/02/26 06:50:53 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/02/26 06:50:53 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


In [14]:
print(f"Model Accuracy: {accuracy}")

# Hiển thị một số dự đoán mẫu
predictions.select("clean_content", "sentiment", "prediction").show(5)

Model Accuracy: 0.723404255319149
+--------------------+---------+----------+
|       clean_content|sentiment|prediction|
+--------------------+---------+----------+
|nvl đi trên tham ...| positive|       1.0|
|trong báo cáo kết...| positive|       2.0|
|vn index hôm nay ...|  neutral|       1.0|
|dòng tiền tham gi...| negative|       0.0|
|sàn hose đóng cửa...| positive|       1.0|
+--------------------+---------+----------+
only showing top 5 rows



In [15]:
model.write().overwrite().save("models/sentiment_model_spark")

                                                                                