# Apache Spark | Word Count
We will cover someSpark functions to build a word counter and test it on *La Divina Commedia* by Dante Alighieri

In [235]:
# Import Spark
# NOTE: This may differ depending on your system!
import os
execfile(os.path.join(os.environ["SPARK_HOME"], 'python/pyspark/shell.py'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.1
      /_/

Using Python version 2.7.13 (default, Dec 18 2016 07:03:39)
SparkSession available as 'spark'.


## Part 1: Creating a base RDD and pair RDDs

### Create a base RDD
We'll start by generating a base RDD by using a Python list and the `sc.parallelize` method.  Then we'll print out the type of the base RDD.

In [236]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4) 
# 4 is the number of partition we are asking Spark to distribute out dataset

print type(wordsRDD)

<class 'pyspark.rdd.RDD'>


### Pluralize 

Let's use a `map()` transformation to add the letter 's' to each string in the base RDD we just created.

In [237]:
# One way of completing the function
def makePlural(word):
    return word + 's'

print makePlural('cat')

cats


### Apply `makePlural` to the base RDD

Now pass each item in the base RDD into a [map()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.map) transformation that applies the `makePlural()` function to each element. And then call the [collect()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.collect) action to see the transformed RDD.

In [238]:
# TODO: Replace <FILL IN> with appropriate code
pluralRDD = wordsRDD.map(makePlural)
print pluralRDD.collect()

['cats', 'elephants', 'rats', 'rats', 'cats']


### Pass a `lambda` function to `map`

Let's create the same RDD using a `lambda` function.

In [239]:
# TODO: Replace <FILL IN> with appropriate code
pluralLambdaRDD = wordsRDD.map(lambda x: x+'s')
print pluralLambdaRDD.collect()

['cats', 'elephants', 'rats', 'rats', 'cats']


### Length of each word

Now use `map()` and a `lambda` function to return the number of characters in each word.  We'll `collect` this result directly into a variable.

In [240]:
# TODO: Replace <FILL IN> with appropriate code
pluralLengths = (pluralRDD
                 .map(lambda x: len(x))
                 .collect())
print pluralLengths

[4, 9, 4, 4, 4]


### Pair RDDs

The next step in writing our word counting program is to create a new type of RDD, called a pair RDD. A pair RDD is an RDD where each element is a pair tuple `(k, v)` where `k` is the key and `v` is the value. In this example, we will create a pair consisting of `('<word>', 1)` for each word element in the RDD.
We can create the pair RDD using the `map()` transformation with a `lambda()` function to create a new RDD.

In [241]:
# TODO: Replace <FILL IN> with appropriate code
wordPairs = wordsRDD.map(lambda x: (x,1))
print wordPairs.collect()

[('cat', 1), ('elephant', 1), ('rat', 1), ('rat', 1), ('cat', 1)]


## Part 2: Counting with pair RDDs

Now, let's count the number of times a particular word appears in the RDD. There are multiple ways to perform the counting, but some are much less efficient than others.

### `groupByKey()` approach
An approach you might first consider (we'll see shortly that there are better ways) is based on using the [groupByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.groupByKey) transformation. As the name implies, the `groupByKey()` transformation groups all the elements of the RDD with the same key into a single list in one of the partitions.

There are two problems with using `groupByKey()`:
  + The operation requires a lot of data movement to move all the values into the appropriate partitions.
  + The lists can be very large. Consider a word count of English Wikipedia: the lists for common words (e.g., the, a, etc.) would be huge and could exhaust the available memory in a worker.

Use `groupByKey()` to generate a pair RDD of type `('word', iterator)`.

In [242]:
# TODO: Replace <FILL IN> with appropriate code
# Note that groupByKey requires no parameters
wordsGrouped = wordPairs.groupByKey()
for key, value in wordsGrouped.collect():
    print '{0}: {1}'.format(key, list(value))

rat: [1, 1]
elephant: [1]
cat: [1, 1]


### Use `groupByKey()` to obtain the counts

Using the `groupByKey()` transformation creates an RDD containing 3 elements, each of which is a pair of a word and a Python iterator.

Now sum the iterator using a `map()` transformation.  The result should be a pair RDD consisting of (word, count) pairs.

In [243]:
# TODO: Replace <FILL IN> with appropriate code
wordCountsGrouped = wordsGrouped.map(lambda x: (x[0],sum(x[1])))
print wordCountsGrouped.collect()

[('rat', 2), ('elephant', 1), ('cat', 2)]


### Counting using `reduceByKey`

A better approach is to start from the pair RDD and then use the [reduceByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey) transformation to create a new pair RDD. The `reduceByKey()` transformation gathers together pairs that have the same key and applies the function provided to two values at a time, iteratively reducing all of the values to a single value. `reduceByKey()` operates by applying the function first within each partition on a per-key basis and then across the partitions, allowing it to scale efficiently to large datasets.

In [244]:
# TODO: Replace <FILL IN> with appropriate code
# Note that reduceByKey takes in a function that accepts two values and returns a single value
wordCounts = wordPairs.reduceByKey(lambda x,y: x+y)
print wordCounts.collect()

[('rat', 2), ('elephant', 1), ('cat', 2)]


### All together

The expert version of the code performs the `map()` to pair RDD, `reduceByKey()` transformation, and `collect` in one statement.

In [246]:
# TODO: Replace <FILL IN> with appropriate code
wordCountsCollected = (wordsRDD
                       .map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y)
                       .collect())
print wordCountsCollected

[('rat', 2), ('elephant', 1), ('cat', 2)]


## Part 3: Finding unique words and a mean value

### Unique words

Calculate the number of unique words in `wordsRDD`.  You can use other RDDs that you have already created to make this easier.

In [247]:
# TODO: Replace <FILL IN> with appropriate code
uniqueWords = wordCounts.count()
print uniqueWords

3


### Mean using `reduce`

Find the mean number of words per unique word in `wordCounts`.

Use a `reduce()` action to sum the counts in `wordCounts` and then divide by the number of unique words.  First `map()` the pair RDD `wordCounts`, which consists of (key, value) pairs, to an RDD of values.

In [39]:
# TODO: Replace <FILL IN> with appropriate code
from operator import add
totalCount = (wordCounts
              .map(lambda x: x[1])
              .reduce(lambda x,y: x+y))
average = totalCount / float(uniqueWords)
print round(average, 2)

1.67


In [52]:
assert round(average, 2) == 1.67

## Part 4: Apply word count to a file

In this section we will finish developing our word count application.  We'll have to build the `wordCount` function, deal with real world problems like capitalization and punctuation, load in our data source, and compute the word count on the new data.

### `wordCount` function

First, define a function for word counting.  You should reuse the techniques that have been covered in earlier parts of this lab.  This function should take in an RDD that is a list of words like `wordsRDD` and return a pair RDD that has all of the words and their associated counts.

In [248]:
# TODO: Replace <FILL IN> with appropriate code
def wordCount(wordListRDD):
    """Creates a pair RDD with word counts from an RDD of words.

    Args:
        wordListRDD (RDD of str): An RDD consisting of words.

    Returns:
        RDD of (str, int): An RDD consisting of (word, count) tuples.
    """
    return wordListRDD.map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y)
print wordCount(wordsRDD).collect()

[('rat', 2), ('elephant', 1), ('cat', 2)]


In [249]:
assert sorted(wordCount(wordsRDD).collect()) == [('cat', 2), ('elephant', 1), ('rat', 2)]

### Capitalization and punctuation

Real world files are more complicated than the data we have been using in this lab. Some of the issues we have to address are:
  + Words should be counted independent of their capitialization (e.g., Spark and spark should be counted as the same word).
  + All punctuation should be removed.
  + Any leading or trailing spaces on a line should be removed.

Define the function `removePunctuation` that converts all text to lower case, removes any punctuation, and removes leading and trailing spaces.  Use the Python [re](https://docs.python.org/2/library/re.html) module to remove any text that is not a letter, number, or space. 

In [251]:
# TODO: Replace <FILL IN> with appropriate code
import re
import string
def removePunctuation(text):
    """Removes punctuation, changes to lower case, and strips leading and trailing spaces.

    Note:
        Only spaces, letters, and numbers should be retained.  Other characters should should be
        eliminated (e.g. it's becomes its).  Leading and trailing spaces should be removed after
        punctuation is removed.

    Args:
        text (str): A string.

    Returns:
        str: The cleaned up string.
    """
    pattern = re.compile('[%s]' % string.punctuation)
    t =  re.sub(pattern, '', text.lower())
    return re.sub(' +',' ',t).strip()

print removePunctuation('Hi, you!')
print removePunctuation(' No under_score!')
print removePunctuation(' *      Remove punctuation then spaces  * ')

hi you
no underscore
remove punctuation then spaces


In [252]:
assert removePunctuation(" The Elephant's   4 cats.  ") == 'the elephants 4 cats'

### Load a text file

For the next part of this lab, we will use the La Divina Commedia by Dante Alighieri. To convert a text file into an RDD, we use the `SparkContext.textFile()` method. We also apply the recently defined `removePunctuation()` function using a `map()` transformation to strip out the punctuation and change all text to lower case.  Since the file is large we use `take(15)`, so that we only print 15 lines.

In [253]:
danteRDD = sc.textFile('input/divina_commedia.txt').map(removePunctuation)
danteRDD.zipWithIndex().take(15)

[(u'la divina commedia', 0),
 (u'di dante alighieri', 1),
 (u'inferno', 2),
 (u'', 3),
 (u'', 4),
 (u'', 5),
 (u'inferno canto i', 6),
 (u'', 7),
 (u'nel mezzo del cammin di nostra vita', 8),
 (u'mi ritrovai per una selva oscura', 9),
 (u'ch\xe9 la diritta via era smarrita', 10),
 (u'ahi quanto a dir qual era \xe8 cosa dura', 11),
 (u'esta selva selvaggia e aspra e forte', 12),
 (u'che nel pensier rinova la paura', 13),
 (u'tant\xe8 amara che poco \xe8 pi\xf9 morte', 14)]

### Words from lines

Before we can use the `wordcount()` function, we have to address two issues with the format of the RDD:
  + The first issue is that  that we need to split each line by its spaces.
  + The second issue is we need to filter out empty lines.

Apply a transformation that will split each element of the RDD by its spaces. For each element of the RDD, you should apply Python's string [split()](https://docs.python.org/2/library/string.html#string.split) function. You might think that a `map()` transformation is the way to do this, but think about what the result of the `split()` function will be.

In [211]:
# TODO: Replace <FILL IN> with appropriate code
danteWordsRDD = danteRDD.flatMap(lambda x: x.split(' '))
danteWordCount = danteWordsRDD.count()
print danteWordCount

96972


In [214]:
assert danteWordCount == 96972

** Remove empty elements **

The next step is to filter out the empty elements.  Remove all entries where the word is `''`.

In [215]:
# TODO: Replace <FILL IN> with appropriate code
danteWordsRDD2 = danteWordsRDD.filter(lambda x: len(x) > 0)
danteWordsCount2 = danteWordsRDD2.count()
print danteWordsCount2

96561


In [216]:
assert danteWordsCount2 == 96561

### Count the words

We now have an RDD that is only words.  Next, let's apply the `wordCount()` function to produce a list of word counts. We can view the top 25 words by using the `takeOrdered()` action; however, since the elements of the RDD are pairs, we need a custom sort function that sorts using the value part of the pair.

You'll notice that many of the words are common English words. These are called stopwords. In a later lab, we will see how to eliminate them from the results.
Use the `wordCount()` function and `takeOrdered()` to obtain the fifteen most common words and their counts.

In [234]:
top25WordsAndCounts = danteWordsRDD2.map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y).takeOrdered(25, lambda s: -s[1])
print '\n'.join(map(lambda (w, c): u'{0}: {1}'.format(w, c), top25WordsAndCounts))

e: 4038
che: 3716
la: 2361
di: 1970
a: 1899
non: 1456
per: 1384
in: 1103
si: 1060
l: 951
le: 801
sì: 796
li: 781
mi: 741
il: 670
più: 660
de: 659
come: 648
con: 645
da: 642
lo: 591
del: 569
se: 559
è: 542
ma: 508
