### Classes to parse XML

In [8]:
import scala.xml.{ NodeSeq, MetaData }
import java.io.File
import scala.io.{ BufferedSource, Source }

abstract class StackTable[T] {


  def getDate(n: scala.xml.NodeSeq): Long = n.text match {
    case "" => 0
    case s => dateFormat.parse(s).getTime
  }

  def dateFormat = {
    import java.text.SimpleDateFormat
    import java.util.TimeZone
    val f = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")
    f.setTimeZone(TimeZone.getTimeZone("GMT"))
    f
  }

  def getInt(n: scala.xml.NodeSeq): Int = n.text match {
    case "" => 0
    case x => x.toInt
  }

  def parseXml(x: scala.xml.Elem): T

  def parse(s: String): Option[T] =
    try{
        if (s.startsWith("  <row ")) Some(parseXml(scala.xml.XML.loadString(s)))
        else None
    }catch{
        //org.xml.sax.SAXParseException
        case _ => None // matches any error, and return None
    }
}

object Post extends StackTable[Post] {

//   val file = new File("data/Posts.xml")
//   assert(file.exists)

  override def parseXml(x: scala.xml.Elem): Post = Post(
    getInt(x \ "@Id"),
    getInt(x \ "@PostTypeId"),
    getInt(x \ "@AcceptedAnswerId"),
    getDate(x \ "@CreationDate"),
    getInt(x \ "@Score"),
    getInt(x \ "@ViewCount"),
    (x \ "@Body").text,
    getInt(x \ "@OwnerUserId"),
    getDate(x \ "@LastActivityDate"),
    (x \ "@Title").text,
    getTags(x \ "@Tags"),
    getInt(x \ "@AnswerCount"),
    getInt(x \ "@CommentCount"),
    getInt(x \ "@FavoriteCount"),
    getDate(x \ "@CommunityOwnedDate"))

  def getTags(x: scala.xml.NodeSeq): Array[String] = x.text match {
    case "" => Array()
    case s => s.drop(1).dropRight(1).split("><")
  }
}

// <row Id="1" PostTypeId="1" AcceptedAnswerId="15" CreationDate="2010-07-19T19:12:12.510" Score="19" ViewCount="1033" Body="&lt;p&gt;How should I elicit prior distributions from experts when fitting a Bayesian model?&lt;/p&gt;&#xA;" OwnerUserId="8" LastActivityDate="2010-09-15T21:08:26.077" Title="Eliciting priors from experts" Tags="&lt;bayesian&gt;&lt;prior&gt;&lt;elicitation&gt;" AnswerCount="5" CommentCount="1" FavoriteCount="11" />
case class Post(
  id: Int,
  postTypeId: Int,
  acceptedAnswerId: Int,
  creationDate: Long,
  score: Int,
  viewCount: Int,
  body: String,
  ownerUserId: Int,
  lastActivityDate: Long,
  title: String,
  tags: Array[String],
  answerCount: Int,
  commentCount: Int,
  favoriteCount: Int,
  communityOwnedDate: Long) 

object User extends StackTable[User] {

//   val file = new File("data/Users.xml")
//   assert(file.exists)

  override def parseXml(x: scala.xml.Elem): User = User(
    getInt(x \ "@Id"),
    getInt(x \ "@Reputation"),
    getDate(x \ "@CreationDate"),
    (x \ "@DisplayName").text,
    getDate(x \ "@LastAccessDate"),
    (x \ "@WebsiteUrl").text,
    (x \ "@Location").text,
    (x \ "@AboutMe").text,
    getInt(x \ "@Views"),
    getInt(x \ "@UpVotes"),
    getInt(x \ "@DownVotes"),
    (x \ "@EmailHash").text,
    getInt(x \ "@Age"))
}

// <row Id="1" Reputation="5828" CreationDate="2009-04-30T07:08:27.067" DisplayName="Jeff Atwood" LastAccessDate="2013-08-15T00:13:25.560" WebsiteUrl="http://www.codinghorror.com/blog/" Location="El Cerrito, CA" AboutMe="&lt;p&gt;&lt;a href=&quot;http://www.codinghorror.com/blog/archives/001169.html&quot; rel=&quot;nofollow&quot;&gt;Stack Overflow Valued Associate #00001&lt;/a&gt;&lt;/p&gt;&#xA;&#xA;&lt;p&gt;Wondering how our software development process works? &lt;a href=&quot;http://www.youtube.com/watch?v=08xQLGWTSag&quot; rel=&quot;nofollow&quot;&gt;Take a look!&lt;/a&gt;&lt;/p&gt;&#xA;&#xA;&lt;p&gt;Find me &lt;a href=&quot;http://twitter.com/codinghorror&quot; rel=&quot;nofollow&quot;&gt;on twitter&lt;/a&gt;, or &lt;a href=&quot;http://www.codinghorror.com/blog&quot; rel=&quot;nofollow&quot;&gt;read my blog&lt;/a&gt;. Don't say I didn't warn you &lt;em&gt;because I totally did&lt;/em&gt;.&lt;/p&gt;&#xA;&#xA;&lt;p&gt;However, &lt;a href=&quot;http://www.codinghorror.com/blog/2012/02/farewell-stack-exchange.html&quot; rel=&quot;nofollow&quot;&gt;I no longer work at Stack Exchange, Inc&lt;/a&gt;. I'll miss you all. Well, &lt;em&gt;some&lt;/em&gt; of you, anyway. :)&lt;/p&gt;&#xA;" 
// Views="6020" UpVotes="2130" DownVotes="32" EmailHash="51d623f33f8b83095db84ff35e15dbe8" Age="43" />
case class User(
  id: Int,
  reputation: Int,
  creationDate: Long,
  displayName: String,
  lastAccessDate: Long,
  websiteUrl: String,
  location: String,
  aboutMe: String,
  views: Int,
  upVotes: Int,
  downVotes: Int,
  emailHash: String,
  age: Int) 

object Vote extends StackTable[Vote] {

  override def parseXml(x: scala.xml.Elem): Vote = Vote(
    getInt(x \ "@Id"),
    getInt(x \ "@PostId"),
    getInt(x \ "@VoteTypeId"),
    getInt(x \ "@UserId"),
    getDate(x \ "@CreationDate"))
}

// <row Id="1264793" PostId="486481" VoteTypeId="5" UserId="175880" CreationDate="2013-05-30T00:00:00.000" />
case class Vote(
  id: Int,
  postId: Int,
  voteTypeId: Int,
  userId: Int,
  creationDate: Long) 





### Read data into Spark RDD

In [2]:
val allPosts = spark.textFile("/home/vagrant/miniprojects/spark/data/spark-stats-data/allPosts/part-00*.xml.gz")





/home/vagrant/miniprojects/spark/data/spark-stats-data/allPosts/part-00*.xml.gz MapPartitionsRDD[1] at textFile at <console>:15

In [3]:
val allUsers = spark.textFile("/home/vagrant/miniprojects/spark/data/spark-stats-data/allUsers/part-00*.xml.gz")





/home/vagrant/miniprojects/spark/data/spark-stats-data/allUsers/part-00*.xml.gz MapPartitionsRDD[3] at textFile at <console>:15

In [4]:
val allVoters = spark.textFile("/home/vagrant/miniprojects/spark/data/spark-stats-data/allVotes/part-00*.xml.gz")





/home/vagrant/miniprojects/spark/data/spark-stats-data/allVotes/part-00*.xml.gz MapPartitionsRDD[5] at textFile at <console>:15

### First question: upvote_percentage_by_favorites

Each post on StackExchange can be upvoted, downvoted, and favorited. One
"sanity check" we can do is to look at the ratio of upvotes to downvotes as a
function of how many times the post has been favorited.  Using post favorite
counts as the keys for your mapper, calculate the average percentage of upvotes
(upvotes / (upvotes + downvotes)) for the first 50 keys (starting from the
least favorited posts).
  
Do the analysis on the stats.stackexchange.com dataset.

**Checkpoint**
Total upvotes: 313,819
Total downvotes: 13,019
Mean of top 50 favorite counts: 24.76

In [9]:
val top50 = allVoters.flatMap(Vote.parse).map({
    vote => (vote.postId, 
             (if (vote.voteTypeId == 5) 1 else 0, 
              if (vote.voteTypeId == 2) 1 else 0,
              if (vote.voteTypeId == 3) 1 else 0 ))
    // sample record: (1, (1, 0, 1))
}).reduceByKey(
    (x,y)=>(x._1 + y._1, x._2 + y._2, x._3 + y._3)
    // sample record: (1, (10, 7, 5))
).map({
    x => (x._2._1, (x._2._2, x._2._3))
    // sample record: (10, (7, 5))
}).reduceByKey(
    (x, y) => (x._1 + y._1, x._2 + y._2)
    // sample records:
    // x (7,5) favoriate = 10
    // y (6,7) favoriate = 10
    // (10, (7+6, 5+7))
).sortByKey(true).take(50)




Array((0,(232141,11828)), (1,(23800,702)), (2,(13064,187)), (3,(7811,79)), (4,(5321,52)), (5,(3753,28)), (6,(2900,15)), (7,(2370,22)), (8,(1785,10)), (9,(1865,6)), (10,(1423,12)), (11,(1052,9)), (12,(1189,5)), (13,(733,2)), (14,(822,5)), (15,(842,6)), (16,(688,0)), (17,(474,0)), (18,(698,1)), (19,(468,1)), (20,(327,1)), (21,(533,3)), (22,(453,1)), (23,(415,2)), (24,(204,0)), (25,(347,0)), (26,(311,5)), (27,(360,4)), (28,(206,1)), (29,(367,1)), (30,(218,1)), (31,(329,2)), (32,(66,0)), (33,(221,0)), (34,(164,0)), (35,(82,0)), (36,(213,0)), (37,(110,1)), (38,(320,2)), (39,(121,1)), (40,(119,0)), (41,(51,0)), (42,(52,0)), (44,(76,0)), (45,(64,0)), (47,(181,0)), (48,(103,0)), (49,(148,0)), (50,(53,0)), (52,(114,0)))



In [21]:
// check point
top50.map({x => x._1}).sum / 50.0





24.76

In [22]:
// final answer
top50.map({
    x => (x._1, x._2._1 / (x._2._1 + x._2._2).toDouble)
})



Array((0,0.9515184306202837), (1,0.971349277609991), (2,0.9858878575201871), (3,0.9899873257287706), (4,0.990321980271729), (5,0.9925945517058979), (6,0.9948542024013722), (7,0.9908026755852842), (8,0.9944289693593314), (9,0.9967931587386424), (10,0.9916376306620209), (11,0.9915174363807728), (12,0.9958123953098827), (13,0.9972789115646259), (14,0.9939540507859734), (15,0.9929245283018868), (16,1.0), (17,1.0), (18,0.9985693848354793), (19,0.997867803837953), (20,0.9969512195121951), (21,0.9944029850746269), (22,0.9977973568281938), (23,0.9952038369304557), (24,1.0), (25,1.0), (26,0.9841772151898734), (27,0.989010989010989), (28,0.9951690821256038), (29,0.9972826086956522), (30,0.9954337899543378), (31,0.9939577039274925), (32,1.0), (33,1.0), (34,1.0), (35,1.0), (36,1.0), (37,0.990990990990991), (38,0.9937888198757764), (39,0.9918032786885246), (40,1.0), (41,1.0), (42,1.0), (44,1.0), (45,1.0), (47,1.0), (48,1.0), (49,1.0), (50,1.0), (52,1.0))



### Second question: user_answer_percentage_by_reputation

Investigate the correlation between a user's reputation and the kind of posts they make. For the 99 users with the highest reputation, single out posts which
are either questions or answers and look at the percentage of their posts that
are answers (answers / (answers + questions)).

You should also return (-1, fraction) to represent the case where you average
over all users.

Again, you only need to run this on the statistics overflow set.

**Checkpoint**
Total questions: 52,060
Total answers: 55,304
Top 99 users' average reputation: 11893.464646464647

In [23]:
// map-side join
// first get top 99 users by favorate counts
val top99Users = spark.parallelize(
    allUsers.flatMap(User.parse)
.map({
    x => (x.id, x.reputation)
})
.reduceByKey(_+_)
.map({
    x => (x._2, x._1)
})
.sortByKey(false)
.map({
    x => (x._2, x._1)
}).take(99)
)





ParallelCollectionRDD[22] at parallelize at <console>:29

In [24]:
// check data
top99Users.collect()





Array((919,100976), (805,92624), (686,47334), (7290,46907), (930,32283), (4505,27599), (4253,25406), (183,23610), (11032,23102), (28746,22706), (887,20315), (159,20133), (2116,19312), (4856,18866), (22047,17719), (5739,16854), (3277,16131), (88,14768), (2970,14500), (601,14100), (17230,13557), (449,13078), (2392,12491), (1390,12098), (5836,11989), (7555,11865), (603,11830), (7972,11795), (6633,11662), (2958,11083), (9394,10750), (7828,10728), (2817,10552), (7224,10394), (4598,10383), (7071,10045), (1739,9619), (1036,9530), (3382,9294), (8013,9047), (3019,8794), (4376,8629), (251,8221), (28666,8013), (1764,7971), (23853,7765), (32036,7729), (10849,7725), (26338,7608), (1352,7552), (401,7116), (5,6962), (8,6948), (7250,6888), (1909,6814), (21054,6716), (4257,6694), (196,6682), (442,6588), (279,6367), (2669,6352), (8402,6208), (36041,6149), (2126,6145), (44269,6127), (6029,6040), (11981,5970), (1934,5967), (795,5849), (25433,5775), (253,5762), (364,5739), (25,5661), (22311,5500), (334,544

In [26]:
// check point
top99Users
.map({x => x._2})
.mean()

11893.464646464647





In [27]:
// second, join with post
val postq2 = allPosts.flatMap(Post.parse)
.map({
    post => (post.ownerUserId, (
        if (post.postTypeId == 1) 1 else 0, // question
        if (post.postTypeId == 2) 1 else 0 // answer
    ))
})
.reduceByKey(
    (x, y) => (x._1 + y._1, x._2 + y._2)
)
postq2.count()






26891

In [28]:
postq2.take(10)





Array((7942,(1,0)), (24134,(2,0)), (16566,(0,1)), (31328,(1,0)), (30602,(4,0)), (53482,(1,0)), (42218,(2,0)), (41382,(1,0)), (16137,(4,16)), (56892,(1,0)))

In [29]:
// then join
val userPost = top99Users.join(postq2)





MapPartitionsRDD[31] at join at <console>:36

In [30]:
userPost
.map({
    x => (x._1, x._2._2._2 / (x._2._2._2 + x._2._2._1).toDouble)
})
.take(10)
// (userID, (favoriate, (question, answer)))





Array((4598,0.9857142857142858), (9394,0.9700854700854701), (28666,0.9), (88,0.9660493827160493), (4257,0.9757575757575757), (2860,0.8903225806451613), (4862,0.8974358974358975), (17908,1.0), (253,0.3695652173913043), (6633,0.9912280701754386))

### word2vec

[Word2Vec](https://code.google.com/p/word2vec/) is an alternative approach for
vectorizing text data. The vectorized representations of words in the
vocabulary tend to be useful for predicting other words in the document,
hence the famous example "vector('king') - vector('man') + vector('woman')
~= vector('queen')".

Let's see how good a Word2Vec model we can train using the tags of each
StackOverflow post as documents. Use Spark ML's implementation of Word2Vec to
return a list of the top 25 closest synonyms to "ggplot2" and their similarity
score in tuple format ("string", number).

In [31]:
val input = allPosts.flatMap(Post.parse)
.map({ 
x => x.tags.toSeq
})
//.take(100)



MapPartitionsRDD[34] at map at <console>:28



In [32]:
input.take(100)





Array(WrappedArray(bayesian, prior, elicitation), WrappedArray(distributions, normality), WrappedArray(software, open-source), WrappedArray(distributions, statistical-significance), WrappedArray(), WrappedArray(machine-learning), WrappedArray(dataset, sample, population, teaching), WrappedArray(humor), WrappedArray(), WrappedArray(scales, measurement, ordinal, interval, likert), WrappedArray(multivariable, interpolation), WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray(anova, chi-squared, generalized-linear-model), WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray(forecasting, population, census), WrappedArray(bayesian, frequentist), WrappedArray(distributions, pdf, cdf), WrappedArray(), WrappedArray(modeling, time-series, finance, software), WrappedArray(standard-deviation, basic-concepts), WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray(algorithms, hypothesis-testing, random-variable, random-generation), WrappedArra

### import Spark's MLLib

In [35]:
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}





#### Set hyper-parameters for word2vec 

In [36]:
val word2vec = new Word2Vec()
word2vec.setSeed(3145) // random seed
word2vec.setLearningRate(0.01)
word2vec.setMinCount(5)
word2vec.setNumIterations(10)
word2vec.setNumPartitions(1)
word2vec.setVectorSize(100) // number of dimension for each word

org.apache.spark.mllib.feature.Word2Vec@4efebe13





In [37]:
val model = word2vec.fit(input)



org.apache.spark.mllib.feature.Word2VecModel@7c3295ed



In [38]:
val synonyms = model.findSynonyms("ggplot2", 20)





Array((scatterplot,0.8327534794807434), (barplot,0.7566014528274536), (boxplot,0.7452748417854309), (pie-chart,0.6875287294387817), (interactive-visualization,0.6625925898551941), (binning,0.6234050393104553), (dataframe,0.5904657244682312), (qq-plot,0.5145055651664734), (matplotlib,0.5047450065612793), (histogram,0.5023707747459412), (excel,0.4991022050380707), (gam,0.49830037355422974), (churn,0.495770663022995), (control-chart,0.49488967657089233), (gnuplot,0.4840632975101471), (smoothing,0.4832567572593689), (prediction-limit,0.48242422938346863), (sql,0.4821176528930664), (functional-data-analysis,0.47709786891937256), (confounding,0.47603529691696167))