# Hadoop MapReduce with Python
There are two prominent *Python* APIs for interfacing *Hadoop MapReduce* clusters:


## *MRJOB* for *MapReduce* job execution
The ``mrjob`` lib -> [see docu](https://mrjob.readthedocs.io/en/latest/index.html) is a power full *MapReduce* client for *Python*. Some of the key features are:

* local emulation (single and multi-core) a *Hadoop* cluster for development and debugging
* simple access, authentication and file transfer to *Hadoop* clusters
* powerful API for common cloud services, such as AWS or Azure   

## Exercise
Use  *mrjob*  to  compute  employee  **top  annual  salaries** and  **gross pay** in the *CSV* table ``Baltimore_City_employee_Salaries_FY2014.csv``.

* use  ``import csv`` to read the data -> [API docs](https://docs.python.org/3/library/csv.html)
* use ``yield`` to return *producers* from *map* and *reduce* functions
* return top entries in both categories 

In [1]:
%%file salaries.py 
from mrjob.job import MRJob
from mrjob.step import MRStep
import csv

#column names
cols = 'Name,JobTitle,AgencyID,Agency,HireDate,AnnualSalary,GrossPay'.split(',')

class salarymax(MRJob):

    def mapper(self, _, line):
        # Convert each line into a dictionary
        row = dict(zip(cols, [ a.strip() for a in next(csv.reader([line]))]))

        # Yield the salary
        try:
            yield 'salary', (float(row['AnnualSalary']), line)
        except ValueError:
            self.increment_counter('warn', 'missing salary', 1)
        
        # Yield the gross pay
        try:
            yield 'gross', (float(row['GrossPay']), line)
        except ValueError:
            self.increment_counter('warn', 'missing gross', 1)

    def reducer(self, key, values):
        topten = []

        # For 'salary' and 'gross' compute the top 10
        for p in values:
            topten.append(p)
            topten.sort()
            topten = topten[-10:]

        for p in topten:
            yield key, p

    combiner = reducer

if __name__ == '__main__':
    salarymax.run()

Writing salaries.py


In [2]:
! python salaries.py -r local Baltimore_City_Employee_Salaries_FY2014.csv

No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/salaries.keuper.20210604.132701.986009
Running step 1 of 1...

Counters: 2
	warn
		missing gross=3224
		missing salary=1

job output is in /tmp/salaries.keuper.20210604.132701.986009/output
Streaming final output from /tmp/salaries.keuper.20210604.132701.986009/output...
"gross"	[165108.5,"\"Cheelsman III,Charles H\",Battalion Fire Chief EMS EMT-P,A64460,Fire Department ,12\/08\/1980,107307.00,165108.50"]
"gross"	[165270.01,"\"Makanjuola,Rafiu T\",POLICE OFFICER (EID),A99061,Police Department ,07\/30\/1997,67535.00,165270.01"]
"gross"	[165892.21,"\"Harris Jr,William\",POLICE SERGEANT,A99309,Police Department ,10\/24\/2000,80612.00,165892.21"]
"gross"	[166442.42,"\"Stokes,Charline B\",Battalion Fire Chief EMS EMT-P,A64460,Fire Department ,01\/18\/1988,107307.00,166442.42"]
"gross"	[173876.84,"\"Marcus Sr,Albert M\",POLICE OFFICER (EID),A99322,Police Department ,02\/03\