# Reddit Comment Analysis
### Data size = 14.77 GB (RC_2010-01 + RC_2010-02 + RC_2010-03 + RC_2010-04 + RC_2010-05 + RC_2010-06 + RC_2010-07 + RC_2010-08)
### Nodes = 4 (1 master + 3 workers)

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
# packages that needs to be installed across all nodes:
from textblob import TextBlob 
import nltk
from nltk.corpus import stopwords

In [2]:
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.197:7077") \
        .appName("rc_analysis")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",2)\
        .config("spark.driver.port",9998)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

spark_context.setLogLevel("ERROR")

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/19 07:45:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/03/19 07:45:37 WARN ExecutorAllocationManager: Dynamic allocation without a shuffle service is an experimental feature.


## Start time count

In [3]:
start_time = time.time()

In [4]:
data_frame = spark_session.read\
    .option("header", "true")\
    .json('hdfs://192.168.2.197:9000/user/hadoop/RC_2010-*')\
    .cache()


                                                                                

120

## Preprocessing

In [5]:
data_frame = data_frame.select('subreddit','body','score')
data_frame = data_frame.dropna(how='any')

## Sentiment analysis

In [6]:
# Cleaning text before analysis

rcb_df = data_frame.filter(data_frame['body'] != '[deleted]')

#removing stopwords

nltk.download('stopwords')

stop_words = stopwords.words("english")

def remove_stopwords_fnc(x):        
    text = ''
    for x in x.split(' '):
        if x.lower() not in stop_words:
            text += x + ' '
        else:
            pass
    return text

remove_stopwords_udf = udf(remove_stopwords_fnc)
spark_session.udf.register("remove_stopwords_udf", remove_stopwords_udf)
rcb_df = rcb_df.withColumn('body',remove_stopwords_udf('body'))

# rcb_df.show()

[nltk_data] Downloading package stopwords to /home/ubuntu/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [7]:
def sentiment_fnc(text):
    return TextBlob(text).sentiment.polarity #gives the polarity of the sentiment, [-1.0, 1.0]
    

sentiment_udf = udf(lambda x: sentiment_fnc(x)) 
spark_session.udf.register("sentiment_udf", sentiment_udf)
rcb_df = rcb_df.withColumn('sentiment_score',sentiment_udf('body').cast('double'))

rcb_df.show(20)

[Stage 1:>                                                          (0 + 1) / 1]

+-------------------+--------------------+-----+--------------------+
|          subreddit|                body|score|     sentiment_score|
+-------------------+--------------------+-----+--------------------+
|           politics|Good rant, stop l...|    5| 0.11613636363636362|
|            offbeat|    Sounds good me. |    2|                 0.7|
|             gaming|Ok people donate ...|    1| 0.04999999999999999|
|           gonewild|               red? |   -1|                 0.0|
|               IAmA|really want give ...|    2| 0.02938311688311688|
|                WTF|school, depends p...|    1| 0.13333333333333333|
|                WTF|they?  know recen...|    2| 0.16666666666666666|
|         MensRights|I'll add voice. b...|    2|                -0.5|
|               pics|        worry 2012. |    3|                 0.0|
|              funny|[George Carlin sa...|   69|              0.1875|
|              funny|No, like that. He...|   51|                 0.0|
|          AskReddit

                                                                                

### Most kindest/popular subreddits

In [8]:
avg_subreddit_score = data_frame.groupBy("subreddit").agg({'score': 'avg'})
avg_subreddit_score_sorted = avg_subreddit_score.orderBy('avg(score)',ascending = False)
print("The top 20 kindest subreddits based on average comment score are:")
avg_subreddit_score_sorted.show()

The top 20 kindest subreddits based on average comment score are:


22/03/19 07:48:48 ERROR TaskSchedulerImpl: Lost executor 1 on 192.168.2.197: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

+-----------------+------------------+
|        subreddit|        avg(score)|
+-----------------+------------------+
|         DateRape| 81.50694444444444|
|       bestof2009|41.922751322751324|
|arboriculturalist|              27.0|
|        Youngluck|13.839869281045752|
|           Fittit|              13.0|
|            apath|              12.0|
|    announcements| 9.822988505747126|
|           USPE08| 9.411764705882353|
|           treees|              9.25|
|     aqua_aqua_bh|               8.6|
|           raerth| 8.431818181818182|
|         moonhoax|               8.0|
|             blog| 7.597214911231277|
|              gnu| 7.540731504571903|
|            gamin| 7.333333333333333|
|             lego| 6.587866108786611|
|        introvert| 6.339285714285714|
|            funny| 6.243865842009863|
|           popper|               6.0|
|       Starcraft3|               6.0|
+-----------------+------------------+
only showing top 20 rows



                                                                                

### What are the most active subreddits?

In [9]:
df_subreddit_frequency = data_frame.groupby("subreddit").count()

In [10]:
df_subreddit_f = df_subreddit_frequency.sort('count',ascending=False).show(20)



+-------------------+-------+
|          subreddit|  count|
+-------------------+-------+
|          AskReddit|5486305|
|         reddit.com|2318324|
|               pics|2247985|
|           politics|1283777|
|             gaming|1146424|
|               IAmA|1072234|
|                WTF|1000546|
|              funny| 797746|
|            atheism| 626184|
|          worldnews| 539036|
|            science| 517520|
|        programming| 457270|
|              trees| 352277|
|         technology| 349378|
|    DoesAnybodyElse| 260333|
|fffffffuuuuuuuuuuuu| 251778|
|    TwoXChromosomes| 233893|
|              Music| 233032|
|relationship_advice| 215600|
|             videos| 207923|
+-------------------+-------+
only showing top 20 rows



## Execution time

In [11]:
print(f"Execution time: {time.time() - start_time}")

Execution time: 252.03396654129028


In [12]:
spark_session.stop()