<img style="float: right" src="images/surfsara.png">
<br/>
<hr style="clear: both" />

# Introduction to RDDs - Apache Spark

This notebook provides an introduction to Apache Spark RDD API using PySpark. Press Shift-Enter to execute the code. You can use code completion by using tab.

During the exercises you may want to refer to [The PySpark documentation](https://spark.apache.org/docs/latest/api/python/pyspark.html) for more information on possible transformations and actions. We will provide links to the documentation when we introduce methods on RDDs.

## The SparkContext

The SparkContext contains all the information about the way Spark is set up. When running on a cluster, the SparkContext contains the address of the cluster and will make sure operations on RDDs will be executed there. In the cell below, we create a [`SparkContext`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext) using `local mode`. This means that Spark will run locally, not on a cluster. It will offer some form of parallelism by making use of the various cores it has available.

Note, that Spark is best used in `cluster mode` where it will run on many machines simultaneously. `Local mode` is only meant for training or testing purposes. However, Spark works quite well in local mode and can be quite powerful. In order to run locally developed code on a cluster, the only thing that needs to be changed is the `SparkContext` and paths to in- and output files.

Even when working in `local mode` it is important to think of an RDD as a data structure that is distributed over many machines on a cluster, and is not available locally. The machine that contains the `SparkContext` is called the *driver*. The SparkContext will communicate with the cluster manager to make sure that the operations on RDDs will run on the cluster in the form of *workers*. It is important to realize that the driver is a separate entity from the nodes in the cluster. You can consider the notebook as being the driver.

In [4]:
# Initialize Spark
from pyspark import SparkContext, SparkConf
import nltk
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')

if not 'sc' in globals(): # This 'trick' makes sure the SparkContext sc is initialized exactly once
    conf = SparkConf().setMaster('local[*]')  # Spark will use all cores (*) available
    sc = SparkContext(conf=conf)

[nltk_data] Downloading package punkt to
[nltk_data]     /mnt/home/sdemo053/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /mnt/home/sdemo053/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


## Creating an RDD

There are three ways to create an RDD: by transforming an existing one, by reading in data, or by creating an RDD based on a local data structure. We show this last option below.

A Python list containing some words is used to create an RDD by calling [`parallelize`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext.parallelize), a method of `SparkContext`. This list is very small and will not benefit from the parallelism of Spark. 

We then print the number of records in the RDD, by calling the `count()` method.

In [5]:
words_list = ['Dog', 'Cat', 'Rabbit', 'Hare', 'Deer', 'Gull', 'Woodpecker', 'Mole']
words_rdd = sc.parallelize(words_list)
print(words_rdd.count())

8


## Map transformation 
There are two kinds of operations on RDDs: transformations and actions. Transformations take as input an RDD and produce as output another RDD (you cannot change an existing RDD, they are immutable). Computation of transformations is deferred until an *action* is executed. An action does not return an RDD, but instead returns data to the driver (for example in the form of a Python list), or writes data to disk or a database.

This *laziness* of executing transformations allows Spark to optimize computations. Only when the user wants real output, the framework will start to compute.

One of the most used transformations is [`map`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.map). This is very similar to the `Map` in MapReduce. The Spark version of `Map` is a method called `map` defined on an RDD, and takes as input a single function. This function will be applied to each element in the RDD, and Spark will put the result of the application in the output RDD.

First, we present a simple Python function that takes a single word as argument and returns the word with an 's' added to it. In the next step we will use this function in a map transformation of the `words_rdd`.

Take a look at the function definition below and execute it.

In [6]:
def make_plural(word):
    return word + 's'

# Let's see if it works

print(make_plural('cat'))

cats


Next, we want to use the `make_plural` function as input for the `map` transformation on `words_rdd`.
The action [collect()](https://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=collect#pyspark.RDD.collect) transfers the content of the RDD to the driver. The result of `collect()` will then be available to our local environment in Python. It is not an RDD but a Python list!

Note, that a large RDD may be scattered over many machines. In such a case calling `collect()` may not be a good idea, since it can take quite some time to retrieve all RDD parts.

## Exercise 1
In the cell below enter the name of the function that map should apply to each element of the RDD in order to end up with an RDD of words in plural form.

In [7]:
plural_rdd = words_rdd.map(make_plural)

print(plural_rdd.collect())

['Dogs', 'Cats', 'Rabbits', 'Hares', 'Deers', 'Gulls', 'Woodpeckers', 'Moles']


## Using lambda functions
We can achieve the same functionality by using lambda functions. In this case we define `make_plural` not using `def` as we did above, but as an anonymous function that we define inside `map` directly. This is the main benefit of using lambda functions: all our processing logic is directly visible in the transformations we're applying.

## Exercise 2
Provide a lambda function in the cell below, that will pluralize all elements in the RDD.

In [9]:
lambda_plural_rdd = words_rdd.map(lambda x: x+'s')

print(lambda_plural_rdd.collect())

['Dogs', 'Cats', 'Rabbits', 'Hares', 'Deers', 'Gulls', 'Woodpeckers', 'Moles']


## Exercise 3
Another transformation is [filter()](https://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=filter#pyspark.RDD.filter). It takes as argument a predicate function (a function that is evaluated to true or false), and applies the predicate to all elements of the RDD. Only elements that are evaluated to true by the filter function, will be passed on to the output RDD.

Use the [filter()](https://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=filter#pyspark.RDD.filter) method of RDD to keep only words with a length larger than three. Use a lambda function to write a predicate that does this. Next, [count()](https://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=count#pyspark.RDD.count) the number of words. 

Like `collect`, [count()](https://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=count#pyspark.RDD.count) is an action. Remember that actions trigger Sparks computations. Transformations are evaluated lazily and their computation is deferred until an action is called.

There should be 6 words that pass the filter. 

In [10]:
filtered_rdd = words_rdd.filter(lambda x:len(x)>3)
filtered_rdd.collect()

['Rabbit', 'Hare', 'Deer', 'Gull', 'Woodpecker', 'Mole']

## Exercise 4

Let's do another `map` transformation on words_rdd. For each word in word_rdd determine its length, again using a lambda function.

In [11]:
word_lengths = words_rdd.map(lambda x: len(x)).collect()
print(word_lengths)

[3, 3, 6, 4, 4, 4, 10, 4]


## FlatMap transformation
Sometimes, the result of a `map` operation is a list of elements rather than a single element. Consider the following example, where we have a list of sentences, and we split each sentence:

In [12]:
sentences = sc.parallelize([
    'this is a sentence',
    'and this is another one'
])
sentences_rdd = sentences.map(str.split)
sentences_rdd.collect()

[['this', 'is', 'a', 'sentence'], ['and', 'this', 'is', 'another', 'one']]

Each element in the RDD returned by `map` is a list of words. Consequently, the result of `collect` is a list of lists, each list containing the sentences' words. Hence, a `count` of this RDD will return two:

In [13]:
sentences_rdd.count()

2

If we want to count the number of words instead, or work directly with the words, we will need to _flatten_ the list of lists into a single list. To do so, we will substitute `flatMap` for `map`. Like `map`, `flatMap` will apply the supplied function to each element in the RDD. In addition to `map`, though, it will _flatten_ the result of the operation such that a list of lists becomes a list:

In [14]:
sentences.flatMap(str.split).collect()

['this', 'is', 'a', 'sentence', 'and', 'this', 'is', 'another', 'one']

## Pair RDDs
Pair RDDs are very important within the Spark RDD API. Each element of a Pair RDD is a pair (or tuple) `(x,y)` where `x` is interpreted as being the key and `y` as the value. Spark offers quite a number of `...byKey` and `...byValues` methods that operate on pair RDDs. As we will see, these methods can be used to define functions per key, very similar to Hadoop's MapReduce.

Keys can be of any *hashable* type, which means all primitive types (numbers, strings, etc.), tuples, **but not lists or dictionaries**. Values can be of any type.

Below we define a Python string variable called `sonnet`. It is assigned Shakespeare's first sonnet in the form of a single line of text. The character `\` is used to let Python ignore the new line character. 

Execute the cell, otherwise the variable is not declared and assigned a value.

In [15]:
sonnet = "From fairest creatures we desire increase, \
That thereby beauty\'s rose might never die, \
But as the riper should by time decease, \
His tender heir might bear his memory: \
But thou contracted to thine own bright eyes, \
Feed'st thy light's flame with self-substantial fuel, \
Making a famine where abundance lies, \
Thy self thy foe, to thy sweet self too cruel: \
Thou that art now the world's fresh ornament, \
And only herald to the gaudy spring, \
Within thine own bud buriest thy content, \
And, tender churl, mak'st waste in niggarding: \
Pity the world, or else this glutton be, \
To eat the world\'s due, by the grave and thee."

## Python magic

From this text we first remove punctuation. The next cell is just Python. You may want to skip this if your focus is just on Spark, but don't forget to execute the cell.

`maketrans()` is a Python method on strings that very efficiently can make character substitutions. Below we use it to remove all punctuation characters. The curly braces indicate a dictionary, and the expression within it, is called a comprehension. The result is a dictionary of key-value pairs, called table, where the key is a punctuation character and the value is `None`. When making substitutions by means of `translate` this table then removes all the entries that have a `None` value.

In [16]:
import string

# The following line creates a translation table
table = str.maketrans({key: None for key in string.punctuation})

# Do a sample translation
s = "string. With. Punctuation?"
print(s.translate(table))

string With Punctuation


## Parallelizing the text

In the next cell a lot is happening in one line. The text above is first translated - which in this case means that each punctuation character is removed. Then on the result, the `lower()` method is applied. (This is a Python method on strings.) This puts a string in lowercase letters. Then this result is `split()`, meaning that the text is split in individual words. (Also a Python method on strings). This results in a list of words, all lowercase, with no punctuation. This is input to the `parallelize()` method which turns it into an RDD.

*Calling consecutive methods by using dot-notation is called chaining. It is possible of course to execute these steps individually, but chaining can be very convenient, especially in Spark. Consider the individual steps: first parallelize the text, then map the resulting RDD to remove the punctuation, then map the resulting RDD to lowercase the text and then map the resulting RDD of that step to split the data... Doing this instead by chaing methods safes a lot of typing.* 

To show just the 5 first elements, we use Spark's [`take()`](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.take) action. This limits the amount of data that is sent to the driver.

In [17]:
text_rdd = sc.parallelize(sonnet.translate(table).lower().split())

text_rdd.take(5)

['from', 'fairest', 'creatures', 'we', 'desire']

## Exercise 5

What would happen if we wouldn't split the text but directly transform it into an RDD? Try this in the next cell (omit `translate` and `lower` as well).

Try to predict what will happen. Remember that a string in Python is very similar to a list. 

(For a list called `mylist` the first element is given by `mylist[0]`. Similarly `mystring[0]` will return the first character of the string `mystring`.)

In [19]:
another_rdd = sc.parallelize(sonnet)
another_rdd.collect()

['F',
 'r',
 'o',
 'm',
 ' ',
 'f',
 'a',
 'i',
 'r',
 'e',
 's',
 't',
 ' ',
 'c',
 'r',
 'e',
 'a',
 't',
 'u',
 'r',
 'e',
 's',
 ' ',
 'w',
 'e',
 ' ',
 'd',
 'e',
 's',
 'i',
 'r',
 'e',
 ' ',
 'i',
 'n',
 'c',
 'r',
 'e',
 'a',
 's',
 'e',
 ',',
 ' ',
 'T',
 'h',
 'a',
 't',
 ' ',
 't',
 'h',
 'e',
 'r',
 'e',
 'b',
 'y',
 ' ',
 'b',
 'e',
 'a',
 'u',
 't',
 'y',
 "'",
 's',
 ' ',
 'r',
 'o',
 's',
 'e',
 ' ',
 'm',
 'i',
 'g',
 'h',
 't',
 ' ',
 'n',
 'e',
 'v',
 'e',
 'r',
 ' ',
 'd',
 'i',
 'e',
 ',',
 ' ',
 'B',
 'u',
 't',
 ' ',
 'a',
 's',
 ' ',
 't',
 'h',
 'e',
 ' ',
 'r',
 'i',
 'p',
 'e',
 'r',
 ' ',
 's',
 'h',
 'o',
 'u',
 'l',
 'd',
 ' ',
 'b',
 'y',
 ' ',
 't',
 'i',
 'm',
 'e',
 ' ',
 'd',
 'e',
 'c',
 'e',
 'a',
 's',
 'e',
 ',',
 ' ',
 'H',
 'i',
 's',
 ' ',
 't',
 'e',
 'n',
 'd',
 'e',
 'r',
 ' ',
 'h',
 'e',
 'i',
 'r',
 ' ',
 'm',
 'i',
 'g',
 'h',
 't',
 ' ',
 'b',
 'e',
 'a',
 'r',
 ' ',
 'h',
 'i',
 's',
 ' ',
 'm',
 'e',
 'm',
 'o',
 'r',
 'y',
 ':',
 ' '

## Exercise 6
We are going to count the words in `text_rdd`. As a first step, transform every word in `text_rdd` into a tuple `(<word>, 1)`. Use a lambda function.

In [24]:
pair_rdd = text_rdd.map(lambda x: (x,1))
pair_rdd.take(5)

[('from', 1), ('fairest', 1), ('creatures', 1), ('we', 1), ('desire', 1)]

## Exercise 7
There is an *action* called [countByKey](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.countByKey) that performs the counting and returns it as a Python dictionary.
Use it below to see the counts.

In [37]:
word_counts = pair_rdd.countByKey()

# Below is some Python code that will nicely print the word_counts dictionary

sorted_word_counts = sorted(word_counts.items(), key=lambda x: -x[1])

for word, count in sorted_word_counts[:10]:
    print(word, count)

the 6
thy 5
to 4
and 3
worlds 2
might 2
self 2
thou 2
that 2
but 2


##  reduceByKey
`countByKey` is an _action_, returning the word counts as a dictionary instead of an RDD. When using `countByKey` with a large number of counts, the dictionary that is sent back to the driver may not fit in memory.

If we want to count words and keep the result into an RDD we have to use the [reduceByKey](https://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=reducebykey#pyspark.RDD.reduceByKey) *transformation*.

This transformation works almost exactly like Reduce in Hadoop's MapReduce. It expects the RDD to consist of key-value pairs and it will perform a reduce operation *per key*.

As input [reduceByKey](https://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=reducebykey#pyspark.RDD.reduceByKey) takes a *two-argument function* that will be applied on the values when they are grouped by key.

## Exercise 8
Create a lambda function that does the counting and forms the input for `reduceByKey`.

In [47]:
# Note that reduceByKey takes in a function that accepts two values and returns a single value
# The function that is input to reduceByKey only works on the values. Spark will execute this function per key

word_counts = pair_rdd.reduceByKey(lambda x,y: x+y)
print(word_counts.collect())

[('self', 2), ('by', 2), ('else', 1), ('we', 1), ('thine', 2), ('beautys', 1), ('this', 1), ('buriest', 1), ('herald', 1), ('churl', 1), ('rose', 1), ('waste', 1), ('should', 1), ('to', 4), ('pity', 1), ('thereby', 1), ('fuel', 1), ('where', 1), ('glutton', 1), ('contracted', 1), ('lies', 1), ('a', 1), ('die', 1), ('bear', 1), ('thy', 5), ('eat', 1), ('cruel', 1), ('within', 1), ('lights', 1), ('gaudy', 1), ('in', 1), ('be', 1), ('worlds', 2), ('world', 1), ('flame', 1), ('riper', 1), ('grave', 1), ('but', 2), ('selfsubstantial', 1), ('that', 2), ('might', 2), ('as', 1), ('thou', 2), ('bright', 1), ('sweet', 1), ('bud', 1), ('feedst', 1), ('too', 1), ('making', 1), ('eyes', 1), ('art', 1), ('and', 3), ('abundance', 1), ('or', 1), ('now', 1), ('desire', 1), ('creatures', 1), ('famine', 1), ('heir', 1), ('only', 1), ('ornament', 1), ('from', 1), ('makst', 1), ('never', 1), ('fairest', 1), ('tender', 2), ('fresh', 1), ('niggarding', 1), ('content', 1), ('thee', 1), ('spring', 1), ('own', 

Instead of using `collect` we can use [takeOrdered](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.takeOrdered) to see the most frequent words first.

Below we show 10 elements from the RDD. The elements are pairs and we sort them by the second element (denoted by `x[1]` in the lambda function. The minus indicates descending order.

In [48]:
word_counts.takeOrdered(10, lambda x: -x[1])

[('the', 6),
 ('thy', 5),
 ('to', 4),
 ('and', 3),
 ('self', 2),
 ('by', 2),
 ('thine', 2),
 ('worlds', 2),
 ('but', 2),
 ('that', 2)]

## Analysing tweets
So far we have created our RDDs with our own strings and lists. Typically, though, you will read data from file or a database.

In the remainder of the notebook, we will analyse Dutch tweets that we load from file using [sc.textFile](https://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=textfile#pyspark.SparkContext.textFile). Each tweet is on a single line in the file, formatted a JSON dictionary. `sc.textFile` will read the each line as text, and we will need to convert this text to JSON later. We'll do this in exercise 10. The result of `sc.textFile` will be an RDD of strings, each string containing a single line (a tweet) of the file.

The file we load the tweets from is a local file in our case. Often when using Spark files reside on a distributed file system like HDFS. When creating the RDD Spark may distribute the data over many machines.

First, let's look at the first line of the data file we are going to use. We use a simple Unix command here (no Python) to view the first line of a file that resides on local disk. Notice, that this is a single tweet in JSON. 

In [49]:
# Unix bash command called head.
# The ! announces a Unix command is coming to Jupyter

!head -1 ../data/tweets.json

{"created_at":"Wed Apr 29 13:26:48 +0000 2015","id":593406077439516672,"id_str":"593406077439516672","text":"@OdekedeJong Omdat ik het zelf ook ervaar en mijn omgeving ook wel. Er hangt soms een bepaalde sfeer waardoor je niet alles kan\/durft.","source":"\u003ca href=\"http:\/\/twitter.com\" rel=\"nofollow\"\u003eTwitter Web Client\u003c\/a\u003e","truncated":false,"in_reply_to_status_id":593403994481020928,"in_reply_to_status_id_str":"593403994481020928","in_reply_to_user_id":48305190,"in_reply_to_user_id_str":"48305190","in_reply_to_screen_name":"OdekedeJong","user":{"id":119697699,"id_str":"119697699","name":"Claudia","screen_name":"Claudia_NL","location":"","url":null,"description":"* Receptor Agent * Bachelor of  Journalism * Nieuwsjunk * Film- en seriefan * Post- en zonliefhebber *","protected":false,"verified":false,"followers_count":138,"friends_count":222,"listed_count":2,"favourites_count":50,"statuses_count":3993,"created_at":"Thu Mar 04 11:16:36 +0000 2010","utc_offset":10

## Exercise 9
Below the call to `sc.textFile` is made. There are also empty lines in the text file (i.e. their length is equal to 0). Provide a lambda function to the subsequent `filter` call to remove these empty lines.

Lastly, print out the first tweet in the RDD by making use of the [take](https://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=take#pyspark.RDD.take) action.

In [53]:
# Note that we need to use the backslash character to have multiline statements in Python
# In this case, we're using it for 
tweets_rdd = sc \
    .textFile('../data/tweets.json') \
    .filter(lambda x: len(x)>0)
     
print(tweets_rdd.take(1))

['{"created_at":"Wed Apr 29 13:26:48 +0000 2015","id":593406077439516672,"id_str":"593406077439516672","text":"@OdekedeJong Omdat ik het zelf ook ervaar en mijn omgeving ook wel. Er hangt soms een bepaalde sfeer waardoor je niet alles kan\\/durft.","source":"\\u003ca href=\\"http:\\/\\/twitter.com\\" rel=\\"nofollow\\"\\u003eTwitter Web Client\\u003c\\/a\\u003e","truncated":false,"in_reply_to_status_id":593403994481020928,"in_reply_to_status_id_str":"593403994481020928","in_reply_to_user_id":48305190,"in_reply_to_user_id_str":"48305190","in_reply_to_screen_name":"OdekedeJong","user":{"id":119697699,"id_str":"119697699","name":"Claudia","screen_name":"Claudia_NL","location":"","url":null,"description":"* Receptor Agent * Bachelor of  Journalism * Nieuwsjunk * Film- en seriefan * Post- en zonliefhebber *","protected":false,"verified":false,"followers_count":138,"friends_count":222,"listed_count":2,"favourites_count":50,"statuses_count":3993,"created_at":"Thu Mar 04 11:16:36 +0000 2010","

## Conversion to JSON
Next, we are going to convert the tweets into dictionaries. For this purpose we import the Python `json` library. In Python a string `s` is converted to a dictionary by calling `json.loads(s)`.

After conversion, each tweet will be a dictionary where each key-value pair is an attribute of the tweet. Some attributes have sub-attributes, such as the ones contained under the `user` key.

## Exercise 10
Transform each tweet in the `tweets_rdd` to dictionary, and print the first tweet.

In [54]:
import json

json_tweets_rdd = tweets_rdd.map(lambda x: json.loads(x))

# Print out the first element (tweet) of the resulting RDD. The last line will format the tweet for you.

parsed_tweet = json_tweets_rdd.take(1)[0]
print(json.dumps(parsed_tweet, indent=2, sort_keys=True))

{
  "contributors": null,
  "coordinates": null,
  "created_at": "Wed Apr 29 13:26:48 +0000 2015",
  "entities": {
    "hashtags": [],
    "symbols": [],
    "trends": [],
    "urls": [],
    "user_mentions": [
      {
        "id": 48305190,
        "id_str": "48305190",
        "indices": [
          0,
          12
        ],
        "name": "Odeke de Jong",
        "screen_name": "OdekedeJong"
      }
    ]
  },
  "favorite_count": 0,
  "favorited": false,
  "filter_level": "low",
  "geo": null,
  "id": 593406077439516672,
  "id_str": "593406077439516672",
  "in_reply_to_screen_name": "OdekedeJong",
  "in_reply_to_status_id": 593403994481020928,
  "in_reply_to_status_id_str": "593403994481020928",
  "in_reply_to_user_id": 48305190,
  "in_reply_to_user_id_str": "48305190",
  "lang": "nl",
  "place": null,
  "possibly_sensitive": false,
  "retweet_count": 0,
  "retweeted": false,
  "source": "<a href=\"http://twitter.com\" rel=\"nofollow\">Twitter Web Client</a>",
  "text": "@Odekede

## Accessing fields in tweets
In the cell below some fields from the tweets are selected. Notice that the input `x` for the lambda function is a dictionary containing the tweet. The result of the lambda function (defined after `:`) is a list with values of the selected fields from the tweet.

You should be able to figure out how to select information from a tweet, after looking at this example.

In [55]:
json_tweets_rdd_text = json_tweets_rdd.map(
    lambda x: [
        x['lang'],
        x['entities']['hashtags'],
        x['user']['name'],
        x['user']['screen_name'],
        x['user']['followers_count'],
        x['user']['description']
    ]
)
json_tweets_rdd_text.take(1)

[['nl',
  [],
  'Claudia',
  'Claudia_NL',
  138,
  '* Receptor Agent * Bachelor of  Journalism * Nieuwsjunk * Film- en seriefan * Post- en zonliefhebber *']]

## Selecting Text
We will work with the text of the tweets in the next few cells.

## Exercise 11
From the `json_tweets_rdd` select **only** the tweet text. (Do not put the text in a list, like we did above with the fields we selected there).

In [58]:
# TODO: Replace <FILL IN> with appropriate code

tweet_text_rdd = json_tweets_rdd.map(lambda x: x['text'])
tweet_text_rdd.take(1)

['@OdekedeJong Omdat ik het zelf ook ervaar en mijn omgeving ook wel. Er hangt soms een bepaalde sfeer waardoor je niet alles kan/durft.']

## TweetTokenizer
The advantage of the RDD API is that it works well with unstructured data like text. We can use any function to transform RDDs. For example, to split text into words or tokens (tokenization) is often more difficult than just calling `split()` on a string. A complicating factor in tweets is the fact that they contain special characters (`#`, `@`) that have a specific meaning. Hence, tokenizing a tweet will be different from, for example, tokenizing a newspaper article.

We can make use of the Python [NLTK](http://www.nltk.org/api/nltk.tokenize.html) library for tokenization of tweets. The NLTK (Natural Language Tool Kit) contains a `TweetTokenizer` that is specially build to tokenize tweets.

In the cell below we show a short example in Python code (no RDDs yet!). A `TweetTokenizer` is created and used to tokenize an example tweet text in the variable `s`. The result is a list of tokens. Try it.

In [59]:
from nltk.tokenize import TweetTokenizer

tweet_tokenizer = TweetTokenizer()
s = "This is a cooool #dummysmiley: :-) :-P <3 and some arrows < > -> <--"
tweet_tokenizer.tokenize(s)

['This',
 'is',
 'a',
 'cooool',
 '#dummysmiley',
 ':',
 ':-)',
 ':-P',
 '<3',
 'and',
 'some',
 'arrows',
 '<',
 '>',
 '->',
 '<--']

## Exercise 12

Use `tweet_tokenizer` to tokenize all tweets in the RDD. Print the first 10 tokens, using `take`.

**Note**: we want `take` to return the first 10 tokens, not the first 10 *lists* of tokens. When you get lists you have to flatten them...

In [61]:
tokens_rdd=tweet_text_rdd.flatMap(lambda x: tweet_tokenizer.tokenize(x))
tokens_rdd.take(10)

['@OdekedeJong',
 'Omdat',
 'ik',
 'het',
 'zelf',
 'ook',
 'ervaar',
 'en',
 'mijn',
 'omgeving']

## Exercise 13
Count the tokens in the tweets, but only those with a length larger than 2. Show the top 15 in descending frequency.

In [64]:
pair_tokens_rdd=tokens_rdd.filter(lambda x: len(x)>2).map(lambda x: (x,len(x)))

pair_tokens_rdd.takeOrdered(10, lambda x: -x[1])

[('#inmiddelsal3opdr8geverskwiijt', 30),
 ('#genietervanzolanghetnogkan', 27),
 ('#WaterschapsbedrijfLimburg', 26),
 ('vrijdagavondprogrammering', 25),
 ('https://t.co/FCQFcesgT8', 23),
 ('https://t.co/kcFA5yu6U2', 23),
 ('https://t.co/GkFqarTRfC', 23),
 ('https://t.co/QHs4GqNqjc', 23),
 ('https://t.co/2cyN0kD6lA', 23),
 ('https://t.co/7076tJaJGY', 23)]