## Map Reduce

[Python](#Python)<br/>
[Hadoop (MR Job)](#Hadoop)<br/>
[Spark](#Spark)<br/>

In [None]:
from __future__ import division

In [16]:
text = ['Map step: Each worker node applies the map() function to the local data, and writes the output to a temporary storage.',
        'Shuffle step: Worker nodes redistribute data based on the output keys (produced by the map() function), such that all data belonging to one key is located on the same worker node.',
        'Reduce step: Worker nodes now process each group of output data, per key, in parallel.']

<a id='Python'></a>
### Python

Not distributed at all but map and reduce exist with the same basic idea.

This is how they can be used:

In [24]:
# count of words using map and reduce
mapped = map(lambda x: len(x), text)
print mapped
reduced = reduce(lambda x, y: x + y, mapped)
print reduced

[118, 179, 86]
383


Python reduce is basically functional (think Haskell) `foldl`

In [29]:
# count of words using just reduce
reduce(lambda x, y: x + len(y), text, 0) # start with 0 as the first x

383

<a id='Hadoop'></a>
### Hadoop (MR Job)

[MapReduce - Wikipedia](https://en.wikipedia.org/wiki/MapReduce)<br/>
[MR Job Documentation](https://pythonhosted.org/mrjob/job.html)

**HDFS** - File system for data storage when it does not fit on single disk.<br/>
**Hadoop** MapReduce - Operations on distributed data, stored in HDFS. Name Node (single point of failure) and Data Nodes, data stored in blocks (usually 128 MB), each block stored 3 times (by default, can be changed).

MapReduce steps:
- **Map** - Each worker node applies the "map()" function to the local data, and writes the output to a temporary storage.
- **Shuffle** - Worker nodes redistribute data based on the output keys (produced by the "map()" function), such that all data belonging to one key is located on the same worker node.
- **Reduce** - Worker nodes now process each group of output data, per key, in parallel.

<img src="./img/mapreduce_hadoop.png" width="400">

MRJob lets you write Hadoop like MapReduce jobs in Python.

Usefuls things to remember:

- Need key value pairs throughout the whole process
- Can yield more than one thing
- Do not evaluate generators into lists!

In [2]:
from mrjob.job import MRJob

In [99]:
%%file wordlen.py
from __future__ import division
from mrjob.job import MRJob
import os

# avg len of words in a newsgroup topic
class AvgWordLen(MRJob):

    def mapper(self, _, line):
        file_name = os.environ['map_input_file'].split('/')[2] #get the topic from path
        words = line.split()
        for word in words:
            yield file_name, len(word)

    # one combiner per mapper > same key can be in multiple combiners!
    # combiners reduce the amount of data transferred from mapper to reucer
    def combiner(self, key, values):
        word_cnt = 0
        word_len = 0
        for word in values:
            word_cnt += 1
            word_len += word        
        yield key, (word_len, word_cnt)
            
    def reducer(self, key, values):
        word_cnt = 0
        word_len = 0
        for x, y in values:
            word_cnt += y
            word_len += x
        yield key, word_len / word_cnt
            
    def reducer_without_combiner(self, key, values):
        word_cnt = 0
        word_len = 0
        for word in values:
            word_cnt += 1
            word_len += word
        yield key, word_len / word_cnt
    
    # this is needed for more steps or for other than default names (mapper, combiner, reducer)
    def steps(self):
        return [self.mr(mapper=self.mapper, reducer=self.reducer_without_combiner)]

Overwriting wordlen.py


This is kind of a hack to have MR Job run in iPython notebook, the correct way is writing it into `wordlen.py` file and than calling from command as follows: 

```
python wordlen.py input_file > outputs.txt
```

Running MR Job from iPython and printing out the results:

In [33]:
import wordlen
reload(wordlen)

mr_job = wordlen.AvgWordLen(args=['data/mini_20_newsgroups'])
with mr_job.make_runner() as runner:
    runner.run()
    for line in runner.stream_output():
        key, value = mr_job.parse_output_line(line)
        print key, value

comp.windows.x 5.8665388883
rec.motorcycles 5.9326556544
sci.med 6.2812920592


<a id='Spark'></a>
### Apache Spark

[Spark Documentation](http://spark.apache.org/docs/latest/api/python/)

Not a map reduce in Hadoop sense, fully in memory.

<img src="">

RDD = resilient distributed dataset 

Useful things to remember:
- Does not need key value pairs
- Saves a lot of I/O time compared to Hadoop (much faster)
- Does not need key value pairs
- RDDS are not evaluated until requested (first(), collect())

In [38]:
import pyspark as ps
import json
import multiprocessing
from operator import add

In [40]:
print multiprocessing.cpu_count() # figure out how many cores I have
sc = ps.SparkContext('local[4]') # use all of them

4


In [95]:
# create RDD from a Python list
my_rdd = sc.parallelize(text)

# create RDD from text file
# sc.textFile('data/whatever.txt')

In [68]:
my_rdd.collect()

['Map step: Each worker node applies the map() function to the local data, and writes the output to a temporary storage.',
 'Shuffle step: Worker nodes redistribute data based on the output keys (produced by the map() function), such that all data belonging to one key is located on the same worker node.',
 'Reduce step: Worker nodes now process each group of output data, per key, in parallel.']

In [64]:
# return only elements longer than 100
my_rdd_filtered = my_rdd.filter(lambda x: len(x) > 100)
my_rdd_filtered.collect()

['Map step: Each worker node applies the map() function to the local data, and writes the output to a temporary storage.',
 'Shuffle step: Worker nodes redistribute data based on the output keys (produced by the map() function), such that all data belonging to one key is located on the same worker node.']

In [98]:
# count of words
my_rdd_count = (
    my_rdd
        .map(lambda x: len(x))
        .sum()
)
print my_rdd_count

my_rdd_count = (
    my_rdd
        .map(lambda x: len(x))
        .reduce(lambda x, y: x + y)
)
print my_rdd_count

383
383


In [93]:
# working with numbers
my_rdd = sc.parallelize([1,2,3,4,5])

print my_rdd.fold(1000, lambda x, y: x + y)
print my_rdd.reduce(lambda x, y: x + y)

5015
15
