# Ex 2.1 Hadoop MapReduce with Python
There are two prominent *Python* APIs for interfacing *Hadoop MapReduce* clusters:

## *Snakebite* for *HDFS* access
The [Snakebite Lib](https://github.com/spotify/snakebite) allows easy access to *HDFS* file systems:  
```
>>> from snakebite.client import Client
>>> client = Client("localhost", 8020, use_trash=False)
>>> for x in client.ls(['/']):
...     print x
```

See [documentation](https://snakebite.readthedocs.io/en/latest/) for details.


## *MRJOB* for *MapReduce* job execution
The ``mrjob`` lib -> [see docu](https://mrjob.readthedocs.io/en/latest/index.html) is a power full *MapReduce* client for *Python*. Some of the key features are:

* local emulation (single and multi-core) a *Hadoop* cluster for development and debugging
* simple access, authentication and file transfer to *Hadoop* clusters
* powerful API for common cloud services, such as AWS or Azure   

### Preparing our environment

In [6]:
#install mrjob lib and boto3 for AWS S3 access
!conda install -c conda-forge -y mrjob boto3

#or !pip install mrjob boto3

zsh:1: command not found: conda


## A *MRJOB* Example: WordCount (again)
Since *Hadoop* works only on file in- and outputs, we do not have usual function based API. We need to pass our code (implementation of *Map* and *Reduce*) as executable *Python* scripts:

* use *Jupyter's* ``%%file`` magic command to write the cell to file
* create a executable script with ``__main__`` method
* inherit from the ``MRJob`` class
* implement ``mapper()`` and ``reducer()`` methods
* call ``run()`` at start

In [2]:
%%file wordcount.py 
#this will save this cell as file

from mrjob.job import MRJob

class MRWordCount(MRJob):
    def mapper(self, _, line):
        for word in line.split():
            yield(word, 1)
 
    def reducer(self, word, counts):
        yield(word, sum(counts))
        
if __name__ == '__main__':
    MRWordCount.run()
            

Overwriting wordcount.py


### execute script from cmd
* ``-r local`` causes local multi-core emulation a *Hadoop* cluster.
* Input files are cmd arguments
* define ouput-file (see docs) or use streams: `` > out.txt``

In [4]:
! python wordcount.py -r local *.rst > out.txt

Traceback (most recent call last):
  File "wordcount.py", line 3, in <module>
    from mrjob.job import MRJob
ImportError: No module named mrjob.job


 -> results in **out.txt** 

## Execution on AWS EMR
AWS EMR is a clound formation service which allows you to create *Hadoop*, *Spark* and other data analytics clusters with a few clicks.

**NOTE**: we are not endorsing AWS specifically, other cloud service providers have similar offers



### Connect to existing cluster

In [None]:
%%file mrjob_cluster.conf
runners:
  emr:
    aws_access_key_id: YOUR_KEY_ID
    aws_secret_access_key: YOUR_KEY_SECRET
    region: eu-west-1

We need the **ID** of the cluster we want to connect to - here pre-set to our Cluster today

In [None]:
! python wordcount.py -r emr --cluster-id=j-L1BO0NYZIYY0 text1.rst text2.rst -c mrjob_cluster.conf  

## Exercise
Use  *mrjob*  to  compute  employee  **top  annual  salaries** and  **gross pay** in the *CSV* table ``Baltimore_City_employee_Salaries_FY2014.csv``.

* use  ``import csv`` to read the data -> [API docs](https://docs.python.org/3/library/csv.html)
* use ``yield`` to return *producers* from *map* and *reduce* functions
* return top entries in both categories 