## live session 2 - map reduce

step-by-step approach to map reduce framework.

we will start with examining a simple "word count" program implemented in python

### follow along here
https://zettadatanet.wordpress.com/2015/04/04/a-hands-on-introduction-to-mapreduce-in-python/

### wordcount - the completely manual way

In [1]:
import sys
import re


In [3]:
# wordcount implementation using python dictionary
sums = {}

filename = 'now.txt'
#filename = 'pg2701.txt'
f = open(filename, 'r')

for line in f:
    line = re.sub( r'^\W+|\W+$', '', line )
    words = re.split(r'\W+', line)

    for word in words:
        word = word.lower()
        sums[word] = sums.get( word, 0 ) + 1

print sums

{'party': 1, 'all': 1, 'good': 1, 'for': 1, 'of': 1, 'is': 1, 'men': 1, 'to': 2, 'time': 1, 'aid': 1, 'the': 3, 'now': 1, 'come': 1}


### wordcount - using map reduce

three steps
1. map
2. shuffle
3. reduce

#### 1. map step

for each word you encounter, "emit" a word and the number 1 for future counting

In [4]:
filename = 'now.txt'
#filename = 'pg2701.txt'
f = open(filename, 'r')

map_output = list()
for line in f:
    line = re.sub( r'^\W+|\W+$', '', line )
    words = re.split(r"\W+", line)
    
    for word in words:
        print( word.lower() + "\t1" )
        map_output.append( (word.lower(),1))

now	1
is	1
the	1
time	1
for	1
all	1
good	1
men	1
to	1
come	1
to	1
the	1
aid	1
of	1
the	1
party	1


In [5]:
map_output

[('now', 1),
 ('is', 1),
 ('the', 1),
 ('time', 1),
 ('for', 1),
 ('all', 1),
 ('good', 1),
 ('men', 1),
 ('to', 1),
 ('come', 1),
 ('to', 1),
 ('the', 1),
 ('aid', 1),
 ('of', 1),
 ('the', 1),
 ('party', 1)]

#### 2. shuffle step

sort so that the same item is next to each other

In [6]:
# shuffle step 

map_sorted = sorted(map_output)
map_sorted

[('aid', 1),
 ('all', 1),
 ('come', 1),
 ('for', 1),
 ('good', 1),
 ('is', 1),
 ('men', 1),
 ('now', 1),
 ('of', 1),
 ('party', 1),
 ('the', 1),
 ('the', 1),
 ('the', 1),
 ('time', 1),
 ('to', 1),
 ('to', 1)]

#### 3. reduce step

loop thru sorted list and count occurences

In [7]:
# reduce step

previous = None
sum = 0

for key, value in map_sorted:

    if key != previous:
        if previous is not None:
            print str( sum ) + '\t' + previous
        previous = key
        sum = 0
    
    sum = sum + value

print str( sum ) + '\t' + previous


1	aid
1	all
1	come
1	for
1	good
1	is
1	men
1	now
1	of
1	party
3	the
1	time
2	to


### what else can you do with map-reduce?

what if we want to get the distribution of number of letters in each word?
how would you change your mapper? reducer?

In [26]:
filename = 'now.txt'
#filename = 'pg2701.txt'
f = open(filename, 'r')

map_output = list()
for line in f:
    line = re.sub( r'^\W+|\W+$', '', line )
    words = re.split(r"\W+", line)
    
    for word in words:
        print( word.lower() + "\t" + word.lower()[0] + "\t1" )
        map_output.append( (word.lower()[0],1))

now	n	1
is	i	1
the	t	1
time	t	1
for	f	1
all	a	1
good	g	1
men	m	1
to	t	1
come	c	1
to	t	1
the	t	1
aid	a	1
of	o	1
the	t	1
party	p	1


In [27]:
map_output

[('n', 1),
 ('i', 1),
 ('t', 1),
 ('t', 1),
 ('f', 1),
 ('a', 1),
 ('g', 1),
 ('m', 1),
 ('t', 1),
 ('c', 1),
 ('t', 1),
 ('t', 1),
 ('a', 1),
 ('o', 1),
 ('t', 1),
 ('p', 1)]

In [28]:
# shuffle step 

map_sorted = sorted(map_output)
map_sorted

[('a', 1),
 ('a', 1),
 ('c', 1),
 ('f', 1),
 ('g', 1),
 ('i', 1),
 ('m', 1),
 ('n', 1),
 ('o', 1),
 ('p', 1),
 ('t', 1),
 ('t', 1),
 ('t', 1),
 ('t', 1),
 ('t', 1),
 ('t', 1)]

In [29]:
# reduce step

previous = None
sum = 0

for key, value in map_sorted:

    if key != previous:
        if previous is not None:
            print str( sum ), '\t' , previous
        previous = key
        sum = 0
    
    sum = sum + value

print str( sum ), '\t' , previous


2 	a
1 	c
1 	f
1 	g
1 	i
1 	m
1 	n
1 	o
1 	p
6 	t


### do it yourself 

modify the code to count the number of words of different lengths

your output should be something like >>> 
```
4 	2
8 	3
3 	4
1 	5
```
it means there are 4 2-letter words, and 8 3-letter words, etc.  for the now.txt file

In [5]:
filename = 'now.txt'
#filename = 'pg2701.txt'
f = open(filename, 'r')

map_output = list()
for line in f:
    line = re.sub( r'^\W+|\W+$', '', line )
    words = re.split(r"\W+", line)
    
    for word in words:
        print( word.lower() + "\t" + word.lower()[0] + "\t1" )
        map_output.append( (word.lower()[0],1))

{'now': 1, 'is': 1, 'the': 3, 'time': 1, 'for': 1, 'all': 1, 'good': 1, 'men': 1, 'to': 2, 'come': 1, 'aid': 1, 'of': 1, 'party': 1}


In [7]:
filename = 'now.txt'
#filename = 'pg2701.txt'
f = open(filename, 'r')

map_output = list()
for line in f:
    line = re.sub( r'^\W+|\W+$', '', line )
    words = re.split(r"\W+", line)
    
    for word in words:
        print( word.lower() + "\t1" )
        map_output.append( (word.lower(),1))

now	1
is	1
the	1
time	1
for	1
all	1
good	1
men	1
to	1
come	1
to	1
the	1
aid	1
of	1
the	1
party	1


In [8]:
# shuffle step 

map_sorted = sorted(map_output)
map_sorted

[('aid', 1),
 ('all', 1),
 ('come', 1),
 ('for', 1),
 ('good', 1),
 ('is', 1),
 ('men', 1),
 ('now', 1),
 ('of', 1),
 ('party', 1),
 ('the', 1),
 ('the', 1),
 ('the', 1),
 ('time', 1),
 ('to', 1),
 ('to', 1)]

In [None]:
# reduce step

previous = None
sum = 0

for key, value in map_sorted:

    if key != previous:
        if previous is not None:
            print str( sum ) + '\t' + previous
        previous = key
        sum = 0
    
    sum = sum + value

print str( sum ) + '\t' + previous


### using python map functions

http://www.bogotobogo.com/python/python_fncs_map_filter_reduce.php

In [12]:
# a for loop for a simple function

items = [1, 2, 3, 4, 5]
squared = []
for x in items:
    squared.append(x ** 2)
squared


[1, 4, 9, 16, 25]

In [14]:
# use the built-in map function
def sqr(x): 
    return x ** 2

list(map(sqr, items))

[1, 4, 9, 16, 25]

In [17]:
# try map from word count example
def word_map(x):
    return (x, 1)


In [19]:
map(word_map, "this is foo".split())

[('this', 1), ('is', 1), ('foo', 1)]

### ADVANCED write python map reduce from scratch

http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/

### mrjob: using a python map and reduce library


https://pythonhosted.org/mrjob/guides/quickstart.html


mrjob lets you write MapReduce jobs in Python 2.6+/3.3+ and run them on several platforms. You can:

* Write multi-step MapReduce jobs in pure Python
* Test on your local machine
* Run on a Hadoop cluster
* Run in the cloud using Amazon Elastic MapReduce (EMR)
* Run in the cloud using Google Cloud Dataproc (Dataproc)
* Easily run Spark jobs on EMR or your own Hadoop cluster

#### put the following in a text file

call it **mr_word_count.py**




In [None]:
from mrjob.job import MRJob


class MRWordFrequencyCount(MRJob):

    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1

    def reducer(self, key, values):
        yield key, sum(values)



MRWordFrequencyCount.run()

#### run the code by typing 

```
python mr_word_count.py now.txt
```