# Ex 2.1 Hadoop MapReduce with Python
There are two prominent *Python* APIs for interfacing *Hadoop MapReduce* clusters:

## *Snakebite* for *HDFS* access
The [Snakebite Lib](https://github.com/spotify/snakebite) allows easy access to *HDFS* file systems:  
```
>>> from snakebite.client import Client
>>> client = Client("localhost", 8020, use_trash=False)
>>> for x in client.ls(['/']):
...     print x
```

See [documentation](https://snakebite.readthedocs.io/en/latest/) for details.


## *MRJOB* for *MapReduce* job execution
The ``mrjob`` lib -> [see docu](https://mrjob.readthedocs.io/en/latest/index.html) is a power full *MapReduce* client for *Python*. Some of the key features are:

* local emulation (single and multi-core) a *Hadoop* cluster for development and debugging
* simple access, authentication and file transfer to *Hadoop* clusters
* powerful API for common cloud services, such as AWS or Azure   

### Preparing our environment

In [29]:
#install mrjob lib and boto3 for AWS S3 access
!conda install -c conda-forge -y mrjob boto3

#or !pip install mrjob boto3

Collecting boto3
  Downloading boto3-1.14.1-py2.py3-none-any.whl (128 kB)
[K     |████████████████████████████████| 128 kB 2.2 MB/s eta 0:00:01
Collecting botocore<1.18.0,>=1.17.1
  Downloading botocore-1.17.1.tar.gz (6.5 MB)
[K     |████████████████████████████████| 6.5 MB 10.6 MB/s eta 0:00:01
[?25hCollecting jmespath<1.0.0,>=0.7.1
  Downloading jmespath-0.10.0-py2.py3-none-any.whl (24 kB)
Collecting s3transfer<0.4.0,>=0.3.0
  Downloading s3transfer-0.3.3-py2.py3-none-any.whl (69 kB)
[K     |████████████████████████████████| 69 kB 9.7 MB/s  eta 0:00:01
[?25hCollecting docutils<0.16,>=0.10
  Downloading docutils-0.15.2-py3-none-any.whl (547 kB)
[K     |████████████████████████████████| 547 kB 17.8 MB/s eta 0:00:01
Building wheels for collected packages: botocore
  Building wheel for botocore (setup.py) ... [?25ldone
[?25h  Created wheel for botocore: filename=botocore-1.17.1-py2.py3-none-any.whl size=6262073 sha256=68c321947081e5a1d0d1f60cba1db3166cd7213ddc72c3a7f559de11bf3d0b

## A *MRJOB* Example: WordCount (again)
Since *Hadoop* works only on file in- and outputs, we do not have usual function based API. We need to pass our code (implementation of *Map* and *Reduce*) as executable *Python* scripts:

* use *Jupyter's* ``%%file`` magic command to write the cell to file
* create a executable script with ``__main__`` method
* inherit from the ``MRJob`` class
* implement ``mapper()`` and ``reducer()`` methods
* call ``run()`` at start

In [1]:
%%file wordcount.py 
#this will save this cell as file

from mrjob.job import MRJob

class MRWordCount(MRJob):
    def mapper(self, _, line):
        for word in line.split():
            yield(word, 1)
 
    def reducer(self, word, counts):
        yield(word, sum(counts))
        
if __name__ == '__main__':
    MRWordCount.run()
            

Writing wordcount.py


### execute script from cmd
* ``-r local`` causes local multi-core emulation a *Hadoop* cluster.
* Input files are cmd arguments
* define ouput-file (see docs) or use streams: `` > out.txt``

In [5]:
! python wordcount.py -r local text1.rst text2.rst text3.rst

No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/wordcount.keuper.20200612.040613.470702
Running step 1 of 1...
job output is in /tmp/wordcount.keuper.20200612.040613.470702/output
Streaming final output from /tmp/wordcount.keuper.20200612.040613.470702/output...
"-"	3
"--"	6
"A"	16
"Adjusting"	3
"Alex"	3
"All"	3
"Almighty,"	3
"Alphabet"	3
"Amazingly"	3
"And"	3
"Bawds"	3
"Baz,"	3
"Big"	6
"Blind"	9
"Blowzy"	3
"Bookmarksgrove"	4
"Bookmarksgrove,"	3
"Brawny"	3
"Brick"	3
"Bright"	3
"But"	3
"Commas,"	3
"Consonantia,"	4
"Copy"	3
"Cozy"	3
"Crazy"	3
"DJs"	4
"Duden"	4
"Even"	3
"Far"	2
"Few"	3
"Five"	3
"Flummoxed"	3
"Fox"	3
"Foxy"	3
"Fredericka"	3
"God!"	3
"Grammar."	3
"Have"	3
"How"	3
"I"	44
"Ipsum"	3
"Iraq."	3
"It"	4
"Italic"	3
"Jack!\""	3
"Jack"	3
"Japan"	3
"Jim."	6
"Joaquin"	3
"July"	3
"Junk"	3
"Lane."	3
"Line"	3
"Little"	9
"Longe"	3
"Lorem"	3
"MTV"	9
"Marks"	3
"Mountains,"	3
"My"	6
"O"	3
"Oh,"	3
"On"	3
"One"	3
"Oxmox"	3


## Execution on AWS EMR
AWS EMR is a clound formation service which allows you to create *Hadoop*, *Spark* and other data analytics clusters with a few clicks.

**NOTE**: we are not endorsing AWS specifically, other cloud service providers have similar offers



### Case 1: create cluster on the fly 
We create a cluster just for a single job:
* simple solution for large jobs that run only once (or only at sparse points in time)
* this approach cause a lot of over head: not suitable for small and frequent jobs  

First, we need a config file for the connection to EMR:
**fill in YOUR AWS credentials**

In [51]:
%%file mrjob.conf
runners:
  emr:
    aws_access_key_id: YOUR_ID
    aws_secret_access_key: YOUR_KEY
    instance_type: m5.xlarge
    num_core_instances: 2
    region: eu-west-1

Overwriting mrjob.conf


In [52]:
! python wordcount.py -r emr --bootstrap-mrjob text1.rst text2.rst -c mrjob.conf 


Using s3://mrjob-42e7145df80ebe94/tmp/ as our temp dir on S3
Creating temp directory /tmp/wordcount.keuper.20200611.214401.921119
writing master bootstrap script to /tmp/wordcount.keuper.20200611.214401.921119/b.sh
uploading working dir files to s3://mrjob-42e7145df80ebe94/tmp/wordcount.keuper.20200611.214401.921119/files/wd...
Copying other local files to s3://mrjob-42e7145df80ebe94/tmp/wordcount.keuper.20200611.214401.921119/files/
Can't access IAM API, trying default instance profile: EMR_EC2_DefaultRole
Can't access IAM API, trying default service role: EMR_DefaultRole
Created new cluster j-APC543JKKD6H
Added EMR tags to cluster j-APC543JKKD6H: __mrjob_label=wordcount, __mrjob_owner=keuper, __mrjob_version=0.7.3
Waiting for Step 1 of 1 (s-358KPGWRE3U5X) to complete...
  PENDING (cluster is STARTING)
  PENDING (cluster is STARTING)
  PENDING (cluster is STARTING)
  PENDING (cluster is STARTING: Configuring cluster software)
  PENDING (cluster is STARTING: Configuring cluster softwar

### Case 3: connect to existing cluster

In [44]:
%%file mrjob_cluster.conf
runners:
  emr:
    aws_access_key_id: YOUR_ID
    aws_secret_access_key: YOUR_KEY
    region: eu-west-1

Overwriting mrjob_cluster.conf


We need the **ID** of the cluster we want to connect to.

In [46]:
! python wordcount.py -r emr --cluster-id=j-CLUSTERID text1.rst text2.rst -c mrjob_cluster.conf  

Using s3://mrjob-42e7145df80ebe94/tmp/ as our temp dir on S3
Creating temp directory /tmp/wordcount.keuper.20200611.211946.917705
uploading working dir files to s3://mrjob-42e7145df80ebe94/tmp/wordcount.keuper.20200611.211946.917705/files/wd...
Copying other local files to s3://mrjob-42e7145df80ebe94/tmp/wordcount.keuper.20200611.211946.917705/files/
Adding our job to existing cluster j-13JDFN62TGS41
  master node is ec2-54-75-7-6.eu-west-1.compute.amazonaws.com
Waiting for Step 1 of 1 (s-FM2DU2UTBWWQ) to complete...
  PENDING (cluster is RUNNING: Running step)
  RUNNING for 0:00:45
  RUNNING for 0:01:15
  FAILED
Cluster j-13JDFN62TGS41 is WAITING: Cluster ready after last step failed.
Attempting to fetch counters from logs...
Waiting 10 minutes for logs to transfer to S3... (ctrl-c to skip)

To fetch logs immediately next time, set up SSH. See:
https://pythonhosted.org/mrjob/guides/emr-quickstart.html#configuring-ssh-credentials

Looking for step log in s3://aws-logs-846657657993-eu-w

## Exercise
Use  *mrjob*  to  compute  employee  **top  annual  salaries** and  **gross pay** in the *CSV* table ``Baltimore_City_employee_Salaries_FY2014.csv``.

* use  ``import csv`` to read the data -> [API docs](https://docs.python.org/3/library/csv.html)
* use ``yield`` to return *producers* from *map* and *reduce* functions
* return top entries in both categories 