In [2]:
import sys
from pyspark.sql import SparkSession, functions, types

spark = SparkSession.builder.appName('reddit averages').getOrCreate()
spark.sparkContext.setLogLevel('WARN')

assert sys.version_info >= (3, 8) # make sure we have Python 3.8+
assert spark.version >= '3.2' # make sure we have Spark 3.2+


pages_schema = types.StructType([
    types.StructField('language', types.StringType()),
    types.StructField('title', types.StringType()),
    types.StructField('requests', types.LongType()),
    types.StructField('bytes', types.LongType())
])

def filter_path(path):
    path = path.removesuffix('.gz')
    path = path[-15:]
    path = path[0:-4]
    return path
    


def main(in_directory, out_directory):
    comments = spark.read.json(in_directory, schema=comments_schema)

    
    # TODO: calculate averages, sort by subreddit. Sort by average score and output that too.

    

    #averages_by_subreddit.write.csv(out_directory + '-subreddit', mode='overwrite')
    #averages_by_score.write.csv(out_directory + '-score', mode='overwrite')


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/07/21 22:52:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [10]:
pages = spark.read.csv('pagecounts-1', sep=' ', schema=pages_schema).withColumn('filename', functions.input_file_name())
pages = pages.filter(pages.language=='en')
pages = pages.filter(pages.title!='Main_Page')
pages = pages.filter(~pages.title.contains('Special:'))

In [7]:
pages.show()


+--------+--------------------+--------+-------+--------------------+-----------+
|language|               title|requests|  bytes|            filename|       hour|
+--------+--------------------+--------+-------+--------------------+-----------+
|      en|Simon_%22Ghost%22...|       2|  39816|file:///Users/yvo...|20160801-21|
|      en| Simon_%26_Garfunkel|      91|4543932|file:///Users/yvo...|20160801-21|
|      en|Simon_%26_Garfunk...|      19| 748974|file:///Users/yvo...|20160801-21|
|      en|  Simon_%26_Schuster|      18| 883010|file:///Users/yvo...|20160801-21|
|      en|Simon_%26_Schuste...|       1|   6651|file:///Users/yvo...|20160801-21|
|      en|Simon_%26_Schuste...|       1|  28288|file:///Users/yvo...|20160801-21|
|      en|Simon_%26_Schuste...|       1|  28333|file:///Users/yvo...|20160801-21|
|      en|Simon_%26_Schuste...|       1|  28338|file:///Users/yvo...|20160801-21|
|      en|Simon_%26_Schuste...|       1|  28316|file:///Users/yvo...|20160801-21|
|      en|Simon_

In [11]:
path_to_hour = functions.udf(filter_path, returnType = types.StringType())
pages = pages.withColumn('hour', path_to_hour(pages.filename))
pages.show()

+--------+--------------------+--------+-------+--------------------+-----------+
|language|               title|requests|  bytes|            filename|       hour|
+--------+--------------------+--------+-------+--------------------+-----------+
|      en|Simon_%22Ghost%22...|       2|  39816|file:///Users/yvo...|20160801-21|
|      en| Simon_%26_Garfunkel|      91|4543932|file:///Users/yvo...|20160801-21|
|      en|Simon_%26_Garfunk...|      19| 748974|file:///Users/yvo...|20160801-21|
|      en|  Simon_%26_Schuster|      18| 883010|file:///Users/yvo...|20160801-21|
|      en|Simon_%26_Schuste...|       1|   6651|file:///Users/yvo...|20160801-21|
|      en|Simon_%26_Schuste...|       1|  28288|file:///Users/yvo...|20160801-21|
|      en|Simon_%26_Schuste...|       1|  28333|file:///Users/yvo...|20160801-21|
|      en|Simon_%26_Schuste...|       1|  28338|file:///Users/yvo...|20160801-21|
|      en|Simon_%26_Schuste...|       1|  28316|file:///Users/yvo...|20160801-21|
|      en|Simon_

In [12]:

max_requests = pages.groupBy(pages['hour']).max('requests')

pages = pages.join(max_requests, 'hour')
pages = pages.filter(pages['requests'] == pages['max(requests)'])
# pages = pages.sort(["hour"], ascending=[True])
pages = pages.sort(["hour", "title"], ascending=[True])

max_requests.show()
pages.show()

+-----------+-------------+
|       hour|max(requests)|
+-----------+-------------+
|20160801-15|          173|
|20160802-14|          198|
|20160801-21|          178|
|20160801-14|          176|
|20160801-13|          127|
|20160801-16|          180|
|20160802-12|          148|
|20160802-13|          140|
|20160801-12|          131|
|20160802-15|          156|
|20160802-16|          179|
|20160801-20|          177|
|20160801-11|          154|
|20160801-18|          146|
|20160801-17|          149|
|20160802-21|          142|
|20160802-19|          155|
|20160802-17|          141|
|20160801-19|          164|
|20160802-18|          163|
+-----------+-------------+
only showing top 20 rows

+-----------+--------+-------------+--------+-------+--------------------+-------------+
|       hour|language|        title|requests|  bytes|            filename|max(requests)|
+-----------+--------+-------------+--------+-------+--------------------+-------------+
|20160801-00|      en|   Simon_Pegg

In [142]:
pages = pages.orderBy(functions.col('hour'), functions.col('title'), ascending=True)
# pages = pages.orderBy(["title"], ascending=[True])


result = pages.select(
    pages['hour'],
    pages['title'],
    pages['max(requests)']
)
result.write.csv('output_wiki', compression=None, mode='overwrite')

In [143]:
pages.show()



+-----------+--------+-------------+--------+-------+--------------------+-------------+
|       hour|language|        title|requests|  bytes|            filename|max(requests)|
+-----------+--------+-------------+--------+-------+--------------------+-------------+
|20160801-00|      en|   Simon_Pegg|     146|4536966|file:///Users/Jef...|          146|
|20160801-01|      en|   Simon_Pegg|     170|5352225|file:///Users/Jef...|          170|
|20160801-02|      en|   Simon_Pegg|     139|4552633|file:///Users/Jef...|          139|
|20160801-03|      en|   Simon_Pegg|     175|5149558|file:///Users/Jef...|          175|
|20160801-04|      en|   Simon_Pegg|     135|4114505|file:///Users/Jef...|          135|
|20160801-05|      en|   Simon_Pegg|     109|3225857|file:///Users/Jef...|          109|
|20160801-06|      en| Simon_Cowell|      96|5334172|file:///Users/Jef...|           96|
|20160801-06|      en|   Simon_Pegg|      96|3101797|file:///Users/Jef...|           96|
|20160801-07|      en

In [141]:
result.show()



+-----------+-------------+-------------+
|       hour|        title|max(requests)|
+-----------+-------------+-------------+
|20160801-06| Simon_Cowell|           96|
|20160801-09| Simon_Cowell|          127|
|20160801-10| Simon_Cowell|          148|
|20160801-11| Simon_Cowell|          154|
|20160801-12| Simon_Cowell|          131|
|20160802-06| Simon_Cowell|           95|
|20160802-07| Simon_Cowell|           82|
|20160802-09| Simon_Cowell|           81|
|20160802-10| Simon_Cowell|          125|
|20160802-11| Simon_Cowell|          149|
|20160802-12| Simon_Cowell|          148|
|20160802-15| Simon_Cowell|          156|
|20160802-18| Simon_Cowell|          163|
|20160801-14|Simon_Helberg|          176|
|20160802-05|Simon_Helberg|          303|
|20160802-08|Simon_Helberg|           91|
|20160802-14|Simon_Helberg|          198|
|20160802-16|Simon_Helberg|          179|
|20160801-00|   Simon_Pegg|          146|
|20160801-01|   Simon_Pegg|          170|
+-----------+-------------+-------

+--------+--------------------+--------+-------+--------------------+-----------+
|language|               title|requests|  bytes|            filename|       hour|
+--------+--------------------+--------+-------+--------------------+-----------+
|      en|Simon_%22Ghost%22...|       2|  39816|file:///Users/Jef...|20160801-21|
|      en| Simon_%26_Garfunkel|      91|4543932|file:///Users/Jef...|20160801-21|
|      en|Simon_%26_Garfunk...|      19| 748974|file:///Users/Jef...|20160801-21|
|      en|  Simon_%26_Schuster|      18| 883010|file:///Users/Jef...|20160801-21|
|      en|Simon_%26_Schuste...|       1|   6651|file:///Users/Jef...|20160801-21|
|      en|Simon_%26_Schuste...|       1|  28288|file:///Users/Jef...|20160801-21|
|      en|Simon_%26_Schuste...|       1|  28333|file:///Users/Jef...|20160801-21|
|      en|Simon_%26_Schuste...|       1|  28338|file:///Users/Jef...|20160801-21|
|      en|Simon_%26_Schuste...|       1|  28316|file:///Users/Jef...|20160801-21|
|      en|Simon_

Traceback (most recent call last):
  File "/Users/JeffWang/opt/anaconda3/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 187, in manager
  File "/Users/JeffWang/opt/anaconda3/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/Users/JeffWang/opt/anaconda3/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 730, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/Users/JeffWang/opt/anaconda3/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 595, in read_int
    raise EOFError
EOFError
