In [ ]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
import re
import time
from operator import add
import pandas as pd
import datetime

############################################################
# Setup session and files
############################################################

spark_session = SparkSession \
    .builder \
    .master("spark://spark-master:7077") \
    .appName("Primary Test") \
    .config("spark.dynamicAllocation.enabled", True) \
    .config("spark.dynamicAllocation.shuffleTracking.enabled",True) \
    .config("spark.shuffle.service.enabled", False) \
    .config("spark.driver.port", 9999) \
    .config("spark.blockManager.port", 10005) \
    .config("spark.dynamicAllocation.executorIdleTimeout","30s") \
    .getOrCreate()

sc = spark_session.sparkContext

In [ ]:
df = spark_session.read.json('hdfs://hdfs:9000/user/ubuntu/corpus-webis-tldr-17.json')

In [ ]:
############################################################
# Process data
############################################################

start_time = time.time()

word_array = ["idiot", "dumbass", "asshole", "fuck", "homo", "queer", "shit", "slut", "hell", "dick", "dyke", "fag", "cunt"]
process_body_udf = udf(lambda field: re.sub(r'[^a-zA-Z0-9\s\nåäöÅÄÖ]', '', field.lower()), StringType())

df = df.withColumn("content", process_body_udf("content"))

def map_content(row):
    subreddit = row["subreddit"]
    result = [(subreddit, word) for word in word_array if len(re.findall(r'\b'+word+r'\b', row["content"], re.IGNORECASE)) > 0]
    return result

rdd = df.rdd.flatMap(map_content)

df_result = rdd.toDF(["subreddit", "word"])
rdd_map = rdd.map(lambda x: (x[0], (x[1], 1)))

rdd_pair = rdd_map.map(lambda x: ((x[0], x[1][0]), x[1][1]))

rdd_counts = rdd_pair.reduceByKey(add)

rdd_subreddit= rdd_counts.map(lambda x: (x[0][0], x[0][1], x[1]))
df = rdd_subreddit.toDF(["subreddit", "word", "count"])

pivoted_df = df.groupBy("subreddit").pivot("word").sum("count").na.fill(0)

final_df = pivoted_df.withColumn('sum', expr(" + ".join(word_array)))
final_df = final_df.filter(col('sum') > 0)
final_df = final_df.orderBy(col('sum').desc())

final_df.show()
end_time = time.time()

final_df = final_df.repartition(1)
final_df.toPandas().to_csv(f"final_results_{datetime.datetime.utcnow()}.csv", index=False)

In [ ]:
time_taken = end_time - start_time
performance = f"time taken to run : {time_taken} seconds \n {datetime.datetime.utcnow()} (UTC)"
print(performance)
with open(f'performance_{datetime.datetime.now()}', 'w') as file:
    file.write(performance)

In [None]:
sc.stop()