In [138]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat, col, lit, regexp_replace


# New API
spark_session = SparkSession\
        .builder\
        .master("spark://130.238.28.143:7077") \
        .appName("DE1 Project")\
        .config("spark.dynamicAllocation.enabled",False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","99999s")\
        .config("spark.executor.cores",2)\
        .config("spark.driver.port",9998)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

#        .config("spark.dynamicAllocation.shuffleTracking.enabled",False)\
#        .config("spark.shuffle.service.enabled",False)\

# Old API (RDD)
spark_context = spark_session.sparkContext

spark_context.setLogLevel("ERROR")

In [139]:
#set no. of workers (1, 2 or 3) based on "physical" cluster

#horizontal
#2 cores, 4 gb mem (duration_1_worker.txt)
#STRONG_1 => 4 cores, 4 gb mem (duration_strong_1_worker.txt)
#STRONG_2 => 8 cores, 8 gb mem (duration_strong_2_worker.txt)
#STRONG_3 => 16 cores, 16 gb mem (duration_strong_3_worker.txt)

WORKERS = "STRONG_3"

In [140]:
#Imports
from pyspark.sql.functions import lower, col, split
from datetime import datetime
import time

In [141]:
#Read Dirty words to DataFrame

df_dirty = spark_session.read\
    .option("header", "true")\
    .csv('hdfs://192.168.2.13:9000/user/ubuntu/DirtyWordsFolder/DirtyWords.csv')

                                                                                

In [142]:
#Convert Dirty words to List

list_dirty = df_dirty.select("word").collect()
list_dirty = [str(row["word"]) for row in list_dirty]

                                                                                

In [143]:
#Read Reddit comments do DataFrame

if WORKERS == 1 or "STRONG_1" or "STRONG_2" or "STRONG_3":
    df_reddit = spark_session.read.json('hdfs://192.168.2.13:9000/user/one_worker')
    df_reddit = df_reddit.select("body","subreddit")
elif WORKERS == 2:
    df_reddit = spark_session.read.json('hdfs://192.168.2.13:9000/user/two_workers')
    df_reddit = df_reddit.select("body","subreddit")
elif WORKERS == 3:
    df_reddit = spark_session.read.json('hdfs://192.168.2.13:9000/user/three_workers')
    df_reddit = df_reddit.select("body","subreddit")
    

                                                                                

In [144]:
#start timer

start_time = datetime.now()

In [145]:
#Create DataFrame only containing sentences with dirty words by subreddit

def checkForDirtyWord(row, list_dirty):
    for word in row[1:]:
        if word in list_dirty:
            return True
    return False
    
df_reddit_w_dirty_words = df_reddit.withColumn("cBody",regexp_replace("body", ",|\n|\!|\?|\.|\*|:|;", " "))\
                        .withColumn('cBody', lower(col('cBody')))\
                        .withColumn('cBody', split(col('cBody'), " "))\
                        .rdd\
                        .filter(lambda row: checkForDirtyWord(row["cBody"], list_dirty))\
                        .toDF()\
                        .select(["subreddit","cBody"])

                                                                                

In [146]:
#Count how many comments with dirty words there are per subreddit

df_reddit_w_dirty_words.registerTempTable("tmp_tbl_reddit_w_dirty_words")

df_subreddit_dirty_words_freq = spark_session.sql("""
    SELECT subreddit as subreddit_dirty, COUNT(*) as frequencies_dirty 
    FROM tmp_tbl_reddit_w_dirty_words
    GROUP BY subreddit_dirty 
    ORDER BY frequencies_dirty DESC
""")

#df_subreddit_dirty_words_freq.show(5)

In [147]:
#Count how many comments there are per subreddit (in total)

df_reddit.registerTempTable("tmp_tbl_reddit")

df_subreddit_freq = spark_session.sql("""
    SELECT subreddit, COUNT(*) as frequencies
    FROM tmp_tbl_reddit
    GROUP BY subreddit
    ORDER BY Frequencies DESC
""")

#df_subreddit_freq.show(5)

In [148]:
#Select the top 20 most profane subreddits with more than 500 comments

df_subreddit_freq.registerTempTable("tmp_tbl_subreddit_freq")
df_subreddit_dirty_words_freq.registerTempTable("tmp_tbl_df_subreddit_dirty_words_freq")

df_joined_freq = spark_session.sql("""
    SELECT subreddit, ratio FROM(
    (SELECT subreddit, frequencies, frequencies_dirty, 
    (frequencies_dirty * 1.0 / frequencies * 1.0) as sort_ratio,
    CONCAT(CAST(((frequencies_dirty * 1.00 / frequencies * 1.00) * 100) AS VARCHAR(5)) ,' %' ) as ratio
    FROM tmp_tbl_subreddit_freq 
    LEFT JOIN 
    tmp_tbl_df_subreddit_dirty_words_freq
    ON subreddit = subreddit_dirty
    WHERE frequencies > 500
    ORDER BY sort_ratio DESC
    LIMIT 20))
""")

In [149]:
#Materialize the data and show the dirty word ratios per subreddit

df_joined_freq.show()

                                                                                

+-------------------+---------------+
|          subreddit|          ratio|
+-------------------+---------------+
|             mexico|22.8373702422 %|
|                sex|22.4592220828 %|
|relationship_advice|20.1205010306 %|
|           Equality|19.4775132275 %|
|    TwoXChromosomes|18.6431989064 %|
|         needadvice|17.1220400729 %|
|              women|16.9527896996 %|
|         MensRights|16.2293274531 %|
|        LadyBashing|15.8977998581 %|
|                AMA|15.4205607477 %|
|   Bad_Cop_No_Donut|15.0097465887 %|
|              drunk|14.8148148148 %|
|     lostgeneration|14.5743145743 %|
|         ukpolitics|14.0225179120 %|
|       SuicideWatch|13.7457044674 %|
|               lgbt|13.7292711145 %|
|                MMA|13.6040184177 %|
|            Fitness|13.4345794393 %|
|            stoners|13.3074935401 %|
|           cannabis|13.1892368769 %|
+-------------------+---------------+



                                                                                

In [150]:
#End timer

end_time = datetime.now()
duration = end_time - start_time
print(f"Execution took {duration} (hh:mm:ss:SSSSS) on {WORKERS} workers")

Execution took 0:02:41.711947 (hh:mm:ss:SSSSS) on STRONG_3 workers


In [151]:
#Write durations to file

if WORKERS == 1:
    f = open('duration_1_worker.txt', 'a')
elif WORKERS == 2:
    f = open('duration_2_workers.txt', 'a')
elif WORKERS == 3:
    f = open('duration_3_worker.txt', 'a')
elif WORKERS == "STRONG_1":
    f = open('duration_strong_1_worker.txt', 'a')
elif WORKERS == "STRONG_2":
    f = open('duration_strong_2_worker.txt', 'a')
elif WORKERS == "STRONG_3":
    f = open('duration_strong_3_worker.txt', 'a')


f.write(str(duration) + " hh:mm:ss:SSSSS" + "\n")
f.close()

In [152]:
# release the cores for another application!
spark_context.stop()