# Spark with Scala

#### StackOverflow provides an anonymized [data dump](https://archive.org/details/stackexchange) on which I performed data analysis with Spark.

## Accessing the data

The data I pulled in included three folders: allUsers, allPosts, and allVotes which were chunked and gzipped xml with the following format:

```
<row Body="&lt;p&gt;I always validate my web pages, and I recommend you do the same BUT many large company websites DO NOT and cannot validate because the importance of the website looking exactly the same on all systems requires rules to be broken. &lt;/p&gt;&#10;&#10;&lt;p&gt;In general, valid websites help your page look good even on odd configurations (like cell phones) so you should always at least try to make it validate.&lt;/p&gt;&#10;" CommentCount="0" CreationDate="2008-10-12T20:26:29.397" Id="195995" LastActivityDate="2008-10-12T20:26:29.397" OwnerDisplayName="Eric Wendelin" OwnerUserId="25066" ParentId="195973" PostTypeId="2" Score="0" />
```

A full schema can be found [here](https://ia801500.us.archive.org/8/items/stackexchange/readme.txt)

## Data Preprocessing

First I eliminated malformatted XML rows by using a Scala XML parsing function. The goal was to create an RDD of Post objects where each Post is a valid row of XML from the allPosts dataset.

In [4]:
import gzip
with gzip.open('spark-stats-data/allPosts/part-00000.xml.gz', 'r') as file:
    i = 0
    for line in file:
        if i < 4:
            print line
            i += 1
        else:
            break

<?xml version="1.0" encoding="UTF-8"?>

<parent>

  <row AcceptedAnswerId="15" AnswerCount="5" Body="&lt;p&gt;How should I elicit prior distributions from experts when fitting a Bayesian model?&lt;/p&gt;&#10;" CommentCount="1" CreationDate="2010-07-19T19:12:12.510" FavoriteCount="17" Id="1" LastActivityDate="2010-09-15T21:08:26.077" OwnerUserId="8" PostTypeId="1" Score="26" Tags="&lt;bayesian&gt;&lt;prior&gt;&lt;elicitation&gt;" Title="Eliciting priors from experts" ViewCount="1457" />

  



In [1]:
import java.io._
def localpath(path: String): String = {
    "file://" + new java.io.File(".").getCanonicalPath + "/" + path
}

In [2]:
import org.apache.spark.SparkContext

val lines = sc.textFile(localpath("/spark-stats-data/allPosts/*.gz"))
val totalLines = lines.count()
println(s"total lines: $totalLines")

total lines: 212990


In [3]:
import scala.xml.{NodeSeq, MetaData}

In [4]:
import scala.io.{ BufferedSource, Source}

In [48]:
def starter(line : String) : Boolean = {
    line.contains("<row ")
}

#### The XML.loadString() function takes a XML string and throws an error if they are incorrectly formatted. 

In [56]:
import scala.xml._
def transformer(x: String): Int ={
    try{
        XML.loadString(x)
        0
       }
    catch{
        case e =>
        1
    }
}

In [58]:
val filtered = (sc.textFile(localpath("/spark-stats-data/allPosts/*.gz"))
                .filter(x => starter(x))
                .map(transformer(_))
                )

#### The "filtered" RDD is devoid of malformatted XMLs => Next step we examine this RDD by running a sanity check.

## Upvote Percentage

One "sanity check" we can do is to look at the ratio of upvotes to downvotes (referred to as "UpMod" and "DownMod" in the schema) as a function of how many times the post has been favorited. The hypothesis here is that posts with more favorites have higher upvote to downvote ratio.

To achieve this, we take the number of favorites as our key and aggregate over the number of upvotes and downvotes and eventually reducing to obtain the avearage upvotes/(upvotes+downvotes). 

In [2]:
# this section is solved with PySpark

In [8]:
from pyspark import SparkContext
import pyspark, os, re, operator
from lxml import etree
from datetime import datetime
import operator

sc = SparkContext("local[*]", "temp")

In [29]:
#small test files
def datapath(path):
    return 'file://' + str(os.path.abspath(os.path.curdir)) + '/spark-stats-data-small/' + path

smallvotes = sc.textFile(datapath('allVotes/'))
smallposts = sc.textFile(datapath('allPosts/')) 
smallusers = sc.textFile(datapath('allUsers/'))

In [16]:
class vote_Record(object):
    def __init__(self, Id, PostId, VoteTypeId, CreationDate):
        self.Id = Id
        self.PostId = PostId
        self.VoteTypeId = VoteTypeId
        self.CreationDate = CreationDate

In [17]:
def parse2(line):
    try:
        vote_sum = (0, 0, 0) 
        votes =etree.fromstring(line)
        PostId = votes.get('PostId')
        VoteTypeId = votes.get('VoteTypeId')
        if VoteTypeId == u'2': #if UpMod
            vote_sum = (1, 0, 0)
        if VoteTypeId == u'3': #if DownMod
            vote_sum = (0, 1, 0)
        if VoteTypeId == u'5': #if Favorite
            vote_sum = (0, 0, 1)
        return (PostId, vote_sum)
    except:
        return (0, (0, 0, 0))

In [18]:
smallvotes.map(parse2)\
    .reduceByKey(lambda x,y:tuple(map(operator.add,x,y)))\
    .map(lambda (x,y):(y[2], (y[0], y[1])))\
    .reduceByKey(lambda x,y:tuple(map(operator.add,x,y)))\
    .map(lambda (x,y): (x,float(y[0])/(float(y[0]+y[1]))))\
    .takeOrdered(50, lambda s:s[0])

[(0, 0.9515184306202837),
 (1, 0.971349277609991),
 (2, 0.9858878575201871),
 (3, 0.9899873257287706),
 (4, 0.990321980271729),
 (5, 0.9925945517058979),
 (6, 0.9948542024013722),
 (7, 0.9908026755852842),
 (8, 0.9944289693593314),
 (9, 0.9967931587386424),
 (10, 0.9916376306620209),
 (11, 0.9915174363807728),
 (12, 0.9958123953098827),
 (13, 0.9972789115646259),
 (14, 0.9939540507859734),
 (15, 0.9929245283018868),
 (16, 1.0),
 (17, 1.0),
 (18, 0.9985693848354793),
 (19, 0.997867803837953),
 (20, 0.9969512195121951),
 (21, 0.9944029850746269),
 (22, 0.9977973568281938),
 (23, 0.9952038369304557),
 (24, 1.0),
 (25, 1.0),
 (26, 0.9841772151898734),
 (27, 0.989010989010989),
 (28, 0.9951690821256038),
 (29, 0.9972826086956522),
 (30, 0.9954337899543378),
 (31, 0.9939577039274925),
 (32, 1.0),
 (33, 1.0),
 (34, 1.0),
 (35, 1.0),
 (36, 1.0),
 (37, 0.990990990990991),
 (38, 0.9937888198757764),
 (39, 0.9918032786885246),
 (40, 1.0),
 (41, 1.0),
 (42, 1.0),
 (44, 1.0),
 (45, 1.0),
 (47, 1.0)

#### This results makes sense since the higher favorite count is associated with higher upvote to downvote ratio. 

## Classification

We'd like to see if we can predict the tags of a question from its body text. Technically this is a multi-label classification problem, but to simplify things we'll use a one-vs-all approach where we choose the top k most common tags and train k binary classifiers where the labels indicate the presence or absence of that tag.

Use a logistic regression model as your classifer.

Since we can't reliably save and load models, return a list of 100 tuples ("string", [number, number, number,...]) where "string" is the tag and the numbers are your model's predicted probabilities for class 0 (eg. 0.2 means a prediction that the tag is present) across the test set.

In [3]:
import re
from pyspark.sql import Row
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql.types import DoubleType
from operator import add
from lxml import etree

In [4]:
from pyspark import SparkContext
import pyspark, os, re, operator
from lxml import etree
from datetime import datetime
import operator

sc = SparkContext("local[*]", "temp")

def datapath(path):
    return 'file://' + str(os.path.abspath(os.path.curdir)) + '/spark-stats-data-small/' + path

smallvotes = sc.textFile(datapath('allVotes/'))
smallposts = sc.textFile(datapath('allPosts/')) 
smallusers = sc.textFile(datapath('allUsers/'))

In [5]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [6]:
train = sc.textFile(datapath('posts-train/')).cache()
test = sc.textFile(datapath('posts-test/')).cache()

In [7]:
def check_xml_posts(xml_text):
    if ('Tags' in xml_text)  & ('Body' in xml_text):
        try:
            doc = etree.fromstring(xml_text)
            return True
        except:
            return False
    return False

def get_tags(line):
    root = etree.XML(line)
    t = root.attrib['Tags']
    return  re.sub(r'<|>', ' ', t).strip()    

def body_tags(line):
    root = etree.XML(line)
    t = [root.attrib['Body'], root.attrib['Tags']]
    return [re.sub(r'<p>|</p>', ' ', t[0]).lower().strip(), re.sub(r'<|>', ' ', t[1]).strip().split()]

In [8]:
pop_tags = train.filter(lambda x: check_xml_posts(x))\
    .flatMap(lambda x: get_tags(x).split())\
    .map(lambda word: (word, 1))\
    .reduceByKey(add)\
    .map(lambda (k,v): (v,k)).sortByKey(False).take(100)\

pop_tags = [i for x, i in pop_tags]
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="features")
lr = LogisticRegression()
traintag = train.filter(lambda x: check_xml_posts(x)) \
           .map(lambda x: body_tags(x))
    

In [9]:
pop_tags

['r',
 'regression',
 'time-series',
 'machine-learning',
 'probability',
 'hypothesis-testing',
 'distributions',
 'self-study',
 'logistic',
 'correlation',
 'statistical-significance',
 'classification',
 'bayesian',
 'anova',
 'normal-distribution',
 'clustering',
 'data-visualization',
 'confidence-interval',
 'mathematical-statistics',
 'multiple-regression',
 'estimation',
 'categorical-data',
 'mixed-model',
 'spss',
 'generalized-linear-model',
 'variance',
 'repeated-measures',
 'sampling',
 't-test',
 'pca',
 'svm',
 'forecasting',
 'data-mining',
 'multivariate-analysis',
 'cross-validation',
 'chi-squared',
 'modeling',
 'maximum-likelihood',
 'predictive-models',
 'matlab',
 'data-transformation',
 'neural-networks',
 'nonparametric',
 'interaction',
 'survival',
 'model-selection',
 'linear-model',
 'dataset',
 'p-value',
 'binomial',
 'poisson',
 'econometrics',
 'bootstrap',
 'standard-deviation',
 'feature-selection',
 'stata',
 'mean',
 'interpretation',
 'sample-siz

In [10]:
testingdata = sqlContext.createDataFrame(test.filter(lambda x: check_xml_posts(x)) \
                                         .map(lambda x: body_tags(x)) \
                                         .map(lambda x: (x[0],)),schema=['text'])

In [11]:
testingdata = hashingTF.transform(tokenizer.transform(testingdata))

In [17]:
predictions = []

for tag in pop_tags:
    temp = sqlContext.createDataFrame(traintag.map(lambda x: Row(1,x[0]) if tag in x[1] else Row(0,x[0])),schema=['label','text'])
    temp = hashingTF.transform(tokenizer.transform(temp))
    model = lr.fit(temp[['label','features']].withColumn("label", temp.label.cast(DoubleType())),params={'maxIter':10, 'regParam':0.01})
    pred = model.transform(testingdata) 
    predictions.append((tag,pred.rdd.map(lambda x:  x.probability).map(lambda x: x[1]).collect()))
    print(tag)  

r
regression
time-series
machine-learning
probability
hypothesis-testing
distributions
self-study
logistic
correlation
statistical-significance
classification
bayesian
anova
normal-distribution
clustering
data-visualization
confidence-interval
mathematical-statistics
multiple-regression
estimation
categorical-data
mixed-model
spss
generalized-linear-model
variance
repeated-measures
sampling
t-test
pca
svm
forecasting
data-mining
multivariate-analysis
cross-validation
chi-squared
modeling
maximum-likelihood
predictive-models
matlab
data-transformation
neural-networks
nonparametric
interaction
survival
model-selection
linear-model
dataset
p-value
binomial
poisson
econometrics
bootstrap
standard-deviation
feature-selection
stata
mean
interpretation
sample-size
references
multiple-comparisons
least-squares
optimization
python
random-forest
arima
experiment-design
conditional-probability
prediction
standard-error
factor-analysis
survey
missing-data
panel-data
sas
covariance
outliers
simulat

In [16]:
predictions[1]

('regression',
 [4.6248835871163648e-90,
  5.2823672922783619e-24,
  3.4150412416720816e-118,
  2.2408752946383374e-60,
  1.0,
  4.1766894242970242e-87,
  2.0999690893401638e-43,
  1.2538750846435401e-24,
  6.0235152289695343e-84,
  3.8913711840359991e-16,
  2.6921442074584195e-135,
  4.0743709691479957e-197,
  2.0719373348535893e-35,
  5.5856967560730421e-36,
  2.1445360004987881e-32,
  2.4949510055314499e-139,
  4.4914743025254398e-35,
  4.134194224294696e-22,
  4.5842081669558646e-93,
  0.99999999554013685,
  2.8172482155894497e-110,
  1.0978866154320436e-60,
  2.2660178842070194e-24,
  1.0588911991935341e-23,
  1.7415666769906636e-23,
  4.6924625431800832e-73,
  8.0956813248748107e-143,
  2.9772938933326019e-84,
  3.6391170165301576e-56,
  4.8965348106435992e-97,
  3.3048124394653643e-53,
  2.1543265177999459e-207,
  2.9980951086446478e-88,
  3.3066320762967301e-24,
  6.9152696152628849e-53,
  2.2608899084251544e-137,
  1.1013998012020178e-75,
  3.8288940330255924e-13,
  5.68382089

In [71]:
import pickle
pickle.dump(predictions, open('pred.txt', 'wb'))

*Copyright &copy; 2016 The Data Incubator.  All rights reserved.*