In [7]:
from pyspark.sql import SparkSession
spark_session = SparkSession.builder\
                .master("spark://192.168.2.5:7077")\
                .appName("Sentiment Analysis")\
                .config("spark.hadoop.fs.defaultFS", "hdfs://192.168.2.5:9000") \
                .config("spark.hadoop.dfs.replication", "1") \
                .config("spark.cores.max", 2)\
                .getOrCreate()

In [8]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from textblob import TextBlob

# Define a user-defined function to perform sentiment analysis on a text
def get_sentiment(text):
    blob = TextBlob(text)
    sentiment = blob.sentiment.polarity
    if sentiment > 0:
        return 'positive'
    elif sentiment < 0:
        return 'negative'
    else:
        return 'neutral'

path = "hdfs://192.168.2.5:9000/user/ubuntu/RC_2010-03.json"
# Register the UDF with PySpark
get_sentiment_udf = udf(get_sentiment, StringType())

# Load the Reddit comments dataset into a PySpark DataFrame
df = spark_session.read.json(path)

# Add a new column to the DataFrame with the sentiment of each comment
df = df.withColumn("sentiment", get_sentiment_udf("body"))

# Group the comments by subreddit and sentiment, and count the number of comments
result = df.groupBy("subreddit", "sentiment").count()

# Show the top 10 subreddits by number of positive comments
result.filter(result.sentiment == "positive").orderBy(result["count"].desc()).show(10)

                                                                                

+-----------+---------+------+
|  subreddit|sentiment| count|
+-----------+---------+------+
|  AskReddit| positive|286525|
| reddit.com| positive|115120|
|       pics| positive| 87499|
|   politics| positive| 83559|
|     gaming| positive| 67410|
|       IAmA| positive| 58545|
|        WTF| positive| 44028|
|    atheism| positive| 33609|
|      funny| positive| 33493|
|programming| positive| 33157|
+-----------+---------+------+
only showing top 10 rows



In [None]:
spark_session.stop()