# Spark Distributed Calculation

## Find the bad XML


In [3]:
from pyspark import SparkContext
sc = SparkContext("local[*]","temp")

In [9]:
!pwd

/home/jovyan/datacourse/spark/miniprojects


In [3]:
import os, time
def localpath(path):
    return 'file://' + os.path.join(os.path.abspath(os.path.curdir), path)

In [8]:
!ls spark-stats-data/allPosts

part-00000.xml.gz  part-00003.xml.gz  part-00006.xml.gz  part-00009.xml.gz
part-00001.xml.gz  part-00004.xml.gz  part-00007.xml.gz  part-00010.xml.gz
part-00002.xml.gz  part-00005.xml.gz  part-00008.xml.gz


In [5]:
raw = sc.textFile(localpath('spark-stats-data/allPosts'))

In [6]:
raw.take(5)

['<?xml version="1.0" encoding="UTF-8"?>',
 '<parent>',
 '  <row Body="" CommentCount="0" CreationDate="2013-10-28T10:42:29.940" Id="73933" LastActivityDate="2013-10-28T10:42:29.940" LastEditDate="2013-10-28T10:42:29.940" LastEditorUserId="686" OwnerUserId="686" PostTypeId="5" Score="0" />',
 '  ',
 '  <row Body="See `continuous-data`" CommentCount="0" CreationDate="2013-10-28T10:42:29.940" Id="73934" LastActivityDate="2013-10-28T10:42:29.940" LastEditDate="2013-10-28T10:42:29.940" LastEditorUserId="686" OwnerUserId="686" PostTypeId="4" Score="0" />']

In [7]:
xmls=raw.filter(lambda x: '<row' in x)

In [64]:
xmls.count()

109522

In [25]:
from lxml import etree

In [904]:
a=etree.fromstring('''<row Body="" CommentCount="0" CreationDate="2013-10-28T10:42:29.940" Id="73933" LastActivityDate="2013-10-28T10:42:29.940" LastEditDate="2013-10-28T10:42:29.940" LastEditorUserId="686" OwnerUserId="686" PostTypeId="5" Score="0" />''')

In [110]:
a.getchildren()
'Score' in a.attrib

True

In [26]:
#XMLsyntaxError
import xml.etree.ElementTree as ET      

In [102]:
def parsePost(line):
    try:
        root = ET.fromstring(line)
        return line
    except:
        return 0

In [794]:
posts = xmls.map(parsePost).filter(lambda x: x!= 0)

In [104]:
posts.count()

108741

In [105]:
posts.take(5)

['  <row Body="" CommentCount="0" CreationDate="2013-10-28T10:42:29.940" Id="73933" LastActivityDate="2013-10-28T10:42:29.940" LastEditDate="2013-10-28T10:42:29.940" LastEditorUserId="686" OwnerUserId="686" PostTypeId="5" Score="0" />',
 '  <row Body="See `continuous-data`" CommentCount="0" CreationDate="2013-10-28T10:42:29.940" Id="73934" LastActivityDate="2013-10-28T10:42:29.940" LastEditDate="2013-10-28T10:42:29.940" LastEditorUserId="686" OwnerUserId="686" PostTypeId="4" Score="0" />',
 '  <row Body="&lt;p&gt;O.K., I think I found a way of doing it with the correct assumptions. Although it is only useful for my particular problem, maybe somebody can tell me if I am being too &quot;sloppy&quot;, correct me or maybe my approach might be useful to somebody in the future.&lt;/p&gt;&#10;&#10;&lt;p&gt;First of all, I had not realized that the &quot;triangles&quot; account for &quot;long  independent events&quot;. This means that $P(X_i = 0 | \\sum_{j\\neq i, |j-i| &amp;lt; D} X_j = 1) = 

In [1]:
bad_xml = 781

## Favorites and scores

We're interested in looking for useful patterns in the data.  If we look at the Post data again (the smaller set, `stats.stackexchange.com`), we see that many things about each post are recorded.  We're going to start by looking to see if there is a relationship between the number of times a post was favorited (the `FavoriteCount`) and the `Score`.  The score is the number of times the post was upvoted minus the number of times it was downvoted, so it is a measure of how much a post was liked.  We'd expect posts with a higher number of favorites to have better scores, since they're both measurements of how good the post is.

Let's aggregate posts by the number of favorites, and find the average score for each number of favorites.  Do this for the lowest 50 numbers of favorites.

In [123]:
def fav(line):
    try:
        root = ET.fromstring(line)
        if "Score" and "FavoriteCount" in root.attrib:
            score = root.attrib['Score']
            fav = root.attrib['FavoriteCount']
            return (int(fav),int(score))
        else:
            return ('Empty')
    except:
        return ('Empty')

In [128]:
favs=posts.map(fav).filter(lambda x: x != 'Empty')

In [157]:
favs.sortByKey()\
    .reduceByKey(lambda x,y: x+y).take(5)

[(0, 2051), (11, 1034), (22, 422), (33, 221), (44, 76)]

In [145]:
favs.sortByKey(lambda x: x[0])\
    .reduceByKey(lambda x,y: x+y).count()

83

In [147]:
favs.map(lambda x: x[0]).map(lambda x: (x,1))\
    .sortByKey(lambda x: x[0])\
    .reduceByKey(lambda x,y: x+y).take(5)

[(0, 1045), (11, 59), (22, 13), (33, 6), (44, 1)]

In [146]:
favs.map(lambda x: x[0]).map(lambda x: (x,1))\
    .reduceByKey(lambda x,y: x+y).count()

83

In [148]:
total = favs.sortByKey(lambda x: x[0])\
            .reduceByKey(lambda x,y: x+y)

count = favs.map(lambda x: x[0]).map(lambda x: (x,1))\
            .sortByKey(lambda x: x[0])\
            .reduceByKey(lambda x,y: x+y)

In [158]:
total.join(count).map(lambda x: (x[0],x[1][0]/x[1][1])).collect()

[(0, 1.9626794258373206),
 (11, 17.52542372881356),
 (22, 32.46153846153846),
 (33, 36.833333333333336),
 (44, 76.0),
 (55, 68.0),
 (66, 66.0),
 (88, 75.0),
 (275, 222.0),
 (1, 2.7334613999279624),
 (12, 18.793650793650794),
 (23, 31.76923076923077),
 (34, 41.0),
 (45, 64.0),
 (67, 69.0),
 (100, 118.0),
 (155, 166.0),
 (2, 4.481914893617021),
 (13, 20.083333333333332),
 (24, 29.142857142857142),
 (35, 41.0),
 (79, 102.5),
 (3, 6.350249584026622),
 (14, 23.58823529411765),
 (25, 38.55555555555556),
 (36, 53.25),
 (47, 90.5),
 (58, 45.5),
 (69, 93.0),
 (91, 97.0),
 (102, 110.0),
 (113, 121.0),
 (4, 7.656934306569343),
 (15, 22.594594594594593),
 (26, 38.25),
 (37, 54.5),
 (48, 51.5),
 (59, 69.0),
 (70, 72.0),
 (103, 75.0),
 (158, 170.0),
 (5, 8.941888619854721),
 (16, 25.48148148148148),
 (27, 39.55555555555556),
 (38, 63.6),
 (49, 49.333333333333336),
 (60, 63.0),
 (6, 11.263779527559056),
 (17, 26.333333333333332),
 (28, 34.166666666666664),
 (39, 40.0),
 (50, 53.0),
 (61, 57.0),
 (72,

In [160]:
#favorite_score = [(0, 2.3398827696988396)]*50
favorite = list(total.join(count).map(lambda x: (x[0],x[1][0]/x[1][1])).collect())

In [162]:
favorite_score = sorted(favorite, key=lambda x: x[0])[:50]
favorite_score

[(0, 1.9626794258373206),
 (1, 2.7334613999279624),
 (2, 4.481914893617021),
 (3, 6.350249584026622),
 (4, 7.656934306569343),
 (5, 8.941888619854721),
 (6, 11.263779527559056),
 (7, 12.916666666666666),
 (8, 13.345864661654135),
 (9, 15.754237288135593),
 (10, 17.0),
 (11, 17.52542372881356),
 (12, 18.793650793650794),
 (13, 20.083333333333332),
 (14, 23.58823529411765),
 (15, 22.594594594594593),
 (16, 25.48148148148148),
 (17, 26.333333333333332),
 (18, 25.814814814814813),
 (19, 25.944444444444443),
 (20, 29.636363636363637),
 (21, 35.333333333333336),
 (22, 32.46153846153846),
 (23, 31.76923076923077),
 (24, 29.142857142857142),
 (25, 38.55555555555556),
 (26, 38.25),
 (27, 39.55555555555556),
 (28, 34.166666666666664),
 (29, 45.75),
 (30, 43.4),
 (31, 40.875),
 (32, 33.0),
 (33, 36.833333333333336),
 (34, 41.0),
 (35, 41.0),
 (36, 53.25),
 (37, 54.5),
 (38, 63.6),
 (39, 40.0),
 (40, 39.666666666666664),
 (41, 51.0),
 (42, 52.0),
 (44, 76.0),
 (45, 64.0),
 (47, 90.5),
 (48, 51.5),

## Answer percentage


Investigate the correlation between a user's reputation and the kind of posts they make. For the 99 users with the highest reputation, single out posts which are either questions or answers and look at the percentage of these posts that are answers: *(answers / (answers + questions))*. 

Return a tuple of their **user ID** and this fraction.

You should also return (-1, fraction) to represent the case where you average over all users (so you will return 100 entries total).

Again, you only need to run this on the statistics overflow set.


In [171]:
users = sc.textFile(localpath('spark-stats-data/allUsers'))

In [172]:
users.take(5)

['<?xml version="1.0" encoding="UTF-8"?>',
 '<parent>',
 '  <row AboutMe="&lt;p&gt;Hi, I\'m not really a person.&lt;/p&gt;&#10;&#10;&lt;p&gt;I\'m a background process that helps keep this site clean!&lt;/p&gt;&#10;&#10;&lt;p&gt;I do things like&lt;/p&gt;&#10;&#10;&lt;ul&gt;&#10;&lt;li&gt;Randomly poke old unanswered questions every hour so they get some attention&lt;/li&gt;&#10;&lt;li&gt;Own community questions and answers so nobody gets unnecessary reputation from them&lt;/li&gt;&#10;&lt;li&gt;Own downvotes on spam/evil posts that get permanently deleted&lt;/li&gt;&#10;&lt;li&gt;Own suggested edits from anonymous users&lt;/li&gt;&#10;&lt;li&gt;&lt;a href=&quot;http://meta.stackexchange.com/a/92006&quot;&gt;Remove abandoned questions&lt;/a&gt;&lt;/li&gt;&#10;&lt;/ul&gt;&#10;" AccountId="-1" CreationDate="2010-07-19T06:55:26.860" DisplayName="Community" DownVotes="2330" Id="-1" LastAccessDate="2010-07-19T06:55:26.860" Location="on the server farm" Reputation="1" UpVotes="5831" Views="0"

In [None]:
##get reputation score
def rep(line):
    if '<row' in line:
        try:
            root = ET.fromstring(line)
            if "Id" and "Reputation" in root.attrib:
                uid = root.attrib['Id']
                rep = root.attrib['Reputation']
                return (uid,int(rep))
            else:
                return ('Empty')
        except:
            return ('Empty')
    else:
        return ('Empty')
    
all_users=users.map(rep).filter(lambda x: x != 'Empty').sortBy(lambda x: -x[1])

##get post with questions and answers
def post(line):
    try:
        root = ET.fromstring(line)
        if "PostTypeId" and "OwnerUserId" in root.attrib:
            uid = root.attrib['OwnerUserId']
            ptype = int(root.attrib['PostTypeId'])
            if ptype == 1 or ptype == 2:
                return (uid,ptype)
            else:
                return ('Empty')
        else:
            return ('Empty')
    except:
        return ('Empty')
    
all_p = posts.map(post).filter(lambda x: x != 'Empty')

questions = all_p.filter(lambda x: x[1] == 1).reduceByKey(lambda x,y: x+y)

answers = all_p.filter(lambda x: x[1] == 2).reduceByKey(lambda x,y: x+y).map(lambda x: (x[0],int(x[1]/2)))

##merge 
answer_percentage_long=all_users.join(questions).join(answers)\
                                .map(lambda x: (int(x[0]),x[1][0][0],x[1][0][1],x[1][1]))\
                                .map(lambda x: (x[0],x[1],x[3]/(x[2]+x[3])))\
                                .takeOrdered(99, key=lambda x: -x[1])

answer_percentage=[(x[0],x[2]) for x in answer_percentage_long]

##calculate average
total, count = all_users.join(questions).join(answers).map(lambda x: (int(x[0]),x[1][0][0],x[1][0][1],x[1][1]))\
                        .map(lambda x: (x[0],x[3]/(x[2]+x[3]),1))\
                        .map(lambda x: (x[1],x[2]))\
                        .reduce(lambda x, y: (x[0]+y[0], x[1]+y[1]))

avg = (-1,total/count)
answer_percentage.append(avg)

In [372]:
answer_percentage

[(919, 0.996694214876033),
 (805, 0.9959749552772809),
 (686, 0.9803049555273189),
 (7290, 0.9918887601390498),
 (930, 0.9817351598173516),
 (4505, 1.0),
 (4253, 0.9909747292418772),
 (183, 0.847870182555781),
 (11032, 0.9875647668393782),
 (28746, 0.968421052631579),
 (887, 0.9794871794871794),
 (159, 0.9728813559322034),
 (2116, 0.9833333333333333),
 (4856, 0.9543147208121827),
 (22047, 1.0),
 (5739, 0.9872773536895675),
 (3277, 0.956081081081081),
 (88, 0.9660493827160493),
 (2970, 1.0),
 (601, 0.9772151898734177),
 (17230, 0.9970059880239521),
 (449, 1.0),
 (2392, 0.9724137931034482),
 (1390, 0.9411764705882353),
 (5836, 0.846441947565543),
 (7555, 1.0),
 (603, 0.8158844765342961),
 (7972, 0.9823008849557522),
 (6633, 0.9912280701754386),
 (2958, 0.9930313588850174),
 (9394, 0.9700854700854701),
 (7828, 0.9850427350427351),
 (2817, 0.8206896551724138),
 (7224, 0.9757575757575757),
 (4598, 0.9857142857142858),
 (7071, 0.9107142857142857),
 (1739, 0.9948717948717949),
 (1036, 0.95454

## First question

We'd expect the first **question** a user asks to be indicative of their future behavior.  We'll dig more into that in the next problem, but for now let's see the relationship between reputation and how long it took each person to ask their first question.

For each user that asked a question, find the difference between when their account was created (`CreationDate` for the User) and when they asked their first question (`CreationDate` for their first question).  Return this time difference in days (round down, so 2.7 days counts as 2 days) for the 100 users with the highest reputation, in the form

`(UserId, Days)`

In [422]:
def first_question(line):
    if '<row' in line:
        try:
            root = ET.fromstring(line)
            if "Id" and "Reputation" and 'CreationDate' in root.attrib:
                uid = root.attrib['Id']
                rep = root.attrib['Reputation']
                cdate = root.attrib['CreationDate']
                return (int(uid),int(rep),cdate)
            else:
                return ('Empty')
        except:
            return ('Empty')
    else:
        return ('Empty')

In [429]:
from datetime import timedelta, datetime

In [525]:
def parsetime(x):
    if x is None:
        return None
    else:
        return datetime.strptime(x,'%Y-%m-%dT%H:%M:%S.%f')

In [431]:
parsetime('2010-07-19T06:55:26.860')

datetime.datetime(2010, 7, 19, 6, 55, 26, 860000)

In [442]:
parsetime('2010-07-19T06:55:26.860')<parsetime('2010-07-20T06:55:26.860')

True

In [477]:
(parsetime('2010-07-19T06:55:26.860')-parsetime('2010-07-20T06:55:26.860')).days

-1

In [463]:
user_info = users.map(first_question).filter(lambda x: x != 'Empty')\
                 .map(lambda x: (x[0], (x[1], parsetime(x[2]))))

In [464]:
def post_first_question(line):
    try:
        root = ET.fromstring(line)
        if "PostTypeId" and "OwnerUserId" and 'CreationDate' in root.attrib:
            uid = root.attrib['OwnerUserId']
            ptype = int(root.attrib['PostTypeId'])
            cdate = root.attrib['CreationDate']
            if ptype == 1:
                return (int(uid),cdate)
            else:
                return ('Empty')
        else:
            return ('Empty')
    except:
        return ('Empty')

In [465]:
post_info = posts.map(post_first_question).filter(lambda x: x != 'Empty')\
                 .map(lambda x: (x[0], parsetime(x[1])))\
                 .reduceByKey(lambda x, y: min(x, y))

In [466]:
user_info.take(5)

[(-1, (1, datetime.datetime(2010, 7, 19, 6, 55, 26, 860000))),
 (2, (101, datetime.datetime(2010, 7, 19, 14, 1, 36, 697000))),
 (3, (101, datetime.datetime(2010, 7, 19, 15, 34, 50, 507000))),
 (4, (101, datetime.datetime(2010, 7, 19, 19, 3, 27, 400000))),
 (5, (6962, datetime.datetime(2010, 7, 19, 19, 3, 57, 227000)))]

In [467]:
post_info.take(5)

[(32010, datetime.datetime(2013, 10, 28, 18, 36, 54, 563000)),
 (32021, datetime.datetime(2013, 10, 29, 0, 6, 34, 723000)),
 (26356, datetime.datetime(2013, 5, 31, 21, 17, 53, 117000)),
 (5676, datetime.datetime(2011, 8, 4, 3, 0, 9, 277000)),
 (32087, datetime.datetime(2013, 10, 30, 9, 32, 23, 277000))]

In [526]:
def time_delta(x,y):
    if x is None or y is None:
        return None
    else:
        return (x-y).days

In [527]:
time_delta(parsetime('2010-07-19T06:55:26.860'),parsetime('2010-07-20T06:55:26.860'))

-1

In [492]:
user_info.join(post_info).map(lambda x: (x[0],x[1][0][0],x[1][0][1],x[1][1])).take(5)

[(13,
  947,
  datetime.datetime(2010, 7, 19, 19, 6, 49, 527000),
  datetime.datetime(2010, 7, 19, 19, 28, 44, 903000)),
 (26,
  3220,
  datetime.datetime(2010, 7, 19, 19, 9, 39, 723000),
  datetime.datetime(2011, 2, 18, 2, 40, 12, 390000)),
 (39,
  1741,
  datetime.datetime(2010, 7, 19, 19, 11, 59, 377000),
  datetime.datetime(2010, 7, 19, 20, 54, 23, 200000)),
 (52,
  976,
  datetime.datetime(2010, 7, 19, 19, 15, 5, 810000),
  datetime.datetime(2010, 12, 5, 23, 43, 10, 43000)),
 (78,
  123,
  datetime.datetime(2010, 7, 19, 19, 26, 51, 850000),
  datetime.datetime(2010, 8, 18, 22, 58, 5, 27000))]

In [498]:
first_questions = user_info.join(post_info).map(lambda x: (x[0],x[1][0][0],x[1][0][1],x[1][1]))\
.map(lambda x:(x[0],x[1],time_delta(x[3],x[2]))).takeOrdered(100, key=lambda x: -x[1])

In [499]:
first_question=[(x[0],x[2]) for x in first_questions]

In [500]:
#first_question = [(805, 669)] * 100

grader.score('spark__first_question', first_question)

Your score:  0.9800000000000006


## Identify veterans


It can be interesting to think about what factors influence a user to remain active on the site over a long period of time. In order not to bias the results towards older users, we'll define a time window between 100 and 150 days after account creation. If the user has made a post in this time, we'll consider them active and well on their way to being veterans of the site; if not, they are inactive and were likely brief users.

*Consider*: What other parameterizations of "activity" could we use, and how would they differ in terms of splitting our user base?

*Consider*: What other biases are still not dealt with, after using the above approach?

Let's see if there are differences between the first ever question posts of "veterans" vs. "brief users". For each group separately, average the score, views, number of answers, and number of favorites of the users' **first question**.

*Consider*: What story could you tell from these numbers? How do the numbers support it?


In [None]:
##find files
def localpath(path):
    return 'file://' + os.path.join(os.path.abspath(os.path.curdir), path)

posts = sc.textFile(localpath('spark-stats-data/allPosts'))
users = sc.textFile(localpath('spark-stats-data/allUsers'))

#define functions
def parsetime(x):
    if x is None:
        return None
    else:
        return datetime.strptime(x,'%Y-%m-%dT%H:%M:%S.%f')
    
def time_delta(x,y):
    if x is None or y is None:
        return None
    else:
        return (x-y).days
    
def user_parse(line):
    if '<row' in line:
        try:
            root = ET.fromstring(line)
            if "Id" in root.attrib:
                uid = root.attrib['Id']
                cdate = root.attrib.get('CreationDate',None)
                return (int(uid),cdate)
            else:
                return ('Empty')
        except:
            return ('Empty')
    else:
        return ('Empty')
    
def date_post(line):
    if '<row' in line:
        try:
            root = ET.fromstring(line)
            if "OwnerUserId" in root.attrib:
                uid = root.attrib['OwnerUserId']
                cdate = root.attrib.get('CreationDate',None)
                return (int(uid),cdate)
            else:
                return ('Empty')
        except:
            return ('Empty')
    else:
        return ('Empty')
    
def vet(x):
    if x is None:
        return 0
    if int(x) >= 100 and int(x) <= 150:
        return 1
    else:
        return 0

##get rdds
valid_users = users.map(user_parse).filter(lambda x: x != 'Empty').map(lambda x: (x[0],parsetime(x[1])))    

valid_posts = posts.map(date_post).filter(lambda x: x != 'Empty')\
                        .map(lambda x: (x[0],parsetime(x[1])))

all_user_info = valid_users.join(valid_posts)\
                           .map(lambda x: (x[0],time_delta(x[1][1],x[1][0])))         

all_i = all_user_info.map(lambda x: (x[0],vet(x[1]))).reduceByKey(lambda x,y: max(x,y))

veteran = all_i.filter(lambda x: x[1] == 1)
brief_views = all_i.filter(lambda x: x[1] == 0)

def post_first_question_more(line):
    try:
        root = ET.fromstring(line)
        if "PostTypeId" and "OwnerUserId" and 'CreationDate' in root.attrib:
            uid = root.attrib['OwnerUserId']
            ptype = int(root.attrib['PostTypeId'])
            cdate = root.attrib['CreationDate']
            #a.attrib.get('Score',1)
            answer = root.attrib.get("AnswerCount",0)           
            view = root.attrib.get("ViewCount",0)           
            fav = root.attrib.get("FavoriteCount",0)            
            s = root.attrib.get("Score",0) 
            #filter out questions
            if ptype == 1:
                return (int(uid),cdate,int(answer),int(view),int(fav),int(s))
            else:
                return ('Empty')
        else:
            return ('Empty')
    except:
        return ('Empty')

#find out the first questions date
first_q= posts_long.map(post_first_question_more).filter(lambda x: x != 'Empty')\
              .map(lambda x: (x[0], parsetime(x[1])))\
              .reduceByKey(lambda x, y: min(x, y))\
              .map(lambda x: ((x[0],x[1]),1))

#find out all questions' performance
all_q = posts_long.map(post_first_question_more).filter(lambda x: x != 'Empty')\
              .map(lambda x: ((x[0],parsetime(x[1])),(x[2],x[3],x[4],x[5])))

#merge (acount, first_question_date) with performances
stats = first_q.join(all_q).map(lambda x: (x[0][0],(x[1][1][0],x[1][1][1],x[1][1][2],x[1][1][3])))

##calculate average for veterian
a,v,f,s,t=veteran.join(stats).map(lambda x: (x[0],(x[1][1][0],x[1][1][1],x[1][1][2],x[1][1][3])))\
                         .map(lambda x: (x[1][0],x[1][1],x[1][2],x[1][3],1))\
                         .reduce(lambda x,y: (x[0]+y[0],x[1]+y[1],x[2]+y[2],x[3]+y[3],x[4]+y[4]))       

###calculate average for brief views
a1,v1,f1,s1,t1=brief_views.join(stats).map(lambda x: (x[0],(x[1][1][0],x[1][1][1],x[1][1][2],x[1][1][3])))\
                         .map(lambda x: (x[1][0],x[1][1],x[1][2],x[1][3],1))\
                         .reduce(lambda x,y: (x[0]+y[0],x[1]+y[1],x[2]+y[2],x[3]+y[3],x[4]+y[4]))       

#pass to lists
lis=[s,v,a,f]
lis1=[l/t for l in lis]

lis=[s1,v1,a1,f1]
lis2=[l/t1 for l in lis]

lis1.extend(lis2)

names=["vet_score","vet_views","vet_answers","vet_favorites","brief_score","brief_views","brief_answers","brief_favorites"]
identify_veterans_full = {n:l for n,l in zip(names,lis1)}

## Identify veterans&mdash;full


Same as above, but on the full Stack Exchange data set.

No pre-parsed data is available for this question.


In [917]:
def localpath(path):
    return 'file://' + os.path.join(os.path.abspath(os.path.curdir), path)

posts_long = sc.textFile(localpath('spark-stack-data/allPosts'))
users_long = sc.textFile(localpath('spark-stack-data/allUsers'))

In [918]:
def parsetime(x):
    if x is None:
        return None
    else:
        return datetime.strptime(x,'%Y-%m-%dT%H:%M:%S.%f')
    
def time_delta(x,y):
    if x is None or y is None:
        return None
    else:
        return (x-y).days
    
def user_parse(line):
    if '<row' in line:
        try:
            root = ET.fromstring(line)
            if "Id" in root.attrib:
                uid = root.attrib['Id']
                cdate = root.attrib.get('CreationDate',None)
                return (int(uid),cdate)
            else:
                return ('Empty')
        except:
            return ('Empty')
    else:
        return ('Empty')
    
def date_post(line):
    if '<row' in line:
        try:
            root = ET.fromstring(line)
            if "OwnerUserId" in root.attrib:
                uid = root.attrib['OwnerUserId']
                cdate = root.attrib.get('CreationDate',None)
                return (int(uid),cdate)
            else:
                return ('Empty')
        except:
            return ('Empty')
    else:
        return ('Empty')
    
def vet(x):
    if x is None:
        return 0
    if int(x) >= 100 and int(x) <= 150:
        return 1
    else:
        return 0
    
valid_users = users_long.map(user_parse).filter(lambda x: x != 'Empty').map(lambda x: (x[0],parsetime(x[1])))    

valid_posts = posts_long.map(date_post).filter(lambda x: x != 'Empty')\
                        .map(lambda x: (x[0],parsetime(x[1])))

all_user_info = valid_users.join(valid_posts)\
                           .map(lambda x: (x[0],time_delta(x[1][1],x[1][0])))         

all_i = all_user_info.map(lambda x: (x[0],vet(x[1]))).reduceByKey(lambda x,y: max(x,y))

veteran = all_i.filter(lambda x: x[1] == 1)
brief_views = all_i.filter(lambda x: x[1] == 0)

In [925]:
veteran.count()

291323

In [926]:
brief_views.count()

1843079

In [927]:
def post_first_question_more(line):
    try:
        root = ET.fromstring(line)
        if "PostTypeId" and "OwnerUserId" and 'CreationDate' in root.attrib:
            uid = root.attrib['OwnerUserId']
            ptype = int(root.attrib['PostTypeId'])
            cdate = root.attrib['CreationDate']
            #a.attrib.get('Score',1)
            answer = root.attrib.get("AnswerCount",0)           
            view = root.attrib.get("ViewCount",0)           
            fav = root.attrib.get("FavoriteCount",0)            
            s = root.attrib.get("Score",0) 
            #filter out questions
            if ptype == 1:
                return (int(uid),cdate,int(answer),int(view),int(fav),int(s))
            else:
                return ('Empty')
        else:
            return ('Empty')
    except:
        return ('Empty')

#find out the first questions date
first_q= posts_long.map(post_first_question_more).filter(lambda x: x != 'Empty')\
              .map(lambda x: (x[0], parsetime(x[1])))\
              .reduceByKey(lambda x, y: min(x, y))\
              .map(lambda x: ((x[0],x[1]),1))

#find out all questions' performance
all_q = posts_long.map(post_first_question_more).filter(lambda x: x != 'Empty')\
              .map(lambda x: ((x[0],parsetime(x[1])),(x[2],x[3],x[4],x[5])))

#merge (acount, first_question_date) with performances
stats = first_q.join(all_q).map(lambda x: (x[0][0],(x[1][1][0],x[1][1][1],x[1][1][2],x[1][1][3])))

##calculate average for veterian
a,v,f,s,t=veteran.join(stats).map(lambda x: (x[0],(x[1][1][0],x[1][1][1],x[1][1][2],x[1][1][3])))\
                         .map(lambda x: (x[1][0],x[1][1],x[1][2],x[1][3],1))\
                         .reduce(lambda x,y: (x[0]+y[0],x[1]+y[1],x[2]+y[2],x[3]+y[3],x[4]+y[4]))       

###calculate average for brief views
a1,v1,f1,s1,t1=brief_views.join(stats).map(lambda x: (x[0],(x[1][1][0],x[1][1][1],x[1][1][2],x[1][1][3])))\
                         .map(lambda x: (x[1][0],x[1][1],x[1][2],x[1][3],1))\
                         .reduce(lambda x,y: (x[0]+y[0],x[1]+y[1],x[2]+y[2],x[3]+y[3],x[4]+y[4]))       

#pass to lists
lis=[s,v,a,f]
lis1=[l/t for l in lis]

lis=[s1,v1,a1,f1]
lis2=[l/t1 for l in lis]

lis1.extend(lis2)

In [928]:
lis1

[2.2561266751584585,
 1841.8189951718825,
 1.841804785404288,
 0.8661474978452715,
 1.129298016272707,
 1095.1468872554271,
 1.5033659787554736,
 0.3854817164205759]

In [929]:
identify_veterans_full = {n:l for n,l in zip(names,lis1)}

## Word2vec


Word2Vec is an alternative approach for vectorizing text data. The vectorized representations of words in the vocabulary tend to be useful for predicting other words in the document, hence the famous example "vector('king') - vector('man') + vector('woman') ~= vector('queen')".

Let's see how good a Word2Vec model we can train using the tags of each Stack Exchange post as documents (this uses the full data set). Use the implementation of Word2Vec from Spark ML (this will require using DataFrames) to return a list of the top 25 closest synonyms to "ggplot2" and their similarity score in tuple format ("string", number).


#### Parameters


The dimensionality of the vector space should be 100. The random seed should be 42 in `PySpark`.


#### Checkpoints

* Mean of the top 25 cosine similarities: 0.8012362027168274

In [127]:
sc.stop()

In [4]:
from pyspark.ml.feature import Word2Vec
from pyspark import SparkContext, SparkConf
import re

sc = SparkContext("local[*]", "temp")

from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

In [5]:
from lxml import etree
from lxml.etree import XMLSyntaxError

In [6]:
posts = sc.textFile(localpath('spark-stack-data/allPosts'))

In [9]:
def tag_post(line):
    if '<row' in line:
        try:
            root = etree.fromstring(line)
            if "Tags" in root.attrib:
                tag = root.attrib['Tags']
                return tag
            else:
                return ('Empty')
        except XMLSyntaxError:
            return ('Empty')
    else:
        return ('Empty')

In [9]:
tag_posts = posts.map(tag_post).filter(lambda x: x!= 'Empty')

In [10]:
tag_posts.take(5)

['<android><android-tabhost><progressdialog>',
 '<security><encryption><hash><passwords><password-protection>',
 '<java><proxy><hostname><resolve>',
 '<sql><jpa>',
 '<jquery><class>']

In [146]:
sc.parallelize(range(10)).sum()

45

In [21]:
df=tag_posts.map(lambda line: ([s for s in re.split("<|>", line) if s != ''], 1))\
            .toDF(['text', 'score'])

w2v = Word2Vec(inputCol="text", outputCol="vectors", vectorSize=100,minCount=10,seed=17)
model = w2v.fit(df)
result = model.transform(df)

print(model.findSynonyms('ggplot2', 25).rdd.take(25))

[Row(word='lattice', similarity=0.894500195980072), Row(word='r-grid', similarity=0.8603865504264832), Row(word='boxplot', similarity=0.8382558822631836), Row(word='plotrix', similarity=0.8369892239570618), Row(word='density-plot', similarity=0.8145373463630676), Row(word='ecdf', similarity=0.8129977583885193), Row(word='ggvis', similarity=0.805046796798706), Row(word='gridextra', similarity=0.8037905693054199), Row(word='levelplot', similarity=0.7975961565971375), Row(word='tapply', similarity=0.7965813875198364), Row(word='rgl', similarity=0.7900989055633545), Row(word='r-raster', similarity=0.7896594405174255), Row(word='quantile', similarity=0.7871367335319519), Row(word='r-factor', similarity=0.7825618386268616), Row(word='plot', similarity=0.7801932096481323), Row(word='gam', similarity=0.7788233757019043), Row(word='anova', similarity=0.7784751057624817), Row(word='confidence-interval', similarity=0.7770544290542603), Row(word='plotmath', similarity=0.7767807841300964), Row(word

In [15]:
model.findSynonyms('ggplot2', 25).rdd.take(25)[0]['word'],model.findSynonyms('ggplot2', 25).rdd.take(25)[0]['similarity']

('lm', 0.8915739059448242)

In [23]:
lis = model.findSynonyms('ggplot2', 25).rdd.take(25)
word2vec = [(l['word'],l['similarity']) for l in lis]
word2vec

[('lattice', 0.894500195980072),
 ('r-grid', 0.8603865504264832),
 ('boxplot', 0.8382558822631836),
 ('plotrix', 0.8369892239570618),
 ('density-plot', 0.8145373463630676),
 ('ecdf', 0.8129977583885193),
 ('ggvis', 0.805046796798706),
 ('gridextra', 0.8037905693054199),
 ('levelplot', 0.7975961565971375),
 ('tapply', 0.7965813875198364),
 ('rgl', 0.7900989055633545),
 ('r-raster', 0.7896594405174255),
 ('quantile', 0.7871367335319519),
 ('r-factor', 0.7825618386268616),
 ('plot', 0.7801932096481323),
 ('gam', 0.7788233757019043),
 ('anova', 0.7784751057624817),
 ('confidence-interval', 0.7770544290542603),
 ('plotmath', 0.7767807841300964),
 ('line-plot', 0.7740892767906189),
 ('do.call', 0.7739959955215454),
 ('standard-error', 0.7729793190956116),
 ('data.table', 0.7726537585258484),
 ('performanceanalytics', 0.7720458507537842),
 ('kernel-density', 0.770494818687439)]

In [136]:
w2v = Word2Vec(inputCol="text", outputCol="vectors",minCount=10, vectorSize=100,seed=17)
model = w2v.fit(df)
result = model.transform(df)

In [152]:
result.show(5)

+--------------------+-----+--------------------+
|                text|score|             vectors|
+--------------------+-----+--------------------+
|[ranking, rank-co...|    1|[0.00461881561204...|
|[data-visualization]|    1|[-0.1391233950853...|
|[probability, mix...|    1|[-0.0312058134004...|
| [r, bioinformatics]|    1|[-0.0321438718237...|
|[machine-learning...|    1|[0.11158403381705...|
+--------------------+-----+--------------------+
only showing top 5 rows



In [154]:
model.findSynonyms('ggplot2', 25).show(25)

+--------------------+------------------+
|                word|        similarity|
+--------------------+------------------+
|                  lm|0.8915739059448242|
|     beta-regression|0.8643296360969543|
|                 plm| 0.857779324054718|
|robust-standard-e...|0.8324904441833496|
|           dataframe|0.8204379677772522|
|              mlogit|0.8197407126426697|
|                 gam|0.8076692819595337|
|                 nls|  0.80257648229599|
| stepwise-regression|0.7994891405105591|
|       error-message|0.7964061498641968|
|             splines|0.7962846755981445|
|       ordered-logit|0.7921024560928345|
|               loess|0.7862126231193542|
|            traminer|0.7547711730003357|
|                nlme|0.7518925666809082|
|                 gis|0.7484728097915649|
|            survival|0.7422318458557129|
|           intercept|0.7362070083618164|
|                lmer|0.7275500893592834|
|             hausman|0.7262603044509888|
|        longitudinal| 0.721697211

## Classification


We'd like to see if we can predict the tags of a question from its body text. Instead of predicting specific tags, we will instead try to predict if a question contains one of the top ten most common tags.  

In [6]:
train = sc.textFile(localpath('spark-stats-data/train'))

In [7]:
def token_tag(string):
    tokens= [s for s in re.split("<|>", string) if s != '']
    token_count=[(t,1) for t in tokens]
    return token_count

In [41]:
string = '<bayesian><prior><elicitation>'

In [42]:
token_tag(string)

[('bayesian', 1), ('prior', 1), ('elicitation', 1)]

In [10]:
common_tags = train.map(tag_post).filter(lambda x: x != 'Empty')\
                   .flatMap(token_tag).reduceByKey(lambda x,y: x+y)\
                   .takeOrdered(10, key=lambda x: -x[1])

In [11]:
tags = [t[0] for t in common_tags]
tags

['r',
 'regression',
 'time-series',
 'machine-learning',
 'probability',
 'hypothesis-testing',
 'distributions',
 'self-study',
 'logistic',
 'correlation']

In [12]:
def token_check(string):
    tokens= [s for s in re.split("<|>", string) if s != '']
    for t in tags:
        if t in tokens:
            return 1
    else:
        return 0

In [18]:
string ='<regression><tag>'
token_check(string)

1

In [180]:
p = re.compile('<p>(.+)</p>')

In [184]:
p = re.compile('>(.+)<')

In [197]:
string = '<p>Last year, I read a blog post from <a href="http://anyall.org/">Brendan O\'Connor</a> entitled <a href="http://anyall.org/blog/2008/12/statistics-vs-machine-learning-fight/">"Statistics vs. Machine Learning, fight!"</a> that discussed some of the differences between the two fields.  <a href="http://andrewgelman.com/2008/12/machine_learnin/">Andrew Gelman responded favorably to this</a>:</p>\n\n<p>Simon Blomberg: </p>\n\n<blockquote>\n  <p>From R\'s fortunes\n  package: To paraphrase provocatively,\n  \'machine learning is statistics minus\n  any checking of models and\n  assumptions\'.\n  -- Brian D. Ripley (about the difference between machine learning\n  and statistics) useR! 2004, Vienna\n  (May 2004) :-) Season\'s Greetings!</p>\n</blockquote>\n\n<p>Andrew Gelman:</p>\n\n<blockquote>\n  <p>In that case, maybe we should get rid\n  of checking of models and assumptions\n  more often. Then maybe we\'d be able to\n  solve some of the problems that the\n  machine learning people can solve but\n  we can\'t!</p>\n</blockquote>\n\n<p>There was also the <a href="http://projecteuclid.org/euclid.ss/1009213726"><strong>"Statistical Modeling: The Two Cultures"</strong> paper</a> by Leo Breiman in 2001 which argued that statisticians rely too heavily on data modeling, and that machine learning techniques are making progress by instead relying on the <em>predictive accuracy</em> of models.</p>\n\n<p>Has the statistics field changed over the last decade in response to these critiques?  Do the <em>two cultures</em> still exist or has statistics grown to embrace machine learning techniques such as neural networks and support vector machines?</p>\n'
''.join(p.findall(string))

'Last year, I read a blog post from <a href="http://anyall.org/">Brendan O\'Connor</a> entitled <a href="http://anyall.org/blog/2008/12/statistics-vs-machine-learning-fight/">"Statistics vs. Machine Learning, fight!"</a> that discussed some of the differences between the two fields.  <a href="http://andrewgelman.com/2008/12/machine_learnin/">Andrew Gelman responded favorably to this</a>:Simon Blomberg: Andrew Gelman:There was also the <a href="http://projecteuclid.org/euclid.ss/1009213726"><strong>"Statistical Modeling: The Two Cultures"</strong> paper</a> by Leo Breiman in 2001 which argued that statisticians rely too heavily on data modeling, and that machine learning techniques are making progress by instead relying on the <em>predictive accuracy</em> of models.Has the statistics field changed over the last decade in response to these critiques?  Do the <em>two cultures</em> still exist or has statistics grown to embrace machine learning techniques such as neural networks and suppor

In [198]:
def format_body(string):
    p = re.compile('<p>(.+)</p>')
    lis = p.findall(string)
    return ''.join(p.findall(string))

In [223]:
def tag_post(line):
    if '<row' in line:
        try:
            root = etree.fromstring(line)
            if 'Body' and "Tags" in root.attrib:
                body = root.attrib['Body']
                tag = root.attrib['Tags']
                return (body,token_check(tag))
            else:
                return ('Empty')
        except XMLSyntaxError:
            return ('Empty')
    else:
        return ('Empty')

In [224]:
train.map(tag_post).filter(lambda x: x != 'Empty').take(5)

[('<p>How should I elicit prior distributions from experts when fitting a Bayesian model?</p>\n',
  0),
 ('<p>In many different statistical methods there is an "assumption of normality".  What is "normality" and how do I know if there is normality?</p>\n',
  1),
 ('<p>What are some valuable Statistical Analysis open source projects available right now?</p>\n\n<p>Edit: as pointed out by Sharpie, valuable could mean helping you get things done faster or more cheaply.</p>\n',
  0),
 ("<p>I have two groups of data.  Each with a different distribution of multiple variables.  I'm trying to determine if these two groups' distributions are different in a statistically significant way.  I have the data in both raw form and binned up in easier to deal with discrete categories with frequency counts in each.  </p>\n\n<p>What tests/procedures/methods should I use to determine whether or not these two groups are significantly different and how do I do that in SAS or R (or Orange)?</p>\n",
  1),
 ('<

In [225]:
train_set = train.map(tag_post).filter(lambda x: x != 'Empty').collect()

In [226]:
train.map(tag_post).filter(lambda x: x != 'Empty').filter(lambda x: x[1]==0).count()

19540

In [227]:
from pyspark.ml.classification import LogisticRegression

from pyspark.ml.classification import RandomForestClassifier

from pyspark.ml.feature import HashingTF, Tokenizer,RegexTokenizer
##save the computation from the traing process by cache()
training = sqlContext.createDataFrame(train_set, ["title", "label"]).cache()

In [228]:
training.show()

+--------------------+-----+
|               title|label|
+--------------------+-----+
|<p>How should I e...|    0|
|<p>In many differ...|    1|
|<p>What are some ...|    0|
|<p>I have two gro...|    1|
|<p>Last year, I r...|    1|
|<p>I've been work...|    0|
|<p>Sorry, but the...|    0|
|<p>Many studies i...|    0|
|<p>I have four co...|    0|
|<p>What are some ...|    0|
|<p>How would you ...|    0|
|<p>How can I find...|    1|
|<p>What modern to...|    1|
|<p>What is a stan...|    0|
|<p>Which methods ...|    1|
|<p>After taking a...|    1|
|<p>What R package...|    1|
|<p>I have a data ...|    1|
|<p>There is an ol...|    1|
|<p>I'm looking fo...|    0|
+--------------------+-----+
only showing top 20 rows



In [229]:
from pyspark.ml.feature import StopWordsRemover

tokenizer = RegexTokenizer(inputCol="title", outputCol="words", pattern="\\w")
tokenizer = Tokenizer(inputCol="title", outputCol="words")
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="filtered")

hashingTF = HashingTF(inputCol=remover.getOutputCol(), outputCol="features")
logreg = LogisticRegression(maxIter=1000000, regParam=0.8)

pipeline = Pipeline(stages=[tokenizer,remover, hashingTF, logreg])

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator


paramGrid = (ParamGridBuilder() 
    .addGrid(hashingTF.numFeatures, [2000])
    .addGrid(logreg.regParam, [10, 1, 0.1]) 
    .build())

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=5)

In [230]:
cvModel = crossval.fit(training)

In [231]:
best_model = cvModel.bestModel

In [26]:
test = sc.textFile(localpath('spark-stats-data/test'))

In [27]:
test.count()

19414

In [233]:
def tag_post_test(line):
    if '<row' in line:
        try:
            root = etree.fromstring(line)
            if 'Body' and 'Id' and "PostTypeId" in root.attrib:
                if int(root.attrib['PostTypeId']) == 1:
                    pid = root.attrib['Id']
                    body = root.attrib['Body']
                    return (body, int(pid))
                else:
                    return ('Empty') 
            else:
                return ('Empty')
        except XMLSyntaxError:
            return ('Empty')
    else:
        return ('Empty')

In [234]:
test.map(tag_post_test).filter(lambda x: x != 'Empty').count()

4649

In [235]:
test_set = test.map(tag_post_test).filter(lambda x: x != 'Empty')

In [236]:
test_df = sqlContext.createDataFrame(test_set, ["title", "id"])

In [237]:
test_df.show()

+--------------------+---+
|               title| id|
+--------------------+---+
|<p>Is there a goo...| 11|
|<p>What algorithm...| 40|
|<p>I have a datas...| 47|
|<p>We're trying t...| 93|
|<p>I need to anal...|183|
|<p>I have 2 ASR (...|212|
|<p>What are some ...|216|
|<p>I have a frien...|223|
|<p>When a non-hie...|278|
|<p>I know of Came...|290|
|<p>I'm a physics ...|312|
|<p>I realize that...|328|
|<p>Why do we seek...|354|
|<p>What is the di...|362|
|<p>If you could g...|363|
|<p>From Wikipedia...|373|
|<p>I am proposing...|492|
|<p>Sometimes, I j...|498|
|<p>In answering <...|539|
|<p>In engineering...|624|
+--------------------+---+
only showing top 20 rows



In [238]:
better_prediction = cvModel.transform(test_df)

In [239]:
better_prediction.show()

+--------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|               title| id|               words|            filtered|            features|       rawPrediction|         probability|prediction|
+--------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|<p>Is there a goo...| 11|[<p>is, there, a,...|[<p>is, good,, mo...|(2000,[139,255,27...|[0.77586480998409...|[0.68478820225451...|       0.0|
|<p>What algorithm...| 40|[<p>what, algorit...|[<p>what, algorit...|(2000,[722,738,85...|[-0.0618889691333...|[0.48453269436504...|       1.0|
|<p>I have a datas...| 47|[<p>i, have, a, d...|[<p>i, dataset, 1...|(2000,[3,11,43,80...|[-0.0133490083470...|[0.49666279746943...|       1.0|
|<p>We're trying t...| 93|[<p>we're, trying...|[<p>we're, trying...|(2000,[77,101,138...|[0.47198871542085...|[0.61585435051996...|       0.0|

In [240]:
selected = better_prediction.select("id", "prediction")

In [241]:
for row in selected.collect()[5]:
    print(row)

212
0.0


In [242]:
sort_selected = selected.sort("id").collect()
sort_selected

[Row(id=11, prediction=0.0),
 Row(id=40, prediction=1.0),
 Row(id=47, prediction=1.0),
 Row(id=93, prediction=0.0),
 Row(id=183, prediction=0.0),
 Row(id=212, prediction=0.0),
 Row(id=216, prediction=0.0),
 Row(id=223, prediction=1.0),
 Row(id=278, prediction=0.0),
 Row(id=290, prediction=1.0),
 Row(id=312, prediction=0.0),
 Row(id=328, prediction=0.0),
 Row(id=354, prediction=0.0),
 Row(id=362, prediction=0.0),
 Row(id=363, prediction=0.0),
 Row(id=373, prediction=1.0),
 Row(id=492, prediction=0.0),
 Row(id=498, prediction=1.0),
 Row(id=539, prediction=0.0),
 Row(id=624, prediction=0.0),
 Row(id=841, prediction=0.0),
 Row(id=886, prediction=1.0),
 Row(id=897, prediction=1.0),
 Row(id=928, prediction=0.0),
 Row(id=929, prediction=0.0),
 Row(id=944, prediction=1.0),
 Row(id=946, prediction=1.0),
 Row(id=977, prediction=0.0),
 Row(id=1063, prediction=0.0),
 Row(id=1082, prediction=0.0),
 Row(id=1228, prediction=0.0),
 Row(id=1266, prediction=1.0),
 Row(id=1308, prediction=1.0),
 Row(id=1

In [243]:
predictions = [int(x['prediction']) for x in sort_selected]
len(predictions)

4649

In [244]:
predictions

[0,
 1,
 1,
 0,
 0,
 0,
 0,
 1,
 0,
 1,
 0,
 0,
 0,
 0,
 0,
 1,
 0,
 1,
 0,
 0,
 0,
 1,
 1,
 0,
 0,
 1,
 1,
 0,
 0,
 0,
 0,
 1,
 1,
 0,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 0,
 0,
 1,
 1,
 0,
 1,
 1,
 0,
 1,
 0,
 0,
 0,
 0,
 1,
 0,
 1,
 1,
 1,
 0,
 0,
 1,
 0,
 0,
 0,
 0,
 1,
 0,
 0,
 1,
 1,
 0,
 0,
 1,
 1,
 1,
 0,
 1,
 0,
 1,
 0,
 1,
 1,
 1,
 0,
 0,
 1,
 0,
 0,
 0,
 0,
 0,
 1,
 0,
 1,
 0,
 0,
 1,
 1,
 1,
 1,
 0,
 1,
 0,
 1,
 0,
 0,
 1,
 1,
 1,
 0,
 1,
 0,
 0,
 0,
 0,
 1,
 0,
 1,
 1,
 0,
 0,
 0,
 1,
 0,
 1,
 0,
 1,
 0,
 1,
 0,
 0,
 0,
 0,
 0,
 1,
 1,
 1,
 0,
 1,
 1,
 0,
 0,
 1,
 1,
 1,
 1,
 1,
 0,
 0,
 0,
 1,
 1,
 1,
 1,
 1,
 1,
 1,
 0,
 1,
 1,
 1,
 0,
 0,
 0,
 0,
 1,
 1,
 0,
 0,
 0,
 1,
 1,
 0,
 1,
 0,
 0,
 1,
 1,
 1,
 1,
 0,
 1,
 0,
 1,
 0,
 1,
 1,
 1,
 0,
 0,
 0,
 1,
 1,
 0,
 1,
 1,
 1,
 0,
 1,
 0,
 1,
 1,
 1,
 0,
 1,
 0,
 0,
 1,
 0,
 0,
 1,
 0,
 0,
 0,
 1,
 0,
 1,
 1,
 0,
 1,
 1,
 1,
 0,
 0,
 0,
 1,
 1,
 0,
 1,
 0,
 1,
 0,
 1,
 0,
 0,
 1,
 0,
 1,
 0,
 1,
 0,
 0,
 0,
 1,
 1,
 0,
 1,
 0,
 0,
