In [19]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from file_parsers import parse_badges, parse_posts, parse_comments, parse_users,\
    parse_posthistory, parse_postlinks, parse_votes, parse_tags

In [20]:
spark = SparkSession.builder \
    .appName('StackExchange') \
    .master('local[*]') \
    .getOrCreate()
sc = spark.sparkContext

# Data Loading

In [3]:
stack = '/home/piotr/big_data/archive.org/download/stackexchange/'
#subject = 'meta.stackoverflow.com/'
subject = 'gardening.stackexchange.com/'
path = stack + subject

In [4]:
badges = parse_badges(sc, path + 'Badges.xml')
posts = parse_posts(sc, path + 'Posts.xml')
comments = parse_comments(sc, path + 'Comments.xml')
users = parse_users(sc, path + 'Users.xml')
posthistory = parse_posthistory(sc, path + 'PostHistory.xml')
postlinks = parse_postlinks(sc, path + 'PostLinks.xml')
votes = parse_votes(sc, path + 'Votes.xml')
tags = parse_tags(sc, path + 'Tags.xml')

# Posts

In [23]:
posts.printSchema()

root
 |-- Id: float (nullable = true)
 |-- PostTypeId: float (nullable = true)
 |-- ParentId: float (nullable = true)
 |-- AcceptedAnswerId: float (nullable = true)
 |-- CreationDate: string (nullable = true)
 |-- Score: float (nullable = true)
 |-- ViewCount: float (nullable = true)
 |-- Body: string (nullable = true)
 |-- OwnerUserId: float (nullable = true)
 |-- LastEditorUserId: float (nullable = true)
 |-- LastEditorDisplayName: string (nullable = true)
 |-- LastEditDate: string (nullable = true)
 |-- LastActivityDate: string (nullable = true)
 |-- CommunityOwnedDate: string (nullable = true)
 |-- ClosedDate: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Tags: string (nullable = true)
 |-- AnswerCount: float (nullable = true)
 |-- CommentCount: float (nullable = true)
 |-- FavoriteCount: float (nullable = true)



In [27]:
posts_filtered = posts.select(col('Id').cast('integer'), \
                             col('PostTypeId').cast('integer'), \
                             col('ParentId').cast('integer'), \
                             col('CreationDate').cast('timestamp'),
                             col('AnswerCount').cast('integer'))

In [28]:
posts_filtered.head(3)

[Row(Id=1, PostTypeId=1, ParentId=None, CreationDate=datetime.datetime(2011, 6, 8, 18, 35, 50, 450000), AnswerCount=5),
 Row(Id=2, PostTypeId=1, ParentId=None, CreationDate=datetime.datetime(2011, 6, 8, 18, 37, 12, 493000), AnswerCount=2),
 Row(Id=3, PostTypeId=1, ParentId=None, CreationDate=datetime.datetime(2011, 6, 8, 18, 37, 45, 593000), AnswerCount=3)]

In [26]:
posts_filtered.groupBy('PostTypeId').count().show()

+----------+-----+
|PostTypeId|count|
+----------+-----+
|         1|11008|
|         5|  335|
|         4|  335|
|         7|    2|
|         2|17451|
+----------+-----+



## ile pytań bez odpowiedzi

In [37]:
posts_filtered.filter(col('AnswerCount')==0).count()

889

## rozkład czasu pierwszej odpowiedzi

In [39]:
questions = posts_filtered.filter(col('PostTypeId')==1)
answers = posts_filtered.filter(col('PostTypeId')==2)

In [40]:
questions.show(5)

+---+----------+--------+--------------------+-----------+
| Id|PostTypeId|ParentId|        CreationDate|AnswerCount|
+---+----------+--------+--------------------+-----------+
|  1|         1|    null|2011-06-08 18:35:...|          5|
|  2|         1|    null|2011-06-08 18:37:...|          2|
|  3|         1|    null|2011-06-08 18:37:...|          3|
|  4|         1|    null|2011-06-08 18:37:...|          3|
|  6|         1|    null|2011-06-08 18:39:...|          5|
+---+----------+--------+--------------------+-----------+
only showing top 5 rows



In [41]:
answers.show(5)

+---+----------+--------+--------------------+-----------+
| Id|PostTypeId|ParentId|        CreationDate|AnswerCount|
+---+----------+--------+--------------------+-----------+
|  5|         2|       1|2011-06-08 18:38:...|       null|
|  9|         2|       1|2011-06-08 18:39:...|       null|
| 11|         2|       4|2011-06-08 18:40:...|       null|
| 12|         2|       1|2011-06-08 18:40:...|       null|
| 14|         2|       2|2011-06-08 18:41:...|       null|
+---+----------+--------+--------------------+-----------+
only showing top 5 rows



In [71]:
from pyspark.sql import functions as F
first_answers = answers.groupBy('ParentId').agg(F.min(answers.CreationDate))
first_answers.printSchema()

root
 |-- ParentId: integer (nullable = true)
 |-- min(CreationDate): timestamp (nullable = true)



In [65]:
df_as1 = questions.alias("questions")
df_as2 = first_answers.alias("first_answers")
joined = df_as1.join(df_as2, col("df_as1.Id") == col("df_as2.ParentId"), 'left')

AnalysisException: "cannot resolve '`df_as1.Id`' given input columns: [first_answers.min(CreationDate), questions.CreationDate, questions.PostTypeId, questions.AnswerCount, questions.ParentId, questions.Id, first_answers.ParentId];;\n'Join LeftOuter, ('df_as1.Id = 'df_as2.ParentId)\n:- SubqueryAlias `questions`\n:  +- Filter (PostTypeId#1807 = 1)\n:     +- Project [cast(Id#6 as int) AS Id#1806, cast(PostTypeId#7 as int) AS PostTypeId#1807, cast(ParentId#8 as int) AS ParentId#1808, cast(CreationDate#10 as timestamp) AS CreationDate#1810, cast(AnswerCount#23 as int) AS AnswerCount#1809]\n:        +- LogicalRDD [Id#6, PostTypeId#7, ParentId#8, AcceptedAnswerId#9, CreationDate#10, Score#11, ViewCount#12, Body#13, OwnerUserId#14, LastEditorUserId#15, LastEditorDisplayName#16, LastEditDate#17, LastActivityDate#18, CommunityOwnedDate#19, ClosedDate#20, Title#21, Tags#22, AnswerCount#23, CommentCount#24, FavoriteCount#25], false\n+- SubqueryAlias `first_answers`\n   +- Aggregate [ParentId#2061], [ParentId#2061, min(CreationDate#2062) AS min(CreationDate)#2043]\n      +- Filter (PostTypeId#2060 = 2)\n         +- Project [cast(Id#6 as int) AS Id#2059, cast(PostTypeId#7 as int) AS PostTypeId#2060, cast(ParentId#8 as int) AS ParentId#2061, cast(CreationDate#10 as timestamp) AS CreationDate#2062, cast(AnswerCount#23 as int) AS AnswerCount#2063]\n            +- LogicalRDD [Id#6, PostTypeId#7, ParentId#8, AcceptedAnswerId#9, CreationDate#10, Score#11, ViewCount#12, Body#13, OwnerUserId#14, LastEditorUserId#15, LastEditorDisplayName#16, LastEditDate#17, LastActivityDate#18, CommunityOwnedDate#19, ClosedDate#20, Title#21, Tags#22, AnswerCount#23, CommentCount#24, FavoriteCount#25], false\n"

# Data Exploration

## Badges

In [None]:
tags.take(2)

In [None]:
tags.sort('Count', ascending=False).show()

In [None]:
import matplotlib
%matplotlib inline

In [None]:
posts_time = posts.select([col('CreationDate').cast('date'), col('AnswerCount'), col('CommentCount')])

In [None]:
posts_time.take(2)

In [None]:
from pyspark.sql.functions import month, year
bitcoin_popularity = posts_time.groupBy(year('CreationDate')).count().toPandas()

In [None]:
bitcoin_popularity.cumsum().plot()