# SI 330: Data Manipulation 
## 20 - Big Data II: Introduction to Elastic MapReduce and Spark

### Dr. Chris Teplovs, School of Information, University of Michigan
<small><a rel="license" href="http://creativecommons.org/licenses/by-nc-sa/4.0/"><img alt="Creative Commons License" style="border-width:0" src="https://i.creativecommons.org/l/by-nc-sa/4.0/88x31.png" /></a>This work is licensed under a <a rel="license" href="http://creativecommons.org/licenses/by-nc-sa/4.0/">Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License</a>.

## Overview of today

* start up EMR cluster
* review notebook from last class
* introduction to Spark
* running a Jupyter notebook on Spark

## Starting up an Elastic MapReduce cluster

* log into your AWS Educate account
* go to EMR
* select "Notebooks"
* select "Create notebook"
  * give your notebook a name
  * you can leave description blank
  * select "Create a cluster"
  * change instance to 1 m4.large
  * click "Create"
* NOTE: provisioning your cluster will take a while, so we're going to go over the slides now while that happens

## [Introduction to Spark](assets/distcomp2.pdf) <-- link to slides

### <FONT color="red">NOTE: The following cells *cannot* be run on your local machine.  You must copy the contents (not the blocks) to your EMR notebook on AWS by hand using copy and paste.</FONT>

First, let's load our regular expressions library (yup, again):

In [1]:
import re

Now let's load in a datafile that I've put in one of my S3 buckets into an RDD:

In [2]:
input_file = sc.textFile('s3://umsi-data-science/data/totc.txt')

NameError: name 'sc' is not defined

We're going to be using the same regex over and over again, so it's best to "compile" it so that we can leverage a python optimization:

In [24]:
WORD_RE = re.compile(r"\b[\w']+\b")

VBox()

In [25]:
input_file.take(2)

VBox()

[u'The Project Gutenberg EBook of A Tale of Two Cities, by Charles Dickens', u'']

The next block will do three things, which we'll talk about in class

In [1]:
word_count1 = input_file.flatMap(lambda line: WORD_RE.findall(line+''))
word_count2 = word_count1.map(lambda word: (word, 1))
word_count3 = word_count2.reduceByKey(lambda a, b: a + b)

NameError: name 'input_file' is not defined

### <font color="magenta">Q1: Explain what each of the above statements does.</font>

The first statement takes the RDD and calls flatMap. FlatMap returns a flattened condensed version of two lists. Then it returns a tuple of the word and the number '1'. The last statements count the word and returns a tuple of the word and the count. 

Now let's sort the results by value in descending order and put the results in another RDD:

In [29]:
word_counts_sorted = word_count3.sortBy(lambda x: x[1], ascending =
False)
top100_sorted = sc.parallelize(word_counts_sorted.take(100))


VBox()

In [30]:
top_100 = top100_sorted.collect()
for word in top_100:
    print word[0],'\t',word[1]

VBox()

the 	7577
and 	4921
of 	4102
to 	3601
a 	2864
in 	2540
I 	1971
his 	1939
that 	1840
was 	1764
it 	1747
he 	1458
with 	1317
had 	1296
you 	1229
as 	1058
her 	1005
at 	999
him 	970
for 	922
on 	899
not 	823
is 	793
be 	773
have 	729
s 	669
said 	661
were 	657
The 	647
Mr 	622
by 	579
my 	577
so 	540
this 	535
all 	530
me 	526
from 	510
but 	485
they 	459
no 	453
there 	449
out 	443
been 	440
or 	430
which 	417
He 	400
them 	399
one 	396
Lorry 	369
when 	365
who 	356
if 	355
an 	340
would 	337
she 	337
It 	334
are 	319
up 	318
your 	317
into 	316
their 	311
Defarge 	302
man 	296
upon 	284
could 	279
will 	275
do 	271
little 	260
time 	260
more 	260
any 	251
hand 	249
You 	246
what 	246
down 	236
know 	231
t 	224
before 	222
himself 	222
its 	220
Doctor 	220
am 	219
than 	218
again 	218
night 	216
Miss 	212
some 	207
like 	207
very 	205
day 	203
then 	202
now 	199
two 	195
looked 	194
other 	192
father 	189
way 	187
face 	187
made 	183
long 	182

In [33]:
counts = input_file.flatMap(lambda line: WORD_RE.findall(line+'')) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b) \
             .sortBy(lambda a: a[1],ascending=False)

VBox()

In [34]:
counts.take(10)

VBox()

[(u'the', 7514), (u'and', 4745), (u'', 4378), (u'of', 4065), (u'to', 3458), (u'a', 2825), (u'in', 2447), (u'his', 1911), (u'was', 1673), (u'that', 1663)]

In [35]:
words = input_file.flatMap(lambda line: WORD_RE.findall(line))

VBox()

In [36]:
counts = words.map(lambda word: (None,1)).reduceByKey(lambda a, b: a + b)

VBox()

In [37]:
counts.take(1)

VBox()

[(None, 141608)]

In [38]:
sorted = counts.sortBy(lambda a: a[1])
counts.collect()

VBox()

[(None, 141608)]

In [39]:
word_counts = input_file.flatMap(lambda line: WORD_RE.findall(line)).countByValue()
type(word_counts)

VBox()

<type 'collections.defaultdict'>

In [None]:
word_counts