In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = (SparkSession.builder
    .config('spark.sql.sources.partitionOverwriteMode', 'dynamic')
    .config('hive.exec.dynamic.partition.mode', 'nonstrict')
    .enableHiveSupport().getOrCreate())

In [2]:
# ! hdfs dfs -ls /data/stackoverflow/parquet

In [3]:
# spark.catalog.createTable('stackoverflow.tags', path='/data/stackoverflow/parquet/Tags')

In [4]:
DAY = '2019-09-01'

In [5]:
posts = spark.table('stackoverflow.posts')

In [6]:
# posts.groupBy('PostTypeId').count().show()

In [7]:
is_question = (col('PostTypeId') == 1) # according to https://ia800107.us.archive.org/27/items/stackexchange/readme.txt
tag_popularity_in_questions = posts.where(is_question & (col('CreationDate').cast('date') == DAY)) \
    .select('Id', explode('Tags').alias('tag')) \
    .groupBy('tag').count()

In [8]:
# tag_popularity_in_questions.orderBy(desc('count')).show()

In [9]:
is_answer = (col('PostTypeId') == 2)

In [10]:
answers_today = posts.where(is_answer & (col('CreationDate').cast('date') == DAY)).alias('a')
all_questions = posts.where(is_question).alias('q')
tag_popularity_in_answers = answers_today.join(all_questions, col('a.ParentId') == col('q.Id')).select('q.Tags') \
    .select(explode('Tags').alias('tag')).groupBy('tag').count()

In [11]:
votes = spark.table('stackoverflow.votes')
votes_today = votes.where(col('VoteTypeId').isin(2, 3)) \
    .where(col('CreationDate').cast('date') == DAY).select('PostId')

In [12]:
# spark.conf.set('spark.sql.autoBroadcastJoinThreshold', '1000000')
# https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-joins-broadcast.html

In [14]:
all_questions = posts.where(is_question).drop('PostTypeId').alias('q')
all_posts = posts.alias('p')

In [15]:
tag_popularity_in_votes = all_posts.join(broadcast(votes_today), votes_today.PostId == col('p.Id')) \
    .join(all_questions, col('q.Id') == col('p.ParentId'), how='left') \
    .select(coalesce('q.Tags', 'p.Tags').alias('valid_tags')) \
    .select(explode('valid_tags').alias('tag')).groupBy('tag').count()

In [16]:
questions_points = tag_popularity_in_questions.withColumnRenamed('count', 'questions')
answers_points = tag_popularity_in_answers.withColumnRenamed('count', 'answers')
votes_points = tag_popularity_in_votes.withColumnRenamed('count', 'votes')

In [17]:
questions_points.join(answers_points, 'tag', how='full') \
    .join(votes_points, 'tag', how='full').fillna(0, ['questions', 'answers', 'votes']) \
    .withColumn('dt', lit(DAY)).explain()

== Physical Plan ==
*(21) Project [coalesce(tag#453, tag#437) AS tag#457, coalesce(questions#444L, 0) AS questions#466L, coalesce(answers#447L, 0) AS answers#467L, coalesce(votes#450L, 0) AS votes#468L, 2019-09-01 AS dt#473]
+- SortMergeJoin [tag#453], [tag#437], FullOuter
   :- *(12) Sort [tag#453 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(tag#453, 200)
   :     +- *(11) Project [coalesce(tag#41, tag#192) AS tag#453, questions#444L, answers#447L]
   :        +- SortMergeJoin [tag#41], [tag#192], FullOuter
   :           :- *(3) Sort [tag#41 ASC NULLS FIRST], false, 0
   :           :  +- *(3) HashAggregate(keys=[tag#41], functions=[count(1)])
   :           :     +- Exchange hashpartitioning(tag#41, 200)
   :           :        +- *(2) HashAggregate(keys=[tag#41], functions=[partial_count(1)])
   :           :           +- Generate explode(Tags#9), false, [tag#41]
   :           :              +- *(1) Project [Tags#9]
   :           :                 +- *(1) Filter 