In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
import numpy as np
from pyspark.sql.functions import udf

In [2]:
spark = SparkSession.builder \
    .appName("SentimentCommentsYoutube") \
    .master("spark://spark-master:7077") \
    .config("spark.driver.bindAddress", "0.0.0.0") \
    .config("spark.driver.host", "jupyter") \
    .config("spark.ui.port", "4040") \
    .config("spark.cores.max", "2") \
    .config("spark.executor.memory", "512m") \
    .getOrCreate()

In [3]:
# spark = SparkSession.builder \
#     .appName("SentimentCommentsYoutube") \
#     .master("local[*]") \
#     .config("spark.driver.host", "localhost") \
#     .getOrCreate()

In [4]:
spark

In [5]:
# Path to the CSV in HDFS
# hdfs_path = "hdfs://localhost:9010/user/data/youtube_comments.csv"
# path to the CSV in hadoop wihtout localhost
hadoop_path = "hdfs://namenode:9000/user/data/youtube_comments.csv"

In [6]:
# Read CSV from HDFS
df = spark.read.csv(hadoop_path, header=True, inferSchema=True)

In [7]:
df.show()

+-------------------------------+--------------------+--------------------+-------+
|                           text|              author|                date|  likes|
+-------------------------------+--------------------+--------------------+-------+
|           Like I said in th...|            @MrBeast|2021-11-24T21:02:45Z|1058903|
|            imagine finding 061|       @Players_1500|2024-12-22T12:12:02Z|      0|
|晚安  希望 韓國女明星(秀智) ...|         @許閔翔-i5r|2024-12-22T10:48:47Z|      0|
|                      Beneran😮|     @AhmadYusri-v3u|2024-12-22T10:26:03Z|      0|
|           Who is here after...|      @Hoyadeeduroon|2024-12-22T10:08:46Z|      0|
|           ผมกดติดตามพี่แล้ว...|     @ภูริณัฐภูมิรัง|2024-12-22T08:52:53Z|      0|
|           I think there is ...|   @pushpakjadav4321|2024-12-22T08:06:32Z|      0|
|                        Hey bro|    @AlzafranHanifah|2024-12-22T07:14:39Z|      0|
|                          happy|@SengathithINTHANANY|2024-12-22T06:32:31Z|      0|
|      

In [8]:
# Initialize sentiment analysis model
tokenizer = AutoTokenizer.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment-latest")
model = AutoModelForSequenceClassification.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment-latest")

  return self.fget.__get__(instance, owner)()
Some weights of the model checkpoint at cardiffnlp/twitter-roberta-base-sentiment-latest were not used when initializing RobertaForSequenceClassification: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
- This IS expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [9]:
def analyze_sentiment_standalone(text, tokenizer, model):
    inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
    outputs = model(**inputs)
    predictions = torch.nn.functional.softmax(outputs.logits, dim=-1)
    sentiment_score = predictions.detach().numpy()[0]
    sentiment_map = {0: "negative", 1: "neutral", 2: "positive"}
    return sentiment_map[np.argmax(sentiment_score)]

In [10]:
# Create UDF for sentiment analysis
sentiment_udf = udf(lambda text: analyze_sentiment_standalone(text, tokenizer, model), StringType())

In [11]:
# Apply sentiment analysis
results = df.withColumn("sentiment", sentiment_udf("text"))

In [12]:
# Show results
results.select("text", "sentiment").show(truncate=False)

+-------------------------------------------------------------------------------------+---------+
|text                                                                                 |sentiment|
+-------------------------------------------------------------------------------------+---------+
|Like I said in the video, subscribe if you haven’t already and you could win $10,000!|positive |
|imagine finding 061                                                                  |neutral  |
|晚安  希望 韓國女明星(秀智) 和 (潤娥) 可以嫁個好男人 一輩子幸福+458（每天情人節快樂）|neutral  |
|Beneran😮                                                                            |neutral  |
|Who is here after watching Ronald&#39;s video 🙋🏻‍♀️❤                               |positive |
|ผมกดติดตามพี่แล้วครับผมกดติดตามพี่แล้วนะครับจริงๆครับจริงๆ                           |neutral  |
|I think there is a lana Rhodes ❤                                                     |positive |
|Hey bro                                                  

In [None]:
# results.write.csv("hdfs://localhost:9010/user/data/youtube_comments_sentiment.csv", header=True)
results.write.csv("hdfs://namenode:9000/user/data/sentiment_counts.csv", header=True)

In [38]:
# Stop SparkSession
spark.stop()