<p align="center"><img src="logo/spark.png" alt="Hadoop Logo" width="250"/></p>
# **Lab 1 - Part 2 - Spark**
#### The following steps demonstrate how to develop a simple word count application in Spark.


### ** Part 1: Creating a base RDD and pair RDDs **

We'll start by generating a base RDD by using a list and the `sc.parallelize` method.  Then we'll print out the type of the base RDD.


In [1]:
val wordsList = List("cat", "elephant", "rat", "rat", "cat")
val wordsRDD = sc.parallelize(wordsList)

// Print out the type of wordsRDD
println(wordsRDD.getClass)

class org.apache.spark.rdd.ParallelCollectionRDD




Let's use a `map()` transformation to add the letter 's' to each string in the base RDD we just created. We'll define a function that returns the word with an 's' at the end of the word.

In [2]:
// TODO: Replace <FILL IN> with appropriate code
def makePlural(word: String): String = {
    """Adds an 's' to `word`.
    Note:
        This is a simple function that only adds an 's'.  No attempt is made to follow proper
        pluralization rules.
    Args:
        word (str): A string.
    Returns:
        str: A string with 's' added to it.
    """
    
    word.concat("s")
}

println(makePlural("cat"))

cats




Now pass each item in the base RDD into a `map()` transformation that applies the `makePlural()` function to each element, and then call the `collect()` action to see the transformed RDD.

In [3]:
// TODO: Replace <FILL IN> with appropriate code
val pluralRDD = wordsRDD.map(makePlural)
pluralRDD.collect().foreach(println)

cats
elephants
rats
rats
cats




Let's create the same RDD using a *lambda* function.

In [5]:
// TODO: Replace <FILL IN> with appropriate code
val pluralLambdaRDD = wordsRDD.map(word => word.concat("s"))
pluralLambdaRDD.collect().foreach(println)

cats
elephants
rats
rats
cats




Now use `map()` and a *lambda* function to return the number of characters in each word. We'll `collect` this result directly into a variable.

In [7]:
// TODO: Replace <FILL IN> with appropriate code

val pluralLengths = pluralRDD.map(word => word.length()).collect()
pluralLengths.foreach(println)

4
9
4
4
4




The next step in writing our word counting program is to create a new type of RDD, called a pair RDD. A pair RDD is an RDD where each element is a pair tuple `(k, v)` where `k` is the key and `v` is the value. In this example, we will create a pair consisting of `('<word>', 1)` for each word element in the RDD.  We can create the pair RDD using the `map()` transformation with a *lambda* function to create a new RDD.

In [8]:
// TODO: Replace <FILL IN> with appropriate code
val wordPairs = wordsRDD.map(word => (word, 1))
wordPairs.collect().foreach(println)

(cat,1)
(elephant,1)
(rat,1)
(rat,1)
(cat,1)




### ** Part 2: Counting with pair RDDs **

Now, let's count the number of times a particular word appears in the RDD. There are multiple ways to perform the counting, but some are much less efficient than others. A naive approach would be to `collect()` all of the elements and count them in the driver program. While this approach could work for small datasets, we want an approach that will work for any size dataset including terabyte- or petabyte-sized datasets. In addition, performing all of the work in the driver program is slower than performing it in parallel in the workers. For these reasons, we will use data parallel operations.

Another approach is based on using the `groupByKey()` that groups all the elements of the RDD with the same key into a single list in one of the partitions. There are two problems with using `groupByKey()`: 
   + the operation requires a lot of data movement to move all the values into the appropriate partitions.
   + the lists can be very large and could exhaust the available memory in a worker.
  
Use `groupByKey()` to generate a pair RDD of type `('<word>', iterator)`.

In [9]:
// TODO: Replace <FILL IN> with appropriate code
// Note that groupByKey requires no parameters
val wordsGrouped = wordPairs.groupByKey()
wordsGrouped.collect().foreach(println)

(cat,CompactBuffer(1, 1))
(rat,CompactBuffer(1, 1))
(elephant,CompactBuffer(1))




Using the `groupByKey()` transformation creates an RDD containing 3 elements, each of which is a pair of a word and an iterator. Then, sum the iterator using a `map()` transformation. The result should be a pair RDD consisting of `(word, count)` pairs.

In [25]:
// TODO: Replace <FILL IN> with appropriate code
def sumReducer(tuple:(String,Iterable[Int]) ) : (String, Integer) = {
    val counts:Iterable[Int] = tuple._2
    val sum = counts.foldLeft(0)((x:Int, y:Int) => x+y)
    (tuple._1, sum)
}
val wordCountsGrouped = wordsGrouped.map(sumReducer)
wordCountsGrouped.collect().foreach(println)

(cat,2)
(rat,2)
(elephant,1)




A better approach is to start from the pair RDD and then use the `reduceByKey()` transformation to create a new pair RDD. The `reduceByKey()` transformation gathers together pairs that have the same key and applies the function provided to two values at a time, iteratively reducing all of the values to a single value. `reduceByKey()` operates by applying the function first within each partition on a per-key basis and then across the partitions, allowing it to scale efficiently to large datasets.

In [26]:
// TODO: Replace <FILL IN> with appropriate code
// Note that reduceByKey takes in a function that accepts two values and returns a single value
val wordCounts = wordPairs.reduceByKey((x,y) => x + y)
wordCounts.collect().foreach(println)

(cat,2)
(rat,2)
(elephant,1)




The expert version of the code performs the `map()` to pair RDD, `reduceByKey()` transformation, and `collect()` in one statement.

In [31]:
// TODO: Replace <FILL IN> with appropriate code

val wordCountsCollected = wordsRDD.map(word => (word, 1)).reduceByKey((x,y) => x + y).collect()
wordCountsCollected.foreach(println)

(cat,2)
(rat,2)
(elephant,1)




### ** Part 3: Finding unique words and a mean value **

#### Calculate the number of unique words in `wordsRDD`.  You can use other RDDs that you have already created to make this easier.

In [34]:
// TODO: Replace <FILL IN> with appropriate code

val uniqueWords = wordsRDD.distinct().count
println(uniqueWords)

3




Find the mean number of words per unique word in `wordCounts`. Use a `reduce()` action to sum the counts in `wordCounts` and then divide by the number of unique words.  First `map()` the pair RDD `wordCounts`, which consists of (key, value) pairs, to an RDD of values.

In [38]:
// TODO: Replace <FILL IN> with appropriate code
val totalCount = wordCounts.map(tuple => tuple._2).reduce(_+_)
val average = totalCount / uniqueWords.toFloat

println(totalCount)
println(average)

5
1.6666666




### ** Part 4: Apply word count to a file **

#### In this section we will finish developing our word count application.  First, define a function for word counting.  This function should take in an RDD that is a list of words like `wordsRDD` and return a pair RDD that has all of the words and their associated counts.

In [41]:
// TODO: Replace <FILL IN> with appropriate code
import org.apache.spark.rdd.RDD

def wordCount(wordListRDD: RDD[String]): RDD[(String, Int)] = {
    wordListRDD.map(word => (word, 1)).reduceByKey((x,y) => x + y)
}

wordCount(wordsRDD).collect().foreach(println)

(cat,2)
(rat,2)
(elephant,1)




Real world files are more complicated than the data we have been using in this lab. Some of the issues we have to address are:
  + Words should be counted independent of their capitialization (e.g., "Spark" and "spark" should be counted as the same word).
  + All punctuation should be removed.
  + Any leading or trailing spaces on a line should be removed.
 
Define the function `removePunctuation` that converts all text to lower case, removes any punctuation, and removes leading and trailing spaces.  Use the Python `re` module to remove any text that is not a letter, number, or space.

In [42]:
// Just run this code
import scala.util.matching

def removePunctuation(text: String): String = {
    text.replaceAll("""\p{Punct}|^\s+|\s+$""", "").toLowerCase
}  

println(removePunctuation("Hi, you!"))
println(removePunctuation(" No under_score!"))

hi you
no underscore




For the next part of this lab, we will use the "Complete Works of William Shakespeare", located at `data/story/shakespeare.txt`. To convert a text file into an RDD, we use the `SparkContext.textFile()` method. We also apply the recently defined `removePunctuation()` function using a `map()` transformation to strip out the punctuation and change all text to lowercase.  Since the file is large we use `take(15)`, so that we only print 15 lines.

In [43]:
// Just run this code

val fileName="data/story/shakespeare.txt"

val shakespeareRDD = sc.textFile(fileName, 8).map(removePunctuation)
shakespeareRDD.zipWithIndex().take(15).map(x => (x._2 + 1) + ": " + x._1).foreach(println)

1: 1609
2: 
3: the sonnets
4: 
5: by william shakespeare
6: 
7: 
8: 
9: 1
10: from fairest creatures we desire increase
11: that thereby beautys rose might never die
12: but as the riper should by time decease
13: his tender heir might bear his memory
14: but thou contracted to thine own bright eyes
15: feedst thy lights flame with selfsubstantial fuel




Before we can use the `wordcount()` function, we have to address two issues with the format of the RDD:
  + Split each line by its spaces.
  + Filter out empty lines.
 
Apply a transformation that will split each element of the RDD by its spaces. For each element of the RDD, you should apply `split()` function. You might think that a `map()` transformation is the way to do this, but think about what the result of the `split()` function will be.

In [44]:
// TODO: Replace <FILL IN> with appropriate code
val shakespeareWordsRDD = shakespeareRDD.flatMap(line => line.split(" "))
val shakespeareWordCount = shakespeareWordsRDD.count()

shakespeareWordsRDD.top(5).foreach(println)
println(shakespeareWordCount)

zwaggerd
zounds
zounds
zounds
zounds
927633




The next step is to filter out the empty elements.  Remove all entries where the word is `''`.

In [49]:
// TODO: Replace <FILL IN> with appropriate code
val shakeWordsRDD = shakespeareWordsRDD.filter(word => word.length() != 0)
val shakeWordCount = shakeWordsRDD.count()

println(shakeWordCount)

882996




We now have an RDD that is only words. Next, let's apply the `wordCount()` function to produce a list of word counts. Since the elements of the RDD are pairs, we need a custom sort function that sorts using the value part of the pair. 

In [64]:
// TODO: Replace <FILL IN> with appropriate code
val top15WordsAndCounts = wordCount(shakeWordsRDD)
.reduceByKey(_+_)
.map(pair => (pair._2, pair._1))
.top(15)


top15WordsAndCounts.map(x => x._1 + ": " + x._2).foreach(println)

27361: the
26028: and
20681: i
19150: to
17463: of
14593: a
13615: you
12481: my
10956: in
10890: that
9134: is
8497: not
7771: with
7769: me
7678: it


