<img style="float: left" src="images/spark.png">
<img style="float: right" src="images/surfsara.png">
<hr style="clear: both">

## 03 - The 20 newsgroups data

Below are number of exercises in Python and Pyspark. Press Shift-Enter to execute the code. You can use code completion by using tab.

In this notebook we will introduce the dataset we will use today and apply some Spark transformations. Finally, we will store preprocessed data that can be used in later notebooks.

 1. The data
 2. 'Munging' the data and counting words (again...)
 3. Dataframes to the rescue
 4. Storing preprocessed data for later use

During the exercises you may want to refer to [The PySpark documentation](https://spark.apache.org/docs/latest/api/python/pyspark.html) for more information on possible transformations and actions.

First initialize Spark:

In [1]:
# initialize Spark
from pyspark import SparkContext, SparkConf
if not 'sc' in globals():
    conf = SparkConf().setMaster('local[*]')
    sc = SparkContext(conf=conf)

### The data: 20 newsgroups

For the exercises today we will use the 20 newsgroups data. This datasets contains around 20000 newsgroup postings from 20 different newsgroups. A description of the original dataset can be found [here](http://kdd.ics.uci.edu/databases/20newsgroups/20newsgroups.data.html). 

We have already converted the raw dataset into a more manageable format: each newsgroup posting has been converted to a JSON object and these objects have been stored together in one large gzipped file (only some common headers were preserved in this process). This file is available in your notebook environment. And can be easily loaded using the SparkContext: sc. Load the data as [text](https://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=textFile#pyspark.SparkContext) file and print the first element of the RDD (use [first](https://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=map#pyspark.RDD.first)). Note that you do not need to worry about decompressing the file first and can read straight from the gzipped file.

In [2]:
# TODO: Replace <FILL IN> with appropriate code
jsonPostsRDD = sc.textFile("file://///home/jovyan/work/data/20newsgroups.labelled.json.gz")

# Print the first element of the RDD
print jsonPostsRDD.first()

{"date":"Mon, 29 Mar 1993 11:57:19 GMT","summary":"Books, addresses, music -- anything related to atheism","in-reply-to":"","followup-to":"alt.atheism","xref":"cantaloupe.srv.cs.cmu.edu alt.atheism:49960 alt.atheism.moderated:713 news.answers:7054 alt.answers:126","newsgroups":"alt.atheism,alt.atheism.moderated,news.answers,alt.answers","references":"","keywords":"FAQ, atheism, books, music, fiction, addresses, contacts","nntp-posting-host":"","subject":"Alt.Atheism FAQ: Atheist Resources","article-id":"","originator":"","label":"alt.atheism","distribution":"world","x-newsreader":"","content":"\nArchive-name: atheism/resources\nAlt-atheism-archive-name: resources\nLast-modified: 11 December 1992\nVersion: 1.0\n\n                              Atheist Resources\n\n                      Addresses of Atheist Organizations\n\n                                     USA\n\nFREEDOM FROM RELIGION FOUNDATION\n\nDarwin fish bumper stickers and assorted other atheist paraphernalia are\navailable fro

If all went well you should have seen that the first element is a JSON document. In the next few exercises we will use RDDs and transformations to first convert the json data to a more manageable format. Then we will select one newsgroup, count the words and find the top 20 senders.

### Data conversion

Next, we are going to convert the elements into JSON format. This will return a dictionary where each key is a property of the JSON doc. The newsgroups posts are in a 'flat' JSON object, so there are no embedded JSON object. 

In the next cell the conversion takes place and the first post is shown. 

Just execute the cell, there's noting to fill in.

In [None]:
import json
import re

parsedJsonPostsRDD = jsonPostsRDD.map(lambda x: json.loads(x))
parsedPosts = parsedJsonPostsRDD.take(1)
print json.dumps(parsedPosts, indent=2)

### Access to fields in the posts

We have made some selections for you to show how to access fields in the posts.

This is pure Python (using dictionaries), although the data is contained in an RDD. You probably see what's going on here.

In [None]:
postSelectionRDD = parsedJsonPostsRDD.map(lambda d: [d["label"], d["sender"], d["date"], d["subject"]])
print postSelectionRDD.take(5)

When using Spark it is important to keep track of what code is executed on workers, and what code on the driver. To move data to and from the driver to the workers is very expensive Given a large enough datasets - it might not even be possible to move the contents of an RDD to the driver.

RDDs are distributed over workers and transformations define a sequence of RDDs. Never try to define an RDD inside an RDD and beware of what code is executed by the driver.

Let's make a quick list of all properties in a post. We'll do it the wrong way first, by doing a map on the RDD.

In [None]:
print parsedJsonPostsRDD.map(lambda x: x.keys()).take(1) 

The previous code is very inefficient, since all elements in the RDD are processed, and we end up with an RDD with all keys for all posts. It would be better to take a single post and then outside an RDD compute the keys. Note that then the computation of the keys is done by the driver.

Try to do this in a single statement.

In [None]:
# TODO: Replace <FILL IN> with appropriate code
print parsedJsonPostsRDD<FILL IN>

### Selecting properties, filtering on those properties, counting words and senders/posters

In the next few exercises we will make a selection of a few properties, filter out a specific newsgroup and count the words and top 10 posters (senders).

Start by creating an RDD with the following headers or properties : {label, from, subject, content}. Print the first element of the resulting RDD to verify your code. 

In [None]:
# TODO: Replace <FILL IN> with appropriate code
postsRDD = parsedJsonPostsRDD.map(<FILL IN>)
print postsRDD.first()

### Filtering an RDD

Besides map and reduce Spark has many convenient transformations on RDDs available out-of-the-box. In the next exercise use [filter](https://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=map#pyspark.RDD.filter) to create an RDD that only consists of postings to the 'sci.space' newsgroup (do onsider the datatype - an array per post -  in the RDD you created in the previous exercise). Print the number of postings in the newsgroup using [count](https://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=map#pyspark.RDD.count) and print the first post in the RDD. 

In [None]:
# TODO: Replace <FILL IN> with appropriate code
scispacePostsRDD = postsRDD.filter(<FILL IN>)
print scispacePostsRDD.count()
print scispacePostsRDD.first()

### Counting words

To do a wordcount on the newsgroup we first need to do a bit of cleaning up.

Use a series of map operations to transform the content part of the RDD from the previous exercise. Instead of the content as a long string we want to end up with the content part as a list of tokens or words. Before creating that list encode the data as utf-8 and replace all non-words with single space. 

In [None]:
# TODO: Replace <FILL IN> with appropriate code
tokenizedPostsRDD = (scispacePostsRDD<FILL IN>)
print tokenizedPostsRDD.first()

Now we will count the words in the newsgroup by again applying several transformations on the tokenizedPostsRDD:

1. use a flatMap to create an RDD that consists of only the words from the posts
2. use a filter to remove all words with a length shorter than 2 characters
3. create (word, 1) tuples, lowercase each word
4. use reduceByKey  to add the results for each word

We will print the top ten words by making use of the [takeOrdered](https://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=takeordered#pyspark.RDD.takeOrdered) action.

In [None]:
# TODO: Replace <FILL IN> with appropriate code
wcRDD = (tokenizedPostsRDD<FILL IN>)
print wcRDD.takeOrdered(10, lambda x : -x[1])

### Counting senders/posters

Using similar operations as in the wordcount exercise, give a top-ten list of posters using the 'from' property.

In [None]:
# TODO: Replace <FILL IN> with appropriate code
fromCountRDD = (scispacePostsRDD<FILL IN>)
print fromCountRDD.takeOrdered(10, lambda x : -x[1])

### DataFrames to the rescue

As you can see from the previous exercises transforming and converting data can be done quite easily using regular RDDs and transformations. There are however even more intuitive ways to deal with your data - provided it is in a structured format (i.e. it has a schema).

From version Spark 1.0.0 onwards SparkSQL has been available in the Spark distribution. The SparkSQL module offers functions for loading and reading structured data, SQL syntax on the resulting loaded data and offers data organized in [DataFrames](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame). 

In the following steps we will load the JSON newsgroup data the SparkSQL way and explore the capabilities of this API.

First, create an [SQLContext](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SQLContext) from the SparkContext.

In [None]:
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)

The [SQLContext](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SQLContext) provides access to a [DataFrameReader](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader) which can directory read from several structured formats or connectiions such as JSON, [Parquet](https://parquet.apache.org), JDBC and Hive.

Load the json into a DataFrame using the SQLContext and print the schema of the structure.

In [None]:
dataFrame = sqlCtx.read.json("file://///home/jovyan/work/data/20newsgroups.labelled.json.gz")
dataFrame.printSchema()

The DataFrame API provides a domain-specific language, alongside SQL, for manipulating structured data. It is, among others, possible to filter, select and group data.

Grouping by the sender column and counting them is trivial

In [None]:
dfGroups = dataFrame.groupBy("sender").count()
dfGroups.show()

Filtering and selecting columns is equally simple

In [None]:
dfFilter = dataFrame.filter(dataFrame.label == "sci.space")
dfSelect = dfFilter.select("label", "from", "subject")
dfSelect.show()

If you are more comfortable with SQL the SparkSQL module allows you to use this as well. 

In order to talk SQL to a DataFrame it is needed to associate the DataFrame with a table name to use in SQL statements:

In [None]:
dataFrame.registerTempTable('newsgroups')

We can then use SQL statements to query the data:

In [None]:
docs = sqlCtx.sql("SELECT label, content FROM newsgroups")
labelsDistinct = sqlCtx.sql("SELECT DISTINCT(label) FROM newsgroups")
print sqlCtx.sql("SELECT COUNT(DISTINCT(label)) as labelCount FROM newsgroups").collect()

Let's make a list of the different newsgroups in the dataset, and broadcast this list to all executors so that it can be used in expressions.

In [None]:
labelList = labelsDistinct.map(lambda r: r.label.encode('utf-8')).collect()
sc.broadcast(labelList)
from IPython.display import display, HTML
th = "<th>Label</th>"
td = ["<tr><td>" + d + "</td></tr>" for d in labelList]
display(HTML("<table><thead><tr>" + "".join(th) + "</tr></thead><tbody>" + "".join(td) + "</tbody></table>"))

Indeed 20 newsgroups. For machine learning it is probably convenient to have a numerical ID or label associated with each group. Let's create this ID and show a table of the mapping and the number of posts per newsgroup.

In [None]:
labels = sqlCtx.sql("SELECT label FROM newsgroups")
labelCounts = labels.map(lambda r: (str(r.label), 1)).reduceByKey(lambda v1, v2: v1 + v2).collect()

from IPython.display import display, HTML
th = "<th>ID</th><th>Label</th><th>Messages</th>"
td = ["<tr><td>" + str(labelList.index(l)) +"</td><td>" + l + "</td><td>" + str(m) +"</tr>" for (l,m) in labelCounts]
display(HTML("<table><thead><tr>" + "".join(th) + "</tr></thead><tbody>" + "".join(td) + "</tbody></table>"))

### Storing data for later use

In the final steps of this notebook we will make a subset of the data that is ready to use for subsequent notebooks;

1. select only the following groups: {2,3,5,6,8,11,12,13,18,19}
2. select only the following columns: {id, label, content}
3. clean and tokenize the content column
4. Safe the resulting dataframe into a Parquet file

First select the relevant groups and columns.

In [None]:
selectedGroups = sqlCtx.sql("SELECT label, content FROM newsgroups WHERE label in ('sci.med', 'rec.autos', 'comp.windows.x', 'rec.sport.baseball', 'misc.forsale', 'talk.politics.misc', 'comp.graphics', 'alt.atheism', 'sci.space', 'talk.religion.misc')")
selectedGroups.show(5)
print selectedGroups.count()

A DataFrame can be accessed as an RDD of [Rows](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Row), although it is not simply a wrapper around RDD's. The way functions are applied to data using DataFrames differs from plain lambda's on RDDs; this becomes evident when working with functions such as the withColumn (this has to be a special kind of function: registered UDF's to be used in SQL).

Here we will use this pattern to create a new RDD with an added column - the id. After adding the column we will chain a few extra maps to tokenize and clean the content field.

In [None]:
import re

selectedGroupsWithID = (selectedGroups.withColumn("id", selectedGroups.label)
                        .map(lambda r: (r.label.encode("utf-8"), re.sub('\s+', ' ', r.content).strip(), labelList.index(r.label)))
                        .map(lambda (l,c,i): (l, c.encode('utf-8').lower(), i))
                        .map(lambda (l,c,i): (l, c.split(" "), i))
                        )
                                                     
print selectedGroupsWithID.first()

selectionDF = sqlCtx.createDataFrame(selectedGroupsWithID, ["label", "content", "id"])
print "\n"
selectionDF.printSchema()
print "\n"
selectionDF.show()

Finally, we will store the dataframe in an efficient format. For the columnar data in our dataframe [Parquet](https://parquet.apache.org) seems a good match. Fortunately the DataFrame API comes with [batteries included](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.parquet) and storing our data is a one-liner.

In [None]:
selectionDF.write.parquet("file://///home/jovyan/work/data/20newsgroups.selected.parquet")