In [3]:
# Strong scaling test 1 with 4 files (Also used as 'Weak scaling test 1')
# 1 node
# ~36 GB of data


from pyspark.sql import SparkSession, SQLContext 
import json
import timeit
import pyspark.sql.functions as f 

spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.225:7077") \
        .appName("Strong_scaling_1_4files")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.cores.max",1)\
        .getOrCreate()

In [4]:
#In this cell, the Reddit data is fetched from HDFS, read as a JSON-file and saved to the variable ’df’.
#The wall clock time of this operation is measured and printed using timeit.default_timer()

start_time_0 = timeit.default_timer()

df = spark_session.read.json('hdfs://192.168.2.225:9000/reddit')

elapsed_0 = timeit.default_timer() - start_time_0

print("Time elapsed: ", elapsed_0, "s")

Time elapsed:  3273.134294032003 s


In [5]:
### PRE-PROCESSING ###
#In this cell the data is preprocessed. Most columns are dropped and comments that have been deleted are removed.

initial_start_time = timeit.default_timer()

if 'author_cakeday' in df.columns:
        current_df = df.drop('author_cakeday')
        
df = df.drop('author_flair_css_class','author_flair_text','can_gild','distinguished','edited','id','is_submitter','link_id','parent_id','permalink','retrieved_on','stickied','subreddit_id')

df = df.filter((df.body != '[deleted]'))


elapsed = timeit.default_timer() - initial_start_time



print("Time elapsed: ", elapsed, "s")

Time elapsed:  1.094910240964964 s


In [6]:
### MAIN PROCESSING - CELL 1 ###

#In this cell the data is processed to show the most frequently occurring subreddits 
#that reddit comments are posted in, in descending order.

start_time_1 = timeit.default_timer()

df.groupBy('subreddit').count().sort("count", ascending = False).show(10)

elapsed_1 = timeit.default_timer() - start_time_1

print("Time elapsed: ", elapsed_1,"s")

+-------------------+-------+
|          subreddit|  count|
+-------------------+-------+
|          AskReddit|7724254|
|              funny|3149078|
|               pics|2500644|
|             gaming|2028916|
|            atheism|1884228|
|           politics|1646043|
|                WTF|1564693|
|fffffffuuuuuuuuuuuu|1299961|
|              trees|1231457|
|               IAmA|1173054|
+-------------------+-------+
only showing top 10 rows

Time elapsed:  1031.5783011569874 s


In [7]:
### MAIN PROCESSING - CELL 2 ###
#In this cell, the number of words in each comment is counted and saved in the 
#new column ’wordCount’. 

start_time_2 = timeit.default_timer()

df = df.withColumn('wordCount', f.size(f.split(f.col('body'), ' ')))


df.groupBy('wordCount').count().sort("count", ascending = False).show(10)   

elapsed_2 = timeit.default_timer() - start_time_2


print("Time elapsed: ", elapsed_2,"s")

+---------+-------+
|wordCount|  count|
+---------+-------+
|        5|2353726|
|        6|2344218|
|        4|2270790|
|        7|2256362|
|        8|2144356|
|        1|2082495|
|        3|2074165|
|        9|2017343|
|       10|1892965|
|        2|1854274|
+---------+-------+
only showing top 10 rows

Time elapsed:  1404.3413064159686 s


In [8]:
### MAIN PROCESSING - CELL 3 ###
#In this cell, the mean of all values in the new column ’wordCount’ is calculated 
#and printed to show the average word count in a comment.

start_time_3 = timeit.default_timer()

df_stats = df.select(f.mean(f.col('wordCount')).alias('mean')).collect()  

mean = df_stats[0]['mean']

print("Average wordcount in comment: ", mean, " words.")


elapsed_3 = timeit.default_timer() - start_time_3
print("Time elapsed: ", elapsed_3,"s")

Average wordcount in comment:  32.177884605623206  words.
Time elapsed:  1366.3709375848994 s


In [9]:
### TOTAL TIME ELAPSED (PROCESSING) ###
#Here the total time elapsed since the preprocessing of the data is printed.

total_elapsed = timeit.default_timer() - initial_start_time

print("Total time elapsed: ", total_elapsed, "s")

Total time elapsed:  3803.456739578978 s


In [10]:
spark_session.stop()