In [0]:
import pyspark.sql.functions as sf
from pyspark.sql.types import *

In [0]:
tags_schema = StructType([
    StructField("Id", LongType(), False),
    StructField("TagName", StringType(), False),
    StructField("Count", IntegerType(), False),
    StructField("ExcerptPostId", LongType(), True),
    StructField("WikiPostId", LongType(), True)
])
tags = spark.read.format("parquet").load("/mnt/mirai/so/tags.parquet", schema=tags_schema)

posts_schema = StructType([
    StructField("Id", LongType(), False),
    StructField("PostTypeId", ShortType(), False),
    StructField("AcceptedAnswerId", LongType(), True),
    StructField("ParentId", LongType(), True),
    StructField("CreationDate", TimestampType(), False),
    StructField("Score", IntegerType(), False),
    StructField("ViewCount", IntegerType(), True),
    StructField("Body", StringType(), False),
    StructField("OwnerUserId", LongType(), True),
    StructField("OwnerDisplayName", StringType(), True),
    StructField("LastEditorUserId", LongType(), True),
    StructField("LastEditorDisplayName", StringType(), True),
    StructField("LastEditDate", TimestampType(), True),
    StructField("LastActivityDate", TimestampType(), False),
    StructField("Title", StringType(), True),
    StructField("Tags", StringType(), True),
    StructField("AnswerCount", IntegerType(), True),
    StructField("CommentCount", IntegerType(), True),
    StructField("ClosedDate", TimestampType(), True),
    StructField("CommunityOwnedDate", TimestampType(), True),
    StructField("ContentLicense", StringType(), False)
])
posts = spark.read.format("parquet").load("/mnt/mirai/so/posts.parquet", schema=posts_schema)

users_schema = StructType([
    StructField("Id", LongType(), False),
    StructField("Reputation", IntegerType(), False),
    StructField("CreationDate", TimestampType(), False),
    StructField("DisplayName", StringType(), False),
    StructField("LastAccessDate", TimestampType(), False),
    StructField("WebsiteUrl", StringType(), True),
    StructField("Location", StringType(), True),
    StructField("AboutMe", StringType(), True),
    StructField("Views", LongType(), False),
    StructField("UpVotes", LongType(), False),
    StructField("DownVotes", LongType(), False),
    StructField("AccountId", LongType(), True)
])
users = spark.read.format("parquet").load("/mnt/mirai/so/users.parquet", schema=users_schema)

comments_schema = StructType([
    StructField("Id", LongType(), False),
    StructField("PostId", LongType(), False),
    StructField("Score", IntegerType(), False),
    StructField("Text", StringType(), False),
    StructField("CreationDate", TimestampType(), False),
    StructField("UserId", LongType(), True),
    StructField("UserDisplayName", StringType(), True),
    StructField("ContentLicense", StringType(), False)
])
comments = spark.read.format("parquet").load("/mnt/mirai/so/comments.parquet", schema=comments_schema)

votes_schema = StructType([
    StructField("Id", LongType(), False),
    StructField("PostId", LongType(), False),
    StructField("VoteTypeId", ShortType(), False),
    StructField("UserId", LongType(), True),
    StructField("CreationDate", TimestampType(), False),
    StructField("BountyAmount", IntegerType(), True)
])
votes = spark.read.format("parquet").load("/mnt/mirai/so/votes.parquet", schema=votes_schema)

In [0]:
# Which post types are tag wiki entries?
is_tag_wiki = sf.col("PostTypeId").isin([4, 5])
# Which posts are related to Apache Spark?
is_post_related_to_apache_spark = (
  sf.col("Body").contains("apache spark") |
  sf.col("Body").contains("spark.apache.org")
)

# What are the IDs of the tag wiki posts which are related to Apache Spark?
spark_tag_wiki_post_ids = (
  posts
  .withColumn("Body", sf.lower(sf.col("Body")))
  .filter(is_tag_wiki & is_post_related_to_apache_spark)
  .select("Id")
)

# Which tags are related to Apache Spark according to their wiki entry or excerpt?
spark_tags = (
  tags.alias("t")
  .join(spark_tag_wiki_post_ids.alias("tw1"), sf.col("t.ExcerptPostId") == sf.col("tw1.Id"), how="left")
  .join(spark_tag_wiki_post_ids.alias("tw2"), sf.col("t.WikiPostId") == sf.col("tw2.Id"), how="left")
  .filter((sf.col("tw1.Id").isNotNull() | sf.col("tw2.Id").isNotNull()) & (sf.col("t.Count") >= 10) & (sf.col("t.TagName") != "dataframe"))
  .select("t.Id", "t.TagName", "t.Count")
  .persist()
)

In [0]:
spark_tags.display()

In [0]:
# Questions
is_question = sf.col("PostTypeId") == 1
questions = posts.filter(is_question)

# Spark tags as a nested array
spark_tags_array = (
  spark_tags
  .groupBy()
  .agg(sf.collect_set("TagName").alias("SparkTags"))
)

# Determine the questions related to Apache Spark
spark_questions = (
  questions
  .withColumn("Tags", sf.split(sf.col("Tags"), "[<>]"))
  .crossJoin(sf.broadcast(spark_tags_array))
  .withColumn("Tags", sf.array_intersect("Tags", "SparkTags"))
  .filter(sf.size("Tags") > 0)  # at least one Spark tag needs to be present
  .persist()
)

In [0]:
# NOTE that the question filter (PostTypeId == 1) will be pushed down to the data source!
spark_questions.explain()

In [0]:
(
  spark_questions
  .sort("Score", ascending=False)
  .limit(5)
  .select("Id", "Title", "Body", "Score", "ViewCount", "Tags")
  .display()
)

In [0]:
# columns to sort by below
ord = sf.struct("CreationDate", "VoteTypeId")

# NOTE 1: Creation date is truncated to not include time for user privacy reasons;
# as such, the following can produce wrong results if multiple bounties are started and closed the same day
# NOTE 2: Questions may only have one active bounty at any given time (see https://stackoverflow.com/help/bounty)
open_bounty_votes = (
  votes
  .filter(sf.col("VoteTypeId").isin([8, 9]))  # 8 = bounty start, 9 = bounty close
  .groupBy("PostId")
  .agg(
    sf.max_by("VoteTypeId", ord).alias("VoteTypeId"),
    sf.max_by("UserId", ord).alias("BountyUserId"),
    sf.max_by("BountyAmount", ord).alias("BountyAmount")
  )
  .filter(sf.col("VoteTypeId") == 8)
  .drop("VoteTypeId")
)

# The following is an alternative implementation of open_bounty_votes using window functions

# from pyspark.sql import Window
# window = Window.partitionBy("PostId").orderBy("CreationDate", "VoteTypeId")
# open_bounty_votes = (
#   votes
#   .filter(sf.col("VoteTypeId").isin([8, 9]))  # 8 = bounty start, 9 = bounty close
#   .withColumn("NextVoteTypeId", sf.lead("VoteTypeId").over(window))
#   .filter((sf.col("VoteTypeId") == 8) & sf.col("NextVoteTypeId").isNull())
#   .select("PostId", sf.col("UserId").alias("BountyUserId"), "BountyAmount")
# )

open_spark_questions_with_bounty = (
  spark_questions
  .filter(sf.col("ClosedDate").isNull())
  .join(open_bounty_votes, spark_questions["Id"] == open_bounty_votes["PostId"])
  .select("Title", "Body", "Score", "ViewCount", "BountyAmount")
)

In [0]:
(
  open_spark_questions_with_bounty
  .sort("BountyAmount", "Score", ascending=False)
  .limit(10)
  .display()
)

In [0]:
is_answer = sf.col("PostTypeId") == 2

n_questions_answers = (
  posts.alias("p1")
  .join(posts.select("AcceptedAnswerId").alias("p2"), sf.col("p1.Id") == sf.col("p2.AcceptedAnswerId"), how="left")
  .withColumn("IsAcceptedAnswer", sf.col("p2.AcceptedAnswerId").isNotNull())
  .groupBy(sf.col("OwnerUserId").alias("UserId"))
  .agg(
    sf.sum(is_question.cast("int")).alias("n_questions"),
    sf.sum(is_answer.cast("int")).alias("n_answers"),
    sf.sum(sf.col("IsAcceptedAnswer").cast("int")).alias("n_accepted_answers")
  )
)

n_comments = (
  comments
  .groupBy("UserId")
  .agg(sf.count("*").alias("n_comments"))
)

reputation = (
  users
  .select(sf.col("Id").alias("UserId"), "Reputation")
  .join(n_questions_answers, on="UserId", how="left")
  .join(n_comments, on="UserId", how="left")
  .persist()
)

In [0]:
print(reputation.corr("Reputation", "n_questions"))
print(reputation.corr("Reputation", "n_answers"))
print(reputation.corr("Reputation", "n_accepted_answers"))
print(reputation.corr("Reputation", "n_comments"))