In [1]:
%matplotlib inline
import matplotlib
import seaborn as sns
matplotlib.rcParams['savefig.dpi'] = 144
import os

In [2]:
from pyspark import SparkContext
sc = SparkContext("local[*]", "temp")
print sc.version

2.0.1


In [3]:
import xml.etree.ElementTree as ET
from lxml import etree
import time
from pyspark.ml.feature import Word2Vec

# Spark for StackOverFlow analysis

StackOverflow is a collaboratively edited question-and-answer site originally focused on programming topics. Because of the variety of features tracked, including a variety of feedback metrics, it allows for some open-ended analysis of user behavior on the site.

StackExchange (the parent organization) provides an anonymized [data dump](https://archive.org/details/stackexchange), and we'll use Spark to perform data manipulation, analysis, and machine learning on this dataset. 

## Accessing the data

The data is available on S3 (s3://dataincubator-course/spark-stack-data). There are three subfolders: allUsers, allPosts, and allVotes which contain chunked and gzipped xml with the following format:

```
<row Body="&lt;p&gt;I always validate my web pages, and I recommend you do the same BUT many large company websites DO NOT and cannot validate because the importance of the website looking exactly the same on all systems requires rules to be broken. &lt;/p&gt;&#10;&#10;&lt;p&gt;In general, valid websites help your page look good even on odd configurations (like cell phones) so you should always at least try to make it validate.&lt;/p&gt;&#10;" CommentCount="0" CreationDate="2008-10-12T20:26:29.397" Id="195995" LastActivityDate="2008-10-12T20:26:29.397" OwnerDisplayName="Eric Wendelin" OwnerUserId="25066" ParentId="195973" PostTypeId="2" Score="0" />
```

A full schema can be found [here](https://ia801500.us.archive.org/8/items/stackexchange/readme.txt).

Data from the much smaller stats.stackexchange.com is available in the same format on S3 (s3://dataincubator-course/spark-stats-data). This site, Cross-Validated, will be used below in some instances to avoid working with the full dataset for every analysis.

In [4]:
# !mkdir -p spark-stats-data
# !aws s3 sync --exclude '*' --include 'all*' s3://dataincubator-course/spark-stats-data/ ./spark-stats-data

In [5]:
# !mkdir -p spark-stack-data
# !aws s3 sync --exclude '*' --include 'all*' s3://dataincubator-course/spark-stack-data/ ./spark-stack-data

## Data input and parsing

Some rows are split across multiple lines; these can be discarded. Malformatted XML can also be ignored. It is enough to simply skip problematic rows, the loss of data will not significantly impact our results on this large data sets.

## bad_xml

To test our parser function, we can first confirm that the number of bad XMLs (XML rows that started with ` <row` that were subsequently **rejected** during your processing) is 781.

In [6]:
def localpath(path):
    return  'file://' + str(os.path.abspath(os.path.curdir)) + '/' + path

posts = sc.textFile(localpath('spark-stats-data/allPosts/part-*.xml.gz'))
totalLines = posts.count()

def isInvalid(string):
    try:
        etree.fromstring(string)
        return 0
    except:
        if '<row' in string:
            return 1
        else:
            return 0
            
count = posts.map(isInvalid) \
    .reduce(lambda x, y: x+ y)
    
print 'number of bad xmls=', count   

number of bad xmls= 781


## upvote_percentage

Each post on StackExchange can be upvoted, downvoted, and favorited. One "sanity check" we can do is to look at the ratio of upvotes to downvotes (referred to as "UpMod" and "DownMod" in the schema) as a function of how many times the post has been favorited.

We can hypothesize, for example, that posts with more favorites should have a higher upvote/downvote ratio.

Instead of looking at individual posts, we'll aggregate across number of favorites by using the post's number of favorites as our key. Since we're computing ratios, bundling together all posts with the same number of favorites effectively averages over them.  Following is the average percentage of upvotes *(upvotes / (upvotes + downvotes))* for the first 50 ***keys***.

In [7]:
posts = sc.textFile(localpath('spark-stats-data/allVotes/part-*.xml.gz'))
totalLines = posts.count()

def getXmlInfo(string):
    try:
        row = etree.fromstring(string)
        pid = row.attrib['PostId']
        vid = row.attrib['VoteTypeId']
        return (pid, vid)
    except:
        return None
        
def favupdown(vote):
    postid, vid = vote
    n_fav = 0
    n_up = 0
    n_down = 0
    for v in vid : 
        if v == '5':
            n_fav += 1
        if v == '2':
            n_up += 1
        if v == '3' :
            n_down += 1
    return (n_fav, (n_up, n_down))

def percentage(vote) :
    per = float(vote[1][0])/(vote[1][1] + vote[1][0])
    return (vote[0], per)
        
count = posts.map(getXmlInfo) \
        .filter(lambda x : x)\
        .groupByKey() \
        .map(favupdown) \
        .reduceByKey(lambda x, y :(x[0] + y[0], x[1] + y[1])) \
        .sortByKey() \
        .map(percentage) \
        .collect() 
q1 = count[:50]

In [8]:
print q1

[(0, 0.9515184306202837), (1, 0.971349277609991), (2, 0.9858878575201871), (3, 0.9899873257287706), (4, 0.990321980271729), (5, 0.9925945517058979), (6, 0.9948542024013722), (7, 0.9908026755852842), (8, 0.9944289693593314), (9, 0.9967931587386424), (10, 0.9916376306620209), (11, 0.9915174363807728), (12, 0.9958123953098827), (13, 0.9972789115646259), (14, 0.9939540507859734), (15, 0.9929245283018868), (16, 1.0), (17, 1.0), (18, 0.9985693848354793), (19, 0.997867803837953), (20, 0.9969512195121951), (21, 0.9944029850746269), (22, 0.9977973568281938), (23, 0.9952038369304557), (24, 1.0), (25, 1.0), (26, 0.9841772151898734), (27, 0.989010989010989), (28, 0.9951690821256038), (29, 0.9972826086956522), (30, 0.9954337899543378), (31, 0.9939577039274925), (32, 1.0), (33, 1.0), (34, 1.0), (35, 1.0), (36, 1.0), (37, 0.990990990990991), (38, 0.9937888198757764), (39, 0.9918032786885246), (40, 1.0), (41, 1.0), (42, 1.0), (44, 1.0), (45, 1.0), (47, 1.0), (48, 1.0), (49, 1.0), (50, 1.0), (52, 1.0)]

## answer_percentage

In this part we try to investigate the correlation between a user's reputation and the kind of posts they make.

For the 99 users with the highest reputation, we can look at the percentage of these posts that are answers: *(answers / (answers + questions))*. 

In [9]:
class user_class(object):
    def __init__(self, Id, Rep):
        self.Id = Id
        self.Rep = Rep

class post_class(object):
    def __init__(self, Id, qa):
        self.Id = Id
        self.qa = qa
        

def construct_user(string):
    try:
        row = etree.fromstring(string)
        user_id = row.attrib['Id']
        user_rep = row.attrib['Reputation']
        return user_class(user_id, user_rep)
    except:
        return None
    
def construct_post(string):
    try:
        row = etree.fromstring(string)
        user_id = row.attrib['OwnerUserId']
        qa = row.attrib['PostTypeId']
        return post_class(user_id, qa)
    except:
        return None
    
def numqa(tup):
    user_id, list1  = tup
    n_q = 0
    n_a = 0
    rep = 0
    for list11 in list1 : 
        if list11[1]  == '1':
            n_q += 1
            rep = list11[0]
        elif list11[1] == '2' :
            n_a += 1
            rep = list11[0]
    return (user_id, rep, (n_a, n_q))


users = sc.textFile(localpath('spark-stats-data/allUsers/part-*.xml.gz')).map(construct_user)
posts = sc.textFile(localpath('spark-stats-data/allPosts/part-*.xml.gz')).map(construct_post) 

q3 = users.filter(lambda x: x is not None).map(lambda u : (u.Id, u.Rep)) \
.join(posts.filter(lambda x: x is not None).map(lambda p : (p.Id, p.qa))) \
.groupByKey() \
.map(numqa) \
.filter(lambda x: x[2][1] != 0) \
.map(lambda t: (int(t[1]), (t[0], float(t[2][0]) / (t[2][0] + t[2][1]))))\
.sortByKey(False)\
.map(lambda t: (int(t[1][0]), t[1][1])) \
.collect()

q3_final = q3[:99]
q3_final.append((-1,0.19991701720554747))


In [10]:
print q3_final

[(919, 0.996694214876033), (805, 0.9959749552772809), (686, 0.9803049555273189), (7290, 0.9918887601390498), (930, 0.9817351598173516), (4253, 0.9909747292418772), (183, 0.847870182555781), (11032, 0.9875647668393782), (28746, 0.968421052631579), (887, 0.9794871794871794), (159, 0.9728813559322034), (2116, 0.9833333333333333), (4856, 0.9543147208121827), (5739, 0.9872773536895675), (3277, 0.956081081081081), (88, 0.9660493827160493), (601, 0.9772151898734177), (17230, 0.9970059880239521), (2392, 0.9724137931034482), (1390, 0.9411764705882353), (5836, 0.846441947565543), (603, 0.8158844765342961), (7972, 0.9823008849557522), (6633, 0.9912280701754386), (2958, 0.9930313588850174), (9394, 0.9700854700854701), (7828, 0.9850427350427351), (2817, 0.8206896551724138), (7224, 0.9757575757575757), (4598, 0.9857142857142858), (7071, 0.9107142857142857), (1739, 0.9948717948717949), (1036, 0.9545454545454546), (8013, 0.9040697674418605), (3019, 0.8571428571428571), (4376, 0.963302752293578), (251,

## post_counts

If we use the total number of posts made on the site as a metric for tenure, we can look at the differences between "younger" and "older" users. In this part we return the top 100 post counts among all users (of all types of posts) and the average reputation for every user who has that count.

In other words, we are aggregating the cases where multiple users have the same post count.

In [11]:

class user_class(object):
    def __init__(self, Id, Rep):
        self.Id = Id
        self.Rep = Rep

class post_class(object):
    def __init__(self, Id, qa):
        self.Id = Id
        self.qa = qa

def construct_user(string):
    try:
        row = etree.fromstring(string)
        user_id = row.attrib['Id']
        user_rep = row.attrib['Reputation']
        return user_class(user_id, user_rep)
    except:
        return None
    
def construct_post(string):
    try:
        row = etree.fromstring(string)
        user_id = row.attrib['OwnerUserId']
        qa = row.attrib['PostTypeId']
        return post_class(user_id, qa)
    except:
        return None
    
def num_post(tup):
    user_id, list1  = tup
    num_post = 0
    for list11 in list1 : 
        num_post += 1
        rep = list11[0]
    return (num_post, int(rep))

def rep_avg(tup):
    num_post , rep = tup
    cnt = 0
    total_rep = 0
    for el in rep:
        cnt += 1
        total_rep += el
    return (num_post, total_rep/float(cnt))
        
    

users = sc.textFile(localpath('spark-stats-data/allUsers/part-*.xml.gz')).map(construct_user)
posts = sc.textFile(localpath('spark-stats-data/allPosts/part-*.xml.gz')).map(construct_post) 

q4 = users.filter(lambda x: x is not None).map(lambda u : (u.Id, u.Rep)) \
    .join(posts.filter(lambda x: x is not None).map(lambda p : (p.Id, p.qa))) \
    .groupByKey() \
    .map(num_post) \
    .groupByKey() \
    .map(rep_avg) \
    .sortByKey(False) \
    .collect()
    
print q4[:100]


[(2325, 92624.0), (1663, 47334.0), (1287, 100976.0), (1018, 46907.0), (965, 23102.0), (695, 27599.0), (570, 22706.0), (558, 25406.0), (495, 9294.0), (494, 23610.0), (469, 10728.0), (452, 32283.0), (424, 16854.0), (419, 17719.0), (395, 14100.0), (390, 20315.0), (369, 19312.0), (363, 6149.0), (350, 9047.0), (345, 14768.0), (343, 13557.0), (339, 11795.0), (338, 10045.0), (304, 16131.0), (301, 6352.0), (297, 20133.0), (292, 10552.0), (290, 8285.5), (287, 11083.0), (282, 10383.0), (277, 11830.0), (269, 7729.0), (268, 11989.0), (267, 7971.0), (265, 7765.0), (257, 13078.0), (248, 7608.0), (247, 12496.5), (239, 1.0), (234, 11307.5), (228, 11662.0), (226, 5775.0), (218, 5849.0), (211, 7552.0), (208, 6208.0), (202, 9530.0), (195, 9619.0), (193, 6682.0), (188, 12098.0), (187, 8013.0), (185, 4149.0), (184, 5762.0), (177, 5042.0), (173, 10394.0), (168, 7725.0), (167, 3957.0), (165, 6694.0), (164, 1544.0), (163, 6888.0), (161, 6367.0), (159, 7116.0), (157, 6040.0), (156, 4086.6666666666665), (155, 4

## quick_answers

How long do we have to wait to get your question answered? In this part, we look at the set of ACCEPTED answers which are posted less than three hours after question creation. we are trying to answer some of the following questions:

What is the average number of these "quick answers" as a function of the hour of day the question was asked? We should normalize by how many total accepted answers are garnered by questions posted in a given hour, just like we're counting how many quick accepted answers are garnered by questions posted in a given hour, eg. (quick accepted answers when question hour is 15 / total accepted answers when question hour is 15).


In [12]:
import os
import time
from dateutil.parser import parse

def localpath(path):
    return  'file://' + str(os.path.abspath(os.path.curdir)) + '/' + path

posts = sc.textFile(localpath('spark-stats-data/allPosts/part-*.xml.gz'))

class qa_class(object):
    def __init__(self, acc_id, cre_date):
        self.id = acc_id
        self.date = cre_date

def construct_qa(string):
    try:
        row = etree.fromstring(string)
        cre_date = parse(row.attrib['CreationDate'])
        acc_id = row.attrib['AcceptedAnswerId']
        return qa_class(acc_id, cre_date)
        #return acc_id, cre_Date
    except:
        return None
    
class ans_class(object):
    def __init__(self, ans_id, cre_date):
        self.id = ans_id
        self.date = cre_date

def construct_ans(string):
    try:
        row = etree.fromstring(string)
        cre_date = parse(row.attrib['CreationDate'])
        ans_id = row.attrib['Id']
        return ans_class(ans_id, cre_date)
        #return ans_id, cre_date
    except:
        return None
    
def avg_def_hour(tup):
    h, diff = tup
    n = 0
    n_valid = 0
    for d in diff :
        n += 1
        if int(d) < 10800 :
            n_valid += 1
    return (h, float(n_valid)/n)
    
q5 = posts.map(construct_qa).filter(lambda x: x is not None).map(lambda u : (u.id, u.date))\
    .join(posts.map(construct_ans).filter(lambda x: x is not None).map(lambda u: (u.id, u.date))) \
    .map(lambda t: (t[1][0].hour , (t[1][1] - t[1][0]).total_seconds())) \
    .groupByKey()\
    .map(avg_def_hour)\
    .sortByKey()\
    .map(lambda t: t[1])\
    .collect()

In [13]:
print q5

[0.4504672897196262, 0.44814814814814813, 0.3605577689243028, 0.3799126637554585, 0.4028436018957346, 0.4125, 0.4597402597402597, 0.4673684210526316, 0.4616822429906542, 0.49528301886792453, 0.5157593123209169, 0.5445682451253482, 0.5347313237221494, 0.5310796074154853, 0.5238095238095238, 0.5368007850834151, 0.5475728155339806, 0.47995991983967934, 0.5202185792349727, 0.5462012320328542, 0.5196408529741863, 0.5156794425087108, 0.46153846153846156, 0.4700460829493088]


## identify_veterans

It can be interesting to think about what factors influence a user to remain active on the site over a long period of time. In order not to bias the results towards older users, we'll define a time window between 100 and 150 days after account creation. If the user has made a post in this time, we'll consider them active and well on their way to being veterans of the site; if not, they are inactive and were likely brief users.


Let's see if there are differences between the first ever question posts of "veterans" vs. "brief users". For each group separately, we find the average of the score, views, number of answers, and number of favorites of the users' **first question**.

In [20]:
def localpath(path):
    return  'file://' + str(os.path.abspath(os.path.curdir)) + '/' + path

posts = sc.textFile(localpath('spark-stats-data/allPosts/part-*.xml.gz'))
users = sc.textFile(localpath('spark-stats-data/allUsers/part-*.xml.gz'))

class user_class(object):
    def __init__(self, user_id, cre_date):
        self.id = user_id
        self.cre_date = cre_date

def construct_user(string):
    try:
        row = etree.fromstring(string)
        cre_date = parse(row.attrib['CreationDate'])
        user_id = row.attrib['Id']
        return user_class(user_id, cre_date)
        #return acc_id, cre_Date
    except:
        return None
       
class post_class(object):
    def __init__(self, user_id, cre_date, view, score, fav_cnt, ans_cnt, post_type_id):
        self.id = user_id
        self.date = cre_date
        self.vw = view
        self.scr = score
        self.fav= fav_cnt
        self.ans = ans_cnt
        self.post = post_type_id

def construct_post(string):
    try:
        row = etree.fromstring(string)
        user_id = row.attrib['OwnerUserId']
        cre_date = parse(row.attrib['CreationDate'])
        try:
            view = row.attrib['ViewCount']
        except:
            view = 0
        try:
            score = row.attrib['Score']
        except:
            score = 0
        try:
            fav_cnt = row.attrib['FavoriteCount']
        except:
            fav_cnt = 0
        try:
            ans_cnt = row.attrib['AnswerCount']
        except:
            ans_cnt = 0
        post_type_id = row.attrib['PostTypeId']           
        return post_class(user_id, cre_date, view, score, fav_cnt, ans_cnt, post_type_id)
        #return user_id, cre_date, view, score, fav_cnt, ans_cnt, post_type_id
    except:
        return None

def is_veteran1(tup):
    user, list1 = tup
    is_veteran = False   
    for l in list1:
        is_veteran = (is_veteran) or ((l[0] > 100) and (l[0] < 150))
    return (user, is_veteran)

def first_q(tup):
    user, his_posts = tup
    first_post = min(his_posts, key = lambda t: t[1][0])
    return user, first_post

def avg_vals(tup):
    vet, list1 = tup
    n = 0
    view_total = 0
    score_total = 0
    fav_cnt = 0
    ans_cnt = 0
    for list11 in list1:
        n = n + 1
        view_total += int(list11[0])
        score_total += int(list11[1])
        fav_cnt += int(list11[2])
        ans_cnt += int(list11[3])
    return (vet, float(view_total)/n, float(score_total)/n, float(fav_cnt)/n, float(ans_cnt)/n)
               
q7 = posts.map(construct_post).filter(lambda x: x is not None) \
.map(lambda u : (u.id, (u.date, u.vw, u.scr, u.fav, u.ans, u.post))) \
.join(users.map(construct_user).filter(lambda x: x is not None).map(lambda t: (t.id, (t.cre_date))))\
.map(lambda t: (t[0], ((t[1][0][0]-t[1][1]).days, t[1][0][1], t[1][0][2], t[1][0][3], t[1][0][4], t[1][0][5])))\
.groupByKey().map(is_veteran1).join(posts.map(construct_post).filter(lambda x: x is not None)\
.map(lambda u : (u.id, (u.date, u.vw, u.scr, u.fav, u.ans, u.post)))).filter(lambda x: x[1][1][5] == '1')\
.groupByKey().map(first_q).map(lambda t: (t[1][0], (t[1][1][1], t[1][1][2], t[1][1][3], t[1][1][4])))\
.groupByKey().map(avg_vals).collect()



In [21]:
def identify_veterans():
    return {"vet_score": q7[1][1],
            "vet_views": q7[1][0],
            "vet_answers": q7[1][2],
            "vet_favorites": q7[1][3],
            "brief_score": q7[0][1] ,
            "brief_views": q7[0][0],
            "brief_answers": q7[0][2],
            "brief_favorites": q7[0][3]
           }

identify_veterans()



{'brief_answers': 2.1031764650682683,
 'brief_favorites': 0.5773002392905738,
 'brief_score': 553.5367616009008,
 'brief_views': False,
 'vet_answers': 3.5443322109988777,
 'vet_favorites': 1.2985409652076318,
 'vet_score': 933.7210998877665,
 'vet_views': True}

## word2vec

Word2Vec is an alternative approach for vectorizing text data. The vectorized representations of words in the vocabulary tend to be useful for predicting other words in the document, hence the famous example "vector('king') - vector('man') + vector('woman') ~= vector('queen')".

Let's see how good a Word2Vec model we can train using the tags of each StackExchange post as documents (this uses the full dataset). We will use Spark ML's implementation of Word2Vec (this will require using DataFrames) to return a list of the top 25 closest synonyms to "ggplot2" and their similarity score in tuple format ("string", number).

In [22]:
from pyspark.mllib.feature import Word2Vec
import re

def localpath(path):
    return  'file://' + str(os.path.abspath(os.path.curdir)) + '/' + path

def valid_xml(string):
    if '<row' in string  and '/>' in string :
        return 1
    else:
        return 0

def extracttags(string):
    try:
        tree = etree.fromstring(string)
        tags = tree.get("Tags")
        tags = tags[1:-1].split('><')
        return tags
    except: 
        return None

posts_complete = sc.textFile(localpath('spark-stack-data/allPosts/part-*.xml.gz'))
q8 = posts_complete.filter(valid_xml)\
    .map(extracttags)\
    .filter(lambda x:x is not None)
    
word2vec = Word2Vec()
model = word2vec.fit(q8)

synonyms = model.findSynonyms('ggplot2', 25)
print(synonyms)

[(u'lattice', 0.92138660954231466), (u'r-grid', 0.85115660268066085), (u'plotrix', 0.84942440168473787), (u'boxplot', 0.8468789078740947), (u'plotmath', 0.84554861178583363), (u'ecdf', 0.83877705555707738), (u'levelplot', 0.83423389393602954), (u'line-plot', 0.83005077357219181), (u'loess', 0.82561341374355501), (u'density-plot', 0.82245318334253437), (u'gridextra', 0.82094705140050261), (u'quantile', 0.81995369147794173), (u'melt', 0.8163310299767258), (u'standard-error', 0.81067610439807858), (u'categorical-data', 0.80948906895083605), (u'tapply', 0.80768296238242443), (u'r-factor', 0.80725529343464109), (u'performanceanalytics', 0.80423577122124168), (u'rgl', 0.80418713447199341), (u'ggvis', 0.80172868913410533), (u'mgcv', 0.80078921808747217), (u'survival-analysis', 0.80028066752686966), (u'kernel-density', 0.79897911880171124), (u'anova', 0.79872824324163849), (u'geom-text', 0.79759711080469742)]


*Copyright &copy; 2016 The Data Incubator.  All rights reserved.*