# Hadoop MapReduce with Python
There are two prominent *Python* APIs for interfacing *Hadoop MapReduce* clusters:


## *MRJOB* for *MapReduce* job execution
The ``mrjob`` lib -> [see docu](https://mrjob.readthedocs.io/en/latest/index.html) is a power full *MapReduce* client for *Python*. Some of the key features are:

* local emulation (single and multi-core) a *Hadoop* cluster for development and debugging
* simple access, authentication and file transfer to *Hadoop* clusters
* powerful API for common cloud services, such as AWS or Azure   

In [18]:
!pip install mrjob boto3



In [19]:
#in colab, we need to clone the data from the repo
!git clone https://github.com/keuperj/DATA.git

fatal: destination path 'DATA' already exists and is not an empty directory.


## Exercise
Use  *mrjob*  to  compute  employee  **top  annual  salaries** and  **gross pay** in the *CSV* table ``Baltimore_City_employee_Salaries_FY2014.csv``.

* use  ``import csv`` to read the data -> [API docs](https://docs.python.org/3/library/csv.html)
* use ``yield`` to return *producers* from *map* and *reduce* functions
* return top entries in both categories 

In [20]:
%%file salaries.py 
from mrjob.job import MRJob
from mrjob.step import MRStep
import csv

#column names
cols = 'Name,JobTitle,AgencyID,Agency,HireDate,AnnualSalary,GrossPay'.split(',')

class salarymax(MRJob):

    def mapper(self, _, line):
        # Convert each line into a dictionary
        row = dict(zip(cols, [ a.strip() for a in next(csv.reader([line]))]))

        # Yield the salary
        try:
            yield 'salary', (float(row['AnnualSalary']), line)
        except ValueError:
            self.increment_counter('warn', 'missing salary', 1)
        
        # Yield the gross pay
        try:
            yield 'gross', (float(row['GrossPay']), line)
        except ValueError:
            self.increment_counter('warn', 'missing gross', 1)

    def reducer(self, key, values):
        topten = []

        # For 'salary' and 'gross' compute the top 10
        for p in values:
            topten.append(p)
            topten.sort()
            topten = topten[-10:]

        for p in topten:
            yield key, p

    combiner = reducer

if __name__ == '__main__':
    salarymax.run()

Overwriting salaries.py


In [21]:
! python salaries.py -r local DATA/Baltimore_City_Employee_Salaries_FY2014.csv

No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/salaries.root.20220429.082011.159464
Running step 1 of 1...

Counters: 2
	warn
		missing gross=3224
		missing salary=1

job output is in /tmp/salaries.root.20220429.082011.159464/output
Streaming final output from /tmp/salaries.root.20220429.082011.159464/output...
"salary"	[163200.0, "\"Chow,Rudolph S\",DIRECTOR PUBLIC WORKS,A41101,DPW-Administration ,02/01/2011,163200.00,145513.79"]
"salary"	[163200.0, "\"Nilson,George A\",CITY SOLICITOR,A30001,Law Department ,01/16/2007,163200.00,164332.32"]
"salary"	[163365.0, "\"Rawlings-Blake,Stephanie C\",MAYOR,A01001,Mayors Office ,12/07/1995,163365.00,161219.24"]
"salary"	[165000.0, "\"Ford,Niles R\",EXECUTIVE LEVEL III,A64006,Fire Department ,01/15/2014,165000.00,69807.64"]
"salary"	[172000.0, "\"Parthemos,Kaliope\",EXECUTIVE LEVEL III,A01020,Mayor's Office ,12/26/2006,172000.00,154654.39"]
"salary"	[187200.0, "\"Swift,Micha

### Now on AWS Hadoop

In [22]:
%%file mrjob_cluster.conf
runners:
  emr:
    aws_access_key_id: XXXXX
    aws_secret_access_key: XXXXX
    region: eu-west-1

Overwriting mrjob_cluster.conf


In [None]:
! python salaries.py -r  emr --cluster-id=XXXXX   DATA/Baltimore_City_Employee_Salaries_FY2014.csv -c mrjob_cluster.conf

Using s3://mrjob-42e7145df80ebe94/tmp/ as our temp dir on S3
Creating temp directory /tmp/salaries.root.20220429.084339.077783
uploading working dir files to s3://mrjob-42e7145df80ebe94/tmp/salaries.root.20220429.084339.077783/files/wd...
Copying other local files to s3://mrjob-42e7145df80ebe94/tmp/salaries.root.20220429.084339.077783/files/
Adding our job to existing cluster j-3J2UJRSKGFVYF
  master node is ec2-54-155-223-96.eu-west-1.compute.amazonaws.com
Waiting for Step 1 of 1 (s-235ZYUJEG7N4X) to complete...
  PENDING (cluster is RUNNING: Running step)
  RUNNING for 0:00:46
  RUNNING for 0:01:16
  COMPLETED
Attempting to fetch counters from logs...
Waiting 10 minutes for logs to transfer to S3... (ctrl-c to skip)

To fetch logs immediately next time, set up SSH. See:
https://pythonhosted.org/mrjob/guides/emr-quickstart.html#configuring-ssh-credentials

