In [42]:
import findspark
from pyspark.sql import SparkSession
import pandas as pd
import re
from textblob import TextBlob

In [43]:
# Tạo SparkSession
spark = SparkSession.builder \
    .appName("SparkSentimentAnalyze") \
    .master("local[*]") \
    .getOrCreate()

# Đọc file CSV bằng pandas
path = "/content/All_Comments_Final.csv"
df = pd.read_csv(path)

# Tiền xử lý các bình luận
comments = df["Comments"]
comments = [
    re.sub(r'\s+', ' ', re.sub(r'[^\x00-\x7F]', '', str(comment).strip()))
    for comment in comments if isinstance(comment, str) and comment.strip()
    if re.sub(r'[^\w]', '', comment.strip())
]

comments_rdd = spark.sparkContext.parallelize(comments)

In [None]:
# Đường dẫn đến các file
positive_path = "../SentimentAnalyzeMapreduce/InputData/positive.txt"
negative_path = "../SentimentAnalyzeMapreduce/InputData/negative.txt"
keywords_path = "../SentimentAnalyzeMapreduce/InputData/keyword.txt"

# Đọc danh sách từ khóa từ các file
with open(keywords_path, "r", encoding="utf-8") as file:
    keywords = set(line.strip().lower() for line in file if line.strip())

with open(positive_path, "r", encoding="utf-8") as file:
    positive_keywords = set(line.strip().lower() for line in file if line.strip())

with open(negative_path, "r", encoding="ISO-8859-1") as file:
    negative_keywords = set(line.strip().lower() for line in file if line.strip())

# In ra danh sách từ để kiểm tra
print("Keywords:", keywords)
print("Positive Keywords:", positive_keywords)
print("Negative Keywords:", negative_keywords)


Keywords: {'music'}
Positive Keywords: {'rightness', 'satisfied', 'dignity', 'upheld', 'spellbind', 'usable', 'zippy', 'loveliness', 'lighter', 'affectation', 'effectiveness', 'fastest', 'lavish', 'lower-priced', 'kindness', 'rejuvenated', 'conscientious', 'bolster', 'kudos', 'poetic', 'valiant', 'spellbinding', 'accomplishments', 'complementary', 'beckons', 'passionate', 'responsibly', 'affirm', 'amazingly', 'lovably', 'vigilance', 'keen', 'navigable', 'humourous', 'perfection', 'complement', 'capably', 'equitable', 'exhilarate', 'sincere', 'astonishingly', 'famous', 'effectual', 'hilarious', 'fortunately', 'obsession', 'elatedly', 'sleek', 'supple', 'darling', 'homage', 'flexibility', 'idealize', 'dazzled', 'boost', 'intimate', 'posh', 'accessable', 'enchant', 'supurbly', 'winner', 'intuitive', 'commendable', 'fantastically', 'dirt-cheap', 'exultant', 'orderly', 'compassion', 'auspicious', 'lyrical', 'enhanced', 'charming', 'infallible', 'succeeded', 'justly', 'revives', 'inexpensive

In [36]:
# Lọc bình luận chứa ít nhất một từ khóa
def contains_keyword(line):
    return any(keyword in line.lower() for keyword in keywords)
filtered_rdd = comments_rdd.filter(contains_keyword)

# Hàm đánh giá cảm xúc
def keyword_analysis(line):
    overall = 0
    if line.startswith("Comments"):
        return None

    fields = line.split('~')
    if len(fields) < 1:
        return None

    comment = fields[0].lower()
    words = comment.split()

    # Đếm từ khóa
    positive_count = sum(1 for word in words if word in positive_keywords)
    negative_count = sum(1 for word in words if word in negative_keywords)

    if positive_count > negative_count:
        overall += 1
    elif positive_count < negative_count:
        overall -= 1
    else:
        overall = 0

    return overall

# Tính overall trên các comments chứa từ khóa
overall = sum(filter(None, (keyword_analysis(line) for line in filtered_rdd.collect())))
if overall > 0:
    keyword_overall = '{keyword} is used positively overall.'.format(keyword = keywords)
elif overall < 0:
    keyword_overall = '{keyword} is used negatively overall.'.format(keyword = keywords)
else:
    keyword_overall = '{keyword} is used neutral overall.'.format(keyword = keywords)
print(keyword_overall)

{'music'} is used positively overall.


In [37]:
# Hàm xử lý bình luận
def process_comment(line):
    if line.startswith("Comments"):
        return None

    fields = line.split("~")
    if len(fields) < 1:
        return None

    comment = fields[0].lower()
    words = comment.split()

    # Đếm từ khóa
    positive_count = sum(1 for word in words if word in positive_keywords)
    negative_count = sum(1 for word in words if word in negative_keywords)

    # Xác định cảm xúc
    if positive_count > negative_count:
        sentiment = f"The comment is POSITIVE as number of positive words are = {positive_count}"
    elif positive_count < negative_count:
        sentiment = f"The comment is NEGATIVE as number of negative words are = {negative_count}"
    else:
        sentiment = f"The comment is NEUTRAL as number of negative and positive words are the same = {positive_count}"

    return f"Comment = {comment}\nComment Sentiment = {sentiment}\n"

In [38]:
# Xử lý bình luận cho các bình luận lọc được
processed_rdd = filtered_rdd.map(lambda line: process_comment(line)).filter(lambda x: x is not None)

In [39]:
processed_rdd.collect()

["Comment = watching in early 2022. i really like this breakdown of process and how to think about solving problems. i'm a musician, producer and recording engineer and a lot of these....learn-by-doing and divergent thought processes remind me a lot of how to get a good mix or composing. really excited on this journey\nComment Sentiment = The comment is POSITIVE as number of positive words are = 3\n",
 'Comment = nice simple beginning without the fancy graphics and music :-)\nComment Sentiment = The comment is POSITIVE as number of positive words are = 2\n',
 'Comment = i am impressed by this video, especially the parts about habits and "learning about learning." as to sound during study, i don\'t do well with any music or speaking that i understand; random sounds (beaches, rain, wildlife, radio noise) is better. lol, if there\'s silence, my mind wanders into some distracting worries...\nComment Sentiment = The comment is POSITIVE as number of positive words are = 2\n',
 "Comment = gre

In [None]:
# Lưu kết vào một file
processed_rdd.coalesce(1).saveAsTextFile("./Output/")

In [45]:
spark.stop()