<h1 align = "center"> Spark Fundamentals 1 - Introduction to Spark </h1>
<h2 align = "center"> Python - Working with RDD operations </h2>
<h4 align = "center"> January 11, 2016 </h4>
<br align = "left">

**Related free online courses:**  
- [Spark Fundamentals II](http://bigdatauniversity.com/bdu-wp/bdu-course/spark-fundamentals-ii/)  
- [Data Analysis using R](https://bigdatauniversity.com/bdu-wp/bdu-course/introduction-to-data-analysis-using-r/)  
- [Big Data Fundamentals](http://bigdatauniversity.com/bdu-wp/bdu-course/big-data-fundamentals/)  

<img src = "http://spark.apache.org/images/spark-logo.png", height = 100, align = 'left'>
<img src = "https://upload.wikimedia.org/wikipedia/commons/f/f8/Python_logo_and_wordmark.svg", height = 95, align = 'left'>


## Analyzing a log file

First, create an RDD by loading in a log file:

In [1]:
logFile = sc.textFile("/resources/LabData/notebook.log")

### <span style="color: red">YOUR TURN:</span> 

#### In the cell below, Filter out the lines that contains INFO

In [3]:
logFile.filter(lambda line: "INFO" in line).count()


13438

13438

#### Count the lines:

In [None]:
#YOUR CODE HERE #


Highlight text for answer:

<textarea rows="3" cols="80" style="color: white">
info.count()
</textarea>

#### Count the lines with "spark" in it by combining transformation and action.

In [4]:
logFile.filter(lambda line: "spark" in line).count()

2238

Highlight text for answer:

<textarea rows="3" cols="80" style="color: white">
info.filter(lambda line: "spark" in line).count()
</textarea>

#### Fetch those lines as an array of Strings

In [5]:
logFile.filter(lambda line: "spark" in line).collect()


[u"Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties",
 u'15/10/14 14:29:23 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@172.17.0.22:53333]',
 u"15/10/14 14:29:23 INFO Utils: Successfully started service 'sparkDriver' on port 53333.",
 u'15/10/14 14:29:23 INFO DiskBlockManager: Created local directory at /tmp/spark-fe150378-7bad-42b6-876b-d14e2c193eb6/blockmgr-c142f2f1-ebb6-4612-945b-0a67d156230a',
 u'15/10/14 14:29:23 INFO HttpFileServer: HTTP File server directory is /tmp/spark-fe150378-7bad-42b6-876b-d14e2c193eb6/httpd-ed3f4ab0-7218-48bc-9d8a-3981b1cfe574',
 u"15/10/14 14:29:24 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 35726.",
 u"Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties",
 u'15/10/15 15:33:42 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@172.17.0.22:47412]',
 u"15/10/15 15:33:42 

Highlight text for answer:

<textarea rows="3" cols="80" style="color: white">
info.filter(lambda line: "spark" in line).collect()
</textarea>

View the graph of an RDD using this command:

In [7]:
print logFile.filter(lambda line: "spark" in line).toDebugString()    

(2) PythonRDD[5] at RDD at PythonRDD.scala:43 []
 |  MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 []
 |  /resources/LabData/notebook.log HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2 []


## Joining RDDs

Next, you are going to create RDDs for the README and the CHANGES file.

In [8]:
readmeFile = sc.textFile("/resources/LabData/README.md")
pomFile = sc.textFile("/resources/LabData/pom.xml")

How many Spark keywords are in each file?

In [9]:
print readmeFile.filter(lambda line: "Spark" in line).count()
print pomFile.filter(lambda line: "Spark" in line).count()

18
2


Now do a WordCount on each RDD so that the results are (K,V) pairs of (word,count)

In [10]:
readmeCount = readmeFile.                    \
    flatMap(lambda line: line.split(" ")).   \
    map(lambda word: (word, 1)).             \
    reduceByKey(lambda a, b: a + b)
    
pomCount = pomFile.                          \
    flatMap(lambda line: line.split(" ")).   \
    map(lambda word: (word, 1)).            \
    reduceByKey(lambda a, b: a + b)

To see the array for either of them, just call the collect function on it.

In [11]:
print "Readme Count\n"
print readmeCount.collect()

Readme Count

[(u'', 67), (u'when', 1), (u'R,', 1), (u'including', 3), (u'computation', 1), (u'using:', 1), (u'guidance', 3), (u'Scala,', 1), (u'environment', 1), (u'only', 1), (u'rich', 1), (u'Apache', 1), (u'sc.parallelize(range(1000)).count()', 1), (u'Building', 1), (u'guide,', 1), (u'return', 2), (u'Please', 3), (u'Try', 1), (u'not', 1), (u'Spark', 14), (u'scala>', 1), (u'Note', 1), (u'cluster.', 1), (u'./bin/pyspark', 1), (u'have', 1), (u'params', 1), (u'through', 1), (u'GraphX', 1), (u'[run', 1), (u'abbreviated', 1), (u'[project', 2), (u'##', 8), (u'library', 1), (u'see', 1), (u'"local"', 1), (u'[Apache', 1), (u'will', 1), (u'#', 1), (u'processing,', 1), (u'for', 12), (u'[building', 1), (u'provides', 1), (u'print', 1), (u'supports', 2), (u'built,', 1), (u'[params]`.', 1), (u'available', 1), (u'run', 7), (u'tests](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools).', 1), (u'This', 2), (u'Hadoop,', 2), (u'Tests', 1), (u'example:', 1), (u'-DskipTests', 1), (u'

In [12]:
print "Pom Count\n"
print pomCount.collect()

Pom Count

[(u'', 2931), (u'agreed', 1), (u'<artifactId>hadoop-mapreduce-client-jobclient</artifactId>', 1), (u'hbase', 1), (u'provided.', 1), (u'copyright', 1), (u'<parent>', 1), (u'distributed', 3), (u'<artifactId>hadoop-core</artifactId>', 1), (u'<artifactId>hadoop-hdfs</artifactId>', 1), (u'</profile>', 6), (u'Apache', 2), (u'CONDITIONS', 1), (u'WARRANTIES', 1), (u'them', 1), (u'<version>1.2.6</version>', 1), (u'<artifactId>spark-hive_${scala.binary.version}</artifactId>', 1), (u'<version>3.2.0</version>', 1), (u'Project', 1), (u'not', 1), (u'writing,', 1), (u'you', 1), (u'Examples</name>', 1), (u'regarding', 1), (u'<version>1.6.0-SNAPSHOT</version>', 1), (u'<artifactId>spark-examples_2.10</artifactId>', 1), (u'...-->', 1), (u'specific', 1), (u'SPARK-4455', 4), (u'<profiles>', 1), (u'</filters>', 1), (u'http://maven.apache.org/xsd/maven-4.0.0.xsd">', 1), (u'dependencies.', 1), (u'force', 1), (u'<packaging>jar</packaging>', 1), (u'licenses', 1), (u'License', 3), (u'for', 2), (u'<gro

The join function combines the two datasets (K,V) and (K,W) together and get (K, (V,W)). Let's join these two counts together.

In [13]:
joined = readmeCount.join(pomCount)

Print the value to the console

In [14]:
joined.collect()

[(u'', (67, 2931)),
 (u'Apache', (1, 2)),
 (u'Spark', (14, 1)),
 (u'this', (1, 3)),
 (u'for', (12, 2)),
 (u'use', (3, 1)),
 (u'uses', (1, 1)),
 (u'and', (10, 1)),
 (u'a', (10, 1)),
 (u'with', (4, 2)),
 (u'See', (1, 2)),
 (u'following', (2, 1)),
 (u'file', (1, 3)),
 (u'is', (6, 2)),
 (u'not', (1, 1)),
 (u'which', (2, 1)),
 (u'you', (4, 1)),
 (u'The', (1, 2)),
 (u'the', (21, 10)),
 (u'You', (3, 2)),
 (u'one', (2, 1)),
 (u'to', (14, 5)),
 (u'an', (3, 1)),
 (u'on', (6, 1)),
 (u'of', (5, 2)),
 (u'are', (1, 1)),
 (u'be', (2, 1)),
 (u'that', (3, 1)),
 (u'in', (5, 3)),
 (u'or', (3, 3)),
 (u'at', (2, 1))]

Let's combine the values together to get the total count

In [15]:
joinedSum = joined.map(lambda k: (k[0], (k[1][0]+k[1][1])))

To check if it is correct, print the first five elements from the joined and the joinedSum RDD

In [16]:
print "Joined Individial\n"
print joined.take(5)

print "\n\nJoined Sum\n"
print joinedSum.take(5)

Joined Individial

[(u'', (67, 2931)), (u'Apache', (1, 2)), (u'Spark', (14, 1)), (u'this', (1, 3)), (u'for', (12, 2))]


Joined Sum

[(u'', 2998), (u'Apache', 3), (u'Spark', 15), (u'this', 4), (u'for', 14)]


## Shared variables

Normally, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of shared variables for two common usage patterns: broadcast variables and accumulators.

### Broadcast variables

Broadcast variables are useful for when you have a large dataset that you want to use across all the worker nodes. A read-only variable is cached on each machine rather than shipping a copy of it with tasks. Spark actions are executed through a set of stages, separated by distributed “shuffle” operations. Spark automatically broadcasts the common data needed by tasks within each stage.


Read more here: [http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables](http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables)

Create a broadcast variable. Type in:

In [17]:
broadcastVar = sc.broadcast([1,2,3])

To get the value, type in:

In [18]:
broadcastVar.value

[1, 2, 3]

### Accumulators

Accumulators are variables that can only be added through an associative operation. It is used to implement counters and sum efficiently in parallel. Spark natively supports numeric type accumulators and standard mutable collections. Programmers can extend these for new types. Only the driver can read the values of the accumulators. The workers can only invoke it to increment the value.

Create the accumulator variable. Type in:

In [19]:
accum = sc.accumulator(0)

Next parallelize an array of four integers and run it through a loop to add each integer value to the accumulator variable. Type in:

In [20]:
rdd = sc.parallelize([1,2,3,4])
def f(x):
    global accum
    accum += x

Next, iterate through each element of the rdd and apply the function f on it:

In [21]:
rdd.foreach(f)

To get the current value of the accumulator variable, type in:

In [22]:
accum.value

10

You should get a value of 10.

This command can only be invoked on the driver side. The worker nodes can only increment the accumulator.


## Key-value pairs

You have already seen a bit about key-value pairs in the Joining RDD section.

Create a key-value pair of two characters. Type in:

In [23]:
pair = ('a', 'b')

To access the value of the first index use [0] and [1] method for the 2nd.

In [24]:
print pair[0]

print pair[1]

a
b


The next lab will show you how to create Spark applications using SQL, Graphx, Mlib, etc. The lab is only available in Scala. 

<h1 align="center" style="font-family: Monaco;">Continue on "[Spark Fundamentals 1 - ScalaLibs.ipynb](/api/v1/resources/Spark%20Fundamentals%201%20-%20ScalaLibs.ipynb)"</h1>
