converts stackoverflow data dump from XML files to parquet format

In [1]:
import pyspark
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# initialize
spark = (SparkSession.builder
    .appName('stackoverflow')
    .config('spark.local.dir', 'spark_dir')
    .getOrCreate())

In [4]:
import re
import html

# create variables for parsing
pattern = re.compile(' ([A-Za-z]+)="([^"]*)"')
parse_line = lambda line: {key:value for key,value in pattern.findall(line)}
unescape = udf(lambda escaped: html.unescape(escaped) if escaped else None)

def read_tags_raw(tags_string): # converts <tag1><tag2> to ['tag1', 'tag2']
    return html.unescape(tags_string).strip('>').strip('<').split('><') if tags_string else []
    
read_tags = udf(read_tags_raw, ArrayType(StringType()))

link = 'german.stackexchange.com'

output_directory = 'main/outputdata'

In [27]:
data=spark.read.text(link + '/Badges.xml').where(col('value').like('%<row Id%')) \
    .select(udf(parse_line, MapType(StringType(), StringType()))('value').alias('value')) 

In [29]:
#Badges
data.select(
    col('value.Id').cast('integer').alias("Id"),
    col('value.UserId').cast('integer').alias("UserId"),  
    col('value.Name'),
    col('value.Date').cast('timestamp').alias("Date"),
    col('value.Class').cast('integer').alias("Class"),
    col('value.TagBased').cast('boolean').alias("TagBased")
    ).repartition(5).write.parquet(output_directory + '/Badges')



                                                                                

In [35]:
 #Comments

spark.read.text(link + '/Comments.xml').where(col('value').like('%<row Id%')) \
    .select(udf(parse_line, MapType(StringType(), StringType()))('value').alias('value')) \
    .select(
        col('value.Id').cast('integer').alias("Id"),
        col('value.PostId').cast('integer').alias("PostId"),  # foreign key
        col('value.Score').cast('integer').alias("Score"),
        unescape('value.Text').alias('Text'),
        col('value.CreationDate').cast('timestamp').alias("CreationDate"),
        col('value.UserId').cast('integer').alias("UserId")
    ).write.parquet(output_directory + '/Comments')

22/03/12 14:37:04 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [37]:
# Posts

spark.read.text(link + '/Posts.xml').where(col('value').like('%<row Id%')) \
    .select(udf(parse_line, MapType(StringType(), StringType()))('value').alias('value')) \
    .select(
    col('value.Id').cast('integer').alias("Id"),
    col('value.PostTypeId').cast('integer').alias("PostTypeId"),
    col('value.ParentId').cast('integer').alias("ParentId"),
    col('value.AcceptedAnswerId').cast('integer').alias("AcceptedAnswerId"),
    col('value.CreationDate').cast('timestamp').alias("CreationDate"),
    col('value.Score').cast('integer').alias("Score"),
    col('value.ViewCount').cast('integer').alias("ViewCount"),
    unescape('value.Body').alias('Body'),
    col('value.OwnerUserId').cast('integer').alias("OwnerUserId"),
    col('value.LastEditorUserId').cast('integer').alias("LastEditorUserId"),
    col('value.LastEditorDisplayName').alias("LastEditorDisplayName"),
    col('value.LastEditDate').cast('timestamp').alias("LastEditDate"),
    col('value.LastActivityDate').cast('timestamp').alias("LastActivityDate"),
    col('value.CommunityOwnedDate').cast('timestamp').alias("CommunityOwnedDate"),
    col('value.ClosedDate').cast('timestamp').alias("ClosedDate"),
    unescape('value.Title').alias('Title'),
    read_tags('value.Tags').alias('Tags'),
    col('value.AnswerCount').cast('integer').alias("AnswerCount"),
    col('value.CommentCount').cast('integer').alias("CommentCount"),
    col('value.FavoriteCount').cast('integer').alias("FavoriteCount"),
).write.parquet(output_directory + '/Posts')


22/03/12 16:01:53 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [38]:
# PostHistory

spark.read.text(link + '/PostHistory.xml').where(col('value').like('%<row Id%')) \
    .select(udf(parse_line, MapType(StringType(), StringType()))('value').alias('value')) \
    .select(
        col('value.Id').cast('integer').alias("Id"),
        col('value.PostHistoryTypeId').cast('integer').alias("PostHistoryTypeId"),
        col('value.PostId').cast('integer').alias("PostId"),  # foreign key
        col('value.RevisionGUID'),
        col('value.CreationDate').cast('timestamp').alias("CreationDate"),
        col('value.UserId').cast('integer').alias("UserId"),
        col('value.UserDisplayName').alias("UserDisplayName"),
        unescape('value.Comment').alias('Comment'),
        unescape('value.Text').alias('Text'),
        col('value.CloseReasonId').cast('integer').alias("CloseReasonId")
    ).write.parquet(output_directory + '/PostHistory')

22/03/12 16:08:12 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [39]:
# PostLinks

spark.read.text(link + '/PostLinks.xml').where(col('value').like('%<row Id%')) \
    .select(udf(parse_line, MapType(StringType(), StringType()))('value').alias('value')) \
    .select(
        col('value.Id').cast('integer').alias("Id"),
        col('value.CreationDate').cast('timestamp').alias("CreationDate"),
        col('value.PostId').cast('integer').alias("PostId"),
        col('value.RelatedPostId').cast('integer').alias("RelatedPostId"),
        col('value.LinkTypeId').cast('integer').alias("LinkTypeId")
    ).repartition(2).write.parquet(output_directory + '/PostLinks')

In [40]:
# Users

spark.read.text(link + '/Users.xml').where(col('value').like('%<row Id%')) \
    .select(udf(parse_line, MapType(StringType(), StringType()))('value').alias('value')) \
    .select(
    col('value.Id').cast('integer').alias("Id"),
    col('value.Reputation').cast('integer').alias("Reputation"),
    col('value.CreationDate').cast('timestamp').alias("CreationDate"),
    col('value.DisplayName'),
    col('value.EmailHash').cast('integer').alias("EmailHash"),
    col('value.LastAccessDate').cast('timestamp').alias("LastAccessDate"),
    col('value.WebsiteUrl'),
    col('value.Location'),
    col('value.Age').cast('integer').alias("Age"),
    unescape('value.AboutMe').alias('AboutMe'),
    col('value.Views').cast('integer').alias('Views'),
    col('value.UpVotes').cast('integer').alias('UpVotes'),
    col('value.DownVotes').cast('integer').alias('DownVotes'),
    col('value.ProfileImageUrl'),
    col('value.AccountId').cast('integer').alias('AccountId')
).repartition(10).write.parquet(output_directory + '/Users')

22/03/12 21:08:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [41]:
# Votes

spark.read.text(link + '/Votes.xml').where(col('value').like('%<row Id%')) \
    .select(udf(parse_line, MapType(StringType(), StringType()))('value').alias('value')) \
    .select(
        col('value.Id').cast('integer').alias("Id"),
        col('value.PostId').cast('integer').alias("PostId"),
        col('value.VoteTypeId').cast('integer').alias("VoteTypeId"),
        col('value.CreationDate').cast('timestamp').alias("CreationDate"),
        col('value.UserId').cast('integer').alias("UserId"),
        col('value.BountyAmount').cast('integer').alias("BountyAmount")
    ).repartition(20).write.parquet(output_directory + '/Votes')

                                                                                

In [42]:
# Tags

spark.read.text(link + '/Tags.xml').where(col('value').like('%<row Id%')) \
    .select(udf(parse_line, MapType(StringType(), StringType()))('value').alias('value')) \
    .select(
    col('value.Id').cast('integer').alias("Id"),
    col('value.TagName').alias("TagName"),
    col('value.Count').cast('integer').alias("Count"),
    col('value.ExcerptPostId').cast('integer').alias("ExcerptPostId"),
    col('value.WikiPostId').cast('integer').alias("WikiPostId")
).repartition(1).write.parquet(output_directory + '/Tags')