In [84]:
from pyspark.sql import SparkSession
from operator import add
from pyspark.sql.functions import split, explode, lower, col, rank, desc, regexp_replace
from pyspark.sql import Window

#################### Setup spark session ####################

# Stop running SparkSession if it exists
if 'spark_session' in locals():
    spark_session.stop()
    
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.224:7077") \
        .appName("Experiment")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.driver.port",9999)\
        .config("spark.executor.cores",12)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

# RDD API
spark_context = spark_session.sparkContext

# Load reddit data
reddit_df = spark_session.read.json("hdfs://192.168.2.224:9000/user/ubuntu/reddit/corpus-webis-tldr-17.json")
reddit_df.printSchema()



root
 |-- author: string (nullable = true)
 |-- body: string (nullable = true)
 |-- content: string (nullable = true)
 |-- content_len: long (nullable = true)
 |-- id: string (nullable = true)
 |-- normalizedBody: string (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- summary_len: long (nullable = true)
 |-- title: string (nullable = true)



                                                                                

In [85]:
# Most popular subreddits, from experiment.ipynb
subreddit_list = ['AskReddit', 'relationships', 'leagueoflegends', 'tifu', 
                  'relationship_advice', 'trees', 'gaming', 'atheism', 
                  'AdviceAnimals', 'funny']


In [86]:
# Get data for only these subreddits
filtered_df = reddit_df.filter(col('subreddit').isin(subreddit_list)) # only look at the most popular reddits gotten for 

# Loading the list of common words, taken from github: https://gist.github.com/deekayen/4148741
filtered_words = spark_session.read.text("hdfs://192.168.2.224:9000/user/ubuntu/most_common_words.txt").rdd.map(lambda x: x[0]).collect()

# The list of filtered words for workers
filtered_words_broadcast = spark_session.sparkContext.broadcast(filtered_words)

# Extract the words from the specific subreddits, filter out blank space and words with ', make lower case, 
apostrophe_pattern = r"\b\w*'\w*\b"
tokenized_df = filtered_df.withColumn("words", split(lower(filtered_df.body), " ")) \
                          .withColumn("words", explode("words")) \
                          .withColumn("words", regexp_replace("words", apostrophe_pattern, ""))

exploded_df = tokenized_df.select("subreddit", explode(split("words", " ")).alias("word"))
exploded_df = exploded_df.filter(exploded_df["word"] != "")

# Filter out all the words which matches the list of common words
filtered_df = exploded_df.filter(~col("word").isin(filtered_words_broadcast.value))

# Get word count
word_count_df = filtered_df.groupBy("subreddit", "word").count()

# Rank the words
window_spec = Window.partitionBy("subreddit").orderBy(desc("count"))
ranked_df = word_count_df.withColumn("rank", rank().over(window_spec))

# Top 5 most popular words for each subreddit
top_five_words_df = ranked_df.filter(col("rank") <= 5)

# Results
top_five_words_df.show(truncate=False)



+-------------+-------+------+----+
|subreddit    |word   |count |rank|
+-------------+-------+------+----+
|AdviceAnimals|being  |14502 |1   |
|AdviceAnimals|think  |13109 |2   |
|AdviceAnimals|got    |12401 |3   |
|AdviceAnimals|really |12397 |4   |
|AdviceAnimals|into   |11693 |5   |
|AskReddit    |got    |337683|1   |
|AskReddit    |back   |299326|2   |
|AskReddit    |into   |292977|3   |
|AskReddit    |after  |264159|4   |
|AskReddit    |really |253384|5   |
|atheism      |god    |29140 |1   |
|atheism      |think  |25333 |2   |
|atheism      |believe|23333 |3   |
|atheism      |being  |22745 |4   |
|atheism      |any    |22083 |5   |
|funny        |into   |11801 |1   |
|funny        |being  |11218 |2   |
|funny        |think  |10913 |3   |
|funny        |got    |10241 |4   |
|funny        |-      |9866  |5   |
+-------------+-------+------+----+
only showing top 20 rows



                                                                                

In [82]:
# Save data
top_five_words_df.toPandas().to_csv('subreddit_words.csv')

                                                                                

In [83]:
spark_context.stop()