# WordCount Step-by-Step using Spark

Spark is a "functional programming" tool. What this means for us each line of code operates on the entire data set and transforms it in some way, returning the entire data set when finished. In order to illustrate the transformations that happen, we will read a large text file (the works of Shakespeare appended to itself several times) and show how we manipulate the data to arrive at a count of words in the file.


Even though Jupyter notebooks can be setup to have a Spark "kernel" allowing us to create Spark notebooks, setting up the notebook server to find and use spark is... difficult. 

Thus enter the **findspark** library. If we tell it where our Spark directory is, it will find Spark and return the starting object we work from -- the "spark context" usually abbreviated "sc."

In [1]:
import findspark
#findspark.init('/usr/hdp/3.0.1.0-187/spark2')
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName="WordCount")

Now that we have the spark context, let's use it to load our text data file from HDFS. This stores the data into a data structure called a **Resilient Distributed Dataset (RDD).** 

In [6]:
#text_file = sc.textFile("hdfs:///user/vagrant/data/input/shake_large.txt")
text_file = sc.textFile("file:///Users/jerem/OneDrive/Documents/School/_REGIS/2022-05_Summer/MSDS610/Week78-project/202201-MCSC600-Intro-DS.txt")

# Check that text_file is an RDD
from pyspark.rdd import RDD
isinstance(text_file, RDD)

True

### Always, _Always_, **ALWAYS!**  look at your data!

Spark doesn't *actually* load the entire data set when you evaluate the cell above. It waits until it absolutely has to. In order to look at the first few lines of the file you need to use the **take()** command:

In [7]:
text_file.take(5)

['Intro to Data Science',
 'Notes',
 '20220116',
 '\tWeek 1 Presentation',
 '\t\tPeople / Directory of Names']

## Strategy ##

Recall that in traditional MapReduce WordCount, the map() function will emit single words paired with the number 1. The reduce() then collects all of those words and sums them up, outputting the sum for each word into the output file(s). 

Right now, however, we do not have single words, we have lines of multiple words. So the first step of our strategy is to break the lines into individual words. Then we can pair each word with a 1 and finally sum the counts for each word.

Our goal going into the reduce() is to have something like this:

`[('A', 1), ('MIDSUMMER-NIGHT'S', 1), ('DREAM', 1), ('Now',1), ... ]`

### 1) Break the lines into individual words
For this step we use the map() function of RDDs. Recall that the map() function takes 2 arguments:
* The function to apply to the data set
* The data itself
* Since our data is strings, we can do several "clean up" functions at once
    * .lower() to make everything lower case
    * .replace(',',' ') to replace commas with spaces
In this case, the RDD word_list is the data set so we just need to provide a function. It is **very** common to use a **"lambda function."** Lambda functions are just little one-liners like below:

In [8]:
# x is an element of the data set - in this case a line of text
word_list = text_file.map(lambda x: x.lower().replace(',',' ').split()) 
# BTW, there's nothing "magic" about x. it is just a variable. you can name it anything
word_list.take(5)

[['intro', 'to', 'data', 'science'],
 ['notes'],
 ['20220116'],
 ['week', '1', 'presentation'],
 ['people', '/', 'directory', 'of', 'names']]

### A list of... lists?
The output of the last map command is a little surprising. **What happened?**

Recall that map() applies the function to each "piece" of data. In this case, each piece of data is a *line of text* and the function, split(), takes a line of text and **returns a list of individual words.** 

Our first transformation gave us individual words but they are wrapped in lists that we don't need. Fortunately, we have a way to deal with it. 

A **list of lists** is called **nested lists.** If you convert nested lists into a single list, you **flatten** it. 

Spark has a function that will act like map() and also flatten nested lists. It is called **flatmap().**

Let's see if we can use flatmap() to output our (word, 1) pairs. BTW, the parentheses on the **(**word,1**)** pair means we are grouping each word with the number 1 (technically, this is called a "tuple").

In [9]:
word_tuple = word_list.flatMap(lambda wordlist: [(word, 1) for word in wordlist])
word_tuple.take(5)

[('intro', 1), ('to', 1), ('data', 1), ('science', 1), ('notes', 1)]

### OK, that is... complicated

Let's look at that last line in detail.

* `flatmap()` helps to flatten out the nested lists, and it takes a function just like `map()`. That's where the lambda comes in,
* On this lambda I called the incoming data **wordlist** just to help distinguish it.

"OK," I hear you saying, "but what the heck is with the brackets and `for` thingy?"

I'm glad you asked! The brackets make a **list comprehension**. This is a fancy, one-line version of a for loop.

Observe, in regular Python:

Notice the list comprehension gave us output in a list. The `flatmap()` function helped keep from nesting more lists.

## Next up: reduce()

"Reduce" in the MapReduce sense of the word means **to gather** or **to sum up.** 

You might remember that MapReduce has an *intermediate* shuffle/sort step that helps group all the occurrences of a word before going in to reduce. It effectivel turns this:

`('the', 1), ('the', 1), ('the', 1), ('the', 1), ('the', 1)`

into this:

`('the', 1,1,1,1,1)`

An interesting way of thinking of each tuple (thing with parentheses) is as a (key, value). So, above, each 'the' would be a key and each 1 is a value. Spark has a cool function called `reduceByKey()` that helps us:

In [10]:
word_counts = word_tuple.reduceByKey(lambda total, count: total + count)
word_counts.take(5)

[('science', 14), ('notes', 1), ('20220116', 1), ('week', 21), ('1', 10)]

## Results

That operation seems to have done exactly what we want. **BUT** even though that operation may have taken a long time, that's just because of the .take(). Normally, the full set of transformations would are not applied until a `reduce()` -type action is taken. Also a call to `collect()` will finalize transformations and collect the results *(can be used when you don't need a reduce but need finalize before (for example) writing to an output file)*. Let's check how many elements are in word_counts:

In [11]:
word_counts.count()

1556

## Saving

RDDs can be saved back to disk very easily. Just use the `saveAsTextFile()` function.

In [12]:
word_counts.collect()

[('science', 14),
 ('notes', 1),
 ('20220116', 1),
 ('week', 21),
 ('1', 10),
 ('presentation', 3),
 ('directory', 1),
 ('of', 83),
 ('names', 1),
 ('is', 42),
 ('just', 4),
 ('cleaning', 4),
 ('volume(size)', 1),
 ('process', 3),
 ('ask', 1),
 ('model', 21),
 ('communicate', 1),
 ('python', 20),
 ('r', 3),
 ('steps', 5),
 ('(“first', 1),
 ('steps”', 1),
 ('“alternatives', 1),
 ('strengthen', 1),
 ('foundations”', 1),
 ('shy', 1),
 ('take', 1),
 ('ch.', 3),
 ('algebra', 1),
 ('statistical', 1),
 ('graph', 5),
 ('machine', 11),
 ('learning', 23),
 ('business', 3),
 ('storage', 1),
 ('retrieval.', 1),
 ('awesome', 1),
 ('basically', 1),
 ('rest', 1),
 ('was', 1),
 ('installing', 1),
 ('everything', 1),
 ('getting', 1),
 ('jupyter', 4),
 ('pfdsfd', 1),
 ('ch', 3),
 ('capture', 1),
 ('friday', 4),
 ('even', 1),
 ('sets', 5),
 ('are', 21),
 ('different', 14),
 ('extraction', 1),
 ('scientists', 1),
 ('preparation', 4),
 ('outlier', 1),
 ('boundary', 3),
 ('=', 20),
 ('lower', 2),
 ('q1', 1)

In [15]:
word_counts.saveAsTextFile('spark_wordcount-jab-example.txt')

Py4JJavaError: An error occurred while calling o122.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/C:/Users/jerem/OneDrive/Documents/School/_REGIS/2022-05_Summer/MSDS610/Week78-project/spark_wordcount-jab-example.txt already exists
	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:299)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1091)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1089)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1062)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1027)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1009)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1008)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:965)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:963)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1599)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1599)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1585)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1585)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile(JavaRDDLike.scala:564)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile$(JavaRDDLike.scala:563)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)


# But Wait! There's More!

Spark commands are meant to be **"chained"** together -- in other words, the output from one function call is immediately used as input for the next function call.

Let's see how chaining would work on our WordCount example. Remember, the variable **text_file** holds the contents of our source data file.

Here is the code, with comments:

`text_file.map(lambda x: x.split()).  \ # We put a "." after the map() to "chain" results -- \ is a line continuation
            flatMap(lambda wordlist: [(word, 1) for word in wordlist]). \
            reduceByKey(lambda total, count: total + count). \
            take(5) `

In [None]:
text_file.map(lambda x: x.split()).      \
            flatMap(lambda wordlist: [(word, 1) for word in wordlist]). \
            reduceByKey(lambda total, count: total + count). \
            take(5) 