<a href="https://cognitiveclass.ai"><img src = "https://s3-api.us-geo.objectstorage.softlayer.net/cf-courses-data/CognitiveClass/Logos/organization_logo/organization_logo.png" width = 400> </a>

<h1 align = "center"> Spark Fundamentals I - Introduction to Spark </h1>
<h2 align = "center"> Python - Working with RDD operations </h2>
<br align = "left">

**Related free online courses:**  

Related courses can be found in the following learning paths:

- [Spark Fundamentals path](http://cocl.us/Spark_Fundamentals_Path)
- [Big Data Fundamentals path](http://cocl.us/Big_Data_Fundamentals_Path)

<img src="http://spark.apache.org/images/spark-logo.png" height=100>

## Analyzing a log file

First let's download the tools that we need to use Spark in SN Labs.

In [None]:
!pip install findspark
!pip install pyspark
import findspark
import pyspark
findspark.init()
sc = pyspark.SparkContext.getOrCreate()

If you completed the **Getting Started** lab, then you should have the data downloaded and unzipped in the */resources/jupyterlab/labs/BD0211EN/LabData/* directory. Otherwise, please uncomment **the last two lines of code** in each of the following cells to download and unzip the data.

In [None]:
## download the data from the IBM server
## this may take ~30 seconds depending on your interent speed

#!wget --quiet https://cocl.us/BD0211EN_Data
#print("Data Downloaded!")

In [None]:
## unzip the folder's content into "resources" directory
## this may take ~30 seconds depending on your internet speed

#!unzip -q -o -d /resources/jupyterlab/labs/BD0211EN/ BD0211EN_Data
#print("Data Extracted!")

In [None]:
# list the extracted files
!ls -1 /resources/jupyterlab/labs/BD0211EN/LabData/

Now, let's create an RDD by loading the log file that we analyze in the Scala version of this lab.

In [None]:
logFile = sc.textFile("/resources/jupyterlab/labs/BD0211EN/LabData/notebook.log")

### <span style="color: red">YOUR TURN:</span> 

#### In the cell below, filter out the lines that contains INFO

In [None]:
# WRITE YOUR CODE BELOW




Double-click __here__ for the solution.
<!-- The correct answer is:
info = logFile.filter(lambda line: "INFO" in line)
-->

#### Count the lines:

In [None]:
# WRITE YOUR CODE BELOW




Double-click __here__ for the solution.
<!-- The correct answer is:
info.count()
-->

#### Count the lines with "spark" in it by combining transformation and action.

In [None]:
# WRITE YOUR CODE BELOW




Double-click __here__ for the solution.
<!-- The correct answer is:
info.filter(lambda line: "spark" in line).count()
-->

#### Fetch those lines as an array of Strings

In [None]:
# WRITE YOUR CODE BELOW




Double-click __here__ for the solution.
<!-- The correct answer is:
info.filter(lambda line: "spark" in line).collect()
-->

View the graph of an RDD using this command:

In [None]:
print(info.toDebugString())

## Joining RDDs

Next, you are going to create RDDs for the same README and the POM files that we used in the Scala version. 

In [None]:
readmeFile = sc.textFile("/resources/jupyterlab/labs/BD0211EN/LabData/README.md")
pomFile = sc.textFile("/resources/jupyterlab/labs/BD0211EN/LabData/pom.xml")

How many Spark keywords are in each file?

In [None]:
print(readmeFile.filter(lambda line: "Spark" in line).count())
print(pomFile.filter(lambda line: "Spark" in line).count())

Now do a WordCount on each RDD so that the results are (K,V) pairs of (word,count)

In [None]:
readmeCount = readmeFile.                    \
    flatMap(lambda line: line.split("   ")).   \
    map(lambda word: (word, 1)).             \
    reduceByKey(lambda a, b: a + b)
    
pomCount = pomFile.                          \
    flatMap(lambda line: line.split("   ")).   \
    map(lambda word: (word, 1)).            \
    reduceByKey(lambda a, b: a + b)

To see the array for either of them, just call the collect function on it.

In [None]:
print("Readme Count\n")
print(readmeCount.collect())

In [None]:
print("Pom Count\n")
print(pomCount.collect())

The join function combines the two datasets (K,V) and (K,W) together and get (K, (V,W)). Let's join these two counts together.

In [None]:
joined = readmeCount.join(pomCount)

Print the value to the console

In [None]:
joined.collect()

Let's combine the values together to get the total count

In [None]:
joinedSum = joined.map(lambda k: (k[0], (k[1][0]+k[1][1])))

To check if it is correct, print the first five elements from the joined and the joinedSum RDD

In [None]:
print("Joined Individial\n")
print(joined.take(5))

print("\n\nJoined Sum\n")
print(joinedSum.take(5))

## Shared variables

Normally, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of shared variables for two common usage patterns: broadcast variables and accumulators.

### Broadcast variables

Broadcast variables are useful for when you have a large dataset that you want to use across all the worker nodes. A read-only variable is cached on each machine rather than shipping a copy of it with tasks. Spark actions are executed through a set of stages, separated by distributed “shuffle” operations. Spark automatically broadcasts the common data needed by tasks within each stage.


Read more here: [http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables](http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables)

Create a broadcast variable. Type in:

In [None]:
broadcastVar = sc.broadcast([1,2,3])

To get the value, type in:

In [None]:
broadcastVar.value

### Accumulators

Accumulators are variables that can only be added through an associative operation. It is used to implement counters and sum efficiently in parallel. Spark natively supports numeric type accumulators and standard mutable collections. Programmers can extend these for new types. Only the driver can read the values of the accumulators. The workers can only invoke it to increment the value.

Create the accumulator variable. Type in:

In [None]:
accum = sc.accumulator(0)

Next parallelize an array of four integers and run it through a loop to add each integer value to the accumulator variable. Type in:

In [None]:
rdd = sc.parallelize([1,2,3,4])
def f(x):
    global accum
    accum += x

Next, iterate through each element of the rdd and apply the function f on it:

In [None]:
rdd.foreach(f)

To get the current value of the accumulator variable, type in:

In [None]:
accum.value

You should get a value of 10.

This command can only be invoked on the driver side. The worker nodes can only increment the accumulator.


## Key-value pairs

You have already seen a bit about key-value pairs in the Joining RDD section.

Create a key-value pair of two characters. Type in:

In [None]:
pair = ('a', 'b')

To access the value of the first index use [0] and [1] method for the 2nd.

In [None]:
print(pair[0])
print(pair[1])

<div class="alert alert-success alertsuccess" style="margin-top: 20px">
    <strong>Tip</strong>: Enjoyed using Jupyter notebooks with Spark? Get yourself a free 
    <a href="http://cocl.us/DSX_on_Cloud">IBM Cloud</a> account where you can use Data Science Experience notebooks
    and have <em>two</em> Spark executors for free!
</div>

### Summary
Having completed this exercise, you should now be able to describe Spark’s primary data abstraction, work with Resilient Distributed Dataset (RDD) operations, and utilize shared variables and key-value pairs.

This notebook is part of the free course on **cognitiveclass.ai** called *Spark Fundamentals I*. If you accessed this notebook outside the course, you can take this free self-paced course, online by going to: http://cocl.us/Spark_Fundamentals_I

### About the Authors:  
Hi! It's Alex Aklson, one of the authors of this notebook. I hope you found this lab educational! There is much more to learn about Spark but you are well on your way. Feel free to connect with me if you have any questions.
<hr>