In [2]:
%matplotlib inline
import matplotlib
import seaborn as sns
sns.set()
matplotlib.rcParams['figure.dpi'] = 144

In [3]:
import gzip, sys, os, re
from pyspark import SparkContext
from datetime import datetime, timedelta

import xml.etree.ElementTree as ET
import numpy as np

%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt

In [4]:
import grader

# Spark Miniproject


StackOverflow is a collaboratively edited question-and-answer site originally focused on programming topics. Because of the variety of features tracked, including a variety of feedback metrics, it allows for some open-ended analysis of user behavior on the site.

StackExchange (the parent organization) provides an anonymized [data dump](https://archive.org/details/stackexchange), and we'll use Spark to perform data manipulation, analysis, and machine learning on this dataset. As a side note, there's also an online data explorer which allows you to query the data interactively.


## Workflow


You may complete this project using the Python or Scala APIs. Most questions can be done locally, however in some cases you may want to use cloud services. See the appropriate lecture notebooks for information on how to use cloud services.

Python example:

1. Edit source code in your main.py file, classes in a separate classes.py (Class definitions need to be written in a separate file and then included at runtime.)
1. Run locally on a chunk using eg. `$SPARK_HOME/bin/spark-submit --py-files src/classes.py src/main.py data/stats results/stats/`
1. Run on GCP once your testing and development are done.

Scala example:

1. Edit source code in Main.scala
1. Run the command `sbt package` from the root directory of the project
1. Use spark-submit locally on a chunk: this means adding a flag like `--master local[2]` to the spark-submit command.
1. Run on GCP once your testing and development are done.

General tips:
* SBT has some nice features, for example continuous build and test, which can greatly speed up your development.
* Try `cat output_dir/* | sort -n -t , -k 1.2 -o sorted_output` to concatenate your output files, which will also be in part-xxxxx format.
* You can access an interactive Spark/Scala REPL with `$SPARK_HOME/bin/spark-shell`.
* You can access an interactive PySpark shell with `$SPARK_HOME/bin/pyspark`.

## Accessing the data


The data is available on S3 (s3://dataincubator-course/spark-stack-data). There are three subfolders: allUsers, allPosts, and allVotes which contain chunked and gzipped xml with the following format:

```
<row Body="&lt;p&gt;I always validate my web pages, and I recommend you do the same BUT many large company websites DO NOT and cannot validate because the importance of the website looking exactly the same on all systems requires rules to be broken. &lt;/p&gt;&#10;&#10;&lt;p&gt;In general, valid websites help your page look good even on odd configurations (like cell phones) so you should always at least try to make it validate.&lt;/p&gt;&#10;" CommentCount="0" CreationDate="2008-10-12T20:26:29.397" Id="195995" LastActivityDate="2008-10-12T20:26:29.397" OwnerDisplayName="Eric Wendelin" OwnerUserId="25066" ParentId="195973" PostTypeId="2" Score="0" />
```

Data from the much smaller stats.stackexchange.com is available in the same format on S3 (s3://dataincubator-course/spark-stats-data). This site, Cross-Validated, will be used below in some instances to avoid working with the full dataset for every question.

The full schema is available as a text file:

In [102]:
#!aws s3 cp s3://dataincubator-course/spark-stats-data/stack_exchange_schema.txt .

download: s3://dataincubator-course/spark-stats-data/stack_exchange_schema.txt to ./stack_exchange_schema.txt


You can either get the data by running the appropriate S3 commands in the terminal, or by running this block for the smaller stats data set:

In [None]:
#!mkdir -p spark-stats-data
#!aws s3 sync --exclude '*' --include 'all*' s3://dataincubator-course/spark-stats-data/ ./spark-stats-data
#!aws s3 sync --exclude '*' --include 'posts*zip' s3://dataincubator-course/spark-stats-data/ ./spark-stats-data

And to get the much larger full data set (be warned, this can take 20 or more minutes, so you may want to run it in the terminal to avoid locking up the notebook):

In [None]:
#!mkdir -p spark-stack-data
#!aws s3 sync --exclude '*' --include 'all*' s3://dataincubator-course/spark-stack-data/ ./spark-stack-data

## Data input and parsing


Some rows are split across multiple lines; these can be discarded. Malformatted XML can also be ignored. It is enough to simply skip problematic rows, the loss of data will not significantly impact our results on this large data sets.

You will need to handle xml parsing yourself using the \ selector in Scala or something like lxml.etree in Python. *Warning*: The built-in xml.etree.ElementTree behaves differently and the results don't correspond perfectly with the Scala equivalent.

To make your code more flexible, it's also recommended to incorporate command-line arguments that specify the location of the input data and where output should be written.

The goal should be to have a parsing function that can be applied to the input data to access any XML element desired. It is suggested to use a class structure so that you can create RDDs of Posts, Votes, Users, etc.

``` scala
// Command line arguments in Scala

object Main {
 def main(args: Array[String]) {
   val inputDir = args(0)
   val outputDir = args(1)
   ...
```

``` python

# Command line arguments using sysv or argparse in Python

if __name__ == '__main__':
    main(ARGS.input_dir, ARGS.output_dir)
```

Dates are parsed by default using the Long data type and unix time (epoch time). In Java/Scala, a given timestamp represents the number of milliseconds since 1970-01-01T00:00:00Z. Also be wary of integer overflow when dealing with Longs. For example, these two are not equal:

`val year: Long = 365 * 24 * 60 * 60 * 1000`

`val year: Long = 365 * 24 * 60 * 60 * 1000L`

## Questions

## bad_xml


In [5]:
from pyspark import SparkContext
sc = SparkContext("local[*]", "temp")
print sc.version

2.2.0


A simple question to test your parsing code. Create an RDD of Post objects where each Post is a valid row of XML from the Cross-Validated (stats.stackexchange.com) allPosts dataset.

We are going to take several shortcuts to speed up and simplify our computations.  First, your parsing function to only attempt to parse rows that start with `  <row` as these denote actual data entries. This should be done in Spark as the data is being read in from disk, without any pre-Spark processing. 

Return the total number XML rows that started with ` <row` that were subsequently **rejected** during your processing.  Note that the text is unicode, and contains non-ascii characters.  You may need to re-encode to utf-8 (depending on your xml parser)

Note that this cleaned dataset will be used for all subsequent questions.

*Question*: Can you figure out what filters you need to put in place to avoid throwing parsing errors entirely?

In [6]:
import os
def localpath(path):
    return 'file://' + str(os.path.abspath(os.path.curdir)) + '/' + path

In [14]:
def Has_row(line):
    return len( re.compile(u'<row ').findall(line.strip()) ) > 0

def Parser(line):
    return len( re.compile(u'<row.*?>').findall(line.strip()) ) > 0


In [15]:
my_input_dir = 'spark-stats-data/allPosts/'
All_lines = sc.textFile(localpath(my_input_dir)).map(Has_row).filter(lambda x: x is True).count()
Fine_lines = sc.textFile(localpath(my_input_dir)).map(Parser).filter(lambda x: x is True).count()
Missed_ones = All_lines - Fine_lines

print("lines with posts:\t {0}".format(Fine_lines) )
print("readable lines:\t\t {0}".format(All_lines) )
print("unreadable posts:\t {0}".format(Missed_ones) )

lines with posts:	 108741
readable lines:		 109522
unreadable posts:	 781


In [84]:
def bad_xml():
    return Missed_ones

grader.score(question_name='spark__bad_xml', func=bad_xml)

Your score:  1


## upvote_percentage


Each post on StackExchange can be upvoted, downvoted, and favorited. One "sanity check" we can do is to look at the ratio of upvotes to downvotes (referred to as "UpMod" and "DownMod" in the schema) as a function of how many times the post has been favorited.

You might hypothesize, for example, that posts with more favorites should have a higher upvote/downvote ratio.

Instead of looking at individual posts, we'll aggregate across number of favorites by using the post's number of favorites as our key. Since we're computing ratios, bundling together all posts with the same number of favorites effectively averages over them.  Calculate the average percentage of upvotes *(upvotes / (upvotes + downvotes))* for the smallest 50 **keys**.

Do the analysis on the smaller Cross-Validated dataset.


#### Checkpoints

* Total upvotes: 313,819
* Total downvotes: 13,019
* Mean of first 50 keys (averaging the keys themselves): 24.76

In [None]:
#u'(^<row(.*?)\/>)'
def Vote(line):
    try:
        root = ET.fromstring(re.compile(u'(<row(.*?)>)').findall(line.strip())[0][0].encode('utf-8').strip())
        PostId = int(root.get('PostId'))
        Up, Down, Fav = 0, 0, 0
        var = int(root.get('VoteTypeId'))
        if var == 2: Up += 1
        elif var == 3: Down += 1
        elif var == 5: Fav += 1
        return (PostId, Up, Down, Fav)
    except:
        return []

In [None]:
my_input_dir = 'spark-stats-data/allVotes/'
sc.textFile(localpath(my_input_dir)).map(Vote).take(5)

In [None]:
my_input_dir = 'spark-stats-data/allVotes/'

top_50 = sc.textFile(localpath(my_input_dir)).map(Vote).filter(lambda x: x!=[])\
        .map(lambda x: (x[0], (x[1], x[2], x[3])) )\
        .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1], x[2]+y[2]) )\
        .map(lambda x: (x[1][2], (x[1][0], x[1][1])) )\
        .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1]) )\
        .map(lambda x: (  x[0], x[1][0]/float(x[1][0]+x[1][1])  )  )\
        .sortByKey(ascending=True).take(50)

In [None]:
top_50[:5]

In [None]:
def upvote_percentage():
    return top_50 #[(20, 0.9952153110047847)] * 50

grader.score(question_name='spark__upvote_percentage', func=upvote_percentage)

## answer_percentage


Investigate the correlation between a user's reputation and the kind of posts they make. For the 99 users with the highest reputation, single out posts which are either questions or answers and look at the percentage of these posts that are answers: *(answers / (answers + questions))*. 

Return a tuple of their **user ID** and this fraction.

You should also return (-1, fraction) to represent the case where you average over all users (so you will return 100 entries total).

Again, you only need to run this on the statistics overflow set.


#### Checkpoints

* Total questions: 52,060
* Total answers: 55,304
* Top 99 users' average reputation: 11893.464646464647

In [250]:
def User_Rep(line):
    try:
        root = ET.fromstring(re.compile(u'(<row(.*?)/>)').findall(line.strip())[0][0].encode('utf-8').strip())
        Id = int(root.get('Id'))
        Reputation = int(root.get('Reputation'))
        return (Id, Reputation)
    except:
        return []
    
def User_Posts(line):
    
    try:
        root = ET.fromstring(re.compile(u'(<row(.*?)/>)').findall(line.strip())[0][0].encode('utf-8').strip())
        UserId = int(root.attrib['OwnerUserId'])
            
        questions, answers = 0, 0
            
        var = int(root.get('PostTypeId')) 
        if var == 1: questions += 1
        elif var == 2: answers += 1
            
        return (UserId, (questions, answers))

    except:
        return []


In [251]:
my_input_Posts = 'spark-stats-data/allPosts/'
my_input_Users = 'spark-stats-data/allUsers/'

posts = sc.textFile(localpath(my_input_Posts)).map(User_Posts).filter(lambda x: x!=[])
users = sc.textFile(localpath(my_input_Users)).map(User_Rep).filter(lambda x: x!=[])

In [125]:
####CHECK POINT: Top 99 users' average reputation: 11893.464646464647 CHECK :)
top99users = users\
    .map(lambda x: (x[1],(1))) \
    .sortByKey(ascending=False) \
    .take(99)
sum([i[0] for i in top99users])/float(99)

In [200]:
####CHECK POINT: Total questions: 52,060
####CHECK POINT: Total answers: 55,304
totans = posts.filter(lambda x: x[1][1]!=0)\
    .map(lambda x: (1,(x[1][1]))).collect()

# I GET TOTAL ANS = 54557 which is != 55,304

totques = posts.filter(lambda x: x[1][0]!=0)\
    .map(lambda x: (1,(x[1][0]))).collect()
    
# I GET TOTAL QUES = 51222 which is != 52,060

In [201]:
print len(totans),len(totques)

55304 52060


In [21]:
posts.take(5)

[(8, (1, 0)), (24, (1, 0)), (18, (1, 0)), (23, (1, 0)), (23, (0, 1))]

In [22]:
users.take(5)

[(-1, 1), (2, 101), (3, 101), (4, 101), (5, 6962)]

In [252]:
users.join(posts).take(4)

[(40963, (11, (1, 0))),
 (24583, (3, (1, 0))),
 (16393, (300, (1, 0))),
 (16393, (300, (1, 0)))]

In [37]:
posts.join(users).take(4)

[(40963, ((1, 0), 11)),
 (24583, ((1, 0), 3)),
 (16393, ((1, 0), 300)),
 (16393, ((1, 0), 300))]

In [253]:
table = users.join(posts)\
        .map(lambda x: (x[0], (x[1][0], x[1][1][0], x[1][1][1]))) \
        .reduceByKey(lambda x,y: (x[0], x[1]+y[1], x[2]+y[2]) ) \
        .filter(lambda x: x[1][1]+x[1][2]!=0 ) \
        .map(lambda x: (x[1][0], (x[0],x[1][2]/float(x[1][1]+x[1][2]))) ) \
        .sortByKey(ascending=False) \
        .take(99)
        
         #(id,(rep,(q,a)))
         #compress id! (id,(rep,q,a))
         #make sure u don't devide by 0
         #(rep, (id,a/(q+a)))

In [208]:
table[:5]

[(69634, (404, 0.7777777777777778)),
 (68914, (99, 0.2647058823529412)),
 (68514, (281, 0.4666666666666667)),
 (68133, (104, 0.2755102040816326)),
 (67799, (443, 0.9166666666666666))]

In [254]:
avg_all_users = posts.map(lambda x: (1, (x[1][0], x[1][1])) )\
        .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1]) )\
        .map(lambda x: (x[1][1]/float(x[1][0]+x[1][1])) )\
        .collect()

In [255]:
q3 = [i[1] for i in table]
q3.append((-1,avg_all_users[0]))

In [256]:
q3[:3]

[(919, 0.996694214876033),
 (805, 0.9959749552772809),
 (686, 0.9803049555273189)]

In [257]:
def answer_percentage():
    return q3 #[(7071, 0.9107142857142857)] * 100

grader.score(question_name='spark__answer_percentage', func=answer_percentage)

Your score:  1.0


## post_counts


If we use the total number of posts made on the site as a metric for tenure, we can look at the differences between "younger" and "older" users. You can imagine there might be many interesting features - for now just return the top 100 post counts among all users (of all types of posts) and the average reputation for every user who has that count.

In other words, aggregate the cases where multiple users have the same post count.


#### Checkpoints

* Mean of top 100 post counts: 281.51

In [232]:
def Posts_number(line):    

    try:
        root = ET.fromstring(re.compile(u'(<row(.*?)>)').findall(line.strip())[0][0].encode('utf-8').strip())
        if 'OwnerUserId' in root.attrib:
            OwnerUserId = int(root.attrib['OwnerUserId'])
        else:
            OwnerUserId = -2
                
        return (OwnerUserId, 1)
        
    except:
        return []
   
    
def User_rep(line):
    
    try:
        root = ET.fromstring(re.compile(u'(<row(.*?)>)').findall(line.strip())[0][0].encode('utf-8').strip())
        Id, Reputation = int(root.attrib['Id']), int(root.attrib['Reputation'])
        return (Id, Reputation)
            
    except:
        return []


In [240]:
my_input_Posts = 'spark-stats-data/allPosts/'
my_input_Users = 'spark-stats-data/allUsers/'

posts_num = sc.textFile(localpath(my_input_Posts))\
        .map(Posts_number).filter(lambda x: x!=[])\
        .reduceByKey(lambda x,y: x+y)
#posts_num = (user id,number of posts for each user)
users = sc.textFile(localpath(my_input_Users)).map(User_Rep).filter(lambda x: x!=[])
# Users = (user id , rep)

In [71]:
users.join(posts_num).take(4)

[(40963, (11, 1)), (24583, (3, 1)), (16393, (300, 1)), (16393, (300, 1))]

In [241]:
posts_num.join(users).take(4)
# (userid, (post number,rep))

[(40963, (1, 11)), (24583, (1, 3)), (16393, (14, 300)), (13, (7, 947))]

In [None]:
table = posts_num.join(users)\
        .map(lambda x: (x[1][0], (x[1][1],1)))\
        .reduceByKey(lambda x,y: (x[0]+y[0],x[1]+y[1]))\
        .map(lambda x : (x[0],(x[1][0]/x[1][1])))\
        .sortByKey(ascending=False)\
        .take(100)


In [249]:
table[:5]

[(2325, 92624), (1663, 47334), (1287, 100976), (1018, 46907), (965, 23102)]

In [248]:
def post_counts():
    return table #[(118, 3736.5)] * 100

grader.score(question_name='spark__post_counts', func=post_counts)

Your score:  1.0


## quick_answers


How long do you have to wait to get your question answered? Look at the set of ACCEPTED answers which are posted less than three hours after question creation. What is the average number of these "quick answers" as a function of the hour of day the question was asked? You should normalize by how many total accepted answers are garnered by questions posted in a given hour, just like we're counting how many quick accepted answers are garnered by questions posted in a given hour, eg. (quick accepted answers when question hour is 15 / total accepted answers when question hour is 15).

Return a list, whose ith element correspond to ith hour (e.g. 0 -> midnight, 1 -> 1:00, etc.)

*Note*: When using Scala's SimpleDateFormat class, it's important to account for your machine's local time zone. Our policy will be to use GMT: hourFormat.setTimeZone(TimeZone.getTimeZone("GMT"))

*Consider*: What biases are present in our result that we don't account for? How should we handle this?


#### Checkpoints

* Total quick accepted answers: 8,468
* Total accepted answers: 17,096

In [405]:
def Posts_Q(line): 
        
    try:
        root = ET.fromstring(re.compile(u'(<row(.*?)>)').findall(line.strip())[0][0].encode('utf-8').strip())
        Id = int(root.attrib['Id'])
        CreationDate = datetime.strptime(root.attrib['CreationDate'], '%Y-%m-%dT%H:%M:%S.%f').replace(microsecond=0)
        PostTypeId = int(root.attrib['PostTypeId'])

        if PostTypeId == 1:
            AcceptedAnswerId = int(root.attrib['AcceptedAnswerId'])
            return (Id, AcceptedAnswerId, CreationDate)

    except:
        return []
    
    
def Posts_A(line): 
        
    try:
        root = ET.fromstring(re.compile(u'(<row(.*?)>)').findall(line.strip())[0][0].encode('utf-8').strip())
        Id = int(root.attrib['Id'])
        CreationDate = datetime.strptime(root.attrib['CreationDate'], '%Y-%m-%dT%H:%M:%S.%f').replace(microsecond=0)
        PostTypeId = int(root.attrib['PostTypeId'])

        if PostTypeId == 2:
            return (Id, CreationDate)

    except:
        return []    


In [330]:
my_input_Posts = 'spark-stats-data/allPosts/'

Question = sc.textFile(localpath(my_input_Posts))\
    .map(Posts_Q)\
    .filter(lambda x: x!=[])\
    .filter(lambda x: x is not None)

#FORMAT QUESTIONS: (1, 15, datetime.datetime(2010, 7, 19, 19, 12, 12), 'Q')
#(Id, AcceptedAnswerId, CreationDate)

Answer = sc.textFile(localpath(my_input_Posts))\
    .map(Posts_A)\
    .filter(lambda x: x!=[])\
    .filter(lambda x: x is not None)
#FORMAT ANSWER (Id, CreationDate)
    
Answered_Questions = Question.map(lambda x: (x[1], x[2]))\
    .join( Answer.map(lambda x: (x[0],x[1])))    
# Answered_Questions = (AcceptedAnswerId, (questions CreationDate,answers CreationDate))
    
Quick_answered = Answered_Questions.filter(lambda x: (x[1][1]-x[1][0]<timedelta(0, 10800)) )
# SAME FORMAT AS ANSWERED_QUESTIONS

Number_of_Answered_Questions = Answered_Questions.map(lambda x: (x[1][0].hour ,1) )\
    .reduceByKey(lambda x, y: x+y)\
    .sortByKey()
Number_of_Quick_answered = Quick_answered.map(lambda x: (x[1][0].hour ,1) )\
    .reduceByKey(lambda x, y: x+y)\
    .sortByKey()  
    
wait_time = Number_of_Quick_answered.map(lambda x: (x[0], x[1]))\
    .join( Number_of_Answered_Questions.map(lambda x: (x[0],x[1])))\
    .map(lambda x: (x[1][0]/float(x[1][1]))).collect()
    


In [331]:
wait_time[:10]

[0.4504672897196262,
 0.44814814814814813,
 0.3605577689243028,
 0.3799126637554585,
 0.4028436018957346,
 0.4125,
 0.4597402597402597,
 0.4673684210526316,
 0.4616822429906542,
 0.49528301886792453]

In [332]:
def quick_answers():
    return wait_time #[0.] * 24

grader.score(question_name='spark__quick_answers', func=quick_answers)

Your score:  1.0


## quick_answers_full


Same as above, but on the full StackExchange dataset.

No pre-parsed data is available for this question.


#### Checkpoints

* Total quick accepted answers: 3,700,224
* Total accepted answers: 5,086,888

In [None]:
#### VERY SAME CODE FROM LAST PART ONY CHANGE THE INPUT!

my_input_Posts = 'spark-stack-data/allPosts/'

Question = sc.textFile(localpath(my_input_Posts))\
    .map(Posts_Q)\
    .filter(lambda x: x!=[])\
    .filter(lambda x: x is not None)

#FORMAT QUESTIONS: (1, 15, datetime.datetime(2010, 7, 19, 19, 12, 12), 'Q')
#(Id, AcceptedAnswerId, CreationDate)

Answer = sc.textFile(localpath(my_input_Posts))\
    .map(Posts_A)\
    .filter(lambda x: x!=[])\
    .filter(lambda x: x is not None)
#FORMAT ANSWER (Id, CreationDate)
    
Answered_Questions = Question.map(lambda x: (x[1], x[2]))\
    .join( Answer.map(lambda x: (x[0],x[1])))    
# Answered_Questions = (AcceptedAnswerId, (questions CreationDate,answers CreationDate))
    
Quick_answered = Answered_Questions.filter(lambda x: (x[1][1]-x[1][0]<timedelta(0, 10800)) )
# SAME FORMAT AS ANSWERED_QUESTIONS

Number_of_Answered_Questions = Answered_Questions.map(lambda x: (x[1][0].hour ,1) )\
    .reduceByKey(lambda x, y: x+y)\
    .sortByKey()
Number_of_Quick_answered = Quick_answered.map(lambda x: (x[1][0].hour ,1) )\
    .reduceByKey(lambda x, y: x+y)\
    .sortByKey()  
    
wait_time = Number_of_Quick_answered.map(lambda x: (x[0], x[1]))\
    .join( Number_of_Answered_Questions.map(lambda x: (x[0],x[1])))\
    .map(lambda x: (x[1][0]/float(x[1][1]))).collect()
    


In [329]:
wait_time[:10]

[0.690509573675678,
 0.6960107966498631,
 0.6996508810120343,
 0.7043822789835892,
 0.7101039989211401,
 0.7178884888038708,
 0.7242146432479951,
 0.7270583958385681,
 0.726148461144255,
 0.722902538582882]

In [324]:
def quick_answers_full():
    return wait_time #[0.] * 24

grader.score(question_name='spark__quick_answers_full', func=quick_answers_full)

Your score:  1.0


## identify_veterans


It can be interesting to think about what factors influence a user to remain active on the site over a long period of time. In order not to bias the results towards older users, we'll define a time window between 100 and 150 days after account creation. If the user has made a post in this time, we'll consider them active and well on their way to being veterans of the site; if not, they are inactive and were likely brief users.

*Consider*: What other parameterizations of "activity" could we use, and how would they differ in terms of splitting our user base?

*Consider*: What other biases are still not dealt with, after using the above approach?

Let's see if there are differences between the first ever question posts of "veterans" vs. "brief users". For each group separately, average the score, views, number of answers, and number of favorites of the users' **first question**.

*Consider*: What story could you tell from these numbers? How do the numbers support it?


#### Checkpoints

* Total brief users: 24,864
* Total veteran users: 2,027

In [86]:
def User_Info(line):
    
        
    try:
        root = ET.fromstring(re.compile(u'(<row(.*?)>)')\
                             .findall(line.strip())[0][0].encode('utf-8').strip())          
        Id = int(root.attrib['Id']) 
        CreationDate = datetime.strptime(root.attrib['CreationDate'], '%Y-%m-%dT%H:%M:%S.%f').replace(microsecond=0)
        return (Id, CreationDate)            

    except:
        return []

    
def Post_Info(line):
    
    try:
        root = ET.fromstring(re.compile(u'(<row(.*?)>)').findall(line.strip())[0][0].encode('utf-8').strip())
        OwnerUserId = int(root.attrib['OwnerUserId'])
        PostTypeId = int(root.attrib['PostTypeId'])
        CreationDate = datetime.strptime(root.attrib['CreationDate'], '%Y-%m-%dT%H:%M:%S.%f').replace(microsecond=0)
        
        if 'Score' in root.attrib: 
            Score = int(root.attrib['Score'])
        else: 
            Score = 0

        if 'ViewCount' in root.attrib: 
            ViewCount = int(root.attrib['ViewCount'])
        else: 
            ViewCount = 0

        if 'AnswerCount' in root.attrib: 
            AnswerCount = int(root.attrib['AnswerCount'])
        else: 
            AnswerCount = 0

        if 'FavoriteCount' in root.attrib: 
            FavoriteCount = int(root.attrib['FavoriteCount'])
        else: 
            FavoriteCount = 0
        '''
        Score = int(root.attrib['Score'])
        ViewCount = int(root.attrib['ViewCount'])
        AnswerCount = int(root.attrib['AnswerCount'])
        FavoriteCount = int(root.attrib['FavoriteCount'])
        '''
        return (OwnerUserId, PostTypeId, CreationDate, Score, ViewCount, AnswerCount, FavoriteCount)            

    except:
        return []


In [45]:
my_input_Users = 'spark-stats-data/allUsers/'

Usr_Info = sc.textFile(localpath(my_input_Users))\
    .map(User_Info)\
    .filter(lambda x: x != [])

my_input_Posts = 'spark-stats-data/allPosts/'

Pst_Info = sc.textFile(localpath(my_input_Posts))\
    .map(Post_Info)\
    .filter(lambda x: x != [])

    
User_first_Q = sc.textFile(localpath(my_input_Posts))\
    .map(Post_Info)\
    .filter(lambda x: x !=[] and x[1]==1)\
    .map(lambda x: (x[0], (x[1],x[2],x[3],x[4],x[5],x[6]) ))\
    .reduceByKey(lambda x, y: x if x[1]<y[1] else y)

#all the info but for just first question: (OwnerUserId, PostTypeId, CreationDate, Score,  
                                            #ViewCount,AnswerCount, FavoriteCount)

active_users = Usr_Info.join(Pst_Info.map(lambda x: (x[0], x[2])))\
    .map(lambda x: (x[0], 1) if timedelta(100, 0)<x[1][1]-x[1][0]<timedelta(150, 0) else (x[0], 0) )\
    .reduceByKey(lambda x, y: x+y)\
    .filter(lambda x: x[1]>0)\
    .map(lambda x: (x[0],1) )
#After join we have (userid,(creation date,post date))
#first filter goves us the users who have posts between 100 to 150 days after they create their id
#reduce by key gives us how many posts a user had between day 100 to 150 after creating id
# the filter keeps those who have posts between day 100-150 which is the defination of active user
# finaly we end up having (active users id ,1)

inactive_users = Usr_Info.join(Pst_Info.map(lambda x: (x[0], x[2]))  )\
    .map(lambda x: (x[0], 1) if timedelta(100, 0)<x[1][1]-x[1][0]<timedelta(150, 0) else (x[0], 0) )\
    .reduceByKey(lambda x, y: x+y)\
    .filter(lambda x: x[1]==0)\
    .map(lambda x: (x[0],1) )
#same as before but if the user doesn't have any posts between day 100-150 after creating his/her
#account then we call them inactive users

active_info = active_users.join(User_first_Q)\
    .map(lambda x: (1,(x[1][1][2],x[1][1][3],x[1][1][4],x[1][1][5],1)))\
    .reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1], x[2]+y[2], x[3]+y[3], x[4]+y[4]) )\
    .map(lambda x: (x[1][0]/float(x[1][4]),x[1][1]/float(x[1][4]),x[1][2]/float(x[1][4]),x[1][3]/float(x[1][4])))\
    .collect()
#gives out : Score, ViewCount, AnswerCount, FavoriteCount,1 (one helps us get the 
                                                        #total number of actives later)
    #finally we get ave of Score, ViewCount, AnswerCount, FavoriteCount for active users

inactive_info = inactive_users.join(User_first_Q)\
    .map(lambda x: (1,(x[1][1][2],x[1][1][3],x[1][1][4],x[1][1][5],1)))\
    .reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1], x[2]+y[2], x[3]+y[3], x[4]+y[4]) )\
    .map(lambda x: (x[1][0]/float(x[1][4]),x[1][1]/float(x[1][4]),x[1][2]/float(x[1][4]),x[1][3]/float(x[1][4])))\
    .collect()

In [46]:
print('active user: \t Score = {0:.2f}\t ViewCount = {1:.2f}\t AnswerCount = {2:.2f}\t FavoriteCount = {3:.2f}'\
      .format(active_info[0][0],active_info[0][1],active_info[0][2],active_info[0][3]) )
print('inactive user: \t Score = {0:.2f}\t ViewCount = {1:.2f}\t AnswerCount = {2:.2f}\t FavoriteCount = {3:.2f}'\
      .format(inactive_info[0][0],inactive_info[0][1],inactive_info[0][2],inactive_info[0][3]) )

active user: 	 Score = 3.54	 ViewCount = 926.40	 AnswerCount = 1.30	 FavoriteCount = 1.30
inactive user: 	 Score = 2.10	 ViewCount = 553.52	 AnswerCount = 0.97	 FavoriteCount = 0.58


In [48]:
def identify_veterans():
    return {"vet_score": active_info[0][0],
            "vet_views": active_info[0][1],
            "vet_answers": active_info[0][2],
            "vet_favorites": active_info[0][3],
            "brief_score": inactive_info[0][0],
            "brief_views": inactive_info[0][1],
            "brief_answers": inactive_info[0][2],
            "brief_favorites": inactive_info[0][3]
           }

grader.score(question_name='spark__identify_veterans', func=identify_veterans)

Your score:  1.0


## identify_veterans_full


Same as above, but on the full StackExchange dataset.

No pre-parsed data is available for this question.


#### Checkpoints

* Total brief users: 1,848,628
* Total veteran users: 288,285

In [None]:
my_input_Users = 'spark-stack-data/allUsers/'

Usr_Info = sc.textFile(localpath(my_input_Users))\
    .map(User_Info)\
    .filter(lambda x: x != [])

my_input_Posts = 'spark-stack-data/allPosts/'

Pst_Info = sc.textFile(localpath(my_input_Posts))\
    .map(Post_Info)\
    .filter(lambda x: x != [])

    
User_first_Q = sc.textFile(localpath(my_input_Posts))\
    .map(Post_Info)\
    .filter(lambda x: x !=[] and x[1]==1)\
    .map(lambda x: (x[0], (x[1],x[2],x[3],x[4],x[5],x[6]) ))\
    .reduceByKey(lambda x, y: x if x[1]<y[1] else y)

#all the info but for just first question: (OwnerUserId, PostTypeId, CreationDate, Score,  
                                            #ViewCount,AnswerCount, FavoriteCount)

active_users = Usr_Info.join(Pst_Info.map(lambda x: (x[0], x[2])))\
    .map(lambda x: (x[0], 1) if timedelta(100, 0)<x[1][1]-x[1][0]<timedelta(150, 0) else (x[0], 0) )\
    .reduceByKey(lambda x, y: x+y)\
    .filter(lambda x: x[1]>0)\
    .map(lambda x: (x[0],1) )
#After join we have (userid,(creation date,post date))
#first filter goves us the users who have posts between 100 to 150 days after they create their id
#reduce by key gives us how many posts a user had between day 100 to 150 after creating id
# the filter keeps those who have posts between day 100-150 which is the defination of active user
# finaly we end up having (active users id ,1)

inactive_users = Usr_Info.join(Pst_Info.map(lambda x: (x[0], x[2]))  )\
    .map(lambda x: (x[0], 1) if timedelta(100, 0)<x[1][1]-x[1][0]<timedelta(150, 0) else (x[0], 0) )\
    .reduceByKey(lambda x, y: x+y)\
    .filter(lambda x: x[1]==0)\
    .map(lambda x: (x[0],1) )
#same as before but if the user doesn't have any posts between day 100-150 after creating his/her
#account then we call them inactive users

active_info = active_users.join(User_first_Q)\
    .map(lambda x: (1,(x[1][1][2],x[1][1][3],x[1][1][4],x[1][1][5],1)))\
    .reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1], x[2]+y[2], x[3]+y[3], x[4]+y[4]) )\
    .map(lambda x: (x[1][0]/float(x[1][4]),x[1][1]/float(x[1][4]),x[1][2]/float(x[1][4]),x[1][3]/float(x[1][4])))\
    .collect()
#gives out : Score, ViewCount, AnswerCount, FavoriteCount,1 (one helps us get the 
                                                        #total number of actives later)
    #finally we get ave of Score, ViewCount, AnswerCount, FavoriteCount for active users

inactive_info = inactive_users.join(User_first_Q)\
    .map(lambda x: (1,(x[1][1][2],x[1][1][3],x[1][1][4],x[1][1][5],1)))\
    .reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1], x[2]+y[2], x[3]+y[3], x[4]+y[4]) )\
    .map(lambda x: (x[1][0]/float(x[1][4]),x[1][1]/float(x[1][4]),x[1][2]/float(x[1][4]),x[1][3]/float(x[1][4])))\
    .collect()

In [None]:
def identify_veterans_full():
    return {"vet_score": active_info[0][0],
            "vet_views": active_info[0][1],
            "vet_answers": active_info[0][2],
            "vet_favorites": active_info[0][3],
            "brief_score": inactive_info[0][0],
            "brief_views": inactive_info[0][1],
            "brief_answers": inactive_info[0][2],
            "brief_favorites": inactive_info[0][3]
           }

grader.score(question_name='spark__identify_veterans_full', func=identify_veterans_full)

Your score:  1.0


## word2vec


Word2Vec is an alternative approach for vectorizing text data. The vectorized representations of words in the vocabulary tend to be useful for predicting other words in the document, hence the famous example "vector('king') - vector('man') + vector('woman') ~= vector('queen')".

Let's see how good a Word2Vec model we can train using the tags of each StackExchange post as documents (this uses the full dataset). Use Spark ML's implementation of Word2Vec (this will require using DataFrames) to return a list of the top 25 closest synonyms to "ggplot2" and their similarity score in tuple format ("string", number).


#### Parameters


The dimensionality of the vector space should be 100. The random seed should be 42L.


#### Checkpoints

* Mean of the top 25 cosine similarities: 0.7785175901170094

In [None]:
# needed to convert RDDs into DataFrames
#sqlContext = SQLContext(sc)
from pyspark.ml.feature import Word2Vec
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

In [170]:
def Posts_tag(line):

    try:
        root = ET.fromstring(re.compile(u'(<row(.*?)>)')\
            .findall(line.strip())[0][0].encode('utf-8').strip()) 
        Tags = re.compile('\w+').findall( root.attrib['Tags'] )
        return (Tags)
    except:
        return []
 ### Apprerantly r'\w[\w-]+' does not work

In [171]:
my_input_dir = 'spark-stack-data/allPosts/'

my_tags = sc.textFile(localpath(my_input_dir))\
    .map(Posts_tag)\
    .filter(lambda x: x!=[])\
    .map(lambda x: (x,1))\
    .toDF(['tags', 'var'])

In [172]:
my_tags.take(10)

[Row(tags=[u'java', u'testing', u'selenium', u'selenium', u'webdriver', u'ui', u'automation'], var=1),
 Row(tags=[u'angularjs', u'angularjs', u'scope', u'angular', u'http', u'interceptors'], var=1),
 Row(tags=[u'embedded', u'windows', u'ce', u'bootable'], var=1),
 Row(tags=[u'iframe', u'primefaces'], var=1),
 Row(tags=[u'three', u'js'], var=1),
 Row(tags=[u'android', u'android', u'layout', u'android', u'linearlayout', u'android', u'textview'], var=1),
 Row(tags=[u'asp', u'net', u'memory', u'leaks', u'net', u'4', u'0', u'net', u'4', u'5', u'stateserver'], var=1),
 Row(tags=[u'php', u'arrays', u'associative', u'array', u'shuffle'], var=1),
 Row(tags=[u'python', u'django', u'django', u'crispy', u'forms'], var=1),
 Row(tags=[u'php', u'excel', u'import'], var=1)]

In [None]:
w2v = Word2Vec(vectorSize=100, seed=42L, inputCol='tags', outputCol='vectors', minCount=1)
my_model = w2v.fit(my_tags)

vectors = my_model.getVectors().rdd.map(lambda x: (x.word, x.vector))

In [81]:
#vectors.collect()[:3]

In [None]:
n_synonyms = 25
my_synonyms = my_model.findSynonyms("ggplot2", n_synonyms).select("word", "similarity").take(n_synonyms)
similars = []
for i in range(n_synonyms):
    similars.append( (my_synonyms[i][0], my_synonyms[i][1]) )    
#similars

In [167]:
sc.close()

AttributeError: 'SparkContext' object has no attribute 'close'

In [None]:
sum([i[1] for i in similars])/25  
# just by using non Hyphenated words I got 0.79
# By using both Hyphenated and non hyphenated words I got 0.72

In [None]:
def word2vec():
    return similars#[("data.frame", 0.7900882217638416)] * 25

grader.score(question_name='spark__word2vec', func=word2vec)

*Copyright &copy; 2016 The Data Incubator.  All rights reserved.*