# Machine Learning At Scale

Data Analytics and Machine Learning at Scale 

---
__Name:__  *Dr. James G. Shanahan*   
__Email:__  *James.Shanahan  @ gmail.com   
__Quiz:__  Debugging strategies in Spark

# Start Spark cluster 
Please first choose which Spark cluster backs this notebook to get your SC/sqlContext

* Back this notebook by Spark that is running on your local machine in a Container world
* [LATER] Back this notebook by Spark that is running an EMR Cluster (note one has to read and write data from/to S3 to run Spark jobs on EMR)
* [LATER] Back this notebook by Spark that is rnning on your local machine natively

### Launch Spark locally
Run the next cell if you wish to launch a Spark cluster on your local machine in a Container world and back this notebook by that cluster

When trying to run pyspark code in a Jupyter notebook launched from the class environment container, you may encounter errors like this:

`NameError: name 'sc' is not defined
or

ValueError: Cannot run multiple SparkContexts at once;`

To avoid both of these and still use examples you find in the pyspark docs, where the Spark Context is always referred to as sc, add this line to your code:

`sc = SparkContext.getOrCreate(conf)`

Note that conf is a SparkConf instance and will already be available in the container. You can inspect it by running the following:


In [1]:
sc = SparkContext.getOrCreate(conf)

conf.getAll()

[(u'spark.master', u'local[*]'),
 (u'spark.submit.deployMode', u'client'),
 (u'spark.app.name', u'PySparkShell')]

In [1]:
# Run this cell once only to start Spark
# Rerunning will cause an error
#import pyspark 
#sc = pyspark.SparkContext('local[*]')

In [2]:
# do something to prove it works
import random
random.seed(9001)
rdd = sc.parallelize(range(1000))
rdd.takeSample(False, 5)
#produces a list of 5 numbers
# [129, 477, 193, 759, 914]

[184, 492, 666, 594, 980]

# Word Count example
Create some data by executing the following code cell

In [3]:
%%writefile wordcount.txt
hello hi hi hallo
bonjour hola hi ciao
nihao konnichiwa ola
hola nihao hello

Writing wordcount.txt


In [4]:
cat wordcount.txt

hello hi hi hallo
bonjour hola hi ciao
nihao konnichiwa ola
hola nihao hello

## Word count in Python

In [5]:
#!/usr/bin/python
file=open("wordcount.txt","r+")
wordcount={}
for word in file.read().split():
    if word not in wordcount:
        wordcount[word] = 1
    else:
        wordcount[word] += 1
print (word,wordcount)
file.close();

('hello', {'hallo': 1, 'konnichiwa': 1, 'ola': 1, 'ciao': 1, 'nihao': 2, 'hi': 3, 'bonjour': 1, 'hello': 2, 'hola': 2})


## NOTES on Inputs to Spark

http://spark.apache.org/docs/latest/programming-guide.html
All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz").

The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.

## QUIZ: How many "konnichiwa" in this word count example? 
Run next cell to get the answer.

In [6]:
# complete word count
#Count words in file/directory
logFileNAME = 'wordcount.txt'
text_file = sc.textFile(logFileNAME)
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
wordCounts = counts.collect()
for v in counts.collect():
    print(v)


(u'ciao', 1)
(u'bonjour', 1)
(u'nihao', 2)
(u'hola', 2)
(u'konnichiwa', 1)
(u'hallo', 1)
(u'hi', 3)
(u'hello', 2)
(u'ola', 1)


In [7]:
rdd = sc.parallelize('wordcount.txt')  #distributes the string
rdd.first()
#rdd.count()

'w'

In [8]:
rdd = sc.textFile('wordcount.txt')  #create an RDD
rdd.count()

4

In [9]:
rdd.first()

'hello hi hi hallo'

# Debugging in Spark  

* PART 1: Write Mapper/reduce functions as standalone code and debug on a test record (key-value pair)
* PART 2: n a multi operation call: break it down and debug step by step on a small test data set


##  PART 1: debug each closure independently with small unit tests
Where a closure can be (e.g., mapper/reducer/filter function first)

In [10]:
# This is ia an example of  mapper function (referred to as closure in Spark as this function and 
# its state will be serialized and shipped to each worker)

def mySplitFunction(string):
    string.split()
mySplitFunction("hello hi hi hallo")

In [12]:
# debug this function to return the first token in a string record
# for some reason we get back the first character and not the first string
def mySplitFunction(string):
    toks = string.split()[0]
    return toks[0]

#fake out my mapper function and debug
print (mySplitFunction("hello hi hi hallo"))


h


In [13]:
## debug this function to return the first token in a string record
# for some reason we get back the first character and not the first string


# solution 
def mySplitFunction(string):
    toks = string.split()[0]
    return toks

#fake out my mapper function and debug
print (mySplitFunction("hello hi hi hallo"))


hello


## PART 2:  In a multi operation call: break it down and debug step by step on a small test data set
### Call one operation at a time and take a couple of results (e.g., take(1) and examine 


In [14]:
# output the tokens from each record (one to MANY transformation)

def mySplitFunction(string):
    string.split()
    
logFileNAME = 'wordcount.txt'
text_file = sc.textFile(logFileNAME)

#debug flatmap
counts = text_file.flatMap(lambda line: line.split(" ")).take(20)
print counts

#              .map(lambda word: (word, 1)) \
#              .reduceByKey(lambda a, b: a + b)
# wordCounts = counts.collect()
# for v in counts.collect():
#     print v

[u'hello', u'hi', u'hi', u'hallo', u'bonjour', u'hola', u'hi', u'ciao', u'nihao', u'konnichiwa', u'ola', u'hola', u'nihao', u'hello']


In [9]:
# output the tokens and corresponding count from each record (one to one map function)

def mySplitFunction(string):
    string.split()
    
logFileNAME = 'wordcount.txt'
text_file = sc.textFile(logFileNAME)

#debug flatmap
counts = text_file.flatMap(lambda line: line.split(" ")) \
                  .map(lambda word: (word, 1)) \
                  .take(3)
print counts

#              .reduceByKey(lambda a, b: a + b)
# wordCounts = counts.collect()
# for v in counts.collect():
#     print v

[(u'hello', 1), (u'hi', 1), (u'hi', 1)]


In [10]:
# complete word count
#Count words in file/directory
logFileNAME = 'wordcount.txt'
text_file = sc.textFile(logFileNAME)
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
wordCounts = counts.collect()
for v in counts.collect():
    print v

(u'ciao', 1)
(u'bonjour', 1)
(u'nihao', 2)
(u'hola', 2)
(u'konnichiwa', 1)
(u'hallo', 1)
(u'hi', 3)
(u'hello', 2)
(u'ola', 1)


In [17]:
print wordCounts

[(u'ciao', 1), (u'bonjour', 1), (u'nihao', 2), (u'hola', 2), (u'konnichiwa', 1), (u'hallo', 1), (u'hi', 3), (u'hello', 2), (u'ola', 1)]


__sortByKey([ascending], [numTasks])__	

When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

In [11]:
wordCounts

[(u'ciao', 1),
 (u'bonjour', 1),
 (u'nihao', 2),
 (u'hola', 2),
 (u'konnichiwa', 1),
 (u'hallo', 1),
 (u'hi', 3),
 (u'hello', 2),
 (u'ola', 1)]

In [12]:
#Last 1
wordCounts[8:]

[(u'ola', 1)]

In [13]:
#first  5
wordCounts[:5]

[(u'ciao', 1),
 (u'bonjour', 1),
 (u'nihao', 2),
 (u'hola', 2),
 (u'konnichiwa', 1)]