#![Spark Logo](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png) + ![Python Logo](http://spark-mooc.github.io/web-assets/images/python-logo-master-v3-TM-flattened_small.png)
# **Word Count Lab: Building a word count application**
#### This lab will build on the techniques covered in the Spark tutorial to develop a simple word count application.  The volume of unstructured text in existence is growing dramatically, and Spark is an excellent tool for analyzing this type of data.  In this lab, we will write code that calculates the most common words in the [Complete Works of William Shakespeare](http://www.gutenberg.org/ebooks/100) retrieved from [Project Gutenberg](http://www.gutenberg.org/wiki/Main_Page).  This could also be scaled to find the most common words on the Internet.
#### ** During this lab we will cover: **
#### *Part 1:* Creating a base RDD and pair RDDs
#### *Part 2:* Counting with pair RDDs
#### *Part 3:* Finding unique words and a mean value
#### *Part 4:* Apply word count to a file
#### Note that, for reference, you can look up the details of the relevant methods in [Spark's Python API](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD)

### ** Part 1: Creating a base RDD and pair RDDs **

#### In this part of the lab, we will explore creating a base RDD with `parallelize` and using pair RDDs to count words.

#### ** (1a) Create a base RDD **

In [1]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)

#### ** (1b) Pluralize and test **

In [2]:
def makePlural(word):
    return word+'s'

print makePlural('cat')

cats


#### ** (1c) Apply `makePlural` to the base RDD **
#### Now pass each item in the base RDD into a [map()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.map) transformation that applies the `makePlural()` function to each element. And then call the [collect()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.collect) action to see the transformed RDD.

In [3]:
pluralRDD = wordsRDD.map(makePlural)
print pluralRDD.collect()

['cats', 'elephants', 'rats', 'rats', 'cats']


#### ** (1d) Pass a `lambda` function to `map` **
#### Let's create the same RDD using a `lambda` function.

In [4]:
pluralLambdaRDD = wordsRDD.map(lambda x: x+'s')
print pluralLambdaRDD.collect()

['cats', 'elephants', 'rats', 'rats', 'cats']


#### ** (1e) Length of each word **
#### Now use `map()` and a `lambda` function to return the number of characters in each word.  We'll `collect` this result directly into a variable.

In [5]:
pluralLengths = (pluralRDD.
                 map(lambda x: len(x))
                 .collect())
print pluralLengths

[4, 9, 4, 4, 4]


#### ** (1f) Pair RDDs **
#### The next step in writing our word counting program is to create a new type of RDD, called a pair RDD. A pair RDD is an RDD where each element is a pair tuple `(k, v)` where `k` is the key and `v` is the value. In this example, we will create a pair consisting of `('<word>', 1)` for each word element in the RDD.
#### We can create the pair RDD using the `map()` transformation with a `lambda()` function to create a new RDD.

In [6]:
wordPairs = wordsRDD.map(lambda x: (x,1))
print wordPairs.collect()

[('cat', 1), ('elephant', 1), ('rat', 1), ('rat', 1), ('cat', 1)]


### ** Part 2: Counting with pair RDDs **

#### Now, let's count the number of times a particular word appears in the RDD. There are multiple ways to perform the counting, but some are much less efficient than others.

#### ** (2a) Counting using `reduceByKey` **
#### A better approach is to start from the pair RDD and then use the [reduceByKey()](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey) transformation to create a new pair RDD. The `reduceByKey()` transformation gathers together pairs that have the same key and applies the function provided to two values at a time, iteratively reducing all of the values to a single value. `reduceByKey()` operates by applying the function first within each partition on a per-key basis and then across the partitions, allowing it to scale efficiently to large datasets.

In [7]:
# Note that reduceByKey takes in a function that accepts two values and returns a single value
wordCounts = wordPairs.reduceByKey(lambda a,b:a+b)
print wordCounts.collect()
print type(wordCounts)

[('rat', 2), ('elephant', 1), ('cat', 2)]
<class 'pyspark.rdd.PipelinedRDD'>


#### ** (2b) All together **
#### The expert version of the code performs the `map()` to pair RDD, `reduceByKey()` transformation, and `collect` in one statement.

In [8]:
wordCountsCollected = (wordsRDD
                       .map(lambda x:(x,1))
                       .reduceByKey(lambda a,b:a+b)
                       .collect())
print wordCountsCollected

[('rat', 2), ('elephant', 1), ('cat', 2)]


### ** Part 3: Finding unique words and a mean value **

#### ** (3a) Unique words **
#### Calculate the number of unique words in `wordsRDD`.  You can use other RDDs that you have already created to make this easier.

In [9]:
# TODO: Replace <FILL IN> with appropriate code
uniqueWords = len(wordCountsCollected)
print uniqueWords
print ( type( wordsRDD ) )

print ( type( wordCountsCollected ) )

3
<class 'pyspark.rdd.RDD'>
<type 'list'>


#### ** (3b) Mean using `reduce` **
#### Find the mean number of words per unique word in `wordCounts`.
#### Use a `reduce()` action to sum the counts in `wordCounts` and then divide by the number of unique words.  First `map()` the pair RDD `wordCounts`, which consists of (key, value) pairs, to an RDD of values.

In [12]:
# TODO: Replace <FILL IN> with appropriate code
from operator import add
totalCount = (wordCounts
              .map(lambda (k,v):v)
              .reduce(add))
average = totalCount / float(uniqueWords)
print totalCount
print round(average, 2)

5
1.67


In [11]:
wordCounts.collect()

[('rat', 2), ('elephant', 1), ('cat', 2)]

### ** Part 4: Apply word count to a file **

#### In this section we will finish developing our word count application.  We'll have to build the `wordCount` function, deal with real world problems like capitalization and punctuation, load in our data source, and compute the word count on the new data.

#### ** (4a) `wordCount` function **
#### First, define a function for word counting.  You should reuse the techniques that have been covered in earlier parts of this lab.  This function should take in an RDD that is a list of words like `wordsRDD` and return a pair RDD that has all of the words and their associated counts.

In [14]:
# TODO: Replace <FILL IN> with appropriate code
def wordCount(wordListRDD):
    return wordListRDD.map(lambda x: (x,1)).reduceByKey(add)

print wordCount(wordsRDD).collect()

[('rat', 2), ('elephant', 1), ('cat', 2)]


#### ** (4b) Capitalization and punctuation **
#### Real world files are more complicated than the data we have been using in this lab. Some of the issues we have to address are:
  + #### Words should be counted independent of their capitialization (e.g., Spark and spark should be counted as the same word).
  + #### All punctuation should be removed.
  + #### Any leading or trailing spaces on a line should be removed.
 
#### Define the function `removePunctuation` that converts all text to lower case, removes any punctuation, and removes leading and trailing spaces.  Use the Python [re](https://docs.python.org/2/library/re.html) module to remove any text that is not a letter, number, or space. Reading `help(re.sub)` might be useful.

In [13]:
# TODO: Replace <FILL IN> with appropriate code
import re
def removePunctuation(text):
    """Removes punctuation, changes to lower case, and strips leading and trailing spaces.

    Note:
        Only spaces, letters, and numbers should be retained.  Other characters should should be
        eliminated (e.g. it's becomes its).  Leading and trailing spaces should be removed after
        punctuation is removed.

    Args:
        text (str): A string.

    Returns:
        str: The cleaned up string.
    """
##return re.sub(r"[']*[/w*!_,.-]",'',text).strip().lower()
    regex= re.sub(r'[^\sa-zA-Z\d]','',text)
    return regex.strip().lower()
print removePunctuation('Hi, you!')
print removePunctuation(' No under_score!')

hi you
no underscore


#### ** (4c) Load a text file **
#### For the next part of this lab, we will use the [Complete Works of William Shakespeare](http://www.gutenberg.org/ebooks/100) from [Project Gutenberg](http://www.gutenberg.org/wiki/Main_Page). To convert a text file into an RDD, we use the `SparkContext.textFile()` method. We also apply the recently defined `removePunctuation()` function using a `map()` transformation to strip out the punctuation and change all text to lowercase.  Since the file is large we use `take(15)`, so that we only print 15 lines.

In [15]:
# Just run this code
fileName = "/home/vagrant/data/cs100/lab1/shakespeare.txt"
shakespeareRDD = (sc
                  .textFile(fileName, 8)
                  .map(removePunctuation))
print '\n'.join(shakespeareRDD
                .zipWithIndex()  # to (line, lineNum)
                .map(lambda (l, num): '{0}: {1}'.format(num, l))  # to 'lineNum: line'
                .take(15))

0: 1609
1: 
2: the sonnets
3: 
4: by william shakespeare
5: 
6: 
7: 
8: 1
9: from fairest creatures we desire increase
10: that thereby beautys rose might never die
11: but as the riper should by time decease
12: his tender heir might bear his memory
13: but thou contracted to thine own bright eyes
14: feedst thy lights flame with selfsubstantial fuel


#### ** (4d) Words from lines **
#### Before we can use the `wordcount()` function, we have to address two issues with the format of the RDD:
  + #### The first issue is that  that we need to split each line by its spaces.
  + #### The second issue is we need to filter out empty lines.
 
#### Apply a transformation that will split each element of the RDD by its spaces. For each element of the RDD, you should apply Python's string [split()](https://docs.python.org/2/library/string.html#string.split) function. You might think that a `map()` transformation is the way to do this, but think about what the result of the `split()` function will be.

In [17]:
# TODO: Replace <FILL IN> with appropriate code
shakespeareWordsRDD = shakespeareRDD.flatMap(lambda x:x.split(" "))
shakespeareWordCount = shakespeareWordsRDD.count()
print type(shakespeareWordsRDD)
print shakespeareRDD.top(5)
print shakespeareWordsRDD.top(10)
print shakespeareWordCount

<class 'pyspark.rdd.PipelinedRDD'>
[u'zounds i will speak of him and let my soul', u'zounds i was never so bethumpd with words', u'zounds i lie for they pray continually to their saint the', u'zeal and obedience he still bore your grace', u'youths a stuff will not endure']
[u'zwaggerd', u'zounds', u'zounds', u'zounds', u'zounds', u'zounds', u'zounds', u'zounds', u'zounds', u'zounds']
927631


#### ** (4e) Remove empty elements **
#### The next step is to filter out the empty elements.  Remove all entries where the word is `''`.

In [18]:
# TODO: Replace <FILL IN> with appropriate code
shakeWordsRDD = shakespeareWordsRDD.filter(lambda x:x!='')
shakeWordCount = shakeWordsRDD.count()
print shakeWordCount
print shakeWordsRDD.top(10)

882996
[u'zwaggerd', u'zounds', u'zounds', u'zounds', u'zounds', u'zounds', u'zounds', u'zounds', u'zounds', u'zounds']


#### ** (4f) Count the words **
#### We now have an RDD that is only words.  Next, let's apply the `wordCount()` function to produce a list of word counts. We can view the top 15 words by using the `takeOrdered()` action; however, since the elements of the RDD are pairs, we need a custom sort function that sorts using the value part of the pair.
#### You'll notice that many of the words are common English words. These are called stopwords. In a later lab, we will see how to eliminate them from the results.
#### Use the `wordCount()` function and `takeOrdered()` to obtain the fifteen most common words and their counts.

In [19]:
# TODO: Replace <FILL IN> with appropriate code
top15WordsAndCounts = wordCount(shakeWordsRDD).takeOrdered(15,lambda (x,y):-1*y)
print '\n'.join(map(lambda (w, c): '{0}: {1}'.format(w, c), top15WordsAndCounts))

the: 27361
and: 26028
i: 20681
to: 19150
of: 17463
a: 14593
you: 13615
my: 12481
in: 10956
that: 10890
is: 9134
not: 8497
with: 7771
me: 7769
it: 7678
