<a href="https://cognitiveclass.ai"><img src = "https://ibm.box.com/shared/static/9gegpsmnsoo25ikkbl4qzlvlyjbgxs5x.png" width = 400> </a>

<h1 align = "center"> Spark Fundamentals I - Introduction to Spark </h1>
<h2 align = "center"> Scala - Working with RDD operations </h2>
<br align = "left">

**Related free online courses:**  

Related courses can be found in the following learning paths:

- [Spark Fundamentals path](http://cocl.us/Spark_Fundamentals_Path)
- [Big Data Fundamentals path](http://cocl.us/Big_Data_Fundamentals_Path)

<img src = "http://spark.apache.org/images/spark-logo.png", height = 100, align = 'left'>

### Starting with Spark using Scala

### Run the following lines of code to get the data

In [None]:
// download the required module to run shell commands within the notebook
import sys.process._

In [None]:
// download the data from the IBM Server
// this may take ~30 seconds depending on your internet speed
"wget --quiet https://ibm.box.com/shared/static/j8skrriqeqw66f51iyz911zyqai64j2g.zip" !
println("Data Downloaded!")

Let's unzip the data that we just downloaded into a directory dedicated for this course. Let's choose the directory **/resources/jupyter/labs/BD0211EN/**.

In [None]:
// this may take ~30 seconds depending on your internet speed
"unzip -q -o -d /resources/jupyterlab/labs/BD0211EN/ j8skrriqeqw66f51iyz911zyqai64j2g.zip" !
println("Data Extracted!")

The data is in a folder called **LabData**. Let's list all the files in the data that we just downloaded and extracted.

In [None]:
// list the extracted files
"ls -1 /resources/jupyterlab/labs/BD0211EN/LabData" !

Now we are going to create an RDD file from the file README. This is created using the spark context ".textFile" just as in the previous lab. As we know the initial operation is a transformation, so nothing actually happens. We're just telling it that we want to create a readme RDD. 

Run the code in the following cell. This was an RDD transformation, thus it returned a pointer to a RDD, which we have named as readme. 

In [None]:
val readme = sc.textFile("/resources/jupyterlab/labs/BD0211EN/LabData/README.md")

Let’s perform some RDD actions on this text file. Count the number of items in the RDD using this command:

In [None]:
readme.count()

Let’s run another action. Run this command to find the first item in the RDD:

In [None]:
readme.first()

Now let’s try a transformation. Use the filter transformation to return a new RDD with a subset of the items in the file. Type in this command:

In [None]:
val linesWithSpark = readme.filter(line => line.contains("Spark"))
linesWithSpark.count()

Again, this returned a pointer to a RDD with the results of the filter transformation.

You can even chain together transformations and actions. To find out how many lines contains the word “Spark”, type in:

In [None]:
readme.filter(line => line.contains("Spark")).count()

### More on RDD Operations

This section builds upon the previous section. In this section, you will see that RDD can be used for more complex computations. You will find the line from that readme file with the most words in it.

In [None]:
readme.map(line => line.split(" ").size).
                    reduce((a, b) => if (a > b) a else b)

There are two parts to this. The first maps a line to an integer value, the number of words in that line. In the second part reduce is called to find the line with the most words in it. The arguments to map and reduce are Scala function literals (closures), but you can use any language feature or Scala/Java library.

In the next step, you use the Math.max() function to show that you can indeed use a Java library instead.
Import in the java.lang.Math library:

In [None]:
import java.lang.Math

Now run with the max function:

In [None]:
readme.map(line => line.split(" ").size).
        reduce((a, b) => Math.max(a, b))

Spark has a MapReduce data flow pattern. We can use this to do a word count on the readme file.

In [None]:
val wordCounts = readme.flatMap(line => line.split(" ")).
                        map(word => (word, 1)).
                        reduceByKey((a,b) => a + b)

Here we combined the flatMap, map, and the reduceByKey functions to do a word count of each word in the readme file.

To collect the word counts, use the collect action.

#### It should be noted that the collect function brings all of the data into the driver node. For a small dataset, this isacceptable but, for a large dataset this can cause an Out Of Memory error. It is recommended to use collect() for testing only. The safer approach is to use the take() function e.g. take(n).foreach(println)

In [None]:
wordCounts.collect().foreach(println)

You can also do:


 println(wordCounts.collect().mkString("\n"))
 
 println(wordCounts.collect().deep)


### <span style="color: red">YOUR TURN:</span> 

#### In the cell below, determine what is the most frequent CHARACTER in the README, and how many times was it used?

In [None]:
// WRITE YOUR CODE BELOW


Highlight text for answer:

<textarea rows="6" cols="80" style="color: white">
val wordCounts = readme.flatMap(line => line.split(" ")).
                        map(word => (word, 1)).
                        reduceByKey((a,b) => a + b).
                        reduce((a, b) => if (a._2 > b._2) a else b)

println(wordCounts)
</textarea>

## Analysing a log file

First, let's analyze a log file in the current directory.

In [None]:
val logFile = sc.textFile("/resources/jupyterlab/labs/BD0211EN/LabData/notebook.log")

Filter out the lines that contains INFO (or ERROR, if the particular log has it)

In [None]:
val info = logFile.filter(line => line.contains("INFO"))

Count the lines:

In [None]:
info.count()

Count the lines with Spark in it by combining transformation and action.

In [None]:
info.filter(line => line.contains("spark")).count()

Fetch those lines as an array of Strings

In [None]:
info.filter(line => line.contains("spark")).collect() foreach println

Remember that we went over the DAG. It is what provides the fault tolerance in Spark. Nodes can re-compute its state by borrowing the DAG from a neighboring node. You can view the graph of an RDD using the toDebugString command.

In [None]:
println(info.toDebugString)

## Joining RDDs

Next, you are going to create RDDs for the README and the POM file in the current directory.

In [None]:
val readmeFile = sc.textFile("/resources/jupyterlab/labs/BD0211EN/LabData/README.md")
val pom = sc.textFile("/resources/jupyterlab/labs/BD0211EN/LabData/pom.xml")

How many Spark keywords are in each file?

In [None]:
println(readmeFile.filter(line => line.contains("Spark")).count())
println(pom.filter(line => line.contains("Spark")).count())

Now do a WordCount on each RDD so that the results are (K,V) pairs of (word,count)

In [None]:
val readmeCount = readmeFile.
                    flatMap(line => line.split(" ")).
                    map(word => (word, 1)).
                    reduceByKey(_ + _)

val pomCount = pom.
                flatMap(line => line.split(" ")).
                map(word => (word, 1)).
                reduceByKey(_ + _)

To see the array for either of them, just call the collect function on it.

In [None]:
println("Readme Count\n")
readmeCount.collect() foreach println

In [None]:
println("Pom Count\n")
pomCount.collect() foreach println

Now let's join these two RDDs together to get a collective set. The join function combines the two datasets (K,V) and (K,W) together and get (K, (V,W)). Let's join these two counts together and then cache it.

In [None]:
val joined = readmeCount.join(pomCount)
joined.cache()

Let's see what's in the joined RDD.

In [None]:
joined.collect.foreach(println)

Let's combine the values together to get the total count. The operations in this command tells Spark to combine the values from (K,V) and (K,W) to give us(K, V+W). The ._ notation is a way to access the value on that particular index of the key value pair.

In [None]:
val joinedSum = joined.map(k => (k._1, (k._2)._1 + (k._2)._2))
joinedSum.collect() foreach println

To check if it is correct, print the first five elements from the joined and the joinedSum RDD

In [None]:
println("Joined Individial\n")
joined.take(5).foreach(println)

println("\n\nJoined Sum\n")
joinedSum.take(5).foreach(println)

## Shared variables

Broadcast variables allow the programmer to keep a read-only variable cached on each worker node rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. After the broadcast variable is created, it should be used instead of the value v in any functions run on the cluster so that v is not shipped to the nodes more than once. In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later).

Read more here: [http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables](http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables)

Let's create a broadcast variable:

In [None]:
val broadcastVar = sc.broadcast(Array(1,2,3))

To get the value, type in:

In [None]:
broadcastVar.value

Accumulators are variables that can only be added through an associative operation. It is used to implement counters and sum efficiently in parallel. Spark natively supports numeric type accumulators and standard mutable collections. Programmers can extend these for new types. Only the driver can read the values of the accumulators. The workers can only invoke it to increment the value.

Create the accumulator variable. Type in:

In [None]:
val accum = sc.accumulator(0)

Next parallelize an array of four integers and run it through a loop to add each integer value to the accumulator variable. Type in:

In [None]:
sc.parallelize(Array(1,2,3,4)).foreach(x => accum += x)

To get the current value of the accumulator variable, type in:

In [None]:
accum.value

You should get a value of 10.
This command can only be invoked on the driver side. The worker nodes can only increment the accumulator.

## Key-value pairs

You have already seen a bit about key-value pairs in the Joining RDD section. Here is a brief example of how to create a key-value pair and access its values. Remember that certain operations such as map and reduce only works on key-value pairs.

Create a key-value pair of two characters. Type in:

In [None]:
val pair = ('a', 'b')

To access the value of the first index using the *._1* method and *._2* method for the 2nd.

In [None]:
pair._1

In [None]:
pair._2

## Sample Application

In this section, you will be using a subset of a data for taxi trips that will determine the top 10 medallion numbers based on the number of trips.

In [None]:
val taxi = sc.textFile("/resources/jupyterlab/labs/BD0211EN/LabData/nyctaxi.csv")

To view the five rows of content, invoke the take function. Type in:

In [None]:
taxi.take(5).foreach(println)

Note that the first line is the headers. Normally, you would want to filter that out, but since it will not affect our results, we can leave it in.

To parse out the values, including the medallion numbers, you need to first create a new RDD by splitting the lines of the RDD using the comma as the delimiter. Type in:

In [None]:
val taxiParse = taxi.map(line=>line.split(","))

Now create the key-value pairs where the key is the medallion number and the value is 1. We use this model to later sum up all the keys to find out the number of trips a particular taxi took and in particular, will be able to see which taxi took the most trips. Map each of the medallions to the value of one. Type in:

In [None]:
val taxiMedKey = taxiParse.map(vals=>(vals(6), 1))

vals(6) corresponds to the column where the medallion key is located

Next use the reduceByKey function to count the number of occurrence for each key.

In [None]:
val taxiMedCounts = taxiMedKey.reduceByKey((v1,v2)=>v1+v2)

taxiMedCounts.take(5).foreach(println)

Finally, the values are swapped so they can be ordered in descending order and the results are presented correctly.

In [None]:
for (pair <-taxiMedCounts.map(_.swap).top(10)) println("Taxi Medallion %s had %s Trips".format(pair._2, pair._1))

While each step above was processed one line at a time, you can just as well process everything on one line:

In [None]:
val taxiMedCountsOneLine = taxi.map(line=>line.split(',')).map(vals=>(vals(6),1)).reduceByKey(_ + _)

Run the same line as above to print the taxiMedCountsOneLine RDD.

In [None]:
for (pair <-taxiMedCountsOneLine.map(_.swap).top(10)) println("Taxi Medallion %s had %s Trips".format(pair._2, pair._1))

Let's cache the taxiMedCountsOneLine to see the difference caching makes. Run it with the logs set to INFO and you can see the output of the time it takes to execute each line. First, let's cache the RDD

In [None]:
taxiMedCountsOneLine.cache()

Next, you have to invoke an action for it to actually cache the RDD. Note the time it takes here (either empirically using the INFO log or just notice the time it takes)

In [None]:
taxiMedCountsOneLine.count()

Run it again to see the difference.

In [None]:
taxiMedCountsOneLine.count()

The bigger the dataset, the more noticeable the difference will be. In a sample file such as ours, the difference may be negligible.

<div class="alert alert-success alertsuccess" style="margin-top: 20px">
**Tip**: Enjoyed using Jupyter notebooks with Spark? Get yourself a free 
    <a href="http://cocl.us/DSX_on_Cloud">IBM Cloud</a> account where you can use Data Science Experience notebooks
    and have *two* Spark executors for free!
</div>

### Summary
Having completed this exercise, you should now be able to describe Spark’s primary data abstraction, understand how to create parallelized collections and external datasets, work with Resilient Distributed Dataset (RDD) operations, and utilize shared variables and key-value pairs.

This notebook is part of the free course on **Cognitive Class** called *Spark Fundamentals I*. If you accessed this notebook outside the course, you can take this free self-paced course, online by going to: http://cocl.us/Spark_Fundamentals_I

### About the Authors:  
Hi! It's [Alex Aklson](https://www.linkedin.com/in/aklson/), one of the authors of this notebook. I hope you found this lab educational! There is much more to learn about Spark but you are well on your way. Feel free to connect with me if you have any questions.
<hr>