In [41]:
import numpy as np
import pandas as pd
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
from pyspark.sql import functions as F

spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.244:7077") \
        .appName("Group10_DE")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","300s")\
        .config("spark.executor.cores",2)\
        .config("spark.driver.port",9999)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

sc = spark_session.sparkContext

sc.setLogLevel("ERROR")


In [42]:
df = spark_session.read.json("hdfs://host-192-168-2-244-de1:9000/user/ubuntu/corpus-webis-tldr-17.json")


                                                                                

In [3]:
df.printSchema()

root
 |-- author: string (nullable = true)
 |-- body: string (nullable = true)
 |-- content: string (nullable = true)
 |-- content_len: long (nullable = true)
 |-- id: string (nullable = true)
 |-- normalizedBody: string (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- summary_len: long (nullable = true)
 |-- title: string (nullable = true)



In [4]:
df.count()

                                                                                

3848330

In [21]:
#filtering to check if content_len is null
filter1 = df.filter(df['content_len'].isNotNull())
filter1.count()

                                                                                

3848330

In [22]:
#filtering to check if body is null
filter2 = df.filter(df['body'].isNotNull())
filter1.count()

                                                                                

3848330

In [5]:
#filtering to remove comments(they havd title as null)
pp_df = df.filter(df['title'].isNotNull())
pp_df.count()

                                                                                

1763029

In [43]:
#top 10 subreddit count of posts and comment along with average content_length of each subreddit
res_whole = df.select('subreddit', 'subreddit_id', 'content_len') \
  .groupBy('subreddit_id', 'subreddit') \
  .agg(F.count('subreddit_id').alias('subreddit_count'),
       F.avg('content_len').alias('average_content_len')) \
  .orderBy(F.desc('subreddit_count'))

res_whole.show(10)



+------------+-------------------+---------------+-------------------+
|subreddit_id|          subreddit|subreddit_count|average_content_len|
+------------+-------------------+---------------+-------------------+
|    t5_2qh1i|          AskReddit|         589947| 228.36230373236918|
|    t5_2qjvn|      relationships|         352049|  459.7823598419538|
|    t5_2rfxx|    leagueoflegends|         109307|  212.9319165286761|
|    t5_2to41|               tifu|          52219|  345.2679867481185|
|    t5_2r0cn|relationship_advice|          50416|  452.3981275785465|
|    t5_2r9vp|              trees|          47286| 229.52611766696273|
|    t5_2qh03|             gaming|          43851| 189.46831315135344|
|    t5_2qh2p|            atheism|          43268|  243.4542386983452|
|    t5_2s7tt|      AdviceAnimals|          40783| 174.43800112792096|
|    t5_2qh33|              funny|          40171| 156.70000746807398|
+------------+-------------------+---------------+-------------------+
only s

                                                                                

In [7]:
#top 10 subreddit count of posts and along with average content_length of each subreddit
res = pp_df.select('subreddit', 'subreddit_id', 'content_len') \
  .groupBy('subreddit_id', 'subreddit') \
  .agg(F.count('subreddit_id').alias('subreddit_count'),
       F.avg('content_len').alias('average_content_len')) \
  .orderBy(F.desc('subreddit_count'))

res.show(10)

[Stage 12:>                                                         (0 + 2) / 2]

+------------+-------------------+---------------+-------------------+
|subreddit_id|          subreddit|subreddit_count|average_content_len|
+------------+-------------------+---------------+-------------------+
|    t5_2qjvn|      relationships|         348493|  462.2536521537018|
|    t5_2qh1i|          AskReddit|         100498| 269.38906246890485|
|    t5_2rfxx|    leagueoflegends|          61912|    246.47417301977|
|    t5_2to41|               tifu|          50243|  352.5128674641244|
|    t5_2r0cn|relationship_advice|          46339| 472.32674421113967|
|    t5_2r9vp|              trees|          31334| 261.78971724005874|
|    t5_2qh3p|                sex|          18862|  328.2478528257873|
|    t5_2qh2p|            atheism|          15167|  331.3584097052812|
|    t5_2ranw|         offmychest|          15124|  609.6938640571277|
|    t5_2vq0w|     DestinyTheGame|          14313|  314.8230978830434|
+------------+-------------------+---------------+-------------------+
only s

                                                                                

In [8]:
#average content length of all subreddits with both posts and comments
res_whole.agg(F.avg('average_content_len')).show()

                                                                                

+------------------------+
|avg(average_content_len)|
+------------------------+
|      273.13985232754436|
+------------------------+



In [9]:
##average content length of all subreddits with posts 
res.agg(F.avg('average_content_len')).show()

                                                                                

+------------------------+
|avg(average_content_len)|
+------------------------+
|      307.49041955542054|
+------------------------+



In [40]:
sc.stop()