# Spark Project


### Stack Exchange provides an anonymized data dump (https://archive.org/details/stackexchange), and I'll use Spark to perform data manipulation, analysis, and machine learning on this data set. 

### Data input and parsing - bad XML


In [1]:
from pyspark import SparkContext
sc = SparkContext("local[*]", "temp")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
import lxml
from lxml import etree

In [2]:
import xml.etree.ElementTree as ET


### First way - testing the logic

In [72]:
sc.textFile('spark-stats-data/allPosts/').filter(lambda x: '<row'in x).count()


109522

In [73]:
posts = sc.textFile('spark-stats-data/allPosts/').filter(lambda x: '<row'in x).collect()


In [74]:
ET.fromstring(posts[7]).attrib

{'Body': '<p>Briefly, my answer is "yes".</p>\n\n<p>The result of any LDA inference algorithm is $\\theta_{d,t}$ and $\\phi_{t,w}$, distribution of topics in each document and distribution of terms in each topics. Given these distributions, one can obtain estimate for $p(z|d,w)$, conditional probability of a topic $z$ for word $w$ in document $d$:\n$$\np(z|d,w)=\\frac{p(z,d,w)}{\\sum\\limits_{s=1}^K p(s,d,w)}=\\frac{\\theta_{d,z} \\phi_{z,w}}{\\sum\\limits_{s=1}^{K} \\theta_{d, s} \\phi_{s, w}}\n$$\nFurther utilizing of this information can be, for example, assigning the single most probable topic for a word. But it\'s not obligatory.</p>\n',
 'CommentCount': '0',
 'CreationDate': '2013-06-15T09:36:39.560',
 'Id': '61814',
 'LastActivityDate': '2013-06-15T09:36:39.560',
 'OwnerUserId': '26924',
 'ParentId': '61648',
 'PostTypeId': '2',
 'Score': '2'}

In [75]:
posts[65]

'  <row Body="Autocorrelation is the correlation of a series of data with itself at some lag. This is an important topic particularly in the analysis of time-series data." CommentCount="0" CreationDate="2013-06-16T20:13:54.233" Id="61881" LastActivityDate="2013-06-16T22:08:32.437" LastEditDate="2013-06-16T22:08:32.437" LastEditorUserId="7290" OwnerUserId="7290" PostTypeId="4" Score="0" />'

parser function for tests

In [20]:
def parser_list(x):
    try:
        return dict(ET.fromstring(x).attrib)
    except:
        ET.ParseError

In [104]:
x = list(map(parser_list, a))

### working on RDD

In [None]:
 sc.textFile('spark-stats-data/allPosts/').filter(lambda x: '<row'in x)\
    .map(parser_list).take(400)

In [173]:
## getting the errors

In [22]:
def parser_error(x):
    counter = 0
    try:
        dict(ET.fromstring(x).attrib)
        return counter
        
    except:
        ET.ParseError
        counter += 1
        return counter

In [172]:
 sc.textFile('spark-stats-data/allPosts/').filter(lambda x: '<row'in x)\
    .map(parser_error).reduce(lambda x,y: x+y)
    # 781 bad XMl

781

### I'm going to check if there is a relationship between the number of times a post was favorited and the Score. I'll aggregate posts by the number of favorites, and find the average score for each number of favorites.  

In [3]:
from pyspark import SparkContext
sc = SparkContext("local[*]", "temp")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
import xml.etree.ElementTree as ET

In [5]:
#import pandas as pd
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, ArrayType
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

In [6]:
def parser_list2(x):
    try:
        return dict(ET.fromstring(x).attrib)
    except:
        ET.ParseError
        return {}

In [9]:
data = sc.textFile('spark-stats-data/allPosts/').filter(lambda x: '<row'in x).map(parser_list2).toDF()

In [8]:
data.count()

109522

In [20]:
x = data.selectExpr("INT(FavoriteCount)", "DOUBLE(Score)").groupBy('FavoriteCount').avg().fillna(0).orderBy('FavoriteCount')

In [33]:
favorite_score = x.select('FavoriteCount', 'avg(Score)').toPandas()

In [38]:
favorite_score = list(zip(favorite_score['FavoriteCount'], favorite_score['avg(Score)']))

### I will now investigate the correlation between a user's reputation and the kind of posts they make. 


#### User data for the reputation feature

In [None]:
users = sc.textFile('spark-stats-data/allUsers/').filter(lambda x: '<row'in x).map(parser_list2).toDF()

In [37]:
users_pd = users.toPandas() # FOR EASIER INSPECTION

In [19]:
users_pd[users_pd.Id=='21054']

Unnamed: 0,AccountId,CreationDate,DisplayName,DownVotes,Id,LastAccessDate,ProfileImageUrl,Reputation,UpVotes,Views
15613,2390441,2013-02-20T08:48:36.937,COOLSerdash,141,21054,2015-03-07T22:29:18.567,,6716,8454,1027


In [105]:
reputation = users.selectExpr('INT(Id)', 'DOUBLE(Reputation)').orderBy('Reputation', ascending=False)

In [106]:
reputation.count()

50458

In [107]:
reputation_99 = reputation.toPandas()[:99]; reputation_99

Unnamed: 0,Id,Reputation
0,919.0,100976.0
1,805.0,92624.0
2,686.0,47334.0
3,7290.0,46907.0
4,930.0,32283.0
...,...,...
94,17908.0,3957.0
95,13138.0,3821.0
96,1108.0,3805.0
97,1679.0,3747.0


#### Posts data for question and answers

In [45]:
data = sc.textFile('spark-stats-data/allPosts/').filter(lambda x: '<row'in x).map(parser_list2).toDF()

In [46]:
data_pd = data.toPandas()

In [108]:
post_type = data.selectExpr('INT(OwnerUserId)', 'INT(PostTypeId)')

In [20]:
post_type.show(10)

+-----------+----------+
|OwnerUserId|PostTypeId|
+-----------+----------+
|      21054|         1|
|        805|         2|
|      22293|         1|
|        183|         1|
|        805|         2|
|      11849|         2|
|       6029|         2|
|      26924|         2|
|      23956|         1|
|       6029|         2|
+-----------+----------+
only showing top 10 rows



In [24]:
post_type.count()

109522

In [48]:
# to pandas

In [109]:
posttype_pd = post_type.toPandas()

In [19]:
def my_func(x):
    if x == 1:
        return 1
    else: 
        return 0    
    
def my_func2(x):
    if x == 2:
        return 1
    else: 
        return 0    

In [112]:
posttype_pd['questions'] = posttype_pd['PostTypeId'].apply(my_func)
posttype_pd['answers'] = posttype_pd['PostTypeId'].apply(my_func2)

In [113]:
post_pd = posttype_pd.drop('PostTypeId', axis=1)

In [114]:
post_pd = post_pd.rename(columns={"OwnerUserId":"Id"})

In [115]:
post_pd = post_pd.groupby(['Id']).sum().sort_values('answers', ascending=False)

In [None]:
# merging

In [116]:
final = reputation_99.join(post_pd, on='Id', how='left')

In [117]:
final['percent'] = final['answers'] / (final['answers'] + final['questions'])

In [118]:
final

Unnamed: 0,Id,Reputation,questions,answers,percent
0,919.0,100976.0,4,1206,0.996694
1,805.0,92624.0,9,2227,0.995975
2,686.0,47334.0,31,1543,0.980305
3,7290.0,46907.0,7,856,0.991889
4,930.0,32283.0,8,430,0.981735
...,...,...,...,...,...
94,17908.0,3957.0,0,167,1.000000
95,13138.0,3821.0,20,127,0.863946
96,1108.0,3805.0,1,35,0.972222
97,1679.0,3747.0,6,97,0.941748


In [120]:
final[final.Id==7071]

Unnamed: 0,Id,Reputation,questions,answers,percent
35,7071.0,10045.0,30,306,0.910714


In [121]:
answer_percentage = list(zip(final.Id, final.percent))

In [122]:
sum(final.answers) / (sum(final.answers) + sum(final.questions))

0.9438837303189342

In [123]:
mytup = [(-1, 0.9438837303189342)]

In [124]:
answer_percentage += mytup

### We'd expect the first question a user asks to be indicative of their future behavior. I'll get in to this but first I would like to see the relationship between reputation and how long it took each person to ask their first question.


In [None]:
users = sc.textFile('spark-stats-data/allUsers/').filter(lambda x: '<row'in x).map(parser_list2).toDF()

In [164]:
reputation = users.selectExpr('INT(Id)', 'DOUBLE(Reputation)', 'CreationDate').\
withColumn('time', F.date_format('CreationDate', "yyyy-MM-dd HH:mm:ss")).\
withColumn('time2', F.to_timestamp('time', 'yyyy-MM-dd HH:mm:ss')).\
orderBy('Reputation', ascending=False)

In [165]:
reputation.show(10)

+-----+----------+--------------------+-------------------+-------------------+
|   Id|Reputation|        CreationDate|               time|              time2|
+-----+----------+--------------------+-------------------+-------------------+
|  919|  100976.0|2010-08-13T15:29:...|2010-08-13 15:29:47|2010-08-13 15:29:47|
|  805|   92624.0|2010-08-07T08:40:...|2010-08-07 08:40:07|2010-08-07 08:40:07|
|  686|   47334.0|2010-08-03T19:42:...|2010-08-03 19:42:40|2010-08-03 19:42:40|
| 7290|   46907.0|2011-11-09T04:43:...|2011-11-09 04:43:15|2011-11-09 04:43:15|
|  930|   32283.0|2010-08-13T20:50:...|2010-08-13 20:50:47|2010-08-13 20:50:47|
| 4505|   27599.0|2011-05-07T13:44:...|2011-05-07 13:44:25|2011-05-07 13:44:25|
| 4253|   25406.0|2011-04-20T12:59:...|2011-04-20 12:59:07|2011-04-20 12:59:07|
|  183|   23610.0|2010-07-20T02:56:...|2010-07-20 02:56:34|2010-07-20 02:56:34|
|11032|   23102.0|2012-05-02T14:04:...|2012-05-02 14:04:04|2012-05-02 14:04:04|
|28746|   22706.0|2013-08-02T14:24:...|2

In [166]:
reputation = reputation.toPandas(); reputation

Unnamed: 0,Id,Reputation,CreationDate,time,time2
0,919.0,100976.0,2010-08-13T15:29:47.140,2010-08-13 15:29:47,2010-08-13 15:29:47
1,805.0,92624.0,2010-08-07T08:40:07.287,2010-08-07 08:40:07,2010-08-07 08:40:07
2,686.0,47334.0,2010-08-03T19:42:40.907,2010-08-03 19:42:40,2010-08-03 19:42:40
3,7290.0,46907.0,2011-11-09T04:43:15.613,2011-11-09 04:43:15,2011-11-09 04:43:15
4,930.0,32283.0,2010-08-13T20:50:47.397,2010-08-13 20:50:47,2010-08-13 20:50:47
...,...,...,...,...,...
50453,,,,,NaT
50454,,,,,NaT
50455,,,,,NaT
50456,,,,,NaT


In [13]:
## question - and creating date

In [144]:
data = sc.textFile('spark-stats-data/allPosts/').filter(lambda x: '<row'in x).map(parser_list2).toDF()

In [145]:
data.printSchema()

root
 |-- AcceptedAnswerId: string (nullable = true)
 |-- AnswerCount: string (nullable = true)
 |-- Body: string (nullable = true)
 |-- CommentCount: string (nullable = true)
 |-- CreationDate: string (nullable = true)
 |-- FavoriteCount: string (nullable = true)
 |-- Id: string (nullable = true)
 |-- LastActivityDate: string (nullable = true)
 |-- LastEditDate: string (nullable = true)
 |-- LastEditorUserId: string (nullable = true)
 |-- OwnerUserId: string (nullable = true)
 |-- PostTypeId: string (nullable = true)
 |-- Score: string (nullable = true)
 |-- Tags: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- ViewCount: string (nullable = true)



In [146]:
post_type = data.selectExpr('INT(OwnerUserId)', 'INT(PostTypeId)', 'CreationDate').\
withColumn('time', F.date_format('CreationDate', "yyyy-MM-dd HH:mm:ss")).\
withColumn('time3', F.to_timestamp('time', 'yyyy-MM-dd HH:mm:ss'))

In [147]:
post_type.show(10)

+-----------+----------+--------------------+-------------------+-------------------+
|OwnerUserId|PostTypeId|        CreationDate|               time|              time3|
+-----------+----------+--------------------+-------------------+-------------------+
|      21054|         1|2013-06-15T06:18:...|2013-06-15 06:18:20|2013-06-15 06:18:20|
|        805|         2|2013-06-15T06:44:...|2013-06-15 06:44:59|2013-06-15 06:44:59|
|      22293|         1|2013-06-15T07:31:...|2013-06-15 07:31:03|2013-06-15 07:31:03|
|        183|         1|2013-06-15T07:42:...|2013-06-15 07:42:51|2013-06-15 07:42:51|
|        805|         2|2013-06-15T07:53:...|2013-06-15 07:53:13|2013-06-15 07:53:13|
|      11849|         2|2013-06-15T08:02:...|2013-06-15 08:02:29|2013-06-15 08:02:29|
|       6029|         2|2013-06-15T09:09:...|2013-06-15 09:09:56|2013-06-15 09:09:56|
|      26924|         2|2013-06-15T09:36:...|2013-06-15 09:36:39|2013-06-15 09:36:39|
|      23956|         1|2013-06-15T10:08:...|2013-06-1

In [148]:
post = post_type.toPandas()

In [149]:
post

Unnamed: 0,OwnerUserId,PostTypeId,CreationDate,time,time3
0,21054.0,1.0,2013-06-15T06:18:20.483,2013-06-15 06:18:20,2013-06-15 06:18:20
1,805.0,2.0,2013-06-15T06:44:59.437,2013-06-15 06:44:59,2013-06-15 06:44:59
2,22293.0,1.0,2013-06-15T07:31:03.950,2013-06-15 07:31:03,2013-06-15 07:31:03
3,183.0,1.0,2013-06-15T07:42:51.720,2013-06-15 07:42:51,2013-06-15 07:42:51
4,805.0,2.0,2013-06-15T07:53:13.860,2013-06-15 07:53:13,2013-06-15 07:53:13
...,...,...,...,...,...
109517,183.0,2.0,2011-05-17T06:33:30.290,2011-05-17 06:33:30,2011-05-17 06:33:30
109518,2860.0,1.0,2011-05-17T08:42:53.303,2011-05-17 08:42:53,2011-05-17 08:42:53
109519,4496.0,1.0,2011-05-17T09:38:43.160,2011-05-17 09:38:43,2011-05-17 09:38:43
109520,4446.0,2.0,2011-05-17T14:09:26.937,2011-05-17 14:09:26,2011-05-17 14:09:26


In [150]:
def my_func(x):  
    if x == 1:
        return 1
    else: 
        return 0    

In [152]:
post['questions'] = post['PostTypeId'].apply(my_func) # get the question column
post = post.drop('PostTypeId', axis=1)
post = post.rename(columns={"OwnerUserId":"Id"})

post = post[post.questions==1] # subset of questions
post = post.drop('questions', axis=1) 

post = post.groupby('Id').min() # get the min date for each user


In [156]:
post = post.drop('time', axis=1)

In [158]:
post = post.drop('CreationDate', axis=1)

In [77]:
# Merging

In [167]:
final = reputation.join(post, on='Id')

In [168]:
final = final.dropna()

In [183]:
import re


In [180]:
final['days'] = final['time3'] - final['time2']

In [185]:
final['days'] = final['days'].astype(str)

In [176]:
final['time3'][0] - final['time2'][0]

Timedelta('3 days 21:40:42')

In [186]:
final['days'] = final['days'].apply(lambda 
                                    x: int(re.findall('^\d+', x)[0]))

In [187]:
final['Id'] = final['Id'].astype(int)

In [189]:
first_question = list(zip(final.Id, final.days))

### It can be interesting to think about what factors influence a user to remain active on the site over a long period of time. In order not to bias the results towards older users, I'll define a time window between 100 and 150 days after account creation. If the user has made a post in this time, I'll consider them active and well on their way to being veterans of the site; if not, they are inactive and were likely brief users.

In [None]:
## getting the users and creation date

In [262]:
users = sc.textFile('spark-stats-data/allUsers/').filter(lambda x: '<row'in x).map(parser_list2).toDF()

In [263]:
users = users.selectExpr('INT(Id)', 'CreationDate').\
withColumn('time', F.date_format('CreationDate', "yyyy-MM-dd HH:mm:ss")).\
withColumn('time2', F.to_timestamp('time', 'yyyy-MM-dd HH:mm:ss')).select('Id', 'time2')

In [264]:
users.show(10)

+-----+-------------------+
|   Id|              time2|
+-----+-------------------+
|70185|2015-03-02 18:42:20|
|70186|2015-03-02 19:04:13|
|70187|2015-03-02 19:40:16|
|70188|2015-03-02 19:46:45|
|70189|2015-03-02 19:56:37|
|70190|2015-03-02 19:59:18|
|70191|2015-03-02 20:08:27|
|70192|2015-03-02 20:10:19|
|70193|2015-03-02 20:41:46|
|70194|2015-03-02 20:46:08|
+-----+-------------------+
only showing top 10 rows



In [28]:
## getting the other features

In [17]:
data = sc.textFile('spark-stats-data/allPosts/').filter(lambda x: '<row'in x).map(parser_list2).toDF()

In [18]:
data.printSchema()

root
 |-- AcceptedAnswerId: string (nullable = true)
 |-- AnswerCount: string (nullable = true)
 |-- Body: string (nullable = true)
 |-- CommentCount: string (nullable = true)
 |-- CreationDate: string (nullable = true)
 |-- FavoriteCount: string (nullable = true)
 |-- Id: string (nullable = true)
 |-- LastActivityDate: string (nullable = true)
 |-- LastEditDate: string (nullable = true)
 |-- LastEditorUserId: string (nullable = true)
 |-- OwnerUserId: string (nullable = true)
 |-- PostTypeId: string (nullable = true)
 |-- Score: string (nullable = true)
 |-- Tags: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- ViewCount: string (nullable = true)



In [274]:
main = data.selectExpr('INT(OwnerUserId)', 'INT(PostTypeId)', 'CreationDate', 'INT(Score)', 'INT(AnswerCount)', 'INT(ViewCount)', 'INT(FavoriteCount)').\
withColumn('time', F.date_format('CreationDate', "yyyy-MM-dd HH:mm:ss")).\
withColumn('time3', F.to_timestamp('time', 'yyyy-MM-dd HH:mm:ss')).\
drop('CreationDate', 'time')

In [275]:
main.show(10)

+-----------+----------+-----+-----------+---------+-------------+-------------------+
|OwnerUserId|PostTypeId|Score|AnswerCount|ViewCount|FavoriteCount|              time3|
+-----------+----------+-----+-----------+---------+-------------+-------------------+
|      21054|         1|    6|          2|     1200|            1|2013-06-15 06:18:20|
|        805|         2|    7|       null|     null|         null|2013-06-15 06:44:59|
|      22293|         1|    2|          1|      325|         null|2013-06-15 07:31:03|
|        183|         1|   10|          2|      602|            3|2013-06-15 07:42:51|
|        805|         2|    6|       null|     null|         null|2013-06-15 07:53:13|
|      11849|         2|    4|       null|     null|         null|2013-06-15 08:02:29|
|       6029|         2|    2|       null|     null|         null|2013-06-15 09:09:56|
|      26924|         2|    2|       null|     null|         null|2013-06-15 09:36:39|
|      23956|         1|    3|          2| 

In [None]:
# Join

In [549]:
final = main.join(users, on=(main['OwnerUserId']==users['Id']), how='left').drop('OwnerUserId', 'time')

In [550]:
final.show(10)

+----------+-----+-----------+---------+-------------+-------------------+----+-------------------+
|PostTypeId|Score|AnswerCount|ViewCount|FavoriteCount|              time3|  Id|              time2|
+----------+-----+-----------+---------+-------------+-------------------+----+-------------------+
|         1|   27|          4|     2984|           13|2010-08-09 18:03:54| 148|2010-07-19 21:55:51|
|         1|    8|          7|     1016|            2|2010-07-27 13:49:41| 463|2010-07-27 13:39:31|
|         1|    5|          3|     2705|            1|2010-08-09 18:10:39| 833|2010-08-09 17:58:31|
|         1|    7|          3|     1241|            2|2010-08-31 18:56:09|1088|2010-08-25 20:48:41|
|         1|    1|          0|       44|         null|2012-12-02 23:08:32|1342|2010-09-17 16:31:43|
|         2|    1|       null|     null|         null|2010-09-17 16:31:46|1342|2010-09-17 16:31:43|
|         1|   13|          6|     3772|            2|2012-01-20 15:51:34|1645|2010-10-20 14:25:23|


In [551]:
final = final.withColumn('days', (final['time3'].cast('long') - final['time2'].cast('long')) /(24*3600))

In [370]:
final.show(10)

+----------+-----+-----------+---------+-------------+-------------------+----+-------------------+--------------------+
|PostTypeId|Score|AnswerCount|ViewCount|FavoriteCount|              time3|  Id|              time2|                days|
+----------+-----+-----------+---------+-------------+-------------------+----+-------------------+--------------------+
|         1|   27|          4|     2984|           13|2010-08-09 18:03:54| 148|2010-07-19 21:55:51|   20.83892361111111|
|         1|    8|          7|     1016|            2|2010-07-27 13:49:41| 463|2010-07-27 13:39:31|0.007060185185185185|
|         1|    5|          3|     2705|            1|2010-08-09 18:10:39| 833|2010-08-09 17:58:31|0.008425925925925925|
|         1|    7|          3|     1241|            2|2010-08-31 18:56:09|1088|2010-08-25 20:48:41|  5.9218518518518515|
|         1|    1|          0|       44|         null|2012-12-02 23:08:32|1342|2010-09-17 16:31:43|   807.2755671296296|
|         2|    1|       null|  

In [552]:
final = final.withColumn('window', F.when((F.col("days") > 100) & (F.col("days") < 150),1)\
.otherwise(0))

In [None]:
# (F.col("PostTypeId").isNotNull())

In [414]:
# getting the veterans

In [555]:
veterans = final.select('Id', 'PostTypeId', 'window').select('Id', 'window')

In [587]:
vet = veterans.groupby('Id').avg().drop('sum(Id)')

In [588]:
veteran_ids = vet.filter(vet['avg(window)']>0)

In [589]:
veteran_ids.orderBy('Id').show(10)

+---+-------+--------------------+
| Id|avg(Id)|         avg(window)|
+---+-------+--------------------+
|  5|    5.0|0.042735042735042736|
|  8|    8.0| 0.06722689075630252|
| 22|   22.0|               0.125|
| 25|   25.0| 0.01282051282051282|
| 29|   29.0| 0.16666666666666666|
| 30|   30.0| 0.07692307692307693|
| 52|   52.0| 0.38461538461538464|
| 53|   53.0|                 0.5|
| 56|   56.0| 0.10526315789473684|
| 69|   69.0|                 0.1|
+---+-------+--------------------+
only showing top 10 rows



In [590]:
veteran_ids.count()

2027

In [591]:
veteran_ids = veteran_ids.drop('avg(window)', 'avg(Id)')
vet_ids = veteran_ids.toPandas().squeeze()
vet_ids = list(vet_ids)

In [599]:
from pyspark.sql.functions import udf

def vets(x):
    if x in vet_ids:
        return 1
    else:
        return 0
    
vets_udf = udf(vets, IntegerType())

final = final.withColumn('Veterans', vets_udf(final['Id']))

In [603]:

def dummy(x):  
    if x == 0:
        return 1
    else: 
        return 0

In [604]:
dummy_udf = udf(dummy, IntegerType())
final = final.withColumn('Brief', dummy_udf(final['Veterans']))
final.select('Id', 'Brief','Veterans').show(100)

In [607]:
def myfunc(x):
    if x == 1:
        return 1
    else:
        return 0
    
myfunc = udf(myfunc, IntegerType())

In [609]:
final = final.withColumn('questions', myfunc(final['PostTypeId']))
questions = final.filter(final.questions==1)

In [611]:
questions.count()

52060

In [613]:
veterans = questions.filter(questions.Veterans==1)
brief_users = questions.filter(questions.Brief==1)

In [615]:
veterans.count()

13407

In [616]:
brief_users.count()

38653

In [160]:
veterans.show(10)

+----------+-----+-----------+---------+-------------+-------------------+-----+-------------------+----+------+-----+---------+
|PostTypeId|Score|AnswerCount|ViewCount|FavoriteCount|              time3|   Id|              time2|days|senior|brief|questions|
+----------+-----+-----------+---------+-------------+-------------------+-----+-------------------+----+------+-----+---------+
|         1|    2|          1|     1889|            2|2012-09-21 17:58:26|11748|2012-06-04 17:06:25| 109|     1|    0|        1|
|         1|    0|          1|       95|            0|2012-10-02 17:39:12|11748|2012-06-04 17:06:25| 120|     1|    0|        1|
|         1|    1|          2|      604|            0|2012-10-10 16:38:59|11748|2012-06-04 17:06:25| 127|     1|    0|        1|
|         1|    2|          0|      218|            0|2013-02-17 12:44:47|16574|2012-11-06 13:14:24| 102|     1|    0|        1|
|         1|    0|          1|       75|            0|2013-02-18 17:40:53|16574|2012-11-06 13:14:

In [617]:
veterans2 = veterans.select('Id', 'time3')
veterans2 = veterans2.groupBy('Id').agg(F.min('time3').alias('time3'))

In [620]:
brief_users2 = brief_users.select('Id', 'time3').groupBy('Id').agg(F.min('time3').alias('time3'))

In [None]:
# merging

In [622]:
vets = veterans2.join(veterans, on=('time3')).drop(veterans.Id)\
.select('Id', 'Score', 'AnswerCount', 'ViewCount', 'FavoriteCount')

In [623]:
vets.count()

1819

In [624]:
vets.show(10)

+-----+-----+-----------+---------+-------------+
|   Id|Score|AnswerCount|ViewCount|FavoriteCount|
+-----+-----+-----------+---------+-------------+
| 8078|    3|          3|     1252|         null|
|21240|    1|          1|      675|         null|
|27589|    4|          1|     1014|            1|
|28957|    1|          0|       30|         null|
|29612|    1|          1|       84|         null|
|40513|    0|          0|       37|         null|
|49793|    1|          0|       35|         null|
|54005|    2|          0|       66|         null|
| 9177|   -1|          0|      129|            0|
|14860|    3|          2|      530|         null|
+-----+-----+-----------+---------+-------------+
only showing top 10 rows



In [625]:
vets.groupby().sum().show()

+--------+----------+----------------+--------------+------------------+
| sum(Id)|sum(Score)|sum(AnswerCount)|sum(ViewCount)|sum(FavoriteCount)|
+--------+----------+----------------+--------------+------------------+
|46281460|      6444|            2361|       1684403|              2365|
+--------+----------+----------------+--------------+------------------+



In [626]:
vets_score = 6444/ 1819; vets_score

3.54260582737768

In [627]:
vets_views = 1684403/ 1819; vets_views

926.004947773502

In [628]:
vets_answers = 2361 / 1819; vets_answers

1.2979659153380978

In [629]:
vets_favorites = 2365 / 1819; vets_favorites

1.3001649257833974

In [630]:
brief = brief_users2.join(brief_users, on=('time3')).drop(brief_users.Id)\
.select('Id', 'Score', 'AnswerCount', 'ViewCount', 'FavoriteCount')

In [354]:
brief.show(10)

+----+-----+-----------+---------+-------------+
|  Id|Score|AnswerCount|ViewCount|FavoriteCount|
+----+-----+-----------+---------+-------------+
|1291|    7|          2|      958|            1|
|2385|    4|          3|     1070|            1|
|2105|    6|          1|      539|         null|
|3891|    7|          4|     2179|            1|
|3840|    6|          1|       83|            2|
|1503|    1|          0|      253|         null|
|6610|    1|          2|      447|         null|
|7223|    1|          1|       62|         null|
|7235|    3|          1|      857|            3|
|7341|    3|          0|      151|         null|
+----+-----+-----------+---------+-------------+
only showing top 10 rows



In [631]:
brief.count()

21286

In [632]:
brief.groupBy().sum().show()

+---------+----------+----------------+--------------+------------------+
|  sum(Id)|sum(Score)|sum(AnswerCount)|sum(ViewCount)|sum(FavoriteCount)|
+---------+----------+----------------+--------------+------------------+
|727971557|     44748|           20670|      11800242|             12263|
+---------+----------+----------------+--------------+------------------+



In [190]:
brief_score = 44748/ 21286; brief_score

2.1022268157474397

In [191]:
brief_views = 11800242/ 21286; brief_views

554.3663440759184

In [635]:
brief_answers = 20670 / 21286; brief_answers

0.9710607911303204

In [636]:
brief_favorites = 12263 / 21286; brief_favorites

0.5761063609884431

#### Same as above, but on the full Stack Exchange data set.


In [181]:
def parser_list2(x):
    try:
        x = dict(ET.fromstring(x).attrib)
        if 'AnswerCount' not in x:
            x['AnswerCount'] = '0'
        if 'FavoriteCount' not in x:
            x['FavoriteCount'] = '0'
        if 'ViewCount' not in x:
            x['ViewCount'] = '0'
        return x

    except:
        ET.ParseError
        return {}

In [7]:
users = sc.textFile('spark-stack-data/allUsers/').filter(lambda x: '<row'in x).map(parser_list2).toDF()

In [9]:
users = users.selectExpr('INT(Id)', 'CreationDate').\
withColumn('time', F.date_format('CreationDate', "yyyy-MM-dd HH:mm:ss")).\
withColumn('time2', F.to_timestamp('time', 'yyyy-MM-dd HH:mm:ss')).select('Id', 'time2')

In [10]:
data = sc.textFile('spark-stack-data/allPosts/').filter(lambda x: '<row'in x).map(parser_list2).toDF()

In [11]:
data.select('AnswerCount', 'FavoriteCount', 'ViewCount', 'ViewCount').show(10)

+-----------+-------------+---------+---------+
|AnswerCount|FavoriteCount|ViewCount|ViewCount|
+-----------+-------------+---------+---------+
|          0|            0|        0|        0|
|          0|            0|        0|        0|
|          0|            0|        0|        0|
|          0|            0|        0|        0|
|          1|            0|     2429|     2429|
|          1|            0|     5797|     5797|
|          7|            0|      217|      217|
|          0|            0|        0|        0|
|          0|            0|        0|        0|
|          3|            0|     2353|     2353|
+-----------+-------------+---------+---------+
only showing top 10 rows



In [11]:
main = data.selectExpr('INT(OwnerUserId)', 'INT(PostTypeId)', 'CreationDate', 'INT(Score)', 'INT(AnswerCount)', 'INT(ViewCount)', 'INT(FavoriteCount)').\
withColumn('time', F.date_format('CreationDate', "yyyy-MM-dd HH:mm:ss")).\
withColumn('time3', F.to_timestamp('time', 'yyyy-MM-dd HH:mm:ss')).\
drop('CreationDate', 'time')

In [None]:
#join

In [12]:
final = main.join(users, on=(main['OwnerUserId']==users['Id']), how='left').drop('OwnerUserId', 'time')

In [13]:
final = final.withColumn('days', (final['time3'].cast('long') - final['time2'].cast('long')) /(24*3600))

In [14]:
final = final.withColumn('window', F.when((F.col("days") > 100) & (F.col("days") < 150),1)\
.otherwise(0)).cache()

In [16]:
veterans = final.select('Id', 'PostTypeId', 'window').select('Id', 'window')

In [17]:
vet = veterans.groupby('Id').avg().drop('sum(Id)')

In [18]:
veteran_ids = vet.filter(vet['avg(window)']>0)

In [19]:
veteran_ids = veteran_ids.drop('avg(window)', 'avg(Id)')

In [20]:
vet_ids = veteran_ids.toPandas().squeeze()
vet_ids = list(vet_ids)
len(vet_ids)

288285

In [21]:
from pyspark.sql.functions import udf

def vets(x):
    if x in vet_ids:
        return 1
    else:
        return 0
    
vets_udf = udf(vets, IntegerType())

final = final.withColumn('Veterans', vets_udf(final['Id']))

In [22]:
#dummy_udf = udf(dummy, IntegerType())
#final = final.withColumn('Brief', dummy_udf(final['Veterans']))
final = final.withColumn('Brief', F.when(F.col("Veterans") == 0 ,1).otherwise(0))

In [23]:
#final = final.withColumn('questions', myfunc(final['PostTypeId']))
final = final.withColumn('questions', F.when(F.col("PostTypeId") == 1 ,1).otherwise(0))
questions = final.filter(final.questions==1)

In [24]:
veterans = questions.filter(questions.Veterans==1)
brief_users = questions.filter(questions.Brief==1)

In [25]:
veterans2 = veterans.select('Id', 'time3')
veterans2 = veterans2.groupBy('Id').agg(F.min('time3').alias('time3'))

In [26]:
vets = veterans2.join(veterans, on=('time3')).drop(veterans.Id)\
.select('Id', 'Score', 'AnswerCount', 'ViewCount', 'FavoriteCount')

In [27]:
vets.count()

267192

In [40]:
vets = vets.groupby().sum().cache()

In [41]:
vets.show()

+------------+----------+----------------+--------------+------------------+
|     sum(Id)|sum(Score)|sum(AnswerCount)|sum(ViewCount)|sum(FavoriteCount)|
+------------+----------+----------------+--------------+------------------+
|480101954549|    597570|          491000|     488664403|            228490|
+------------+----------+----------------+--------------+------------------+



In [28]:
 597570 / 267192 # vets_score 

2.2364816311865625

In [30]:
 488664403 /  267192  # vets_view

1828.888600706608

In [31]:
491000 /  267192  # vets_answer

1.8376298691577593

In [32]:
228490 / 267192   # vets_favorite

0.8551528488876913

In [34]:
brief_users2 = brief_users.select('Id', 'time3').groupBy('Id').agg(F.min('time3').alias('time3'))

In [39]:
brief = brief_users2.join(brief_users, on=('time3')).drop(brief_users.Id)\
.select('Id', 'Score', 'AnswerCount', 'ViewCount', 'FavoriteCount').cache()

In [40]:
brief.show(10)

+-------+-----+-----------+---------+-------------+
|     Id|Score|AnswerCount|ViewCount|FavoriteCount|
+-------+-----+-----------+---------+-------------+
|2090742|   16|          4|     2406|            3|
|    889|  203|         22|    53711|           96|
|    263|   27|          4|     6440|           17|
|   4271|    5|          7|     1165|            1|
|   4118|    4|          8|     2934|            3|
|  13277|   15|          7|     6343|           10|
|  14322|    8|          6|    35012|            3|
|2133351|    1|          2|      763|            0|
|  18744|    8|          7|     4453|            1|
|  18950|    1|          3|     1525|            0|
+-------+-----+-----------+---------+-------------+
only showing top 10 rows



In [41]:
brief.groupBy().sum().show()

+-------------+----------+----------------+--------------+------------------+
|      sum(Id)|sum(Score)|sum(AnswerCount)|sum(ViewCount)|sum(FavoriteCount)|
+-------------+----------+----------------+--------------+------------------+
|3204413085258|   1630897|         2184116|    1576757606|            556090|
+-------------+----------+----------------+--------------+------------------+



In [42]:
brief.count()

1456311

In [43]:
1630897 / 1456311 # brief score

1.1198823602925474

In [44]:
1576757606 / 1456311 # brief view

1082.7066512578701

In [45]:
2184116 / 1456311 # brief answer

1.4997593233862823

In [46]:
556090 / 1456311 # brief favorite

0.38184838266002247

### Word2Vec is an alternative approach for vectorizing text data. The vectorized representations of words in the vocabulary tend to be useful for predicting other words in the document, hence the famous example "vector('king') - vector('man') + vector('woman') ~= vector('queen')".

### I'll see how good a Word2Vec model we can train using the 'tags' of each Stack Exchange post as documents (this uses the full data set). I choose 'ggplot' as an example.


#### Parameters


In [124]:
def parser(x):
    if '  <row'in x:
        try:
            root = ET.fromstring(x)
        except:
            pass
            return ("Empty")
            
        if root != '':
            if ("Tags" in root.attrib): 
                return root.attrib["Tags"]
            else:
                return("Empty")
        else:
            return("Empty")
    else:
        return("Empty")


In [141]:
full_posts = sc.textFile('spark-stack-data/allPosts/').map(parser).filter(lambda x: x!= 'Empty')


In [142]:
final = full_posts.map(lambda x: ([i for i in re.split("<|>", x) if i], 1)).toDF(['words','num']).cache()


In [143]:
final.show(10)

+--------------------+---+
|               words|num|
+--------------------+---+
|[javascript, jque...|  1|
|[c#, .net, encoding]|  1|
|                [c#]|  1|
|[java, gwt, inter...|  1|
|[java, image, ima...|  1|
|[php, file-io, up...|  1|
|[jquery, asp.net-...|  1|
|[captcha, recaptcha]|  1|
|[eclipse, charts,...|  1|
|    [java, xml, jsp]|  1|
+--------------------+---+
only showing top 10 rows



In [110]:
from pyspark.ml.feature import Word2Vec

In [145]:
w2v = Word2Vec(inputCol="words", outputCol="vectors", vectorSize=100, seed=42)


In [146]:

model = w2v.fit(final)


In [147]:
result = model.transform(final)

In [None]:
x = model.findSynonyms("ggplot2", 25).toPandas();x

In [159]:
word2vec = list(zip(x.word, x.similarity))

### I'd like to see if we I predict the tags of a question from its body text. Instead of predicting specific tags, I will instead try to predict if a question contains one of the top ten most common tags.  

In [6]:
from pyspark.ml.feature import VectorAssembler # to create the features into one vector if needed
from pyspark.sql import functions as F


In [7]:
def parser_list2(x):
    try:
        x = dict(ET.fromstring(x).attrib)
        if 'AnswerCount' not in x:
            x['AnswerCount'] = '0'
        if 'FavoriteCount' not in x:
            x['FavoriteCount'] = '0'
        if 'ViewCount' not in x:
            x['ViewCount'] = '0'
        return x

    except:
        ET.ParseError
        return {}

In [8]:
train = sc.textFile('spark-stats-data/train/').filter(lambda x: '<row'in x).map(parser_list2).toDF()

In [9]:
test = sc.textFile('spark-stats-data/test/').filter(lambda x: '<row'in x).map(parser_list2).toDF()

In [11]:
train.count()

90046

In [12]:
test.count()

9954

In [114]:
test.printSchema()

root
 |-- AcceptedAnswerId: string (nullable = true)
 |-- AnswerCount: string (nullable = true)
 |-- Body: string (nullable = true)
 |-- CommentCount: string (nullable = true)
 |-- CreationDate: string (nullable = true)
 |-- FavoriteCount: string (nullable = true)
 |-- Id: string (nullable = true)
 |-- LastActivityDate: string (nullable = true)
 |-- OwnerUserId: string (nullable = true)
 |-- PostTypeId: string (nullable = true)
 |-- Score: string (nullable = true)
 |-- Tags: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- ViewCount: string (nullable = true)



In [360]:
train.printSchema()

root
 |-- AcceptedAnswerId: string (nullable = true)
 |-- AnswerCount: string (nullable = true)
 |-- Body: string (nullable = true)
 |-- CommentCount: string (nullable = true)
 |-- CreationDate: string (nullable = true)
 |-- FavoriteCount: string (nullable = true)
 |-- Id: string (nullable = true)
 |-- LastActivityDate: string (nullable = true)
 |-- LastEditDate: string (nullable = true)
 |-- LastEditorUserId: string (nullable = true)
 |-- OwnerUserId: string (nullable = true)
 |-- PostTypeId: string (nullable = true)
 |-- Score: string (nullable = true)
 |-- Tags: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- ViewCount: string (nullable = true)



In [10]:
train = train.selectExpr('INT(Id)', 'Tags' , 'Body', 'INT(PostTypeId)').filter(train['PostTypeId']==1).\
drop('PostTypeId')

In [30]:
train.show(10)

+-----+--------------------+--------------------+
|   Id|                Tags|                Body|
+-----+--------------------+--------------------+
|48396| <probability><dice>|<p>I've just play...|
|48397|      <epidemiology>|<p>Suppose there ...|
|48405|  <gaussian-process>|<p>I am having a ...|
|48408|<regression><spat...|<p>From a <a href...|
|48411|<self-study><math...|<blockquote>
  <p...|
|48412|<hypothesis-testi...|<p>Suppose we do ...|
|48418|    <markov-process>|<blockquote>
  <p...|
|48419|<probability><pre...|<p>I'm quite an a...|
|48425|<clustering><k-me...|<p>I'm trying to ...|
|48427|<hypothesis-testi...|<p>I am trying to...|
+-----+--------------------+--------------------+
only showing top 10 rows



In [11]:
test = test.selectExpr('INT(Id)', 'Body', 'INT(PostTypeId)').filter(test['PostTypeId']==1).\
drop('PostTypeId')

### getting the TOP 10 used tag words

In [11]:
import pandas as pd
x = train.toPandas()

In [275]:
import re
tags = tags.apply(lambda x: re.findall('<(.*?)>', x)); tags

In [299]:
res = []
for row in tags:
    res.append(row)

from itertools import chain
words = list(chain.from_iterable(res))        

In [305]:
words = pd.DataFrame(words)
top_10 = words.groupby(0)[0].count().sort_values(0, ascending=False)[:10]


In [12]:
top_10 = ['r', 'regression', 'time-series', 'machine-learning', 'probability',
         'hypothesis-testing', 'distributions', 'self-study', 'logistic', 'correlation']

In [63]:
top_10

['r',
 'regression',
 'time-series',
 'machine-learning',
 'probability',
 'hypothesis-testing',
 'distributions',
 'self-study',
 'logistic',
 'correlation']

### Creating the label of 1's and 0's


In [13]:
from pyspark.sql.functions import udf

def label(x):
    if x[0] in top_10:
        return 1
    else:
        return 0
    
label_udf = udf(label, IntegerType())

In [14]:
import re

In [15]:
def clean(x):
    return re.findall('<(.*?)>', x)

clean_udf = udf(clean, StringType())

In [16]:
def split(x):
    return re.split('<p>', x)

split_udf = udf(split, StringType())

In [17]:
train = train.withColumn('cleaned_tags', clean_udf(train['Tags']))

In [18]:
train = train.withColumn('label', label_udf(train['cleaned_tags']))

In [19]:
# train = train.withColumn('Text', split_udf(train['Body']))

In [19]:
train = train.drop('Tags')

In [57]:
## cleaning tests

In [22]:
test = test.withColumn('Text', split_udf(test['Body']))

### Tokenizing the Body texts

In [21]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

In [22]:
toke = Tokenizer(inputCol='Body', outputCol='words')
words = toke.transform(train)

In [23]:
from pyspark.ml.feature import StopWordsRemover
sw = StopWordsRemover(inputCol=toke.getOutputCol(), outputCol='stopped')
stop_words = sw.transform(words)

In [24]:
hashing = HashingTF(inputCol= sw.getOutputCol(), outputCol='counts')
hashes = hashing.transform(stop_words)

In [25]:
hashes.select('counts', 'label').show()

+--------------------+-----+
|              counts|label|
+--------------------+-----+
|(262144,[1076,428...|    1|
|(262144,[4631,538...|    0|
|(262144,[2437,383...|    0|
|(262144,[3992,580...|    1|
|(262144,[528,3336...|    1|
|(262144,[3834,428...|    1|
|(262144,[850,901,...|    0|
|(262144,[2686,333...|    1|
|(262144,[12974,13...|    0|
|(262144,[5167,853...|    1|
|(262144,[10049,20...|    0|
|(262144,[5385,132...|    0|
|(262144,[2564,283...|    0|
|(262144,[1277,658...|    0|
|(262144,[161,521,...|    1|
|(262144,[10136,12...|    1|
|(262144,[2066,291...|    0|
|(262144,[16836,44...|    1|
|(262144,[3889,703...|    1|
|(262144,[8901,124...|    1|
+--------------------+-----+
only showing top 20 rows



In [26]:
va = VectorAssembler(inputCols=[hashing.getOutputCol()], outputCol='features')
vec = va.transform(hashes)


In [128]:
## for test-set

In [122]:
toke2 = Tokenizer(inputCol='Body', outputCol='words')
words2 = toke.transform(test)
sw2 = StopWordsRemover(inputCol='words', outputCol='stopped')
stop_words2 = sw2.transform(words2)
hashing2 = HashingTF(inputCol= 'stopped', outputCol='counts')
hashes2 = hashing2.transform(stop_words2)
va2 = VectorAssembler(inputCols=['counts'], outputCol='features')
vec2 = va2.transform(hashes2)

In [123]:

vec2.show()

In [34]:
### Grid Search and Models

In [32]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [33]:
logreg = LogisticRegression(maxIter=10, labelCol="label", featuresCol="features")

In [34]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[toke, sw, hashing, va, logreg])

In [35]:
pipeline.getStages()

[Tokenizer_b70930419b87,
 StopWordsRemover_800aeec0db25,
 HashingTF_f1ca7bb49a5a,
 VectorAssembler_a08a7bc1aa52,
 LogisticRegression_59d313962c4c]

In [36]:
paramGrid = (ParamGridBuilder() 
    .addGrid(hashing.numFeatures, [10, 100, 500, 1000, 5000]) 
    .addGrid(logreg.regParam, [1.0, 0.5, 0.1, 0.005, 0.01]) 
    .addGrid(logreg.elasticNetParam, [0.0, 0.5, 1.0])
    .build())

In [37]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3)

In [38]:
cvModel = crossval.fit(train)

In [40]:
cvModel.transform(test).select('Id', 'prediction').show(20)

+-----+----------+
|   Id|prediction|
+-----+----------+
|48410|       1.0|
|48415|       1.0|
|48420|       1.0|
|48422|       1.0|
|48429|       0.0|
|48442|       1.0|
|48496|       1.0|
|48497|       0.0|
|48517|       1.0|
|48577|       0.0|
|48582|       1.0|
|48594|       1.0|
|48639|       1.0|
|48668|       1.0|
|48724|       0.0|
|48726|       0.0|
|48766|       1.0|
|48772|       0.0|
|48833|       1.0|
|48836|       0.0|
+-----+----------+
only showing top 20 rows



In [41]:
df = cvModel.transform(test).select('Id', 'prediction').toPandas()

In [51]:
model = lr.fit(vec)

In [52]:
model.transform(vec).select('Label','prediction').show()

+-----+----------+
|Label|prediction|
+-----+----------+
|    1|       1.0|
|    0|       0.0|
|    0|       0.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    0|       0.0|
|    1|       1.0|
|    0|       0.0|
|    1|       1.0|
|    0|       0.0|
|    0|       0.0|
|    0|       1.0|
|    0|       0.0|
|    1|       1.0|
|    1|       1.0|
|    0|       0.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
+-----+----------+
only showing top 20 rows



In [53]:
df = model.transform(vec2).select('Id', 'prediction').toPandas()

In [42]:
df = df.sort_values('Id')

In [43]:
classification = list(df.prediction)

In [44]:
classification

[0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 1.0,
 0.0,
 0.0,
 1.0,
 1.0,
 0.0,
 0.0,
 0.0,
 1.0,
 1.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 1.0,
 1.0,
 0.0,
 1.0,
 1.0,
 1.0,
 0.0,
 0.0,
 0.0,
 0.0,
 1.0,
 1.0,
 0.0,
 1.0,
 1.0,
 1.0,
 0.0,
 1.0,
 1.0,
 1.0,
 0.0,
 1.0,
 0.0,
 1.0,
 0.0,
 1.0,
 0.0,
 0.0,
 1.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 1.0,
 1.0,
 1.0,
 0.0,
 0.0,
 1.0,
 0.0,
 1.0,
 1.0,
 0.0,
 1.0,
 1.0,
 0.0,
 1.0,
 1.0,
 0.0,
 0.0,
 0.0,
 1.0,
 1.0,
 0.0,
 0.0,
 0.0,
 1.0,
 0.0,
 1.0,
 1.0,
 1.0,
 1.0,
 0.0,
 1.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 1.0,
 1.0,
 0.0,
 1.0,
 1.0,
 1.0,
 0.0,
 1.0,
 0.0,
 1.0,
 0.0,
 1.0,
 0.0,
 0.0,
 1.0,
 1.0,
 0.0,
 0.0,
 1.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 1.0,
 1.0,
 0.0,
 0.0,
 1.0,
 1.0,
 1.0,
 1.0,
 0.0,
 1.0,
 0.0,
 1.0,
 1.0,
 0.0,
 0.0,
 0.0,
 1.0,
 1.0,
 1.0,
 0.0,
 0.0,
 1.0,
 1.0,
 0.0,
 0.0,
 1.0,
 1.0,
 1.0,
 1.0,
 1.0,
 1.0,
 0.0,
 0.0,
 0.0,
 1.0,
 1.0,
 1.0,
 1.0,
 1.0,
 1.0,
 0.0,
 0.0,
 1.0,
 1.0,
 0.0,
 1.0,
 0.0,
 0.0,
 0.0