# Big Data - Project

## Initial data pre-processing

### Dragan Postolovski
### Wojciech Taisner

This part of report is dedicated to the data files itself. The aim of the code, provided in this section is to reduce data volume, point only significant data and shape them into proper structure. After the pre-processing each file is exported in CSV format, so it can be easily fit into dataframe. 

Main objectives are to:
* get rid on unnecessary and insignificant data
* get rid of blocks of text (replace it with text-related values e.g. word count, etc.)
* reduce xml overhead
* define require structure of data and export it

Two most important technologies:
* Dask bag and dataframe - to process data in memory
* Beautiful Soup - xml parsing

NOTE: dataset is compressed with tar.xz instead of 7z

NOTE: commented blocks of code were used to seek for differences in structure between data in single file, this code usually takes long to execute and it's not required to build it, since most of values were hardcoded.

These computations are very time-consuming because compression does not allow block splitting, so dask can only work with single thread. Also parsing xml is very expensive (in terms of computing power).

In [1]:
!ls -la *.tar.xz

-rw-rw-r--. 1 wojtek wojtek   346423376 01-18 06:06 AnswersFrame-0.csv.tar.xz
-rw-rw-r--. 1 wojtek wojtek    15774436 01-17 11:17 BadgesAggregatedFrame-0.csv.tar.xz
-rw-rw-r--. 1 wojtek wojtek   127357784 01-16 12:22 BadgesFrame-0.csv.tar.xz
-rwxrwxrwx. 1 wojtek wojtek   204796672 01-09 21:50 Badges.tar.xz
-rw-rw-r--. 1 wojtek wojtek     2075408 01-15 21:50 BountyFrame-0.csv.tar.xz
-rwxrwxrwx. 1 wojtek wojtek  4093557860 01-10 13:47 Comments.tar.xz
-rw-rw-r--. 1 wojtek wojtek     8444828 01-15 19:56 DuplicateFrame-0.csv.tar.xz
-rwxrwxrwx. 1 wojtek wojtek    64935436 01-09 20:51 PostLinks.tar.xz
-rw-rw-r--. 1 wojtek wojtek   486344308 01-28 03:54 PostsFrame-0.csv.tar.xz
-rwxrwxrwx. 1 wojtek wojtek 12303248204 01-10 21:26 Posts.tar.xz
-rw-rw-r--. 1 wojtek wojtek      430164 01-28 12:29 TagsFrame-0.csv.tar.xz
-rwxrwxrwx. 1 wojtek wojtek      735732 01-09 21:32 Tags.tar.xz
-rw-rw-r--. 1 wojtek wojtek   179974748 01-15 21:25 UsersFrame-0.csv.tar.xz
-rwxrwxrwx. 1 wojtek wojtek  

In [2]:
# import dask bag
import dask.bag as db
# import dask datarfame
import dask.dataframe as dd
# import BeatuifulSoup
from bs4 import BeautifulSoup
# import pandas and numpy
import pandas as pd
import numpy as np
import re

In [3]:
#files
base = "stackoverflow.com-"

badges = "Badges"
comments = "Comments"
posthistory = "PostHistory"
postlinks = "PostLinks"
posts = "Posts"
tags = "Tags"
users = "Users"
votes = "Votes"

#all_files = [badges, comments, postlinks, posts, tags, users, votes] # posthistory, 
# for some reason post history does not want to co-operate
#all_files

['Badges', 'Comments', 'PostLinks', 'Posts', 'Tags', 'Users', 'Votes']

# Functions
Required in process of transforming data

In [4]:
# non complex parsing (ex badges, tags, etc)
# gets data from single xml row (attr values)
# input single line encoded as xml
# output tuple of attributes values
def xml_row_to_tuple(row): 
    soup = BeautifulSoup(row, "xml")
    ret = []
    for a, v in soup.row.attrs.items():
        if v.isdigit():
            v = int(v) #integers take less memory :D
        ret.append(v)
    return tuple(ret);

# gets headers for single xml row (attr names)
# input single line encoded as xml
# output tuple of attributes names
def xml_headers_to_tuple(row): 
    soup = BeautifulSoup(row, "xml")
    ret = []
    for a, v in soup.row.attrs.items():
        ret.append(a)
    return tuple(ret);

# read data only form provided attrs
# input single line encoded as xml d as tuple of required headers
# output tuple of values of required headers
# special treatment for text values
def xml_row_to_tuple_attrs(row, d=tuple("Id")):
    soup = BeautifulSoup(row, "xml")
    ret = []
    for e in d:
        try:
            v = soup.row.attrs[e]
        except KeyError:
            v = None
        
        if e == "Body":  ## special parser for posts and answers
            ret.extend(list(filter_body_noise(v)))
        elif e == "Title":  ## special parser for posts and answers
            ret.append(len(v.split()))
        elif v != None:
            if v.isdigit():
                v=int(v)
            ret.append(v)
        else:
            ret.append(0)
    return tuple(ret)

## select shortest valid header
## if any field from tested header is missing in set of headers, then false
def is_header_valid(setof, element):
    for k in setof:
        for e in element:
            try:
                k.index(e)    
            except ValueError:
                return False
    return True

# looks for first header whicz elements occurs in all the other headers
def find_valid_header(headers):
    # generate list of sorted headers by length ascending
    headers.sort(key=len)
    
    minlen = min(list(map(len, headers)))
    
    # for elements where len(element) == min(len(element)) - other just wont be valid anyway
    # try to find if it is a valid header
    for k in list(filter(lambda x: len(x) == minlen, headers)):
        if is_header_valid(headers, k):
            return k
    return tuple()

# returns shortest valid header (it's elements occur in all the other headers)
def shortest_valid_header(bag):
    # select distinct headers
    headers = bag.map(xml_headers_to_tuple).distinct().compute()
    return find_valid_header(headers)
    

# bag summary header + first row
# only bags with proper structure
def summary_bag(bag):
    th = bag.map(xml_headers_to_tuple).take(1)[0]
    for i, v in enumerate(th):
        print('{}: {}'.format(i, v))
    print("")
    tp = bag.map(xml_row_to_tuple).take(1)[0]
    for i, v in enumerate(tp):
        print('{}: {}'.format(i, v))
    print("")
    
# read bag function
def make_xml_bag(filename):
    extension=".tar.xz"
    compression="xz"
    temp_bag = db.read_text(filename+extension, compression=compression).str.strip()
    temp_bag = temp_bag.filter(lambda x: x.find("row") >= 0)
    return temp_bag

# check if bag is ok (data are more-a-less structured) [the same number of attributes in each row]
# takes a while of course
def check_bag(bag):
    ma = bag.map(lambda x: len(x)).max().compute()
    mi = bag.map(lambda x: len(x)).min().compute()
    if ma == mi:
        return True
    print(ma, mi)
    return False

# generate metadata iterable of tuples (name, dtype)
def meta_from_header(header):
    ret = []
    for k in header:
        if k.find("Date") >= 0:
            ret.append((k, np.dtype('datetime64[ns]')))
        elif k.find("Id") >= 0 or k.find("Score") >= 0 or k.find("Count") >= 0:
            ret.append((k, np.int_))
        else:
            ret.append((k, np.dtype('S'))) ## TODO some better parsing to string type
    return ret

def filter_body_noise(input):

    # count blocks of code
    nr_code_blocks = len(re.findall(r"</code>", input))
    # count link in body
    nr_links = len(re.findall(r"</a>", input))

    # couldnt make the regex work with multi-line files, so I remove them :))))
    input = input.replace('\n', ' ')
    input = input.replace("\'", '')

    # remove code blocks from body
    input = re.sub(r"<code>(.*?)</code>", "", input,flags=re.MULTILINE)

    # remove all other tags from body
    input = re.sub(r"<[^>]*>", "", input)

    # remove hardcoded links
    input = re.sub(r'\w+:\/{2}[\d\w-]+(\.[\d\w-]+)*(?:(?:\/[^\s/]*))*', '', input, flags=re.MULTILINE)

    # remove multiple whitespace characters to clean up the previous
    # string substitutions(because when removing for example a code block, there will be two spaces
    # remaining on each side of the code block, thus we get two whitespace characters
    input = re.sub("\s\s+", ' ', input)

    # count remaining words
    nr_words = len(input.split(' '))

    # returns tuple in format (number_of_words, number_of_links, number_of_code_blocks)
    return (nr_words, nr_links, nr_code_blocks)

# Transforming input data

In this part of project, aim is to clean data, remove xml overhead, mine some of required features and in general minify size of files as possible. Output is tabled-schema data in csv files, with headers.

All data are initially filtered so only rows are parsed. Afterwards provided values are taken from rows and exported to dask dataframe and later to csv file.

## Tags

In [9]:
# read tags bag, select rows only
tags_bag = db.read_text(tags+".tar.xz", compression="xz").str.strip()
tags_bag = tags_bag.filter(lambda x: x.find("row") >= 0)

In [10]:
# no need to run this
#dt = tags_bag.map(xml_headers_to_tuple).distinct().compute()
#print(dt)

In [11]:
dt = [('Id', 'TagName', 'Count', 'ExcerptPostId', 'WikiPostId'), ('Id', 'TagName', 'Count')]
if len(dt) > 1:
    header = find_valid_header(dt)
    print(header)

('Id', 'TagName', 'Count')


In [12]:
tags_bag2 = tags_bag.map(xml_row_to_tuple_attrs, d=header)
tags_df = tags_bag2.to_dataframe(meta=meta_from_header(header))
tags_df.to_csv("TagsFrame-*.csv.tar.xz",compression="xz",index=False)

['TagsFrame-0.csv.tar.xz']

## Badges

Badges are aggregated and counted for each user.

In [9]:
badges_bag = make_xml_bag(badges)

In [14]:
## no need to run this, header provided next cell
#dba = badges_bag.map(xml_headers_to_tuple).distinct().compute()
#print(dba)

In [10]:
## only one distinct header, do not need to compute this
dba = [('Id', 'UserId', 'Name', 'Date', 'Class', 'TagBased')]
if len(dba) > 1:
    b_header = find_valid_header(dba)
    print(b_header)

In [11]:
#hhh=('Id', 'UserId', 'Class')
#temp_b = badges_bag.map(xml_row_to_tuple_attrs, d=hhh).to_dataframe(meta=meta_from_header(hhh))
#temp_b.to_csv("BadgesFrame-*.csv.tar.xz",compression="xz",index=False)

In [12]:
#generate aggregated badges by user id
def binop(total,x): #count
    return (total[0]+1,)

def combine(t1, t2): #combine count
    return(t1[0]+t2[0],)

hhh2 = ("UserId", "BadgesCount")
temp_b2 = badges_bag.map(xml_row_to_tuple_attrs, d=hhh).foldby(lambda x: x[1], binop, (0,), combine, (0,))
temp_b2 = temp_b2.map(lambda x: (x[0], x[1][0])).to_dataframe(meta=meta_from_header(hhh2))
#temp_b2.to_csv("BadgesAggregatedFrame-*.csv.tar.xz",compression="xz",index=False)
temp_b2.to_csv("BadgesAggregatedFrame-*.csv",index=False)

['BadgesAggregatedFrame-0.csv']

## Comments

This file was not used later

In [None]:
comments_bag = make_xml_bag(comments)

In [None]:
# no need to rerun this
#dcb = comments_bag.map(xml_headers_to_tuple).distinct().compute()
#print(dcb)

In [None]:
## only one distinct header, do not need to compute this
#if len(dcb) > 1:
#    b_header = find_valid_header(dcb)
#    print(b_header)
    
c_header = ('Id', 'PostId', 'Score', 'Text', 'CreationDate')

## Post Links

This data were used to define posts marked as duplicate

In [None]:
postlinks_bag = make_xml_bag(postlinks)

In [None]:
#no need to rerun this
#dplb = postlinks_bag.map(xml_headers_to_tuple).distinct().compute()
#print(dplb)
dplb = [('Id', 'CreationDate', 'PostId', 'RelatedPostId', 'LinkTypeId')]

In [None]:
## only one distinct header, do not need to compute this
if len(dplb) > 1:
    pl_header = find_valid_header(dplb)
    print(pl_header)

In [None]:
##('Id', 'CreationDate', 'PostId', 'RelatedPostId', 'LinkTypeId')
temp_pl = postlinks_bag.filter(lambda x: x.find("LinkTypeId=\"3\"") >= 0).map(xml_row_to_tuple_attrs, d=dplb[0])
temp_pl = temp_pl.to_dataframe(meta=meta_from_header(dplb[0]))
temp_pl.to_csv("DuplicateFrame-*.csv.tar.xz",compression="xz",index=False)

## Users

This file contains important information about users and their activity on stack overflow

In [None]:
users_bag = make_xml_bag(users)

In [None]:
## do not run this, no point
#dub = users_bag.map(xml_headers_to_tuple).distinct().compute()
#print(dub)

In [None]:
## only one distinct header, do not need to compute this
#if len(dub) > 1:
#    u_header = find_valid_header(dub)
#    print(u_header)

In [None]:
u_header = ('Id', 'Reputation', 'CreationDate', 'DisplayName', 'LastAccessDate', 'Views', 'UpVotes', 'DownVotes')
temp_u = users_bag.map(xml_row_to_tuple_attrs, d=u_header)
temp_u.to_dataframe(meta=meta_from_header(u_header)).to_csv("UsersFrame-*.csv.tar.xz",compression="xz",index=False)

## Votes

We use this section to determine which posts contained bounty for providing accepted answer.

NOTE: filtering allowed to significantly reduce time required for this file to be parsed (for entire file its more than 8 hours).

In [None]:
votes_bag = make_xml_bag(votes)

In [None]:
### dont run this
### never ever
#CPU times: user 15 s, sys: 5.59 s, total: 20.6 s
#Wall time: 8h 16min 4sys
## result copied cell below

#dv = votes_bag.map(xml_headers_to_tuple).distinct().compute()
#print(dv)

In [None]:
dv = [('Id', 'PostId', 'VoteTypeId', 'UserId', 'CreationDate'), ('Id', 'PostId', 'VoteTypeId', 'CreationDate', 'BountyAmount'), ('Id', 'PostId', 'VoteTypeId', 'UserId', 'CreationDate', 'BountyAmount'), ('Id', 'PostId', 'VoteTypeId', 'CreationDate')]
## only one distinct header, do not need to compute this
if len(dv) > 1:
    v_header = find_valid_header(dv)
    print(v_header)

In [None]:
# only votes to open and close bounty
temp_v = votes_bag.filter(lambda x: x.find("VoteTypeId=\"8\"") >= 0 or x.find("VoteTypeId=\"9\"") >=0)
temp_v = temp_v.map(xml_row_to_tuple_attrs, d=v_header)
#temp_v.to_dataframe(meta=meta_from_header(v_header)).to_csv("BountyFrame-*.csv.tar.xz",compression="xz",index=False)
#export without compression
temp_v.to_dataframe(meta=meta_from_header(v_header)).to_csv("BountyFrame-*.csv",index=False)

## Posts History
Seems that data are too big and too complicated to process them. Required effort seems to have no profit to give.

## Posts and Answers 

Single file containing posts and answers was split into two separate ones.

In [None]:
# read posts file
both_bag = db.read_text(posts+".tar.xz", compression="xz").str.strip()
both_bag = both_bag.filter(lambda x: x.find('row') >=0 )
# create answers and posts bag
answers_bag = both_bag.filter(lambda x: x.find("PostTypeId=\"2\"") >= 0)
posts_bag = both_bag.filter(lambda x: x.find("PostTypeId=\"1\"") >= 0)

In [None]:
# invastigatimg missing rows from posts
#totc = both_bag.count().compute()
#totc # total count
#ansc = answers_bag.count().compute()
#ansc # answers count
#posc = posts_bag.count().compute()
#posc # posts count
#missing = totc - ansc - posc # how many missing
#missing
# try to find missing vol 2 works
#both_bag.filter(lambda x: x.find("PostTypeId=\"1\"") < 0 and x.find("PostTypeId=\"2\"") < 0).count().compute()
# check whats wrong with data -> undefined types of posts
#both_bag.filter(lambda x: x.find("PostTypeId=\"1\"") < 0 and x.find("PostTypeId=\"2\"") < 0).take(1)

In [None]:
## check data structure across answers
## dont run it, takes a while
dab = answers_bag.map(xml_headers_to_tuple).distinct().compute()
#print(dab)
if len(dab) > 1:
    ans_header = find_valid_header(dab)
    print(ans_header)
#('Id', 'PostTypeId', 'ParentId', 'CreationDate', 'Score', 'Body', 'LastActivityDate', 'CommentCount')

In [None]:
## store answers to csv
# required header
ans_header = ('Id', 'PostTypeId', 'ParentId', 'CreationDate', 'Score', 'Body', 'LastActivityDate', 'CommentCount')
# generating dataframe header
temp_ans_meta = meta_from_header(('Id', 'PostTypeId', 'ParentId', 'CreationDate', 'Score', 
                                  'WordCount', 'LinksCount', 'BlocksCount', 'LastActivityDate', 'CommentCount'))
# mapping and getting these heders
temp_ans = answers_bag.map(xml_row_to_tuple_attrs, d=ans_header)
# create dataframe, store as csv
temp_ans.to_dataframe(meta=temp_ans_meta).to_csv("AnswersFrame-*.csv.tar.xz",compression="xz",index=False)

In [None]:
## check data structure across posts
## dont run it, takes a while
dpb = posts_bag.map(xml_headers_to_tuple).distinct().compute()
#print(dpb## only one distinct header, do not need to compute this
if len(dpb) > 1:
    pos_header = find_valid_header(dpb)
    print(pos_header)
#('Id', 'PostTypeId', 'CreationDate', 'Score', 'ViewCount', 'Body', 'LastActivityDate', 'Title', 'Tags', 'AnswerCount', 'CommentCount')

In [None]:
## store posts to csv
# required posts header
pos_head = ('Id', 'PostTypeId', 'CreationDate', 'Score', 'ViewCount', 'Body', 'LastActivityDate', 
            'Title', 'Tags', 'AnswerCount', 'CommentCount', 'OwnerUserId', 'AcceptedAnswerId')
# generating dataframe header
temp_pos_meta = meta_from_header(('Id', 'PostTypeId', 'CreationDate', 'Score', 'ViewCount', 
                                  'WordCount', 'LinksCount', 'BlocksCount', 'LastActivityDate', 
                                  'TitleWordsCount', 'Tags', 'AnswerCount', 
                                  'CommentCount', 'OwnerUserId', 'AcceptedAnswerId'))

# mapping and getting these heders
temp_pos = posts_bag.map(xml_row_to_tuple_attrs, d=pos_head)
# create dataframe,  store as csv
temp_pos.to_dataframe(meta=temp_pos_meta).to_csv("PostsFrame-*.csv.tar.xz",compression="xz",index=False)