## This notebook converts [stackoverflow data dump](https://archive.org/details/stackexchange) from XML files to parquet format

In [None]:
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = (SparkSession.builder
    .appName('stackoverflow')
    .config('spark.local.dir', '/home/mario/tmp')
    .getOrCreate())

In [None]:
import re
import html

pattern = re.compile(' ([A-Za-z]+)="([^"]*)"')
parse_line = lambda line: {key:value for key,value in pattern.findall(line)}
unescape = udf(lambda escaped: html.unescape(escaped) if escaped else None)

def read_tags_raw(tags_string): # converts <tag1><tag2> to ['tag1', 'tag2']
    return html.unescape(tags_string).strip('>').strip('<').split('><') if tags_string else []
    
read_tags = udf(read_tags_raw, ArrayType(StringType()))

In [None]:
# Badges

spark.read.text('/home/stackoverflow/xml/Badges.xml').where(col('value').like('%<row Id%')) \
    .select(udf(parse_line, MapType(StringType(), StringType()))('value').alias('value')) \
    .select(
        col('value.Id').cast('integer'),
        col('value.UserId').cast('integer'),
        col('value.Name'),
        col('value.Date').cast('timestamp'),
        col('value.Class').cast('integer'),
        col('value.TagBased').cast('boolean')
    ).repartition(5).write.parquet('/home/stackoverflow/parquet/Badges')

In [None]:
# Comments

spark.read.text('/home/stackoverflow/xml/Comments.xml').where(col('value').like('%<row Id%')) \
    .select(udf(parse_line, MapType(StringType(), StringType()))('value').alias('value')) \
    .select(
        col('value.Id').cast('integer'),
        col('value.PostId').cast('integer'),
        col('value.Score').cast('integer'),
        unescape('value.Text').alias('Text'),
        col('value.CreationDate').cast('timestamp'),
        col('value.UserId').cast('integer'),
        col('value.UserDisplayName')
    ).write.parquet('/home/stackoverflow/parquet/Comments')

In [None]:
# PostHistory

spark.read.text('/home/stackoverflow/xml/PostHistory.xml').where(col('value').like('%<row Id%')) \
    .select(udf(parse_line, MapType(StringType(), StringType()))('value').alias('value')) \
    .select(
        col('value.Id').cast('integer'),
        col('value.PostHistoryTypeId').cast('integer'),
        col('value.PostId').cast('integer'),
        col('value.RevisionGUID'),
        col('value.CreationDate').cast('timestamp'),
        col('value.UserId').cast('integer'),
        unescape('value.Text').alias('Text'),
        unescape('value.Comment').alias('Comment'),
        col('value.UserDisplayName')
    ).write.parquet('/home/stackoverflow/parquet/PostHistory')

In [None]:
# PostLinks

spark.read.text('/home/stackoverflow/xml/PostLinks.xml').where(col('value').like('%<row Id%')) \
    .select(udf(parse_line, MapType(StringType(), StringType()))('value').alias('value')) \
    .select(
        col('value.Id').cast('integer'),
        col('value.CreationDate').cast('timestamp'),
        col('value.PostId').cast('integer'),
        col('value.RelatedPostId').cast('integer'),
        col('value.LinkTypeId').cast('integer')
    ).repartition(2).write.parquet('/home/stackoverflow/parquet/PostLinks')

In [None]:
# Posts

spark.read.text('/home/stackoverflow/xml/Posts.xml').where(col('value').like('%<row Id%')) \
    .select(udf(parse_line, MapType(StringType(), StringType()))('value').alias('value')) \
    .select(
        col('value.Id').cast('integer'),
        col('value.ParentId').cast('integer'),
        col('value.PostTypeId').cast('integer'),
        col('value.CreationDate').cast('timestamp'),
        col('value.Score').cast('integer'),
        col('value.ViewCount').cast('integer'),
        unescape('value.Body').alias('Body'),
        col('value.OwnerUserId').cast('integer'),
        col('value.LastActivityDate').cast('timestamp'),
        unescape('value.Title').alias('Title'),
        read_tags('value.Tags').alias('Tags'),
        col('value.CommentCount').cast('integer'),
        col('value.AnswerCount').cast('integer'),
        col('value.LastEditDate').cast('timestamp'),
        col('value.LastEditorUserId').cast('integer'),
        col('value.AcceptedAnswerId').cast('integer'),
        col('value.FavoriteCount').cast('integer'),
        col('value.OwnerDisplayName'),
        col('value.ClosedDate').cast('timestamp'),
        col('value.LastEditorDisplayName'),
        col('value.CommunityOwnedDate').cast('timestamp')
    ).write.parquet('/home/stackoverflow/parquet/Posts')

In [None]:
# Tags

spark.read.text('/home/stackoverflow/xml/Tags.xml').where(col('value').like('%<row Id%')) \
    .select(udf(parse_line, MapType(StringType(), StringType()))('value').alias('value')) \
    .select(
        col('value.Id').cast('integer'),
        col('value.TagName'),
        col('value.Count').cast('integer'),
        col('value.ExcerptPostId').cast('integer'),
        col('value.WikiPostId').cast('integer')
    ).repartition(1).write.parquet('/home/stackoverflow/parquet/Tags')

In [None]:
# Users

spark.read.text('/home/stackoverflow/xml/Users.xml').where(col('value').like('%<row Id%')) \
    .select(udf(parse_line, MapType(StringType(), StringType()))('value').alias('value')) \
    .select(
        col('value.Id').cast('integer'),
        col('value.Reputation').cast('integer'),
        col('value.CreationDate').cast('timestamp'),
        col('value.DisplayName'),
        col('value.LastAccessDate').cast('timestamp'),
        col('value.WebsiteUrl'),
        col('value.Location'),
        unescape('value.AboutMe').alias('AboutMe'),
        col('value.Views').cast('integer'),
        col('value.UpVotes').cast('integer'),
        col('value.DownVotes').cast('integer'),
        col('value.ProfileImageUrl'),
        col('value.Age').cast('integer'),
        col('value.AccountId').cast('integer')
    ).repartition(10).write.parquet('/home/stackoverflow/parquet/Users')

In [None]:
# Votes

spark.read.text('/home/stackoverflow/xml/Votes.xml').where(col('value').like('%<row Id%')) \
    .select(udf(parse_line, MapType(StringType(), StringType()))('value').alias('value')) \
    .select(
        col('value.Id').cast('integer'),
        col('value.PostId').cast('integer'),
        col('value.VoteTypeId').cast('integer'),
        col('value.UserId').cast('integer'),
        col('value.CreationDate').cast('timestamp'),
        col('value.BountyAmount').cast('integer')
    ).repartition(20).write.parquet('/home/stackoverflow/parquet/Votes')

In [None]:
# getting all keys used in file

# spark.read.text('/home/stackoverflow/new/Tags.xml').where(col('value').like('%<row Id%')) \
#     .select(udf(parse_line, MapType(StringType(), StringType()))('value').alias('value')) \
#     .select(expr('map_keys(value)')).select(explode('map_keys(value)')).groupBy('col').count() \
#     .orderBy(desc('count')).show()