In [1]:
import time
from pyspark.sql import SparkSession
from operator import add
import pyspark.sql
from pyspark.sql.functions import explode, split, col

In [2]:
# Dictionary to store execution times
# execution_times = {}

def measure_execution_time(job_name, job_function, job_parameter=None):
    start_time = time.time()
    
    # Execute the Spark job function
    if job_parameter:
        job_function(job_parameter)
    else:
        job_function()
    
    end_time = time.time()
    execution_time = end_time - start_time
    
    # Store the execution time using the job name as the key
    # execution_times[job_name] = execution_time
    print(f"Execution time for {job_name}: {execution_time} seconds")

In [3]:
# What are the most common words or topics in the content and summaries of the top N subreddits?

def job_common_words_top_subreddits(df):
    # First, identify the top N subreddits by post count
    top_subreddits = df.groupBy("subreddit").count().orderBy(col("count").desc()).limit(100)
    
    # Filter posts from these top subreddits
    top_subreddit_posts = df.join(top_subreddits, "subreddit")
    
    # Split words in content and explode the resulting arrays to analyze word frequency
    words_df = top_subreddit_posts.select(explode(split(col("content"), " ")).alias("word"))
    word_counts = words_df.groupBy("word").count().orderBy(col("count").desc())
    
    word_counts.show()

In [4]:
def strong_scaling_experiment(n_executor_instances: int):
    spark_session = SparkSession.builder\
        .master("spark://192.168.2.240:7077") \
        .appName("scaling_experiments")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores", 4)\
        .config("spark.dynamicAllocation.mminExecutors", n_executor_instances)\
        .config("spark.dynamicAllocation.maxExecutors", n_executor_instances)\
        .config("spark.driver.port",9999)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

    # RDD API
    spark_context = spark_session.sparkContext
    
    spark_context.setLogLevel("WARN")
    df = spark_session.read.json("hdfs://192.168.2.240:9000/corpus-webis-tldr-17.json")
    measure_execution_time("Common Words in Top Subreddits", job_common_words_top_subreddits, df)
    spark_context.stop()

In [5]:
for i in [1, 2, 3, 4]:
    print(i)
    strong_scaling_experiment(i)

1


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/15 14:56:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+----+--------+
|word|   count|
+----+--------+
|   I|21772577|
| the|21371259|
|  to|20370477|
| and|19591881|
|   a|15566019|
|  of|11112888|
|  \n| 9323606|
|that| 8404942|
|  in| 7778927|
|  my| 7162255|
| was| 6725528|
|  is| 6225561|
| for| 5952877|
|  it| 5789218|
|    | 5659071|
|with| 5334123|
|  me| 4367853|
| but| 4345200|
|  on| 4274986|
|have| 4207031|
+----+--------+
only showing top 20 rows

Execution time for Common Words in Top Subreddits: 1060.2244713306427 seconds
2


                                                                                

+----+--------+
|word|   count|
+----+--------+
|   I|21772577|
| the|21371259|
|  to|20370477|
| and|19591881|
|   a|15566019|
|  of|11112888|
|  \n| 9323606|
|that| 8404942|
|  in| 7778927|
|  my| 7162255|
| was| 6725528|
|  is| 6225561|
| for| 5952877|
|  it| 5789218|
|    | 5659071|
|with| 5334123|
|  me| 4367853|
| but| 4345200|
|  on| 4274986|
|have| 4207031|
+----+--------+
only showing top 20 rows

Execution time for Common Words in Top Subreddits: 425.0892810821533 seconds
3


                                                                                

+----+--------+
|word|   count|
+----+--------+
|   I|21772577|
| the|21371259|
|  to|20370477|
| and|19591881|
|   a|15566019|
|  of|11112888|
|  \n| 9323606|
|that| 8404942|
|  in| 7778927|
|  my| 7162255|
| was| 6725528|
|  is| 6225561|
| for| 5952877|
|  it| 5789218|
|    | 5659071|
|with| 5334123|
|  me| 4367853|
| but| 4345200|
|  on| 4274986|
|have| 4207031|
+----+--------+
only showing top 20 rows

Execution time for Common Words in Top Subreddits: 399.14378809928894 seconds
4


                                                                                

+----+--------+
|word|   count|
+----+--------+
|   I|21772577|
| the|21371259|
|  to|20370477|
| and|19591881|
|   a|15566019|
|  of|11112888|
|  \n| 9323606|
|that| 8404942|
|  in| 7778927|
|  my| 7162255|
| was| 6725528|
|  is| 6225561|
| for| 5952877|
|  it| 5789218|
|    | 5659071|
|with| 5334123|
|  me| 4367853|
| but| 4345200|
|  on| 4274986|
|have| 4207031|
+----+--------+
only showing top 20 rows

Execution time for Common Words in Top Subreddits: 244.2339735031128 seconds


In [14]:
def weak_scaling_experiment(n_executor_instances: int):
    spark_session = SparkSession.builder\
        .master("spark://192.168.2.240:7077") \
        .appName("scaling_experiments")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores", 4)\
        .config("spark.dynamicAllocation.mminExecutors", n_executor_instances)\
        .config("spark.dynamicAllocation.maxExecutors", n_executor_instances)\
        .config("spark.driver.port",9999)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()
    # RDD API
    spark_context = spark_session.sparkContext
    
    spark_context.setLogLevel("WARN")
    df = spark_session.read.json("hdfs://192.168.2.240:9000/corpus-webis-tldr-17.json")
    if n_executor_instances < 4:
        test_partition_size = 0.25*n_executor_instances
        partitions = df.randomSplit([test_partition_size, 1-test_partition_size], seed=42)  # Split into four equal parts
        test_partition = partitions[0]
    else:
        test_partition = df
    measure_execution_time("Common Words in Top Subreddits", job_common_words_top_subreddits, test_partition)
    spark_context.stop()


In [None]:
for i in [1, 2, 3, 4]:
    print(i)
    weak_scaling_experiment(i)

1


                                                                                

+----+-------+
|word|  count|
+----+-------+
|   I|5445867|
| the|5342233|
|  to|5100828|
| and|4900189|
|   a|3893890|
|  of|2780404|
|  \n|2330324|
|that|2102802|
|  in|1946803|
|  my|1791453|
| was|1681171|
|  is|1558996|
| for|1490589|
|  it|1451590|
|    |1438665|
|with|1334852|
|  me|1091820|
| but|1086165|
|  on|1070809|
|have|1054204|
+----+-------+
only showing top 20 rows

Execution time for Common Words in Top Subreddits: 602.5473868846893 seconds
2


                                                                                

+----+--------+
|word|   count|
+----+--------+
|   I|10884028|
| the|10676129|
|  to|10180626|
| and| 9794095|
|   a| 7783599|
|  of| 5555264|
|  \n| 4657986|
|that| 4199831|
|  in| 3887963|
|  my| 3581214|
| was| 3358200|
|  is| 3114524|
| for| 2976653|
|  it| 2897434|
|    | 2847490|
|with| 2665830|
|  me| 2183117|
| but| 2169807|
|  on| 2138446|
|have| 2106767|
+----+--------+
only showing top 20 rows

Execution time for Common Words in Top Subreddits: 336.02891635894775 seconds
3


[Stage 0:>                                                       (0 + 12) / 147]