## Lab 1 - Hello Spark
This Lab will show you how to work with Apache Spark using Python

# Step 1 - Working with Spark Context

Step 1 - Invoke the spark context and extract what version of the spark driver application.

Type<br>
sc.version

In [1]:
#Step 1.1 - Check spark version
sc.version

u'1.6.0'

# Step 2 - Working with Resilient Distributed Datasets

Step 2 - Create RDD with numbers 1 to 10,<br>
Extract first line,<br>
Extract first 5 lines,<br>
Create RDD with string "Hello Spark",<br>
Extract first line.<br>

Type: <br>
x = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]<br>
x_nbr_rdd = sc.parallelize(x)<br>

In [2]:
#Step 2.1 - Create RDD of Numbers 1-10
x = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
x_nbr_rdd = sc.parallelize(x)

Type: <br>
x_nbr_rdd.first()

In [3]:
#Step 2.2 - Extract first line
x_nbr_rdd.first()

1

Type:<br>
x_nbr_rdd.take(5)

In [4]:
#Step 2.3 - Extract first 5 lines
x_nbr_rdd.take(5)

[1, 2, 3, 4, 5]

Perform a first map transformation and rpelace each element X in the RDD with X+1.<br>
Type:<br>
x_nbr_rdd_2 = x_nbr_rdd.map(lambda x: x+1)

In [5]:
#Step 2.4 - Perform your first map transformation. Replace each element X in the RDD with X+1.
#Remember that RDDs are IMMUTABLE, so it is not possible to UPDATE an RDD. You need to create
#a NEW RDD
x_nbr_rdd_2 = x_nbr_rdd.map(lambda x: x+1)

Take a look at the elements of the new RDD.<br>
Type:<br>
x_nbr_rdd_2.collect()   

In [6]:
#Step 2.5 - Check out the elements of the new RDD. Warning: Be careful with this in real life !! As you
#will be bringing all elements of the RDD (from all partitions) to the driver...
x_nbr_rdd_2.collect()

[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

Let's now create a new RDD with one string "Hello Spark" and take a look at it.<br>
Type:<br>
y = ["Hello Spark!"]<br>
y_str_rdd = sc.parallelize(y)<br>
y_str_rdd.first()<br>

In [7]:
#Step 2.6 - Create String RDD, Extract first line
y = ["Hello Spark!"]
y_str_rdd = sc.parallelize(y)
y_str_rdd.first()

'Hello Spark!'

Let's now create a third RDD with several strings.<br>
Type:<br>
z = ["First,Line", "Second,Line", "and,Third,Line"]<br>
z_str_rdd = sc.parallelize(z)<br>
z_str_rdd.first()

In [8]:
#Step 2.7 - Create String RDD with many lines / entries, Extract first line
z = ["First,Line", "Second,Line", "and,Third,Line"]
z_str_rdd = sc.parallelize(z)
z_str_rdd.first()

'First,Line'

Count the number of entries in this RDD.<br>
Type:<br>
z_str_rdd.count()

In [9]:
#Step 2.8 - Count the number of entries in the RDD
z_str_rdd.count()

3

Take a look at the elements of this RDD.<br>
Type:<br>
z_str_rdd.collect()

In [10]:
#Step 2.9 - Show all the entries in the RDD. Warning: Be careful with this in real life !! 
#As you will be bringing all elements of the RDD (from all partitions) to the driver...
z_str_rdd.collect()

['First,Line', 'Second,Line', 'and,Third,Line']

In the next step, we will split all the entries in the RDD on the commas "," <br>
Type: <br>
z_str_rdd_split = z_str_rdd.map(lambda line: line.split(","))<br>
z_str_rdd_split.collect()

In [11]:
#Step 2.10 - Perform a map transformation to split all entries in the RDD on the commas ",".
z_str_rdd_split = z_str_rdd.map(lambda line: line.split(","))

#Check out the entries in the new RDD
z_str_rdd_split.collect()

#Notice how the entries in the new RDD are now ARRAYs with elements, where the original
#strings have been split using the comma delimiter.

[['First', 'Line'], ['Second', 'Line'], ['and', 'Third', 'Line']]

In this step, we will learn a new transformation besides map: flatMap <br>
flatMap will "flatten" all the elements of an RDD entry into its subcomponents<br>
This is better explained with an example<br>
Type:<br>
z_str_rdd_split_flatmap = z_str_rdd.flatMap(lambda line: line.split(",")<br>
z_str_rdd_split_flatmap.collect()

In [12]:
#Step 2.11 - Learn the difference between two transformations: map and flatMap.
#Go back to the RDD z_str_rdd_split defined above using a map transformation from z_str_rdd
#and use this time a flatmap.
z_str_rdd_split_flatmap = z_str_rdd.flatMap(lambda line: line.split(","))
z_str_rdd_split_flatmap.collect()

#What do you notice ? How is z_str_rdd_split_flatmap different from z_str_rdd_split ?

['First', 'Line', 'Second', 'Line', 'and', 'Third', 'Line']

In this step, we will augment each entry in the previous RDD with the number "1" to create pairs (or tuples). The first element of the tuple will be the keyword and the second elements of the tuple will be the digit "1".<br>
This is a common technic used to count elements using Spark.<br>
Type:<br>
countWords = z_str_rdd_split_flatmap.map(lambda word:(word,1))<br>
countWords.collect()

In [13]:
#Step 2.12 - Learn the difference between two transformations: map and flatMap.
#Go back to the RDD z_str_rdd_split defined above using a map transformation from z_str_rdd
#and use this time a flatmap.

countWords = z_str_rdd_split_flatmap.map(lambda word:(word,1))
countWords.collect()

[('First', 1),
 ('Line', 1),
 ('Second', 1),
 ('Line', 1),
 ('and', 1),
 ('Third', 1),
 ('Line', 1)]

Now we have above what is known as a PAIR RDD. Each entry in the RDD has a KEY and a VALUE.<br>
The KEY is the word (First, Line, etc...) and the value is the number "1"<br>
We can now AGGREGATE this RDD by summing up all the values BY KEY<br>
Type:<br>
from operator import add<br>
countWords2 = countWords.reduceByKey(add)<br>
countWords2.collect()<br>

In [14]:
#Step 2.13 - Check out the results of the aggregation
from operator import add
countWords2 = countWords.reduceByKey(add)
countWords2.collect()

#You just created an RDD countWords2 which contains the counts for each token...

[('and', 1), ('Line', 3), ('Second', 1), ('Third', 1), ('First', 1)]

# Step 3 - Count number of lines with Spark in it
Step 3 - Pull in a spark README.md file, <br>
Convert the file to an RDD,<br>
Count the number of lines with the word "Spark" in it. <br>

Type:<br>
!rm README.md* -f<br>
!wget https://github.com/carloapp2/SparkPOT/blob/master/README.md<br>


In [15]:
#Step 3.1 - Pull data file into workbench
!rm README.md* -f
!wget https://github.com/carloapp2/SparkPOT/blob/master/README.md

--2016-07-14 13:55:04--  https://github.com/carloapp2/SparkPOT/blob/master/README.md
Resolving github.com (github.com)... 192.30.253.112
Connecting to github.com (github.com)|192.30.253.112|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/html]
Saving to: 'README.md'

    [ <=>                                   ] 41,391      --.-K/s   in 0.06s   

2016-07-14 13:55:04 (642 KB/s) - 'README.md' saved [41391]



Now we will point Spark to the text file stored in the local filesystem and use the "textFile" method to create an RDD named "textfile_rdd" which will contain one entry for each line in the original text file.<br>
We will also count the number of lines in the RDD (which would be as well the number of lines in the text file. <br>
Type:<br>
textfile_rdd = sc.textFile("./README.md")<br>
textfile_rdd.count()<br>

In [16]:
#Step 3.2 - Create RDD from data file
textfile_rdd = sc.textFile("./README.md")
textfile_rdd.count()

628

Let us now filter out the RDD and only keep the entries that contain the token "Spark". This will be achieved using the "filter" transformation, combined with the Python syntax for figuring out whether a particular substring is present within a larger string: substring in string.<br>
We will also take a look at the first line in the newly filtered RDD. <br>
Type:<br>
Spark_lines = textfile_rdd.filter(lambda line: "Spark" in line)<br>
Spark_lines.first()<br>

In [17]:
#Step 3.3 - Filter for only lines with word Spark
Spark_lines = textfile_rdd.filter(lambda line: "Spark" in line)
Spark_lines.first()

u'    <title>SparkPOT/README.md at master \xb7 carloapp2/SparkPOT \xb7 GitHub</title>'

We will now count the number of entries in this filtered RDD and present the result as a concatenated string.<br>
Type:<br>
print "The file README.md has " + str(Spark_lines.count()) + \<br>
" of " + str(textfile_rdd.count()) + \<br>
" Lines with word Spark in it."<br>

In [18]:
#Step 3.4 - count the number of lines
print "The file README.md has " + str(Spark_lines.count()) + \
" of " + str(textfile_rdd.count()) + \
" Lines with the word Spark in it."

The file README.md has 52 of 628 Lines with the word Spark in it.


Using your knowledge from the previous exercises, you will now count the number of times the substring "Spark" appears in the original text.<br>
Instructions:<br>
Looking back at previous exercises, you will need to: <br>
1- Execute a flatMap transformation on the original RDD Spark_lines and split on white space.<br>
2- Augment each token with the digit "1", so as to obtain a PAIR RDD where the first element of the tuple is the token and the second element is the digit "1".<br>
3- Execute a reduceByKey with the addition to count the number of instances of each token.<br>
4- Filter the resulting RDD from Step 3- above to only keep entries which start with "Spark".<br> In Python, the syntax to decide whether a string starts with a token is string.startswith("token"). <br>
5- Display the resulting list of tokens which start with "Spark".

In [19]:
#Step 3.5 - count the number of instances of tokens starting with "Spark"
temp = Spark_lines.flatMap(lambda line:line.split(" ")).map(lambda x:(x,1)).reduceByKey(add)
temp.filter(lambda (k,v): k.startswith("Spark")).collect()

[(u'Spark', 11),
 (u'SparkPOT:master"', 1),
 (u'Spark.</p>', 1),
 (u'Spark</h2>', 1),
 (u'SparkPi', 2),
 (u'Spark</a>.', 1),
 (u'Spark"</a>.</p>', 1),
 (u'Spark</h1>', 1)]

As a slight modification of the cell above, let us now filter out and display the tokens which contain the substring "Spark". (Instead of those which only START with it). Your result should be a superset of the previous result. <br>
The Python syntax to determine whether a string contains a particular "token" is: "token" in string<br>

In [20]:
#Step 3.6 - Display the tokens which contain the substring "Spark" in them.
temp.filter(lambda (k,v): "Spark" in k).collect()

[(u'href="/carloapp2/SparkPOT"', 2),
 (u'href="https://github.com/carloapp2/SparkPOT/commits/master.atom"', 1),
 (u'Spark', 11),
 (u'href="/carloapp2/SparkPOT/issues"', 1),
 (u'SparkPOT:master"', 1),
 (u'title="Spark', 1),
 (u'content="github.com/carloapp2/SparkPOT', 1),
 (u'Readme">Spark', 1),
 (u'content="https://github.com/carloapp2/SparkPOT"', 1),
 (u'href="/carloapp2/SparkPOT/pulse"', 1),
 (u'href="/carloapp2/SparkPOT/commits/master/README.md?author=carloapp"><img',
  1),
 (u'/carloapp2/SparkPOT"', 1),
 (u'href="/carloapp2/SparkPOT/blame/master/README.md"', 1),
 (u'/carloapp2/SparkPOT/graphs">', 1),
 (u'href="/login?return_to=%2Fcarloapp2%2FSparkPOT%2Fblob%2Fmaster%2FREADME.md"',
  1),
 (u'href="/carloapp2/SparkPOT"><span>SparkPOT</span></a></span></span><span',
  1),
 (u'href="https://github.com/carloapp2/SparkPOT/blob/master/README.md"', 1),
 (u'href="/carloapp2/SparkPOT/watchers">', 1),
 (u'href="/carloapp2/SparkPOT/commit/6cf4eb21f3c1469119bdb052fe1eae9d16c84112"',
  2),
 (u'S

# Step 4 - Perform analysis on a data file
We have a sample file with instructors and scores. In this exercise we want you to add all scores and report on results by following these steps:<br>

1- The name of the file is "Scores.txt". Delete it from the local filesystem if it exists.<br>
2- Download the file from the provided location (see below).<br>
3- Load the text file into an RDD of instructor names and instructor scores.<br>
4- Execute a transformation which will keep the instructors names, but will add up the 4 numbers representing the scores per instructor, resulting into a new RDD<br>
5- Display the instructor's name and the total score for each instructor<br>
6- Execute a second transformation to compute the average score for each instructor and display the results.<br>
7- Who was top performer?<br>

The Data File has the following format: Instructor Name,Score1,Score2,Score3,Score4<br>
Here is an example line from the text file: "Carlo,5,3,3,4"<br>
Data File Location: https://raw.githubusercontent.com/carloapp2/SparkPOT/master/Scores.txt

In [21]:
#Step 4.1 - Delete the file if it exists, download a new copy and load it into an RDD
!rm Scores.txt* -f
!wget https://raw.githubusercontent.com/carloapp2/SparkPOT/master/Scores.txt

Raw_Rdd = sc.textFile("./Scores.txt")
Raw_Rdd.take(10)

--2016-07-14 13:57:38--  https://raw.githubusercontent.com/carloapp2/SparkPOT/master/Scores.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.48.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.48.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 75 [text/plain]
Saving to: 'Scores.txt'


2016-07-14 13:57:38 (19.5 MB/s) - 'Scores.txt' saved [75/75]



[u'Carlo,5,3,3,4',
 u'Mokhtar,2,5,5,3',
 u'Jacques,4,2,4,5',
 u'Braden,5,3,2,5',
 u'Chris,5,4,5,1']

In [22]:
#Step 4.2 - Execute the necessary transformation(s) to extract the instructor's name, as well
# as the instructors scores, then add up the scores per instructor and display the results
# in the form of a new RDD with the elements: "Instructor Name", InstructorTotals
SumScores = Raw_Rdd.map(lambda l: l.split(",")).\
map(lambda v : (v[0], int(v[1])+int(v[2])+int(v[3])+int(v[4])))
SumScores.take(5)

[(u'Carlo', 15),
 (u'Mokhtar', 15),
 (u'Jacques', 15),
 (u'Braden', 15),
 (u'Chris', 15)]

In [23]:
#Step 4.3 - Execute additional transformation(s) to compute the average score per instructor.
# Display the resulting averages for all instructors.
Final = SumScores.map(lambda avg: (avg[0],avg[1],avg[1]/4))
Final.take(5)

[(u'Carlo', 15, 3),
 (u'Mokhtar', 15, 3),
 (u'Jacques', 15, 3),
 (u'Braden', 15, 3),
 (u'Chris', 15, 3)]