In [46]:
import seaborn as sns
sns.set()

In [47]:
from static_grader import grader

# Spark Miniproject


Stack Overflow is a collaboratively edited question-and-answer site originally focused on programming topics. Because of the variety of features tracked, including a variety of feedback metrics, it allows for some open-ended analysis of user behavior on the site.

Stack Exchange (the parent organization) provides an anonymized [data dump](https://archive.org/details/stackexchange), and we'll use Spark to perform data manipulation, analysis, and machine learning on this data set. As a side note, there's also an online data explorer which allows you to query the data interactively.

*Consider*: Do we need to use Spark to work with this data set? What are our alternatives?

## Workflow


**All questions in this miniproject can be done locally in this notebook (i.e. on your Jupyter pod).**  

You are free to try running on a cloud service, but note that we have no resources to pay for you to try out these services.  (New users often get a limited amount of free credit to try a service.)  Also, the grader library will not be available, so you would have to get your answers into this notebook to submit to the grader.   See the appropriate lecture notebooks for information on how to use cloud services if you want to try them out.

Python example workflow when **not** running in a Jupyter notebook:

1. Edit source code in your `main.py` file, classes in a separate `classes.py` (class definitions need to be written in a separate file and then included at runtime).
1. If you are using a cloud service, in order to make your code more flexible, it's recommended to incorporate command-line arguments that specify the location of the input data and where output should be written.
``` python
# Command line arguments using sysv or argparse in Python
if __name__ == '__main__':
    main(ARGS.input_dir, ARGS.output_dir)
```
1. Run locally using the `spark-submit` program on a chunk using, eg., `$SPARK_HOME/bin/spark-submit --py-files src/classes.py src/main.py data/stats results/stats/`  Note that long jobs using `spark-submit` may not finish before your server gets automatically shut down (our server only checks for running Jupyter notebooks to avoid shutting down).  
1. Run on Amazon Web Services (AWS) once your testing and development are done.  Note that you will also have to load all of the input data on an AWS bucket.  (Similar statements apply if you were to use Google Cloud Platform (GCP) or other services.)  

General tips when using `spark-submit` or working on a cloud service:
* Try `cat output_dir/* | sort -n -t , -k 1.2 -o sorted_output` to concatenate your output files, which will be in `part-xxxxx` format.
* You can alternatively access an interactive PySpark shell on your Jupyter pod with this command: `$SPARK_HOME/bin/pyspark`

## Accessing the data


The data is available on S3 (`s3://dataincubator-course/spark-stack-data`). There are three sub-folders, `allUsers`, `allPosts`, and `allVotes` which contain Gzipped XML.  The `allPosts` sub-folder will contain data with the following format:

``` html
<row Body="&lt;p&gt;I always validate my web pages, and I recommend you do the same BUT many large company websites DO NOT and cannot validate because the importance of the website looking exactly the same on all systems requires rules to be broken. &lt;/p&gt;&#10;&#10;&lt;p&gt;In general, valid websites help your page look good even on odd configurations (like cell phones) so you should always at least try to make it validate.&lt;/p&gt;&#10;" CommentCount="0" CreationDate="2008-10-12T20:26:29.397" Id="195995" LastActivityDate="2008-10-12T20:26:29.397" OwnerDisplayName="Eric Wendelin" OwnerUserId="25066" ParentId="195973" PostTypeId="2" Score="0" />
```

Data from the much smaller `stats.stackexchange.com` website (called "Cross Validated") is available in the same format on S3 (`s3://dataincubator-course/spark-stats-data`). This smaller data set will be used below in most questions to avoid working with the full data set for every question.

The full schema is available as a text file, which can be downloaded with the following command.

In [8]:
!aws s3 cp s3://dataincubator-course/spark-stats-data/stack_exchange_schema.txt .

Completed 4.6 KiB/4.6 KiB (45.9 KiB/s) with 1 file(s) remainingdownload: s3://dataincubator-course/spark-stats-data/stack_exchange_schema.txt to ./stack_exchange_schema.txt


You can either get the data by running the appropriate S3 commands in the terminal, or by running this block for the smaller stats data set:

In [9]:
!mkdir -p spark-stats-data
!aws s3 sync --exclude '*' --include 'all*' s3://dataincubator-course/spark-stats-data/ ./spark-stats-data
!aws s3 sync --exclude '*' --include 'posts*zip' s3://dataincubator-course/spark-stats-data/ ./spark-stats-data

And to get the much larger full data set (be warned, this can take 20 or more minutes, so you may want to run it in the terminal to avoid locking up the notebook):

In [10]:
!mkdir -p spark-stack-data
!aws s3 sync --exclude '*' --include 'all*' s3://dataincubator-course/spark-stack-data/ ./spark-stack-data

## Data input and parsing


Some rows are split across multiple lines; these can be discarded. Incorrectly formatted XML can also be ignored. It is enough to simply skip problematic rows, the loss of data will not significantly impact our results on these large data sets.

**You will need to handle XML parsing yourself.  Our solution uses `lxml.etree` in Python, and we would recommend using this tool yourself to handle the XML parsing.**

The goal should be to have a parsing function that can be applied to the input data to access any desired XML elements. You might find it convenient to represent the post, votes, users, etc. data using [`namedtuples`](https://docs.python.org/3/library/collections.html?highlight=namedtuple#collections.namedtuple).

## Structure

This miniproject is divided into two parts, called `spark_data` and `spark_ml`. The first part is doing data analysis in spark, on both a small data set and a large one. This consists of the first six questions in the notebook. The second part is using Spark ML to do machine learning, and is the last two questions. They are distinguished both by sections in the notebook and the question names.

## Spark data section

## Question 1: Bad XML


This first question is a simple question to test your parsing code. Create an RDD of Post objects where each Post is a valid row of XML from the small "Cross Validated" (stats.stackexchange.com) `allPosts` data set.

We are going to take several shortcuts to speed up and simplify our computations.  First, your parsing function should only attempt to parse rows that start with `<row` as these denote actual data entries. This should be done in Spark as the data is being read in from disk, without any pre-Spark processing. 

Return the total number of XML rows that started with `<row` that were subsequently **rejected** during your XML processing.  Note that the text is Unicode, and contains non-ASCII characters.  You may need to re-encode to UTF-8 (depending on your XML parser).

**Note that this cleaned data set will be used for questions 1-5.**  (For questions 6-8, you want to similarly remove improperly formatted XML from that data before proceeding further.)  

*Question*: Can you figure out what filters you need to put in place to avoid throwing parsing errors entirely?

In [11]:
import pyspark

# create the connection to Spark cluster

sc = pyspark.SparkContext('local[*]')

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [12]:
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

In [13]:
# create an RDD named 'lines' 
# RDD is Resilient distributed dataset, it's immutable distributed collection of objects that can be processed in parallel
# 'spark-stats-data/allPosts/' contains multiple text files, each line from all the files will form individual elements 

lines = sc.textFile('spark-stats-data/allPosts/')

In [14]:
# retrieve the first 5 strings from an RDD and return them in a list

lines.take(5)

['<?xml version="1.0" encoding="UTF-8"?>',
 '<parent>',
 '  <row Body="" CommentCount="0" CreationDate="2013-10-28T10:42:29.940" Id="73933" LastActivityDate="2013-10-28T10:42:29.940" LastEditDate="2013-10-28T10:42:29.940" LastEditorUserId="686" OwnerUserId="686" PostTypeId="5" Score="0" />',
 '  ',
 '  <row Body="See `continuous-data`" CommentCount="0" CreationDate="2013-10-28T10:42:29.940" Id="73934" LastActivityDate="2013-10-28T10:42:29.940" LastEditDate="2013-10-28T10:42:29.940" LastEditorUserId="686" OwnerUserId="686" PostTypeId="4" Score="0" />']

In [15]:
# number of strings in 'lines' RDD
# total count of strings = 212,990

lines.count()

212990

In [16]:
# filter the 'lines' RDD by choosing only the ones that start with '<row'
# strings that start with '<row' = 109,522

filtered = lines.filter(lambda line: line.strip().startswith('<row'))
print(filtered.count())
filtered.take(15)

109522


['  <row Body="" CommentCount="0" CreationDate="2013-10-28T10:42:29.940" Id="73933" LastActivityDate="2013-10-28T10:42:29.940" LastEditDate="2013-10-28T10:42:29.940" LastEditorUserId="686" OwnerUserId="686" PostTypeId="5" Score="0" />',
 '  <row Body="See `continuous-data`" CommentCount="0" CreationDate="2013-10-28T10:42:29.940" Id="73934" LastActivityDate="2013-10-28T10:42:29.940" LastEditDate="2013-10-28T10:42:29.940" LastEditorUserId="686" OwnerUserId="686" PostTypeId="4" Score="0" />',
 '  <row Body="&lt;p&gt;O.K., I think I found a way of doing it with the correct assumptions. Although it is only useful for my particular problem, maybe somebody can tell me if I am being too &quot;sloppy&quot;, correct me or maybe my approach might be useful to somebody in the future.&lt;/p&gt;&#10;&#10;&lt;p&gt;First of all, I had not realized that the &quot;triangles&quot; account for &quot;long  independent events&quot;. This means that $P(X_i = 0 | \\sum_{j\\neq i, |j-i| &amp;lt; D} X_j = 1) = 

In [17]:
import xml.etree.ElementTree as ET
from collections import namedtuple

record_tuple = namedtuple('record_tuple', ['FavoriteCount','Score'])

def parse(line):
    
    """parse every string to check if it's a valid xml using etree"""
    
    try:
        tree = ET.fromstring(line.encode('utf-8'))
        favoriteCount = int(tree.attrib.get('FavoriteCount', 0))
        score = int(tree.attrib.get('Score'))
        return record_tuple(favoriteCount, score)
    except:
        return None

In [18]:
# apply (map) 'parse' function to 'filtered' RDD's
# the output is the 'xmls' RDD, that has both valid and bad xmls 
# bad xmls will return 'None'

xmls = filtered.map(parse)
xmls.count()

109522

In [19]:
type(xmls)

pyspark.rdd.PipelinedRDD

In [20]:
# filter xmls by choosing valid ones (the ones that pass parse function successfully)

valid_xmls = xmls.filter(lambda x: x is not None)
valid_xmls.count()

108741

In [21]:
# second way to do the valid counts in one step

valid = filtered.map(parse) \
       .filter(lambda x: x is not None)
valid.count()

108741

In [22]:
# filter xmls by choosing bad ones (the ones that don't pass parse function successfully)

bad_xmls = xmls.filter(lambda x: x is None)
bad_xmls.count()

781

In [23]:
bad_xml = 781

grader.score('spark_data__bad_xml', bad_xml)

Your score: 1.0000


## Question 2: Favorites and scores

We're interested in looking for useful patterns in the data.  If we look at the Post data again (the smaller set, `stats.stackexchange.com`), we see that many things about each post are recorded.  We're going to start by looking to see if there is a relationship between the number of times a post was favorited (the `FavoriteCount`) and the `Score`.  The score is the number of times the post was upvoted minus the number of times it was downvoted, so it is a measure of how much a post was liked.  We'd expect posts with a higher number of favorites to have better scores, since they're both measurements of how good the post is.

Let's aggregate posts by the number of favorites, and find the average score for each number of favorites.  Do this for the lowest 50 numbers of favorites.

**If any field in the Posts or Users is missing, such as the `FavoriteCount`, you should assume it is zero. _Make this assumption for all questions going forward._**

_Note:_ Before submitting, take a look at the numbers.  Do they follow the trend you expect?

**Checkpoints**

- Total score across all posts: 299469
- Mean of first 50 favorite counts (averaging the keys themselves): 24.76

In [24]:
# test the first 10 tuples returned from 'parse' function

valid_xmls.take(10)

[record_tuple(FavoriteCount=0, Score=0),
 record_tuple(FavoriteCount=0, Score=0),
 record_tuple(FavoriteCount=0, Score=0),
 record_tuple(FavoriteCount=0, Score=1),
 record_tuple(FavoriteCount=0, Score=0),
 record_tuple(FavoriteCount=0, Score=1),
 record_tuple(FavoriteCount=0, Score=1),
 record_tuple(FavoriteCount=0, Score=1),
 record_tuple(FavoriteCount=0, Score=-1),
 record_tuple(FavoriteCount=2, Score=4)]

In [25]:
# double check the number of valid_xmls

valid_xmls.count()

108741

In [26]:
# check the total scores across all valid posts (posts that have valid_xmls)

total_scores = valid_xmls.map(lambda x: x.Score) \
              .reduce(lambda x, y: x + y) 
               
total_scores

299469

In [27]:
# check the number of unique FavoriteCounts, and the randomly output 10 of them

unique_favoritecounts = valid_xmls.map(lambda x: x.FavoriteCount).distinct()
                                      
unique_favoritecounts.takeSample(False, 10)

[19, 60, 41, 20, 58, 69, 61, 8, 83, 138]

In [28]:
# count the sum of scores and the number of scores as a tuple for every distinct FavoriteScount

scores_sum_count = valid_xmls.map(lambda x: (x.FavoriteCount, (x.Score, 1))) \
                             .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

scores_sum_count.take(5)

[(0, (219956, 94003)),
 (11, (1034, 59)),
 (22, (422, 13)),
 (33, (221, 6)),
 (55, (68, 1))]

In [29]:
# find the average of scores for every distinct FavoriteCount by dividing sum of scores by scores count per FavoriteCount

avg_scores = valid_xmls.map(lambda x: (x.FavoriteCount, (x.Score, 1))) \
                  .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
                  .map(lambda x: (x[0], x[1][0] / x[1][1])) 

avg_scores.take(10)

[(0, 2.3398827696988396),
 (11, 17.52542372881356),
 (22, 32.46153846153846),
 (33, 36.833333333333336),
 (55, 68.0),
 (66, 66.0),
 (88, 75.0),
 (275, 222.0),
 (44, 76.0),
 (1, 2.7334613999279624)]

In [30]:
# sort by FavoriteCount (key) the tuples in ascending order

sorted_avg_scores = valid_xmls.map(lambda x: (x.FavoriteCount, (x.Score, 1))) \
                   .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
                   .map(lambda x: (x[0], x[1][0] / x[1][1])) \
                   .sortByKey()

sorted_avg_scores.take(10)

[(0, 2.3398827696988396),
 (1, 2.7334613999279624),
 (2, 4.481914893617021),
 (3, 6.350249584026622),
 (4, 7.656934306569343),
 (5, 8.941888619854721),
 (6, 11.263779527559056),
 (7, 12.916666666666666),
 (8, 13.345864661654135),
 (9, 15.754237288135593)]

In [31]:
favorite_score = sorted_avg_scores.take(50)

In [32]:
# tried to test the mean of average scores, didn't average the key itslef

first_50_avg_scores = avg_scores.take(50)
sum_of_averages = sum(value for _, value in first_50_avg_scores)
mean_of_averages = sum_of_averages / len(first_50_avg_scores)
mean_of_averages

57.31172468957298

In [33]:
#favorite_score = [(0, 2.3398827696988396)]*50

grader.score('spark_data__favorite_score', favorite_score)

Your score: 1.0000


## Question 3: Answer percentage


Investigate the correlation between a user's reputation and the kind of posts they make. For the 99 users with the highest reputation, single out posts which are either questions or answers and look at the percentage of these posts that are answers: *(answers / (answers + questions))*. 

Return a tuple of their **user ID** and this fraction.

You should also return (-1, fraction) to represent the case where you average over all users (so you will return 100 entries total).

Again, you only need to run this on the statistics overflow set.


* Total questions: 52,060
* Total answers: 55,304
* Top 99 users' average reputation: 11893.464646464647

In [34]:
users = sc.textFile('spark-stats-data/allUsers/')
users.take(5)

['<?xml version="1.0" encoding="UTF-8"?>',
 '<parent>',
 '  <row AccountId="5872878" CreationDate="2015-03-02T18:42:20.510" DisplayName="Lars Reeker" DownVotes="0" Id="70185" LastAccessDate="2015-03-02T18:42:20.510" ProfileImageUrl="https://lh3.googleusercontent.com/-Y7GNsydm-mc/AAAAAAAAAAI/AAAAAAAADq8/15o5t99O5IU/photo.jpg" Reputation="1" UpVotes="0" Views="0" />',
 '  ',
 '  <row AccountId="5872995" CreationDate="2015-03-02T19:04:13.380" DisplayName="Vra" DownVotes="0" Id="70186" LastAccessDate="2015-03-06T15:45:57.590" Reputation="6" UpVotes="0" Views="1" />']

In [35]:
users.count()

100425

In [36]:
filtered_users = users.filter(lambda line: line.strip().startswith('<row'))
print(filtered_users.count())
filtered_users.take(5)

50458


['  <row AccountId="5872878" CreationDate="2015-03-02T18:42:20.510" DisplayName="Lars Reeker" DownVotes="0" Id="70185" LastAccessDate="2015-03-02T18:42:20.510" ProfileImageUrl="https://lh3.googleusercontent.com/-Y7GNsydm-mc/AAAAAAAAAAI/AAAAAAAADq8/15o5t99O5IU/photo.jpg" Reputation="1" UpVotes="0" Views="0" />',
 '  <row AccountId="5872995" CreationDate="2015-03-02T19:04:13.380" DisplayName="Vra" DownVotes="0" Id="70186" LastAccessDate="2015-03-06T15:45:57.590" Reputation="6" UpVotes="0" Views="1" />',
 '  <row AboutMe="" AccountId="5873177" CreationDate="2015-03-02T19:40:16.420" DisplayName="Aroona" DownVotes="0" Id="70187" LastAccessDate="2015-03-02T19:40:16.420" ProfileImageUrl="https://www.gravatar.com/avatar/e0e90702da3203e069f0a7d957ee7ea6?s=128&amp;d=identicon&amp;r=PG&amp;f=1" Reputation="1" UpVotes="0" Views="0" WebsiteUrl="" />',
 '  <row AccountId="5873184" CreationDate="2015-03-02T19:46:45.400" DisplayName="Yazeed" DownVotes="0" Id="70188" LastAccessDate="2015-03-02T19:46:45

In [37]:
import xml.etree.ElementTree as ET
from collections import namedtuple

users_tuple = namedtuple('users_tuple', ['Id', 'Reputation'])

def parse_users(line):
    
    """parse every string to check if it's a valid xml using etree"""
    
    try:
        tree = ET.fromstring(line.encode('utf-8'))
        user_id = int(tree.attrib.get('Id'))
        reputation = int(tree.attrib.get('Reputation'))
        return users_tuple(user_id, reputation)
    except:
        return None

In [38]:
all_users = filtered_users.map(parse_users)
all_users.count()

50458

In [39]:
valid_users = all_users.filter(lambda x: x is not None)
print(valid_users.count())
valid_users.take(5)

50320


[users_tuple(Id=70185, Reputation=1),
 users_tuple(Id=70186, Reputation=6),
 users_tuple(Id=70187, Reputation=1),
 users_tuple(Id=70188, Reputation=1),
 users_tuple(Id=70189, Reputation=101)]

In [40]:
# do the sorting by reputation at the end

sorted_valid_users = valid_users.sortBy(lambda x: x.Reputation, ascending=False)
sorted_valid_users.take(5)

[users_tuple(Id=919, Reputation=100976),
 users_tuple(Id=805, Reputation=92624),
 users_tuple(Id=686, Reputation=47334),
 users_tuple(Id=7290, Reputation=46907),
 users_tuple(Id=930, Reputation=32283)]

In [41]:
# checkpoint #3 - average_reputation: 11,893.46464646

top_99_users = sorted_valid_users.take(99)

total_reputation_sum = sum(named_tuple.Reputation for named_tuple in top_99_users)
average_reputation = total_reputation_sum / len(top_99_users)
average_reputation

11893.464646464647

In [42]:
users_df = sorted_valid_users.toDF()

In [43]:
print(f"total users count: {users_df.count()}")
users_df.show()

total users count: 50320
+-----+----------+
|   Id|Reputation|
+-----+----------+
|  919|    100976|
|  805|     92624|
|  686|     47334|
| 7290|     46907|
|  930|     32283|
| 4505|     27599|
| 4253|     25406|
|  183|     23610|
|11032|     23102|
|28746|     22706|
|  887|     20315|
|  159|     20133|
| 2116|     19312|
| 4856|     18866|
|22047|     17719|
| 5739|     16854|
| 3277|     16131|
|   88|     14768|
| 2970|     14500|
|  601|     14100|
+-----+----------+
only showing top 20 rows



In [44]:
top_99_df = users_df.limit(99)
print(top_99_df.count())
top_99_df.show()

99
+-----+----------+
|   Id|Reputation|
+-----+----------+
|  919|    100976|
|  805|     92624|
|  686|     47334|
| 7290|     46907|
|  930|     32283|
| 4505|     27599|
| 4253|     25406|
|  183|     23610|
|11032|     23102|
|28746|     22706|
|  887|     20315|
|  159|     20133|
| 2116|     19312|
| 4856|     18866|
|22047|     17719|
| 5739|     16854|
| 3277|     16131|
|   88|     14768|
| 2970|     14500|
|  601|     14100|
+-----+----------+
only showing top 20 rows



In [45]:
import xml.etree.ElementTree as ET
from collections import namedtuple

posts_tuple = namedtuple('posts_tuple', ['OwnerUserId','PostTypeId'])

def parse_posts(line):
    
    """parse every string to check if it's a valid xml using etree"""
    
    try:
        tree = ET.fromstring(line.encode('utf-8'))      
        user_id = int(tree.attrib.get('OwnerUserId', 0))    # some of the ownerUserId's are missing
        post_type = int(tree.attrib.get('PostTypeId'))
        return posts_tuple(user_id, post_type)
    except:
        return None

In [46]:
# map 'parse_users' function to 'filtered' posts

all_posts = filtered.map(parse_posts)
all_posts.count()

109522

In [47]:
# filter the valid posts from all posts

#valid_posts = all_posts.filter(lambda x: x is not None)
#print(valid_posts.count())
#valid_posts.take(10)

valid_posts = all_posts.filter(lambda x: x is not None)
valid_posts.count()

108741

In [48]:
# checkpoint#1 - number of total questions: 52,060

valid_posts.filter(lambda x: x.PostTypeId == 1).count()

52060

In [49]:
# checkpoint 2 - number of total answers: 55,304

valid_posts.filter(lambda x: x.PostTypeId == 2).count()

55304

In [50]:
# get the posts that are either questions (1) or answers (2)

filtered_valid_posts = valid_posts.filter(lambda x: x[1] in [1,2])

print(filtered_valid_posts.count())
print(filtered_valid_posts.map(lambda x: x[0]).distinct().count())
filtered_valid_posts.take(5)

107364
26888


[posts_tuple(OwnerUserId=28322, PostTypeId=2),
 posts_tuple(OwnerUserId=686, PostTypeId=2),
 posts_tuple(OwnerUserId=9047, PostTypeId=1),
 posts_tuple(OwnerUserId=31989, PostTypeId=1),
 posts_tuple(OwnerUserId=31990, PostTypeId=1)]

In [51]:
posts_df = filtered_valid_posts.toDF()
print(f"total posts count: {posts_df.count()}")
print(f"total unique users count: {posts_df.select('OwnerUserId').distinct().count()}")
posts_df.show()

total posts count: 107364
total unique users count: 26888
+-----------+----------+
|OwnerUserId|PostTypeId|
+-----------+----------+
|      28322|         2|
|        686|         2|
|       9047|         1|
|      31989|         1|
|      31990|         1|
|        686|         2|
|      31992|         1|
|      30834|         1|
|      25433|         2|
|      30395|         2|
|        226|         2|
|      12436|         2|
|      26206|         2|
|      29851|         1|
|          0|         2|
|      30951|         1|
|      32000|         1|
|      17475|         2|
|      19815|         2|
|      30557|         1|
+-----------+----------+
only showing top 20 rows



In [52]:
posts_df_renamed = posts_df.selectExpr("OwnerUserId as Id", 'PostTypeId')
posts_df_renamed.show()

+-----+----------+
|   Id|PostTypeId|
+-----+----------+
|28322|         2|
|  686|         2|
| 9047|         1|
|31989|         1|
|31990|         1|
|  686|         2|
|31992|         1|
|30834|         1|
|25433|         2|
|30395|         2|
|  226|         2|
|12436|         2|
|26206|         2|
|29851|         1|
|    0|         2|
|30951|         1|
|32000|         1|
|17475|         2|
|19815|         2|
|30557|         1|
+-----+----------+
only showing top 20 rows



In [53]:
postsByUser_df = top_99_df.join(posts_df_renamed, 'Id', 'left').select(top_99_df.Id, top_99_df.Reputation, posts_df_renamed.PostTypeId)
postsByUser_df.orderBy("Reputation")

print(postsByUser_df.count())
print(postsByUser_df.select('Id').distinct().count())
postsByUser_df.show()

27247
99
+---+----------+----------+
| Id|Reputation|PostTypeId|
+---+----------+----------+
|442|      6588|         2|
|442|      6588|         2|
|442|      6588|         2|
|442|      6588|         2|
|442|      6588|         1|
|442|      6588|         2|
|442|      6588|         2|
|442|      6588|         2|
|442|      6588|         2|
|442|      6588|         2|
|442|      6588|         2|
|442|      6588|         2|
|442|      6588|         2|
|442|      6588|         2|
|442|      6588|         2|
|442|      6588|         2|
|442|      6588|         2|
|442|      6588|         2|
|442|      6588|         2|
|442|      6588|         2|
+---+----------+----------+
only showing top 20 rows



In [54]:
from pyspark.sql import functions as F

# group by 'Id' and calculate the counts of 1s and 2s
grouped_df = postsByUser_df.groupBy('Id').agg(
    F.sum(F.when(F.col('PostTypeId') == 1, 1).otherwise(0)).alias('count_1'),
    F.sum(F.when(F.col('PostTypeId') == 2, 1).otherwise(0)).alias('count_2')
)

# calculate the ratio of counts of 2 by the sum of counts of 1 and 2
result_df = grouped_df.withColumn('ratio', F.col('count_2') / (F.col('count_1') + F.col('count_2')))

print(result_df.select('Id').distinct().count())
result_df.show()

99
+-----+-------+-------+------------------+
|   Id|count_1|count_2|             ratio|
+-----+-------+-------+------------------+
|  442|     17|    115|0.8712121212121212|
|28666|     18|    162|               0.9|
| 3382|      0|    495|               1.0|
| 7555|      0|    234|               1.0|
|  196|     51|    142|0.7357512953367875|
| 2116|      6|    354|0.9833333333333333|
| 7224|      4|    161|0.9757575757575757|
|  264|     12|    116|           0.90625|
| 1108|      1|     35|0.9722222222222222|
| 9394|      7|    227|0.9700854700854701|
|   25|     13|    143|0.9166666666666666|
| 7828|      7|    461|0.9850427350427351|
|13047|      4|    146|0.9733333333333334|
|  686|     31|   1543|0.9803049555273189|
|25433|      3|    223|0.9867256637168141|
| 1934|      3|     91|0.9680851063829787|
|35989|      8|    148|0.9487179487179487|
|32036|      1|    248|0.9959839357429718|
| 1739|      1|    194|0.9948717948717949|
|  279|      0|    161|               1.0|
+-----+-

In [55]:
rdd = result_df.rdd
b = rdd.map(lambda x: (x[0], x[-1]))


# from instructions (-1, answers/ [answers+questions] )

last_tuple = (-1, 0.51510748481)


top_100 = b.union(sc.parallelize([last_tuple]))

answer_percentage = top_100.take(100)

#### Checkpoints

In [56]:
#answer_percentage = [(7071, 0.9107142857142857)] * 100

grader.score('spark_data__answer_percentage', answer_percentage)

Your score: 1.0000


## Question 4: First question

We'd expect the first **question** a user asks to be indicative of their future behavior.  We'll dig more into that in the next problem, but for now let's see the relationship between reputation and how long it took each person to ask their first question.

For each user that asked a question, find the difference between when their account was created (`CreationDate` for the User) and when they asked their first question (`CreationDate` for their first question).  Return this time difference in days (round down, so 2.7 days counts as 2 days) for the 100 users with the highest reputation, in the form

`(UserId, Days)`

**Checkpoints**
- Users that asked a question: 23134
- Average number of days (round each user's days, then average): 30.1074258

In [57]:
import xml.etree.ElementTree as ET
from collections import namedtuple

q4_tuple = namedtuple('q4_users_tuple', ['Id', 'CreationDate', 'Reputation'])

def q4_parser(line):
    
    """parse every string to check if it's a valid xml using etree and get the necessary tag values"""
    
    try:
        tree = ET.fromstring(line.encode('utf-8'))
        user_id = int(tree.attrib.get('Id'))
        date = tree.attrib.get('CreationDate')
        reputation = int(tree.attrib.get('Reputation'))
        return q4_tuple(user_id, date, reputation)
    except:
        return None

In [58]:
q4_users = filtered_users.map(q4_parser)
q4_users.count()

50458

In [59]:
q4_valid_users = q4_users.filter(lambda x: x is not None)
print(q4_valid_users.count())
q4_valid_users.take(5)

50320


[q4_users_tuple(Id=70185, CreationDate='2015-03-02T18:42:20.510', Reputation=1),
 q4_users_tuple(Id=70186, CreationDate='2015-03-02T19:04:13.380', Reputation=6),
 q4_users_tuple(Id=70187, CreationDate='2015-03-02T19:40:16.420', Reputation=1),
 q4_users_tuple(Id=70188, CreationDate='2015-03-02T19:46:45.400', Reputation=1),
 q4_users_tuple(Id=70189, CreationDate='2015-03-02T19:56:37.233', Reputation=101)]

In [60]:
# convert 'q4_valid_users' RDD to 'valid_users_df' dataframe, that has 50,320 unique users with their reputation

valid_users_df = q4_valid_users.toDF()

print(valid_users_df.select('Id').distinct().count())
valid_users_df.show()

50320
+-----+--------------------+----------+
|   Id|        CreationDate|Reputation|
+-----+--------------------+----------+
|70185|2015-03-02T18:42:...|         1|
|70186|2015-03-02T19:04:...|         6|
|70187|2015-03-02T19:40:...|         1|
|70188|2015-03-02T19:46:...|         1|
|70189|2015-03-02T19:56:...|       101|
|70190|2015-03-02T19:59:...|         1|
|70191|2015-03-02T20:08:...|         1|
|70192|2015-03-02T20:10:...|         1|
|70193|2015-03-02T20:41:...|         1|
|70194|2015-03-02T20:46:...|        11|
|70195|2015-03-02T20:52:...|         1|
|70196|2015-03-02T20:57:...|         6|
|70197|2015-03-02T21:08:...|        18|
|70198|2015-03-02T21:37:...|         1|
|70199|2015-03-02T21:38:...|        51|
|70200|2015-03-02T22:35:...|        11|
|70201|2015-03-02T22:39:...|         1|
|70202|2015-03-02T23:33:...|       101|
|70203|2015-03-02T23:41:...|       116|
|70204|2015-03-02T23:45:...|       103|
+-----+--------------------+----------+
only showing top 20 rows



In [61]:
import xml.etree.ElementTree as ET
from collections import namedtuple

q4_tuple2 = namedtuple('q4_posts_tuple', ['Id', 'PostDate', 'PostTypeId'])

def q4_parser2(line):
    
    """parse every string to check if it's a valid xml using etree and get the necessary tag values"""
    
    try:
        tree = ET.fromstring(line.encode('utf-8'))      
        user_id = int(tree.attrib.get('OwnerUserId', 0))    # some of the ownerUserId's are missing
        date = tree.attrib.get('CreationDate')
        post_type = int(tree.attrib.get('PostTypeId'))
        return q4_tuple2(user_id, date, post_type)
    except:
        return None

In [62]:
# map 'q4_parser2' function to 'filtered' posts

q4_posts = filtered.map(q4_parser2)
q4_posts.count()

109522

In [63]:
# filter the valid posts from q4_posts

q4_valid_posts = q4_posts.filter(lambda x: x is not None)
q4_valid_posts.count()

108741

In [64]:
# filter the posts by PostTypeId to get just the questions
# total number of valid questions: 52,060

valid_questions = q4_valid_posts.filter(lambda x: x.PostTypeId == 1)
valid_questions.count()

52060

In [65]:
valid_questions_df = valid_questions.toDF()
print(valid_questions_df.select('Id').distinct().count())         # Checkpoint 1 - Users that asked a question: 23134
valid_questions_df.show()

23134
+-----+--------------------+----------+
|   Id|            PostDate|PostTypeId|
+-----+--------------------+----------+
| 9047|2013-10-28T11:21:...|         1|
|31989|2013-10-28T11:29:...|         1|
|31990|2013-10-28T11:59:...|         1|
|31992|2013-10-28T12:27:...|         1|
|30834|2013-10-28T12:33:...|         1|
|29851|2013-10-28T14:10:...|         1|
|30951|2013-10-28T14:16:...|         1|
|32000|2013-10-28T14:55:...|         1|
|30557|2013-10-28T15:54:...|         1|
|31699|2013-10-28T15:58:...|         1|
|17812|2013-10-28T17:06:...|         1|
|17812|2013-10-28T17:12:...|         1|
|11944|2013-10-28T17:28:...|         1|
| 4705|2013-10-28T18:16:...|         1|
|32010|2013-10-28T18:36:...|         1|
|30523|2013-10-28T20:22:...|         1|
|26189|2013-10-28T20:23:...|         1|
|21988|2013-10-28T20:33:...|         1|
| 4552|2013-10-28T20:58:...|         1|
|20227|2013-10-28T21:05:...|         1|
+-----+--------------------+----------+
only showing top 20 rows



In [66]:
# join two dataframes by selecting the appropriate columns
# group by 'Id' column and aggregate by choosing the earliest QuestionDate (simply need to choose the min) 

questionsByUser_df = valid_users_df.join(valid_questions_df, 'Id', 'inner') \
                    .select(valid_users_df.Id, valid_users_df.Reputation, valid_users_df.CreationDate, valid_questions_df.PostDate) \
                    .groupBy("Id", "Reputation", "CreationDate").agg(F.min("PostDate").alias("FirstQuestionDate"))

print(questionsByUser_df.select('Id').count())
questionsByUser_df.show()

23095
+-----+----------+--------------------+--------------------+
|   Id|Reputation|        CreationDate|   FirstQuestionDate|
+-----+----------+--------------------+--------------------+
|   26|      3220|2010-07-19T19:09:...|2011-02-18T02:40:...|
|   29|      2570|2010-07-19T19:09:...|2010-11-01T20:31:...|
|  474|        22|2010-07-27T15:08:...|2010-07-27T15:08:...|
| 1950|       430|2010-11-10T17:56:...|2010-11-10T18:13:...|
| 2040|       585|2010-11-17T23:45:...|2010-11-23T20:47:...|
| 3506|       123|2011-03-02T11:56:...|2011-03-02T12:03:...|
| 4823|         6|2011-05-31T20:48:...|2011-05-31T20:57:...|
| 5385|        14|2011-07-13T07:55:...|2011-07-13T07:55:...|
| 5556|       101|2011-07-26T17:44:...|2011-09-23T14:37:...|
| 7225|        13|2011-11-05T08:18:...|2011-11-05T09:10:...|
| 7279|        51|2011-11-08T18:30:...|2011-11-08T18:30:...|
| 8440|       250|2012-01-10T05:07:...|2013-03-19T22:29:...|
| 8484|       116|2012-01-12T07:22:...|2012-01-12T07:29:...|
| 9233|      1109|

In [67]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, asc,desc, ceil
import math
from pyspark.sql.functions import floor, col

# convert date strings using timestamp, then find the date difference in days by rounding it down
# finally sort the table based on 'Reputation' column

updated_df = questionsByUser_df.withColumn("Date1", F.to_timestamp(F.col("CreationDate"))) \
                               .withColumn("Date2", F.to_timestamp(F.col("FirstQuestionDate"))) \
                               .withColumn("DateDiff", floor((F.col("Date2").cast("long") - F.col("Date1").cast("long")) / 86400)) \
                               .sort(desc("Reputation"))

print(questionsByUser_df.select('Id').count())       # double check the number of users in a dataframe
updated_df.show()

23095
+-----+----------+--------------------+--------------------+--------------------+--------------------+--------+
|   Id|Reputation|        CreationDate|   FirstQuestionDate|               Date1|               Date2|DateDiff|
+-----+----------+--------------------+--------------------+--------------------+--------------------+--------+
|  919|    100976|2010-08-13T15:29:...|2010-08-17T13:10:...|2010-08-13 15:29:...|2010-08-17 13:10:...|       3|
|  805|     92624|2010-08-07T08:40:...|2012-06-07T00:14:...|2010-08-07 08:40:...|2012-06-07 00:14:...|     669|
|  686|     47334|2010-08-03T19:42:...|2011-02-10T15:35:...|2010-08-03 19:42:...|2011-02-10 15:35:...|     190|
| 7290|     46907|2011-11-09T04:43:...|2011-11-09T05:51:...|2011-11-09 04:43:...|2011-11-09 05:51:...|       0|
|  930|     32283|2010-08-13T20:50:...|2010-08-18T20:36:...|2010-08-13 20:50:...|2010-08-18 20:36:...|       4|
| 4253|     25406|2011-04-20T12:59:...|2012-04-05T21:54:...|2011-04-20 12:59:...|2012-04-05 21:54:

In [68]:
from pyspark.sql.functions import avg

# checkpoint 2 - Average number of days (round each user's days, then average): 30.1074258

updated_df.select(avg('DateDiff')).show()

+------------------+
|     avg(DateDiff)|
+------------------+
|30.086165836761204|
+------------------+



In [69]:
# convert the spark dataframe back to rdd, 
# select the first and last columns to make a tuple of the first 100 users with the highest reputation

q4_rdd = updated_df.rdd
first_question = q4_rdd.map(lambda x: (x[0], x[6])).take(100)

In [70]:
#first_question = [(805, 669)] * 100

grader.score('spark_data__first_question', first_question)

Your score: 1.0000


## Question 5: Identify veterans


It can be interesting to think about what factors influence a user to remain active on the site over a long period of time. In order not to bias the results towards older users, we'll define a time window between 100 and 150 days after account creation. If the user has made a post in this time, we'll consider them active and well on their way to being veterans of the site; if not, they are inactive and were likely brief users.

*Consider*: What other parameterizations of "activity" could we use, and how would they differ in terms of splitting our user base?

*Consider*: What other biases are still not dealt with, after using the above approach?

Let's see if there are differences between the first ever question posts of "veterans" vs. "brief users". For each group separately, average the score, views, number of answers, and number of favorites of the users' **first question**. Remember, if the score, views, answers, or favorites is missing, you should assume it is zero.

*Consider*: What story could you tell from these numbers? How do the numbers support it?


#### Checkpoints

* Total brief users: 24,864
* Total veteran users: 2,027

In [71]:
# double check the valid_users_df.
# will need to remove 'reputation' column

valid_users_df.show()

+-----+--------------------+----------+
|   Id|        CreationDate|Reputation|
+-----+--------------------+----------+
|70185|2015-03-02T18:42:...|         1|
|70186|2015-03-02T19:04:...|         6|
|70187|2015-03-02T19:40:...|         1|
|70188|2015-03-02T19:46:...|         1|
|70189|2015-03-02T19:56:...|       101|
|70190|2015-03-02T19:59:...|         1|
|70191|2015-03-02T20:08:...|         1|
|70192|2015-03-02T20:10:...|         1|
|70193|2015-03-02T20:41:...|         1|
|70194|2015-03-02T20:46:...|        11|
|70195|2015-03-02T20:52:...|         1|
|70196|2015-03-02T20:57:...|         6|
|70197|2015-03-02T21:08:...|        18|
|70198|2015-03-02T21:37:...|         1|
|70199|2015-03-02T21:38:...|        51|
|70200|2015-03-02T22:35:...|        11|
|70201|2015-03-02T22:39:...|         1|
|70202|2015-03-02T23:33:...|       101|
|70203|2015-03-02T23:41:...|       116|
|70204|2015-03-02T23:45:...|       103|
+-----+--------------------+----------+
only showing top 20 rows



In [72]:
# all users with their account creation dates

q5_users_df = valid_users_df.drop('Reputation')

print(q5_users_df.select('Id').distinct().count())

q5_users_df.show()

50320
+-----+--------------------+
|   Id|        CreationDate|
+-----+--------------------+
|70185|2015-03-02T18:42:...|
|70186|2015-03-02T19:04:...|
|70187|2015-03-02T19:40:...|
|70188|2015-03-02T19:46:...|
|70189|2015-03-02T19:56:...|
|70190|2015-03-02T19:59:...|
|70191|2015-03-02T20:08:...|
|70192|2015-03-02T20:10:...|
|70193|2015-03-02T20:41:...|
|70194|2015-03-02T20:46:...|
|70195|2015-03-02T20:52:...|
|70196|2015-03-02T20:57:...|
|70197|2015-03-02T21:08:...|
|70198|2015-03-02T21:37:...|
|70199|2015-03-02T21:38:...|
|70200|2015-03-02T22:35:...|
|70201|2015-03-02T22:39:...|
|70202|2015-03-02T23:33:...|
|70203|2015-03-02T23:41:...|
|70204|2015-03-02T23:45:...|
+-----+--------------------+
only showing top 20 rows



In [73]:
# all the posts made by users 

q5_posts_df = q4_valid_posts.toDF()

print(q5_posts_df.select('Id').count())

q5_posts_df.show()

108741
+-----+--------------------+----------+
|   Id|            PostDate|PostTypeId|
+-----+--------------------+----------+
|  686|2013-10-28T10:42:...|         5|
|  686|2013-10-28T10:42:...|         4|
|28322|2013-10-28T10:49:...|         2|
|  686|2013-10-28T10:53:...|         2|
| 9047|2013-10-28T11:21:...|         1|
|31989|2013-10-28T11:29:...|         1|
|31990|2013-10-28T11:59:...|         1|
|  686|2013-10-28T12:25:...|         2|
|31992|2013-10-28T12:27:...|         1|
|30834|2013-10-28T12:33:...|         1|
|25433|2013-10-28T12:57:...|         2|
|30395|2013-10-28T13:00:...|         2|
|  226|2013-10-28T13:08:...|         2|
|12436|2013-10-28T13:20:...|         2|
|26206|2013-10-28T13:22:...|         2|
|29851|2013-10-28T14:10:...|         1|
|    0|2013-10-28T14:12:...|         2|
|30951|2013-10-28T14:16:...|         1|
|32000|2013-10-28T14:55:...|         1|
|17475|2013-10-28T15:03:...|         2|
+-----+--------------------+----------+
only showing top 20 rows



In [74]:
# total numbers of users in 2 categories: 26891   (veterans + brief)

q5_df = q5_users_df.join(q5_posts_df, 'Id', 'inner') \
                   .select(q5_users_df.Id, q5_users_df.CreationDate, q5_posts_df.PostDate) 

print(q5_df.select('Id').count())
q5_df.show()

107042
+---+--------------------+--------------------+
| Id|        CreationDate|            PostDate|
+---+--------------------+--------------------+
| 26|2010-07-19T19:09:...|2011-05-19T19:02:...|
| 26|2010-07-19T19:09:...|2011-05-22T04:08:...|
| 26|2010-07-19T19:09:...|2011-05-26T05:12:...|
| 26|2010-07-19T19:09:...|2011-05-26T20:15:...|
| 26|2010-07-19T19:09:...|2011-05-27T04:59:...|
| 26|2010-07-19T19:09:...|2011-05-27T23:07:...|
| 26|2010-07-19T19:09:...|2011-05-29T18:11:...|
| 26|2010-07-19T19:09:...|2011-06-06T15:03:...|
| 26|2010-07-19T19:09:...|2011-06-07T17:04:...|
| 26|2010-07-19T19:09:...|2011-06-08T16:06:...|
| 26|2010-07-19T19:09:...|2011-06-08T22:37:...|
| 26|2010-07-19T19:09:...|2011-06-09T03:39:...|
| 26|2010-07-19T19:09:...|2011-06-10T02:12:...|
| 26|2010-07-19T19:09:...|2011-06-10T16:21:...|
| 26|2010-07-19T19:09:...|2011-06-11T07:13:...|
| 26|2010-07-19T19:09:...|2011-06-11T17:50:...|
| 26|2010-07-19T19:09:...|2011-06-12T17:03:...|
| 26|2010-07-19T19:09:...|2011-06

In [75]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, desc, floor
#import math


# convert date strings using timestamp, then find the date difference in days by rounding it down
# finally sort the table based on 'Reputation' column

updated_q5_df = q5_df.withColumn("AccountCreationDate", F.to_timestamp(F.col("CreationDate"))) \
        .withColumn("PostDate", F.to_timestamp(F.col("PostDate"))) \
        .withColumn("DaysDifference", floor((F.col("PostDate").cast("long") - F.col("AccountCreationDate").cast("long")) / 86400))

print(updated_q5_df.select('Id').count())       # double check the number of users in a dataframe
updated_q5_df.show()

107042
+---+--------------------+--------------------+--------------------+--------------+
| Id|        CreationDate|            PostDate| AccountCreationDate|DaysDifference|
+---+--------------------+--------------------+--------------------+--------------+
| 26|2010-07-19T19:09:...|2011-05-19 19:02:...|2010-07-19 19:09:...|           303|
| 26|2010-07-19T19:09:...|2011-05-22 04:08:...|2010-07-19 19:09:...|           306|
| 26|2010-07-19T19:09:...|2011-05-26 05:12:...|2010-07-19 19:09:...|           310|
| 26|2010-07-19T19:09:...|2011-05-26 20:15:...|2010-07-19 19:09:...|           311|
| 26|2010-07-19T19:09:...|2011-05-27 04:59:...|2010-07-19 19:09:...|           311|
| 26|2010-07-19T19:09:...|2011-05-27 23:07:...|2010-07-19 19:09:...|           312|
| 26|2010-07-19T19:09:...|2011-05-29 18:11:...|2010-07-19 19:09:...|           313|
| 26|2010-07-19T19:09:...|2011-06-06 15:03:...|2010-07-19 19:09:...|           321|
| 26|2010-07-19T19:09:...|2011-06-07 17:04:...|2010-07-19 19:09:...| 

In [76]:
# create a new column 'status' to check if the post date was within 100 and 150 days

status_column = F.when((F.col("DaysDifference") >= 100) & (F.col("DaysDifference") < 150), 1).otherwise(0)

updated_q5_df2 = updated_q5_df.withColumn("Status", status_column) 
                             
updated_q5_df2.show()

+---+--------------------+--------------------+--------------------+--------------+------+
| Id|        CreationDate|            PostDate| AccountCreationDate|DaysDifference|Status|
+---+--------------------+--------------------+--------------------+--------------+------+
| 26|2010-07-19T19:09:...|2011-05-19 19:02:...|2010-07-19 19:09:...|           303|     0|
| 26|2010-07-19T19:09:...|2011-05-22 04:08:...|2010-07-19 19:09:...|           306|     0|
| 26|2010-07-19T19:09:...|2011-05-26 05:12:...|2010-07-19 19:09:...|           310|     0|
| 26|2010-07-19T19:09:...|2011-05-26 20:15:...|2010-07-19 19:09:...|           311|     0|
| 26|2010-07-19T19:09:...|2011-05-27 04:59:...|2010-07-19 19:09:...|           311|     0|
| 26|2010-07-19T19:09:...|2011-05-27 23:07:...|2010-07-19 19:09:...|           312|     0|
| 26|2010-07-19T19:09:...|2011-05-29 18:11:...|2010-07-19 19:09:...|           313|     0|
| 26|2010-07-19T19:09:...|2011-06-06 15:03:...|2010-07-19 19:09:...|           321|     0|

In [77]:
# classify every user as a 'veteran' or 'brief user' based on their post date

grouped_df = updated_q5_df2.groupBy("Id").agg(F.collect_list("Status").alias("StatusList"))
df_with_result = grouped_df.withColumn("Result", F.when(F.array_contains("StatusList", 1) == True, 'veteran').otherwise('brief')).drop("StatusList")

print(df_with_result.select('Id').distinct().count())
df_with_result.show()

26847
+-----+-------+
|   Id| Result|
+-----+-------+
|   26|  brief|
|   29|veteran|
|  474|  brief|
| 1806|  brief|
| 1950|  brief|
| 2040|veteran|
| 3506|  brief|
| 4823|  brief|
| 5385|  brief|
| 5556|  brief|
| 6721|  brief|
| 7225|  brief|
| 7279|  brief|
| 8440|  brief|
| 8484|  brief|
| 9233|  brief|
| 9458|  brief|
| 9968|  brief|
|11434|  brief|
|11567|  brief|
+-----+-------+
only showing top 20 rows



In [78]:
# create a separate sub-dataset with veterans only

veterans = df_with_result.where(df_with_result.Result=='veteran')

print(veterans.select('Id').distinct().count())
veterans.show()

2027
+-----+-------+
|   Id| Result|
+-----+-------+
|   29|veteran|
| 2040|veteran|
|35148|veteran|
|39520|veteran|
|41988|veteran|
|52743|veteran|
| 7546|veteran|
| 8101|veteran|
|15921|veteran|
|23772|veteran|
|26140|veteran|
|27183|veteran|
|27276|veteran|
|53215|veteran|
|53990|veteran|
| 6489|veteran|
|17814|veteran|
|21138|veteran|
|21417|veteran|
|35243|veteran|
+-----+-------+
only showing top 20 rows



In [79]:
questionsByVeterans_df = veterans.join(questionsByUser_df, 'Id', 'inner') \
                                  .select(veterans.Id, veterans.Result, questionsByUser_df.FirstQuestionDate.alias("CreationDate")) 

print(questionsByVeterans_df.select('Id').count())
questionsByVeterans_df.show()

1818
+-----+-------+--------------------+
|   Id| Result|        CreationDate|
+-----+-------+--------------------+
|   29|veteran|2010-11-01T20:31:...|
| 2040|veteran|2010-11-23T20:47:...|
|35148|veteran|2013-11-21T23:25:...|
|41988|veteran|2014-03-15T15:38:...|
|52743|veteran|2014-11-29T20:39:...|
| 7546|veteran|2011-11-22T18:29:...|
| 8101|veteran|2011-06-30T00:19:...|
|15921|veteran|2012-10-14T05:42:...|
|23772|veteran|2013-04-01T14:39:...|
|26140|veteran|2013-05-26T12:02:...|
|27276|veteran|2013-06-25T10:49:...|
|53215|veteran|2014-12-16T21:30:...|
|53990|veteran|2014-08-21T20:24:...|
| 6489|veteran|2011-09-24T15:37:...|
|17814|veteran|2013-04-10T13:57:...|
|21138|veteran|2013-02-22T05:19:...|
|21417|veteran|2013-03-21T10:35:...|
|35243|veteran|2013-11-24T13:11:...|
|41420|veteran|2014-07-08T08:58:...|
|42246|veteran|2014-03-20T11:09:...|
+-----+-------+--------------------+
only showing top 20 rows



In [80]:
briefs = df_with_result.where(df_with_result.Result=='brief')

print(briefs.select('Id').distinct().count())
briefs.show()

24820
+-----+------+
|   Id|Result|
+-----+------+
|   26| brief|
|  474| brief|
| 1806| brief|
| 1950| brief|
| 3506| brief|
| 4823| brief|
| 5385| brief|
| 5556| brief|
| 6721| brief|
| 7225| brief|
| 7279| brief|
| 8440| brief|
| 8484| brief|
| 9233| brief|
| 9458| brief|
| 9968| brief|
|11434| brief|
|11567| brief|
|11625| brief|
|11745| brief|
+-----+------+
only showing top 20 rows



In [81]:
questionsByBriefs_df = briefs.join(questionsByUser_df, 'Id', 'inner') \
                                  .select(briefs.Id, briefs.Result, questionsByUser_df.FirstQuestionDate.alias("CreationDate")) 

print(questionsByBriefs_df.select('Id').count())
questionsByBriefs_df.show()

21277
+-----+------+--------------------+
|   Id|Result|        CreationDate|
+-----+------+--------------------+
|   26| brief|2011-02-18T02:40:...|
|  474| brief|2010-07-27T15:08:...|
| 1950| brief|2010-11-10T18:13:...|
| 3506| brief|2011-03-02T12:03:...|
| 4823| brief|2011-05-31T20:57:...|
| 5385| brief|2011-07-13T07:55:...|
| 5556| brief|2011-09-23T14:37:...|
| 7225| brief|2011-11-05T09:10:...|
| 7279| brief|2011-11-08T18:30:...|
| 8440| brief|2013-03-19T22:29:...|
| 8484| brief|2012-01-12T07:29:...|
| 9233| brief|2012-02-22T12:48:...|
| 9458| brief|2012-02-27T10:49:...|
| 9968| brief|2012-03-19T21:40:...|
|11434| brief|2012-05-20T03:20:...|
|11567| brief|2012-05-26T21:50:...|
|11745| brief|2012-06-06T11:06:...|
|13248| brief|2012-08-13T03:35:...|
|14719| brief|2012-10-07T13:17:...|
|14846| brief|2012-10-11T01:31:...|
+-----+------+--------------------+
only showing top 20 rows



In [82]:
# Total brief users: 24,864
# Total veteran users: 2,027

print(f"brief users count: {df_with_result.select('Result').where(df_with_result.Result=='brief').count()}.")
print(f"veteran users count: {df_with_result.select('Result').where(df_with_result.Result=='veteran').count()}.")

brief users count: 24820.
veteran users count: 2027.


In [83]:
# parser for question 5 to get the features from 'posts' data

q5_tuple = namedtuple('q5_tuple', ['Id', 'Score', 'ViewCount', 'AnswerCount', 'FavoriteCount', 'CreationDate', 'PostTypeId'])

def q5_parser(line):
    
    """parse every string to check if it's a valid xml using etree"""
    
    try:
        tree = ET.fromstring(line.encode('utf-8'))      
        user_id = int(tree.attrib.get('OwnerUserId', 0))    # some of the ownerUserId's are missing
        score = int(tree.attrib.get('Score', 0))
        view = int(tree.attrib.get('ViewCount', 0))
        answer = int(tree.attrib.get('AnswerCount', 0))
        fav_count = int(tree.attrib.get('FavoriteCount', 0))
        date = tree.attrib.get('CreationDate')
        post_type = int(tree.attrib.get('PostTypeId'))
        return q5_tuple(user_id, score, view, answer, fav_count, date, post_type)
    except:
        return None

In [84]:
f = lines.filter(lambda line: line.strip().startswith('<row'))

In [85]:
q5_data = f.map(q5_parser) \
           .filter(lambda x: x is not None) 

q5_data.count()

108741

In [86]:
questions = q5_data.filter(lambda x: x.PostTypeId==1)
questions.count()

52060

In [87]:
q5_data_df = questions.toDF()


print(f"all posts: {q5_data_df.select('Id').count()}.")
print(f"unique posts: {q5_data_df.select('Id').distinct().count()}.")

q5_data_df.show()

all posts: 52060.
unique posts: 23134.
+-----+-----+---------+-----------+-------------+--------------------+----------+
|   Id|Score|ViewCount|AnswerCount|FavoriteCount|        CreationDate|PostTypeId|
+-----+-----+---------+-----------+-------------+--------------------+----------+
| 9047|    0|       88|          1|            0|2013-10-28T11:21:...|         1|
|31989|    1|      867|          3|            0|2013-10-28T11:29:...|         1|
|31990|    1|       39|          1|            0|2013-10-28T11:59:...|         1|
|31992|   -1|       96|          1|            0|2013-10-28T12:27:...|         1|
|30834|    4|      501|          1|            2|2013-10-28T12:33:...|         1|
|29851|    1|      217|          0|            0|2013-10-28T14:10:...|         1|
|30951|    3|       73|          0|            0|2013-10-28T14:16:...|         1|
|32000|    0|       61|          1|            0|2013-10-28T14:55:...|         1|
|30557|    1|      258|          1|            0|2013-10-28

In [88]:
Veterans_data_df = questionsByVeterans_df.join(q5_data_df, ['Id','CreationDate'], 'inner') \
      .select(questionsByVeterans_df.Id, q5_data_df.Score, q5_data_df.ViewCount, q5_data_df.AnswerCount, q5_data_df.FavoriteCount) 
               

print(Veterans_data_df.select('Id').distinct().count())
Veterans_data_df.show()

1818
+-----+-----+---------+-----------+-------------+
|   Id|Score|ViewCount|AnswerCount|FavoriteCount|
+-----+-----+---------+-----------+-------------+
| 1894|    3|     1216|          1|            0|
| 3762|    3|      242|          1|            1|
| 8397|    1|     2743|          1|            0|
| 8926|    3|      979|          2|            2|
| 9577|    5|    20332|          3|            7|
|10457|    5|       81|          2|            1|
|12885|    1|       88|          2|            0|
|17293|    5|      509|          1|            1|
|20011|    3|      607|          2|            0|
|21639|    3|      579|          1|            1|
|29325|    4|     1021|          1|            3|
|41737|    0|       76|          0|            0|
|   29|    8|      440|          1|            0|
| 1679|   12|      906|          1|            4|
| 3794|    7|      145|          1|            1|
| 8724|    7|      784|          1|            3|
| 9311|    4|      268|          2|          

In [89]:
from pyspark.sql.functions import avg

veterans_averages = Veterans_data_df.select([avg('Score'), avg('ViewCount'), avg('AnswerCount'), avg('FavoriteCount')])
veterans_averages.show()

+------------------+-----------------+------------------+------------------+
|        avg(Score)|   avg(ViewCount)|  avg(AnswerCount)|avg(FavoriteCount)|
+------------------+-----------------+------------------+------------------+
|3.5434543454345433|926.3982398239824|1.2981298129812981| 1.300880088008801|
+------------------+-----------------+------------------+------------------+



In [90]:
Briefs_data_df = questionsByBriefs_df.join(q5_data_df, ['Id','CreationDate'], 'inner') \
      .select(questionsByBriefs_df.Id, q5_data_df.Score, q5_data_df.ViewCount, q5_data_df.AnswerCount, q5_data_df.FavoriteCount) 
               

print(Briefs_data_df.select('Id').distinct().count())
Briefs_data_df.show()

21277
+-----+-----+---------+-----------+-------------+
|   Id|Score|ViewCount|AnswerCount|FavoriteCount|
+-----+-----+---------+-----------+-------------+
|   78|    4|      118|          2|            0|
|  862|    6|     1773|          3|            3|
| 1441|    6|       99|          1|            0|
| 2157|    1|      425|          1|            0|
| 2516|    3|      525|          1|            0|
| 3921|    2|     2563|          3|            2|
| 4375|    0|      294|          1|            1|
| 4551|    1|      879|          3|            0|
| 4591|    1|      163|          1|            1|
| 5081|    1|      127|          1|            0|
| 5172|    6|     1067|          2|            2|
| 5513|    4|      434|          0|            0|
| 5727|    4|      356|          1|            0|
| 5754|    9|     1688|          2|            3|
| 5808|    0|      208|          2|            0|
| 7225|    2|      153|          1|            0|
| 7359|    2|      369|          2|         

In [91]:
from pyspark.sql.functions import avg

briefs_averages = Briefs_data_df.select([avg('Score'), avg('ViewCount'), avg('AnswerCount'), avg('FavoriteCount')])
briefs_averages.show()

+------------------+-----------------+------------------+------------------+
|        avg(Score)|   avg(ViewCount)|  avg(AnswerCount)|avg(FavoriteCount)|
+------------------+-----------------+------------------+------------------+
|2.1009023404455305|553.4952533132813|0.9706739355202557|0.5758529937024156|
+------------------+-----------------+------------------+------------------+



In [92]:
identify_veterans = {
    "vet_score": 3.5434543454345433,
    "vet_views": 926.3982398239824,
    "vet_answers": 1.2981298129812981,
    "vet_favorites": 1.300880088008801,
    "brief_score": 2.1009023404455305,
    "brief_views": 553.4952533132813,
    "brief_answers": 0.9706739355202557,
    "brief_favorites": 0.5758529937024156
}

grader.score('spark_data__identify_veterans', identify_veterans)

Your score: 1.0000


## Question 6: Identify veterans&mdash;full


Same as above, but on the full Stack Exchange data set.

#### Checkpoints

* Total brief users: 1,848,628
* Total veteran users: 288,285

In [97]:
import pyspark
from pyspark.sql import SQLContext
import xml.etree.ElementTree as ET
from collections import namedtuple
from pyspark.sql import functions as F
from pyspark.sql.functions import col, desc, floor, avg


# create the connection to Spark cluster
# sc = pyspark.SparkContext('local[*]')
sqlContext = SQLContext(sc)



# read the files from directory
posts = sc.textFile('spark-stack-data/allPosts/')
users = sc.textFile('spark-stack-data/allUsers/')



# filter tags
filtered_posts = posts.filter(lambda line: line.strip().startswith('<row'))
filtered_users = users.filter(lambda line: line.strip().startswith('<row'))



user_tuple = namedtuple('user_tuple', ['Id', 'Account_Creation_Date', 'Reputation'])

# create a f-n to parse users
def parse_user(user): 
    try:
        tree = ET.fromstring(user.encode('utf-8'))
        user_id = int(tree.attrib.get('Id'))
        date = tree.attrib.get('CreationDate')
        reputation = int(tree.attrib.get('Reputation'))
        return user_tuple(user_id, date, reputation)
    except:
        return None

    
    
# map 'parse_user' function to filtered users and choose the users that were parsed
all_users = filtered_users.map(parse_user) 
valid_users = all_users.filter(lambda x: x is not None)



# convert 'valid_users' RDD to spark dataframe 'valid_users_df'
valid_users_df = valid_users.toDF()



# drop the 'Reputation' column to keep all the users with their 'id's and account 'CreationDates'
users_df = valid_users_df.drop('Reputation')



post_tuple = namedtuple('post_tuple', ['Id', 'Creation_Date', 'PostTypeId'])

# create a f-n to parse posts
def parse_post(post):   
    try:
        tree = ET.fromstring(post.encode('utf-8'))      
        user_id = int(tree.attrib.get('OwnerUserId', 0))    # some of the ownerUserId's are missing
        date = tree.attrib.get('CreationDate')
        post_type = int(tree.attrib.get('PostTypeId'))
        return post_tuple(user_id, date, post_type)
    except:
        return None

    
    
# map 'parse_post' function to filtered posts and choose the posts that were parsed
all_posts = filtered_posts.map(parse_post) 
valid_posts = all_posts.filter(lambda x: x is not None)



# convert 'valid_posts' RDD to spark dataframe 'posts_df'
posts_df = valid_posts.toDF()



# total numbers of users in 2 categories: 26891   (veterans + brief)
joined_df = users_df.join(posts_df, 'Id', 'inner') \
                    .select(users_df.Id, users_df.Account_Creation_Date, posts_df.Creation_Date) 



# convert date strings using timestamp, then find the date difference in days by rounding it down
updated_joined_df = joined_df.withColumn("AccountCreationDate", F.to_timestamp(F.col("Account_Creation_Date"))) \
                             .withColumn("CreationDate", F.to_timestamp(F.col("Creation_Date"))) \
                             .withColumn("DaysDifference", floor((F.col("CreationDate").cast("long") - F.col("AccountCreationDate").cast("long")) / 86400))



# create a new column 'status' to check if the post date was within 100 and 150 days
status_column = F.when((F.col("DaysDifference") >= 100) & (F.col("DaysDifference") < 150), 1).otherwise(0)
updated_joined_df2 = updated_joined_df.withColumn("Status", status_column) 



# classify every user as a 'veteran' or 'brief user' based on their post date
updated_joined_df3 = updated_joined_df2.groupBy("Id").agg(F.collect_list("Status").alias("StatusList"))
veterans_briefs_df = updated_joined_df3.withColumn("Result", F.when(F.array_contains("StatusList", 1) == True, 'veteran').otherwise('brief')).drop("StatusList")



# create sub-datasets with 'veterans' and 'brief' users separately
veterans = veterans_briefs_df.where(veterans_briefs_df.Result=='veteran')
briefs = veterans_briefs_df.where(veterans_briefs_df.Result=='brief')



# parser for question 5 to get the features from 'posts' data

post_tuple2 = namedtuple('post_tuple2',['Id','Score','ViewCount','AnswerCount','FavoriteCount','CreationDate','PostTypeId'])

def parse_post2(post):   
    try:
        tree = ET.fromstring(post.encode('utf-8'))      
        user_id = int(tree.attrib.get('OwnerUserId', 0))    # some of the ownerUserId's are missing
        score = int(tree.attrib.get('Score', 0))
        view = int(tree.attrib.get('ViewCount', 0))
        answer = int(tree.attrib.get('AnswerCount', 0))
        fav_count = int(tree.attrib.get('FavoriteCount', 0))
        date = tree.attrib.get('CreationDate')
        post_type = int(tree.attrib.get('PostTypeId'))
        return post_tuple2(user_id, score, view, answer, fav_count, date, post_type)
    except:
        return None

    

all_posts2 = filtered_posts.map(parse_post2) 
valid_posts2 = all_posts2.filter(lambda x: x is not None)    


# select only 'questions' among posts
questions = valid_posts2.filter(lambda x: x.PostTypeId==1)


# convert 'questions' RDD to spark dataframe 'questions_df'
questions_df = questions.toDF()



questionsByUser_df = users_df.join(questions_df, 'Id', 'inner') \
                    .select(users_df.Id, users_df.Account_Creation_Date, questions_df.CreationDate) \
                    .groupBy("Id", "Account_Creation_Date").agg(F.min("CreationDate").alias("CreationDate"))



# join two dataframes to get questions by 'veterans'
questionsByVeterans_df = veterans.join(questionsByUser_df, 'Id', 'inner') \
                                 .select(veterans.Id, veterans.Result, questionsByUser_df.CreationDate.alias("CreationDate")) 



# join two dataframes to get questions by 'brief' users
questionsByBriefs_df = briefs.join(questionsByUser_df, 'Id', 'inner') \
                             .select(briefs.Id, briefs.Result, questionsByUser_df.CreationDate.alias("CreationDate")) 



print(f"brief users count: {veterans_briefs_df.select('Result').where(veterans_briefs_df.Result=='brief').count()}.")
print(f"veteran users count: {veterans_briefs_df.select('Result').where(veterans_briefs_df.Result=='veteran').count()}.")



Veterans_data_df = questionsByVeterans_df.join(questions_df, ['Id','CreationDate'], 'inner') \
                                         .select(questionsByVeterans_df.Id, questions_df.Score, questions_df.ViewCount, questions_df.AnswerCount, questions_df.FavoriteCount) 

Briefs_data_df = questionsByBriefs_df.join(questions_df, ['Id','CreationDate'], 'inner') \
                                     .select(questionsByBriefs_df.Id, questions_df.Score, questions_df.ViewCount, questions_df.AnswerCount, questions_df.FavoriteCount) 



# calculate the averages of the features for both 'veterans' and 'brief' users
veterans_averages = Veterans_data_df.select([avg('Score'), avg('ViewCount'), avg('AnswerCount'), avg('FavoriteCount')])    
briefs_averages = Briefs_data_df.select([avg('Score'), avg('ViewCount'), avg('AnswerCount'), avg('FavoriteCount')])



veterans_averages.show()
briefs_averages.show()

brief users count: 1846117.
veteran users count: 288285.
+------------------+------------------+------------------+------------------+
|        avg(Score)|    avg(ViewCount)|  avg(AnswerCount)|avg(FavoriteCount)|
+------------------+------------------+------------------+------------------+
|2.2598437331442924|1844.0344896669696|1.8426197044183144|0.8673157237744455|
+------------------+------------------+------------------+------------------+

+------------------+------------------+------------------+------------------+
|        avg(Score)|    avg(ViewCount)|  avg(AnswerCount)|avg(FavoriteCount)|
+------------------+------------------+------------------+------------------+
|1.1307456144103445|1096.1519220732553|1.5038565525030159|0.3861764445851408|
+------------------+------------------+------------------+------------------+



In [2]:
identify_veterans_full = {
    "vet_score": 2.2598437331442924,
    "vet_views": 1844.0344896669696,
    "vet_answers": 1.8426197044183144,
    "vet_favorites": 0.8673157237744455,
    "brief_score": 1.1307456144103445,
    "brief_views": 1096.1519220732553,
    "brief_answers": 1.5038565525030159,
    "brief_favorites": 0.3861764445851408
}

grader.score('spark_data__identify_veterans_full', identify_veterans_full)

Your score: 1.0000


This ends the `spark_data` section.

<br><br><br>

## Spark ML questions

The questions from here forward are associated with the `spark_ml` prefix. They are working with Spark's ML library to do some NLP based analysis on the data.

<br><br><br>

## Question 7: Word2vec


Word2Vec is one approach for vectorizing text data. The vectorized representations of words in the vocabulary tend to be useful for predicting other words in the document, hence the famous example "vector('king') - vector('man') + vector('woman') ~= vector('queen')".

Let's see how good a Word2Vec model we can train using the **tags** of each Stack Exchange post as documents (this uses the full data set). Use the implementation of Word2Vec from Spark ML (this will require using DataFrames) to return a list of the top 25 closest synonyms to "ggplot2" and their similarity score in tuple format ("string", number).

The tags appear in the data as one string, you will need to separate them into individual tags. There is no need to further parse them beyond separating them.

#### Parameters


The dimensionality of the vector space should be 100. The random seed should be 42 in `PySpark`.


#### Checkpoints

* Mean of the top 25 cosine similarities: 0.8012362027168274

In [143]:
import pyspark

#sc = pyspark.SparkContext('local[*]')

In [144]:
from pyspark.sql import SQLContext
import xml.etree.ElementTree as ET
from collections import namedtuple
from pyspark.ml.feature import Word2Vec
import re



sqlContext = SQLContext(sc)
posts = sc.textFile('spark-stack-data/allPosts/')


pattern = '<([^>]+)>'                                 # regex pattern to capture the words inside tags
tag_tuple = namedtuple('tag_tuple', ['Tags'])         # pull out only tags from every line of posts 


def parse_tag(post):   
    try:
        tree = ET.fromstring(post.encode('utf-8'))      
        tag = tree.attrib.get('Tags')
        tag = re.findall(pattern, tag)
        return tag_tuple(tag)
    except:
        return None

In [145]:
# filter posts by '<row', apply f-n 'parse_tag' to every post, filter out invalid xmls, convert RDD to dataframe

tags = posts.filter(lambda line: line.strip().startswith('<row'))\
            .map(parse_tag)\
            .filter(lambda x: x is not None)\
            .toDF(['tags'])

In [146]:
tags.show()

+--------------------+
|                tags|
+--------------------+
|[types, xsd, jaxb...|
|[javascript, jque...|
|[android, backgro...|
|[animation, monot...|
|            [jquery]|
|      [xml, ms-word]|
|[asp.net-mvc, whi...|
|[gis, openstreetm...|
|[iphone, cocos2d-...|
|[profiling, gprof...|
|       [php, arrays]|
|      [c#, winforms]|
|[ruby-on-rails, r...|
|[php, mysql, apache]|
|[blackberry, blac...|
| [c, multithreading]|
|[asp.net-mvc-3, n...|
|[soap, protocols,...|
|[c#, asp.net, .ne...|
|[c, oracle, prepr...|
+--------------------+
only showing top 20 rows



In [147]:
w2v = Word2Vec(inputCol='tags', outputCol='vectors', vectorSize=30, minCount=10, seed=42)
model = w2v.fit(tags)
result = model.transform(tags)

In [172]:
model.findSynonyms('ggplot2', 25).show()

+--------------------+------------------+
|                word|        similarity|
+--------------------+------------------+
|             lattice|0.9696043729782104|
|              r-grid|0.9443326592445374|
|           levelplot|0.9430872797966003|
|             boxplot|0.9414194226264954|
|                nlme|0.9279255867004395|
| confidence-interval|0.9261690378189087|
|               anova|0.9255669713020325|
|                  lm|  0.92326819896698|
|      standard-error| 0.922639787197113|
|             plotrix|0.9209862947463989|
|                 zoo|0.9206746220588684|
|            quantile|0.9187219738960266|
|                 rgl|0.9182008504867554|
|                ecdf|0.9143278002738953|
|performanceanalytics|0.9127399921417236|
|                 xts|0.9124394059181213|
|            quantmod| 0.911929190158844|
|             p-value|0.9117739796638489|
|   survival-analysis|0.9113281965255737|
|               loess|0.9109570980072021|
+--------------------+------------

In [173]:
from pyspark.sql.functions import avg

#checkpoint - mean of the top 25 cosine similarities: 0.8012362027168274

print(model.findSynonyms('ggplot2', 25).select(avg('similarity')))

DataFrame[avg(similarity): double]


In [174]:
synonyms_rdd = model.findSynonyms('ggplot2', 25).rdd        # convert "model.findSynonyms('ggplot2', 25)" dataframe to rdd
final_rdd = synonyms_rdd.map(lambda x: (x[0], x[1]))        # get the result in the form of tuples of two columns
word2vec = final_rdd.take(25)                               # store just those first 25 synonyms into a word2vec
word2vec

[('lattice', 0.9696043729782104),
 ('r-grid', 0.9443326592445374),
 ('levelplot', 0.9430872797966003),
 ('boxplot', 0.9414194226264954),
 ('nlme', 0.9279255867004395),
 ('confidence-interval', 0.9261690378189087),
 ('anova', 0.9255669713020325),
 ('lm', 0.92326819896698),
 ('standard-error', 0.922639787197113),
 ('plotrix', 0.9209862947463989),
 ('zoo', 0.9206746220588684),
 ('quantile', 0.9187219738960266),
 ('rgl', 0.9182008504867554),
 ('ecdf', 0.9143278002738953),
 ('performanceanalytics', 0.9127399921417236),
 ('xts', 0.9124394059181213),
 ('quantmod', 0.911929190158844),
 ('p-value', 0.9117739796638489),
 ('survival-analysis', 0.9113281965255737),
 ('loess', 0.9109570980072021),
 ('lme4', 0.910408616065979),
 ('line-plot', 0.9095427989959717),
 ('plotmath', 0.908098042011261),
 ('categorical-data', 0.9057424068450928),
 ('regression', 0.9040448665618896)]

In [175]:
# word2vec = [("data.frame", 0.772650957107544)] * 25

grader.score('spark_ml__word2vec', word2vec)

Your score: 0.9600


## Question 8: Classification


We'd like to see if we can predict the tags of a **question** from its body text. Instead of predicting specific tags, we will instead try to predict if a question contains one of the top ten most common tags.  

To this end, we have separated out a train and a test set from the original data.  The training and tests sets were downloaded with the stats data at the beginning of the notebook.  You can also get them from S3:
  * `s3://dataincubator-course/spark-stats-data/posts_train.zip`
  * `s3://dataincubator-course/spark-stats-data/posts_test.zip`

This will involve two steps: first, find the ten most common tags for questions in the training data set (the tags have been removed from the test set). Then train a learner to predict from the text of the question (the `Body` attribute) if it should have one of those ten tags in it - you will need to process the question text with NLP techniques such as splitting the text into tokens.

Since we can't reliably pickle Spark models, instead return a list of your predictions, sorted by the question's `Id`.  This sorting is very important, as our grader expects the results to be submitted in a particular order. These predictions should be `0` if the question isn't expected to have a tag in the top ten, and `1` if it is.

As an example, if our top tags include `spark` and `python`, and we had the following questions:

```
<row Body="..." Id="1740" Tags="<machine-learning><spark><regression>" ... />
<row Body="..." Id="723" Tags="<statistics><neurons>" ... />
<row Body="..." Id="2740" Tags="<functional><python><spark><pyspark>" ... />
```

We would expect to return `[0, 1, 1]` (for the order `[723, 1740, 2740]`).  You may need to do some format manipulation in your DataFrame to get this.

#### Checkpoints

- Number of training posts with a tag in the top 10: `22525`
- Number without: `19540`

In [1]:
!unzip -u -d spark-stats-data/train spark-stats-data/posts_train.zip
!unzip -u -d spark-stats-data/test spark-stats-data/posts_test.zip

Archive:  spark-stats-data/posts_train.zip
Archive:  spark-stats-data/posts_test.zip


In [2]:
import pyspark

sc = pyspark.SparkContext('local[*]')

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
import xml.etree.ElementTree as ET
from collections import namedtuple
from pyspark.ml.feature import Word2Vec
import re

# read the train data
posts = sc.textFile('spark-stats-data/train/')

pattern = '<([^>]+)>'            # regex pattern to capture the words inside tags

# pull out the necessary information from every post
question_tag_tuple = namedtuple('question_tag_tuple', ['Id', 'Tags', 'Body', 'PostTypeId'])         

def parse_question_tag(post):   
    try:
        tree = ET.fromstring(post.encode('utf-8'))   
        post_id = int(tree.attrib.get('Id', 0))                   # pull the question id, but not the 'OwnerUesrId'
        tag = tree.attrib.get('Tags')
        tag = re.findall(pattern, tag)                            # capture only what's within the tags (<> text <>)
        body = tree.attrib.get('Body')
        body = re.sub(r'<p>|</p>|<i>|</i>|\n', '', body)          # drop the '<p>' tags
        post_type = int(tree.attrib.get('PostTypeId'))
        return question_tag_tuple(post_id, tag, body, post_type)
    except:
        return None

In [23]:
from pyspark.sql import SQLContext

# allows to convert RDD to spark dataframe
sqlContext = SQLContext(sc) 

# parse all posts
# choose only the posts that pass parser
# choose the questions only
# select the following columns

question_tags = posts.filter(lambda line: line.strip().startswith('<row'))
filtered_question_tags = question_tags.map(parse_question_tag)    
mapped_question_tags = filtered_question_tags.filter(lambda x: x is not None)                      
q_tags = mapped_question_tags.filter(lambda x: x.PostTypeId==1)                      
question_tags_df = q_tags.toDF(['Id','Tags', 'Body', 'PostTypeId'])                
 
question_tags_df.show()

+-----+--------------------+--------------------+----------+
|   Id|                Tags|                Body|PostTypeId|
+-----+--------------------+--------------------+----------+
|10893|[confidence-inter...|I'm developing an...|         1|
|10897|       [probability]|I would like to c...|         1|
|10900|[distributions, r...|I have to generat...|         1|
|10905|[r, mixed-model, ...|I have searched f...|         1|
|10907|[regression, basi...|I have data with ...|         1|
|10910|      [spss, scales]|I have a 37-quest...|         1|
|10918|                 [r]|I'm trying to do ...|         1|
|10920|          [variance]|I am looking for ...|         1|
|10926|[r, confidence-in...|As question, I ha...|         1|
|10928|[probability, hyp...|If i test two hyp...|         1|
|10943|[mixed-model, var...|I have a mixed ef...|         1|
|10945|[correlation, fac...|If an exploratory...|         1|
|10947|[r, sas, autocorr...|I am trying to fi...|         1|
|10949|[regression, prob

In [26]:
from pyspark.sql.functions import col, explode

# flatten the list of tags to be able to count individual occurences of every tag
flattened_tags = question_tags_df.withColumn("Tag", explode(col("tags"))).drop("tags")

flattened_tags.show()

+-----+--------------------+----------+-------------------+
|   Id|                Body|PostTypeId|                Tag|
+-----+--------------------+----------+-------------------+
|10893|I'm developing an...|         1|confidence-interval|
|10897|I would like to c...|         1|        probability|
|10900|I have to generat...|         1|      distributions|
|10900|I have to generat...|         1|         randomness|
|10905|I have searched f...|         1|                  r|
|10905|I have searched f...|         1|        mixed-model|
|10905|I have searched f...|         1|   cross-validation|
|10905|I have searched f...|         1|      biostatistics|
|10907|I have data with ...|         1|         regression|
|10907|I have data with ...|         1|     basic-concepts|
|10907|I have data with ...|         1|              error|
|10907|I have data with ...|         1|      least-squares|
|10910|I have a 37-quest...|         1|               spss|
|10910|I have a 37-quest...|         1| 

In [27]:
# checkpoint - number of total training posts

flattened_tags.select('body').distinct().count()

42057

In [28]:
flattened_tags.select('body').count()

116001

In [29]:
# find the top 10 tags based on their counts

unique_word_count = flattened_tags.groupBy("Tag").count()
sorted_word_count = unique_word_count.orderBy('Count', ascending = False)
sorted_word_count.show(10)

+------------------+-----+
|               Tag|count|
+------------------+-----+
|                 r| 7121|
|        regression| 5408|
|       time-series| 2654|
|  machine-learning| 2524|
|       probability| 2055|
|hypothesis-testing| 1926|
|     distributions| 1807|
|        self-study| 1762|
|          logistic| 1627|
|       correlation| 1544|
+------------------+-----+
only showing top 10 rows



In [30]:
from pyspark.sql import functions as F
from pyspark.sql.functions import when

# list of top10 tags to check for in 'tag' column  of 'flattened_tags' dataframe
top10 = ['r', 'regression', 'time-series', 'machine-learning', 'probability', 'hypothesis-testing',\
         'distributions', 'self-study', 'logistic', 'correlation']

# add new 'top10' column based on top10 presence
df_with_top10 = flattened_tags.withColumn("In_top_10", when(col("Tag").isin(top10),'yes').otherwise('no'))


df_with_top10.show()

+-----+--------------------+----------+-------------------+---------+
|   Id|                Body|PostTypeId|                Tag|In_top_10|
+-----+--------------------+----------+-------------------+---------+
|10893|I'm developing an...|         1|confidence-interval|       no|
|10897|I would like to c...|         1|        probability|      yes|
|10900|I have to generat...|         1|      distributions|      yes|
|10900|I have to generat...|         1|         randomness|       no|
|10905|I have searched f...|         1|                  r|      yes|
|10905|I have searched f...|         1|        mixed-model|       no|
|10905|I have searched f...|         1|   cross-validation|       no|
|10905|I have searched f...|         1|      biostatistics|       no|
|10907|I have data with ...|         1|         regression|      yes|
|10907|I have data with ...|         1|     basic-concepts|       no|
|10907|I have data with ...|         1|              error|       no|
|10907|I have data w

In [31]:
grouped_df = df_with_top10.groupBy('Id','Body').agg(F.collect_list("In_top_10").alias("In_top_10_list"))

grouped_df.show()

+-----+--------------------+--------------------+
|   Id|                Body|      In_top_10_list|
+-----+--------------------+--------------------+
|   39|I'm looking for w...|    [no, no, no, no]|
|  269|What is the diffe...|    [no, no, no, no]|
| 1142|I am working with...|   [yes, no, no, no]|
| 4239|I am designing a ...|        [no, no, no]|
| 4354|I was wondering i...|               [yes]|
| 5359|<blockquote>  Dia...|                [no]|
| 5807|Colleagues of min...|      [yes, yes, no]|
| 6111|Here is the probl...|       [yes, no, no]|
| 6582|When deconstructi...|        [no, no, no]|
| 8559|Related to <a hre...|        [no, no, no]|
| 9715|I ran a multinomi...|[yes, yes, no, no...|
|10234|What is the diffe...|           [yes, no]|
|11609|My current unders...|                [no]|
|13038|Could someone exp...|           [yes, no]|
|13193|I have to simulat...|            [no, no]|
|15648|I have a dataset ...|       [yes, no, no]|
|16321|Can we say anythi...|           [yes, no]|


In [32]:
grouped_df.count()

42065

In [33]:
train_df = grouped_df.withColumn("label", F.when(F.array_contains("In_top_10_list", 'yes') == True, 1)\
                     .otherwise(0))\
                     .drop("In_top_10_list")

train_df.show()

+-----+--------------------+-----+
|   Id|                Body|label|
+-----+--------------------+-----+
|   39|I'm looking for w...|    0|
|  269|What is the diffe...|    0|
| 1142|I am working with...|    1|
| 4239|I am designing a ...|    0|
| 4354|I was wondering i...|    1|
| 5359|<blockquote>  Dia...|    0|
| 5807|Colleagues of min...|    1|
| 6111|Here is the probl...|    1|
| 6582|When deconstructi...|    0|
| 8559|Related to <a hre...|    0|
| 9715|I ran a multinomi...|    1|
|10234|What is the diffe...|    1|
|11609|My current unders...|    0|
|13038|Could someone exp...|    1|
|13193|I have to simulat...|    0|
|15648|I have a dataset ...|    1|
|16321|Can we say anythi...|    1|
|16480|I have a dataset ...|    1|
|16834|I'm using Friedma...|    0|
|17029|I wonder if <a hr...|    0|
+-----+--------------------+-----+
only showing top 20 rows



In [34]:
# Checkpoint - Number of training posts with a tag in the top 10: 22525
# Checkpoint - Number without: 19540

print(f"posts with top 10 tag: {train_df.select('body').where(train_df.label==1).count()}.")
print(f"posts without top 10 tag: {train_df.select('body').where(train_df.label==0).count()}.")

posts with top 10 tag: 22525.
posts without top 10 tag: 19540.


In [35]:
train = train_df
train = train.withColumn('int_label', col('label').cast('int')) 
train = train.withColumn('int_Id', col('Id').cast('int'))
train.show()

+-----+--------------------+-----+---------+------+
|   Id|                Body|label|int_label|int_Id|
+-----+--------------------+-----+---------+------+
|   39|I'm looking for w...|    0|        0|    39|
|  269|What is the diffe...|    0|        0|   269|
| 1142|I am working with...|    1|        1|  1142|
| 4239|I am designing a ...|    0|        0|  4239|
| 4354|I was wondering i...|    1|        1|  4354|
| 5359|<blockquote>  Dia...|    0|        0|  5359|
| 5807|Colleagues of min...|    1|        1|  5807|
| 6111|Here is the probl...|    1|        1|  6111|
| 6582|When deconstructi...|    0|        0|  6582|
| 8559|Related to <a hre...|    0|        0|  8559|
| 9715|I ran a multinomi...|    1|        1|  9715|
|10234|What is the diffe...|    1|        1| 10234|
|11609|My current unders...|    0|        0| 11609|
|13038|Could someone exp...|    1|        1| 13038|
|13193|I have to simulat...|    0|        0| 13193|
|15648|I have a dataset ...|    1|        1| 15648|
|16321|Can w

In [36]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, CountVectorizer

In [37]:
tokenizer = Tokenizer(inputCol='Body', outputCol='words')
count_vecrorizer = CountVectorizer(inputCol=tokenizer.getOutputCol(), outputCol='features')
logreg = LogisticRegression(maxIter=24, regParam=1.0).setLabelCol('int_label')

tokens = tokenizer.transform(train)
count_model = count_vecrorizer.fit(tokens)
counts = count_model.transform(tokens)
model = logreg.fit(counts)

In [38]:
from pyspark.sql import SQLContext
import xml.etree.ElementTree as ET
from collections import namedtuple
from pyspark.ml.feature import Word2Vec
import re

# allows to convert RDD to spark dataframe
sqlContext = SQLContext(sc)    

# read the train data
test_data = sc.textFile('spark-stats-data/test/')

# pull out only tags from every line of posts 
question_tag_tuple = namedtuple('question_tag_tuple', ['Id', 'Body', 'PostTypeId'])         

def parse_question_tag(post):   
    try:
        tree = ET.fromstring(post.encode('utf-8'))   
        post_id = int(tree.attrib.get('Id', 0))
        body = tree.attrib.get('Body')
        body = re.sub(r'<p>|</p>|<i>|</i>|\n', '', body)
        post_type = int(tree.attrib.get('PostTypeId'))
        return question_tag_tuple(post_id, body, post_type)
    except:
        return None

In [39]:
test_tags = test_data.filter(lambda line: line.strip().startswith('<row'))\
                     .map(parse_question_tag)\
                     .filter(lambda x: x is not None)\
                     .filter(lambda x: x.PostTypeId==1)\
                     .toDF(['Id', 'Body', 'PostTypeId'])

In [40]:
# drop 'PostTypeId' column to have only 'body' and 'id' 
test_df = test_tags.drop('PostTypeId')
test_df.show()

+-----+--------------------+
|   Id|                Body|
+-----+--------------------+
|10904|What are the pros...|
|11000|I'd like to regre...|
|11018|Let's aim for som...|
|11019|I am doing text c...|
|11079|I need an help be...|
|11087|I was just wonder...|
|11093|Apologies for wha...|
|11109|If you have a var...|
|11118|Given a data fram...|
|11200|Today I opened tw...|
|11219|I have been readi...|
|11232|I am trying to co...|
|11290|I have come acros...|
|11296|I have a table  w...|
|11315|So when I assume ...|
|11381|I've calculated t...|
|11435|Suppose $x_{1}, x...|
|11516|I am doing Cox re...|
|11531|I am looking at t...|
|11568|I have one data s...|
+-----+--------------------+
only showing top 20 rows



In [41]:
# make predictions on test documents
test_tokens = tokenizer.transform(test_df)
test_counts = count_model.transform(test_tokens)

predictions = model.transform(test_counts)
predictions = predictions.sort('Id', ascending=True)
selected = predictions.select('Id','Body', 'prediction')

In [42]:
print(selected.count())
selected.show()

4649
+---+--------------------+----------+
| Id|                Body|prediction|
+---+--------------------+----------+
| 11|Is there a good, ...|       0.0|
| 40|What algorithms a...|       0.0|
| 47|I have a dataset ...|       0.0|
| 93|We're trying to u...|       0.0|
|183|I need to analyze...|       1.0|
|212|I have 2 ASR (Aut...|       0.0|
|216|What are some goo...|       1.0|
|223|I have a friend w...|       0.0|
|278|When a non-hierar...|       0.0|
|290|I know of Cameron...|       0.0|
|312|I'm a physics gra...|       0.0|
|328|I realize that th...|       1.0|
|354|Why do we seek to...|       1.0|
|362|What is the diffe...|       0.0|
|363|If you could go b...|       1.0|
|373|From Wikipedia :<...|       1.0|
|492|I am proposing to...|       0.0|
|498|Sometimes, I just...|       1.0|
|539|In answering <a h...|       0.0|
|624|In engineering, w...|       0.0|
+---+--------------------+----------+
only showing top 20 rows



In [43]:
# check the datatype of every column
selected.printSchema()

root
 |-- Id: long (nullable = true)
 |-- Body: string (nullable = true)
 |-- prediction: double (nullable = false)



In [44]:
# convert dataframe to RDD, and choose the third column only ('prediction') 
result = selected.rdd.map(lambda x: (x[2])).collect()

# make sure the datatype of values in list is in int
classification = [int(x) for x in result]

In [48]:
#classification = [0] * 4649

grader.score('spark_ml__classification', classification)

Your score: 0.8659


## K-means (ungraded)


From your trained Word2Vec model, pass the vectors into a K-means clustering algorithm. Create a plot of the sum of squared error by calculating the square root of the sum of the squared distances for each point and its assigned cluster. For an independent variable use either the number of clusters k or the dimension of the Word2Vec vectorization.

*Copyright &copy; 2023 Pragmatic Institute. This content is licensed solely for personal use. Redistribution or publication of this material is strictly prohibited.*