# MapReduce Using `MRJob`

## Job Posting Dataset

The sample dataset we will mainly use (`data/job-data/job-data-2018-09-*.txt`) for this tutorial contains job postings from one of the US job search websites. The data is stored with each row as a JSON document representing a job posting record. 

The example below shows a sample job postings from the data file. The sample record has been formatted with 4 spaces indentation. In the real file, each record is stored as a JSON document in one row.

*Example: JSON document of a job posting record*

```
{
    "industry": "Information Technology", 
    "datePosted": "2018-09-07", 
    "salaryCurrency": "USD", 
    "validThrough": "2018-10-07", 
    "empId": 671932, 
    "jobLocation": {
        "geo": {
            "latitude": "37.7623", 
            "@type": "GeoCoordinates", 
            "longitude": "-122.4145"
        }, 
        "@type": "Place", 
        "address": {
            "postalCode": "94110-2042", 
            "addressLocality": "San Francisco", 
            "@type": "PostalAddress", 
            "addressRegion": "CA", 
            "addressCountry": {
                "@type": 
                "Country", 
                "name": "US"
            }
        }
    }, 
    "estimatedSalary": {
        "@type": "MonetaryAmount", 
        "currency": "USD", 
        "value": {
            "maxValue": "202000", 
            "@type": "QuantitativeValue", 
            "unitText": "YEAR", 
            "minValue": "146000"
        }
    }, 
    "description": "<div><em>Generate insights and impact from data</em><em>.</em></div>\n<br/>\n<div>\n<div>We're looking for data scientists to join the Analytics team who are excited about applying their analytical skills to understand our users and influence decision making. If you are naturally data curious, excited about deriving insights from data, and motivated by having impact on the business, we want to hear from you.</div><br/>\n\n<div><strong>You will:</strong></div><div>\n\n\n<ul>\n<li>Work closely with product and business teams to identify important questions and answer them with data.</li>\n</ul>\n\n</div><br/>\n\n<div>\n\n\n<ul>\n<li>Apply statistical and econometric models on large datasets to: i) measure results and outcomes, ii) identify causal impact and attribution, iii) predict future performance of users or products.</li>\n</ul>\n\n</div><br/>\n\n<div>\n\n\n<ul>\n<li>Design, analyze, and interpret the results of experiments.</li>\n</ul>\n\n</div><br/>\n\n<div>\n\n\n<ul>\n<li>Drive the collection of new data and the refinement of existing data sources.</li>\n</ul>\n\n</div><br/>\n\n<div>\n\n\n<ul>\n<li>Create analyses that tell a \"story\" focused on insights, not just data.</li>\n</ul>\n\n</div><br/>\n\n<div><strong>We're looking for someone with:</strong></div><div>\n\n\n<ul>\n<li>3+ years experience working with and analyzing large data sets to solve problems.</li>\n</ul>\n\n</div><br/>\n\n<div>\n\n\n<ul>\n<li>A PhD or MS in a quantitative field (e.g., Economics, Statistics, Eng, Natural Sciences, CS).</li>\n</ul>\n\n</div><br/>\n\n<div>\n\n\n<ul>\n<li>Expert knowledge of a scientific computing language (such as R or Python) and SQL.</li>\n</ul>\n\n</div><br/>\n\n<div>\n\n\n<ul>\n<li>Strong knowledge of statistics and experimental design.</li>\n</ul>\n\n</div><br/>\n\n<div>\n\n\n<ul>\n<li>Ability to communicate results clearly and a focus on driving impact.</li>\n</ul>\n\n</div><br/>\n\n<div><strong>Nice to haves:</strong></div><div>\n\n\n<ul>\n<li>Prior experience with data-distributed tools (Scalding, Hadoop, Pig, etc).</li>\n</ul>\n\n</div><br/>\n\n<div><strong>You should include these in your application:</strong></div><div>\n\n\n<ul>\n<li>Resume and LinkedIn profile.</li>\n</ul>\n\n</div><br/>\n\n<div>\n\n\n<ul>\n<li>Description of the most interesting data analysis you've done, key findings, and its impact.</li>\n</ul>\n\n</div><br/>\n\n<div>\n\n\n<ul>\n<li>Link to or attachment of code you've written related to data analysis.</li>\n</ul>\n\n</div>\n</div>\n<br/>", 
    "hiringOrganization": {
        "@type": "Organization", 
        "sameAs": "www.stripe.com", 
        "name": "Stripe"
    },
    "@type": "JobPosting", 
    "jobId": 2280174543, 
    "@context": "http://schema.org", 
    "employmentType": "FULL_TIME", 
    "occupationalCategory": [
        "15-1111.00", 
        "Computer and Information Research Scientists"
    ], 
    "title": "Data Scientist"
}
```

Copy input data to HDFS:

In [1]:
!hdfs dfs -mkdir job-data/

In [2]:
!hdfs dfs -put ../data/job-data/* job-data/

## 1. Protocols For Input & Output

`mrjob` assumes that all data is newline-delimited bytes. Each job has an *input protocol*, an *output protocol*, and an *internal protocol*. These protocols can be changed by overwritting the attributes: `INPUT_PROTOCOL`, `INTERNAL_PROTOCOL`, and `OUTPUT_PROTOCOL`, respectively.

The default *input* protocol is `RawValueProtocol`, which just reads in a line as a `str`.
The default *output* and *internal* protocols are both `JSONProtocol`, which reads and writes JSON strings separated by a tab character.

`JSONValueProtocol` encodes value as a JSON and discard key (key is read in as None). To load the job posting dataset, we can set `INPUT_PROTOCOL = JSONValueProtocol` which automaticall loads input data as Python `dict` objects.

For more information, see [Protocols](https://pythonhosted.org/mrjob/guides/writing-mrjobs.html#job-protocols).

**Example**: Simple JSON Parser

The script below reads the data into `MRTest.mapper` with each record loaded as a Python dict, and generates output of key-value pairs where keys are `jobId` and values are `jobLocation`, which will then be written into output files as JSON documents. Note that no `MRTest.reducer` is provided, this type of jobs are also called *map-only* jobs.

- *Data flow*:

  - Input:`record`
  - $\quad\downarrow$
  - mapper:`<_, record> -> <jobId, jobLocation>`
  - $\quad\downarrow$
  - Output:`jobId jobLocation`
  
- *Features and highlights*:
  
  `INPUT_PROTOCOL = JSONValueProtocol` allows MRJob to parse JSON documents as python dict

In [3]:
%%file mr-jobs/1_protocols.py
from mrjob.job import MRJob
from mrjob.protocol import JSONValueProtocol


class MRTest(MRJob):
    
    INPUT_PROTOCOL = JSONValueProtocol

    def mapper(self, _, value):
        yield value.get('jobId', None), value.get('jobLocation', None)

        
if __name__ == '__main__':
    MRTest.run()

Overwriting mr-jobs/1_protocols.py


- Test locally:

In [4]:
!python3 mr-jobs/1_protocols.py ../data/job-data/* --output-dir mr-output

No configs found; falling back on auto-configuration
No configs specified for inline runner
Running step 1 of 1...
Creating temp directory /tmp/1_protocols.hadoop.20180918.152432.546356
job output is in mr-output
Removing temp directory /tmp/1_protocols.hadoop.20180918.152432.546356...


- Run on your Hadoop cluster:

In [5]:
!hdfs dfs -rm -r hdfs:///user/hadoop/mr-output

rm: `hdfs:///user/hadoop/mr-output': No such file or directory


In [6]:
!python3 mr-jobs/1_protocols.py -r hadoop \
hdfs:///user/hadoop/job-data/ \
--output-dir mr-output/

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /usr/local/hadoop-2.8.4/bin...
Found hadoop binary: /usr/local/hadoop-2.8.4/bin/hadoop
Using Hadoop version 2.8.4
Looking for Hadoop streaming jar in /usr/local/hadoop-2.8.4...
Found Hadoop streaming jar: /usr/local/hadoop-2.8.4/share/hadoop/tools/lib/hadoop-streaming-2.8.4.jar
Creating temp directory /tmp/1_protocols.hadoop.20180918.152437.972664
Copying local files to hdfs:///user/hadoop/tmp/mrjob/1_protocols.hadoop.20180918.152437.972664/files/...
Running step 1 of 1...
  packageJobJar: [/tmp/hadoop-unjar8039006073959697496/] [] /tmp/streamjob291647840707618100.jar tmpDir=null
  Connecting to ResourceManager at /0.0.0.0:8032
  Connecting to ResourceManager at /0.0.0.0:8032
  Total input files to process : 2
  number of splits:2
  Submitting tokens for job: job_1537280577176_0001
  Submitted application application_1537280577176_0001
  The url to track the job: htt

## 2. Filtering

Keys:

- Filtering pattern aims to find a subset of data but usually not change the actural records. 
  - We can set `OUTPUT_PROTOCOL = JSONValueProtocol` to ignore the key field for each record in the output.
- Filtering patterns usually don't need a reducer if each record is filtered individually and the evaluation does not depend on other records.
- Filtering usually serves as an abstract pattern for some other patterns.

Applications:

- Data cleaning
- Events tracking
- Records matching
- Random sampling
- Dataset splitting

### 2.1 Simple Filtering

Simple filtering is often used when data cleaning, events tracking, outliers removing, etc., are needed.

**Example**: Find all jobs with titles relavant to *Data Scientist*.


- *Data flow*:

  - Input:`record`
  - $\quad\downarrow$
  - mapper:`<_, record> [if keyword in title -> <None, record>]`
  - $\quad\downarrow$
  - Output:`record`
  
- *Features and highlights*:
  
  `OUTPUT_PROTOCOL = JSONValueProtocol` ignores the key field for each record in the output

In [7]:
%%file mr-jobs/2.1_simple_filtering.py
from mrjob.job import MRJob
from mrjob.protocol import JSONValueProtocol


class MRSimpleFiltering(MRJob):
    
    INPUT_PROTOCOL = JSONValueProtocol
    OUTPUT_PROTOCOL = JSONValueProtocol
    
    def mapper(self, _, value):
        title = value.get('title', '').lower()
        if title.find('data scientist') > -1:
            yield _, value


if __name__ == '__main__':
    MRSimpleFiltering.run()

Overwriting mr-jobs/2.1_simple_filtering.py


Test locally:

In [8]:
!python3 mr-jobs/2.1_simple_filtering.py ../data/job-data/* --output-dir mr-output

No configs found; falling back on auto-configuration
No configs specified for inline runner
Running step 1 of 1...
Creating temp directory /tmp/2.hadoop.20180918.152527.148705
job output is in mr-output
Removing temp directory /tmp/2.hadoop.20180918.152527.148705...


Run on your Hadoop cluster:

In [9]:
!hdfs dfs -rm -r hdfs:///user/hadoop/mr-output

Deleted hdfs:///user/hadoop/mr-output


In [10]:
!python3 mr-jobs/2.1_simple_filtering.py \
-r hadoop hdfs:///user/hadoop/job-data/ \
--output-dir mr-output/

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /usr/local/hadoop-2.8.4/bin...
Found hadoop binary: /usr/local/hadoop-2.8.4/bin/hadoop
Using Hadoop version 2.8.4
Looking for Hadoop streaming jar in /usr/local/hadoop-2.8.4...
Found Hadoop streaming jar: /usr/local/hadoop-2.8.4/share/hadoop/tools/lib/hadoop-streaming-2.8.4.jar
Creating temp directory /tmp/2.hadoop.20180918.152531.986639
Copying local files to hdfs:///user/hadoop/tmp/mrjob/2.hadoop.20180918.152531.986639/files/...
Running step 1 of 1...
  packageJobJar: [/tmp/hadoop-unjar1271355753105562194/] [] /tmp/streamjob663284053151428870.jar tmpDir=null
  Connecting to ResourceManager at /0.0.0.0:8032
  Connecting to ResourceManager at /0.0.0.0:8032
  Total input files to process : 2
  number of splits:2
  Submitting tokens for job: job_1537280577176_0002
  Submitted application application_1537280577176_0002
  The url to track the job: http://ebee9e947bed:808

### 2.2 Random Sampling

Random sampling pattern allows us to create a subset (usually much smaller) of our larger dataset for quick exploration. Thus each record should have an equal probability of being selected. 

If reproducible is not required, then we can use a random function, e.g.: `random.uniform(a, b)` in python, to do the work.

**Example**: Create a random subset with 10% of the full dataset.

We want to pass an argument `fraction` to our `MRJob` script. We can do this by using `MRJob.configure_args()` and `MRJob.add_passthru_arg()` together.

- *Data flow*:

  - Input:`record`
  - $\quad\downarrow$
  - mapper:`<_, record> [Prob=0.1 -> <None, record>]`
  - $\quad\downarrow$
  - Output:`record`
  
- *Features and highlights*:
  
  - `MRJob.configure_args()` allows user to define arguments for this script 
  - `MRJob.add_passthru_arg('--fraction', **kwargs)` defines a command-line argument named `fraction`
  - To pass a value to `fraction` via command-line arguemnt: `--fraction <value>`
  - To use `fraction` in script: `MRJob.options.fraction`
  - `MRJob.mapper_init()` validates the value of `fraction` before the `mapper` processes any input.

In [11]:
%%file mr-jobs/2.2_random_sampling.py
from mrjob.job import MRJob
from mrjob.protocol import JSONValueProtocol

import random


class MRRandomSampling(MRJob):
    
    INPUT_PROTOCOL = JSONValueProtocol
    OUTPUT_PROTOCOL = JSONValueProtocol
    
    def configure_args(self):
        super().configure_args()
        self.add_passthru_arg('--fraction', type=float)
        
    def mapper_init(self):
        if self.options.fraction > 1 or self.options.fraction < 0:
            raise ValueError('Invalid fraction value')
        
    def mapper(self, _, value):
        key = value.get('jobId', 0)
        if random.uniform(0, 1) < self.options.fraction:
            yield _, value


if __name__ == '__main__':
    MRRandomSampling.run()

Overwriting mr-jobs/2.2_random_sampling.py


- Test locally:

In [12]:
!python3 mr-jobs/2.2_random_sampling.py ../data/job-data/* --output-dir mr-output/ --fraction .1

No configs found; falling back on auto-configuration
No configs specified for inline runner
Running step 1 of 1...
Creating temp directory /tmp/2.hadoop.20180918.154235.758049
job output is in mr-output/
Removing temp directory /tmp/2.hadoop.20180918.154235.758049...


- Run on your Hadoop cluster:

In [13]:
!hdfs dfs -rm -r hdfs:///user/hadoop/mr-output

Deleted hdfs:///user/hadoop/mr-output


In [14]:
!python3 mr-jobs/2.2_random_sampling.py \
-r hadoop hdfs:///user/hadoop/job-data/ \
--output-dir mr-output/ --fraction .1

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /usr/local/hadoop-2.8.4/bin...
Found hadoop binary: /usr/local/hadoop-2.8.4/bin/hadoop
Using Hadoop version 2.8.4
Looking for Hadoop streaming jar in /usr/local/hadoop-2.8.4...
Found Hadoop streaming jar: /usr/local/hadoop-2.8.4/share/hadoop/tools/lib/hadoop-streaming-2.8.4.jar
Creating temp directory /tmp/2.hadoop.20180918.154240.231092
Copying local files to hdfs:///user/hadoop/tmp/mrjob/2.hadoop.20180918.154240.231092/files/...
Running step 1 of 1...
  packageJobJar: [/tmp/hadoop-unjar840998732318019041/] [] /tmp/streamjob4484908570565239764.jar tmpDir=null
  Connecting to ResourceManager at /0.0.0.0:8032
  Connecting to ResourceManager at /0.0.0.0:8032
  Total input files to process : 2
  number of splits:2
  Submitting tokens for job: job_1537280577176_0003
  Submitted application application_1537280577176_0003
  The url to track the job: http://ebee9e947bed:808

## 2.3 Data Splitting

For machine learning modeling, we usually divide the data set into two non-overlapping subsets:
- training set — a subset to train a model.
- test set — a subset to test the trained model.

If the goal is to split the dataset into such two subsets, then we need to make sure:
- each record can only be selected into one of the two datasets
- sampling is reproducible

The `sample` function below returns either `True` or `False` based on the hashed value of key and fraction:
1. split fraction into *numerator* and *denominator*, e.g.: 0.125 $\rightarrow$ 125/1000
2. calculate the hash value of the key. Here we will use MD5, which is a widely used hash function producing a 128-bit hash value.
3. calculate hash value modulo *denominator*, if it's less than *numerator*, return `True`, otherwise return `False`.

Note: if you just want to randomly sample the dataset, then a simple random number generator will work.

In [15]:
import decimal
import hashlib

def sample(key, fraction):
    if fraction > 1 or fraction < 0:
        raise ValueError('Invalid fraction value')
    # calculate numerator and denominator
    frac = decimal.Decimal(str(fraction)).as_tuple()
    numer = sum([v*10**i for i, v in enumerate(frac.digits[::-1])])
    denom = 10**(-frac.exponent)
    # calculate hash value using md5
    hash_val = hashlib.md5(str(key).encode()).hexdigest()
    return (int(hash_val, 16) % denom) < numer

In [16]:
# test the function with the code below
N = 1000
print(sum([sample(i, fraction=0.25) for i in range(N)]))

259


**Example**: Creating a reproducible train/test split.
 
- *Features and highlights*:
    
  - `MRJob.add_passthru_arg('--split')` defines a command-line argument named `split`, which takes value "train" or "test". 
    - If `split=train`, it outputs train subset, otherwise it outputs test subset.
  - `MRJob.add_passthru_arg('--test_size')` defines a command-line argument named `test_size`. 
    - The value should be between 0.0 and 1.0, which represent the proportion of the dataset to include in the test split.
  - To create a train/test split, we run the script twice, one with `split=train` and one with `split=test`.

In [17]:
%%file mr-jobs/2.3_train_test_splitting.py
from mrjob.job import MRJob
from mrjob.protocol import JSONValueProtocol

import decimal
import hashlib


class MRTrainTestSplit(MRJob):
    
    INPUT_PROTOCOL = JSONValueProtocol
    OUTPUT_PROTOCOL = JSONValueProtocol
    
    def configure_args(self):
        super().configure_args()
        self.add_passthru_arg('--split')
        self.add_passthru_arg('--test_size', type=float, default=0.3)
        
    def mapper_init(self):
        if self.options.split not in ('train', 'test'):
            raise ValueError('Invalid split value')
        if self.options.test_size > 1 or self.options.test_size < 0:
            raise ValueError('Invalid test size')
        
    def mapper(self, _, value):
        key = value.get('jobId', 0)
        include = self._sample(key=key, fraction=self.options.test_size)
        if include ^ (self.options.split=='train'):
            yield _, value
    
    def _sample(self, key, fraction=1):
        frac = decimal.Decimal(str(fraction)).as_tuple()
        numer = sum([v*10**i for i, v in enumerate(frac.digits[::-1])])
        denom = 10**(-frac.exponent)
        hash_val = hashlib.md5(str(key).encode()).hexdigest()
        return (int(hash_val, 16) % denom) < numer
    
        
if __name__ == '__main__':
    MRTrainTestSplit.run()

Overwriting mr-jobs/2.3_train_test_splitting.py


- Test locally:

In [18]:
!python3 mr-jobs/2.3_train_test_splitting.py ../data/job-data/* \
--output-dir mr-output/train \
--test_size 0.3 \
--split train \
&& python3 mr-jobs/2.3_train_test_splitting.py ../data/job-data/* \
--output-dir mr-output/test \
--test_size 0.3 \
--split test \

No configs found; falling back on auto-configuration
No configs specified for inline runner
Running step 1 of 1...
Creating temp directory /tmp/2.hadoop.20180918.155157.415264
job output is in mr-output/train
Removing temp directory /tmp/2.hadoop.20180918.155157.415264...
No configs found; falling back on auto-configuration
No configs specified for inline runner
Running step 1 of 1...
Creating temp directory /tmp/2.hadoop.20180918.155158.250271
job output is in mr-output/test
Removing temp directory /tmp/2.hadoop.20180918.155158.250271...


- Run on your Hadoop cluster:

In [19]:
!hdfs dfs -rm -r hdfs:///user/hadoop/mr-output

Deleted hdfs:///user/hadoop/mr-output


In [20]:
!python3 mr-jobs/2.3_train_test_splitting.py \
-r hadoop hdfs:///user/hadoop/job-data/ \
    --output-dir mr-output/train \
    --test_size 0.3 \
    --split train \
&& python3 mr-jobs/2.3_train_test_splitting.py \
-r hadoop hdfs:///user/hadoop/job-data/ \
    --output-dir mr-output/test \
    --test_size 0.3 \
    --split test

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /usr/local/hadoop-2.8.4/bin...
Found hadoop binary: /usr/local/hadoop-2.8.4/bin/hadoop
Using Hadoop version 2.8.4
Looking for Hadoop streaming jar in /usr/local/hadoop-2.8.4...
Found Hadoop streaming jar: /usr/local/hadoop-2.8.4/share/hadoop/tools/lib/hadoop-streaming-2.8.4.jar
Creating temp directory /tmp/2.hadoop.20180918.160512.741754
Copying local files to hdfs:///user/hadoop/tmp/mrjob/2.hadoop.20180918.160512.741754/files/...
Running step 1 of 1...
  packageJobJar: [/tmp/hadoop-unjar5739251114290174198/] [] /tmp/streamjob3525669514403577379.jar tmpDir=null
  Connecting to ResourceManager at /0.0.0.0:8032
  Connecting to ResourceManager at /0.0.0.0:8032
  Total input files to process : 2
  number of splits:2
  Submitting tokens for job: job_1537280577176_0004
  Submitted application application_1537280577176_0004
  The url to track the job: http://ebee9e947bed:80

## 3. Top N Pattern


Keys:

- Top n pattern aims to retrieve a relatively small number of records from a large data set according to a ranking scheme specified by user without sorting the entire data set.
- The subset needs to be small enough to fit into one single node and thus N should not be too large.

Applications:

- Anomaly detection
- Finding the top k records with the lowest/highest values

### 3.1 Top N Values

In our data set, each job contains two salaires (or no such fields if it's not available), `minValue` and `maxValue`. We want to find out the top N Salaries from all jobs. 

- To find the top n list across the entire dataset, we need to compare the values from all records, which means the key field becomes less useful and therefore we can assign `None` as the key, which will end up with one single reducer process the reduce job.

- To minimize the burden of the reducer and to maximize the parallelism, we can use a combiner to find a local top n list in each mapper container.

**Example**: Find the top N salaries from all jobs. 

To receive the top N salary list, we first compair `maxValue`, then `minValue`. 


- *Data flow*:

  - Input:`record`
  - $\quad\downarrow$
  - mapper:`<_, record> -> <_, (maxValue, minValue)>`
  - $\quad\downarrow$
  - combiner:`<_, local_top_n[(maxValue, minValue)]>`
  - $\quad\downarrow$
  - reducer:`<_, global_top_n[(maxValue, minValue)]>`
  - $\quad\downarrow$
  - Output:`top_n[(maxValue, minValue)]`
  
- *Features and highlights*:
  
  - Ignore key field in `mapper` so only one reducer will be involved  
  - `MRJob.reducer_init()` initializes top N sorted list
  - `MRJob.reducer()` inserts record into top N sorted list, alway truncate list to a length of N
  - `MRJob.reducer_final()` emits record from top N sorted list
  - Add `combiner_init`/`combiner`/`combiner_final` steps through `MrJob.steps()` to reduce data flow
  - To maintain a sorted list of size n, we use Python built-in heap sort algorithm to achieve O(log(n)) for each insertion operation.

In [21]:
%%file mr-jobs/3.1_top_n_value.py
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob.protocol import JSONValueProtocol

import heapq


class MRTopNValue(MRJob):
    
    INPUT_PROTOCOL = JSONValueProtocol
    OUTPUT_PROTOCOL = JSONValueProtocol
        
    def configure_args(self):
        super().configure_args()
        self.add_passthru_arg('--top_n', type=int)
        
    def mapper(self, _, value):
        try:
            max_ = float(value['estimatedSalary']['value']['maxValue'])
            min_ = float(value['estimatedSalary']['value']['minValue'])
        except (KeyError, ValueError):
            pass
        else:
            yield _, (max_, min_)
    
    def reducer_init(self):
        if self.options.top_n < 1:
            raise ValueError('Invalid top_n value')
        self.top_n = []
        
    def reducer(self, _, values):
        for value in values:
            if len(self.top_n) < self.options.top_n:
                heapq.heappush(self.top_n, value)
            else:
                heapq.heappushpop(self.top_n, value)
                
    def reducer_final(self):
        for value in self.top_n:
            yield None, value
            
    def steps(self):
        return [MRStep(mapper=self.mapper,
                       combiner_init=self.reducer_init,
                       combiner=self.reducer,
                       combiner_final=self.reducer_final,
                       reducer_init=self.reducer_init,
                       reducer=self.reducer,
                       reducer_final=self.reducer_final)]


if __name__ == '__main__':
    MRTopNValue.run()

Overwriting mr-jobs/3.1_top_n_value.py


- Test locally:

In [22]:
!python3 mr-jobs/3.1_top_n_value.py ../data/job-data/* --output-dir mr-output --top_n 10

No configs found; falling back on auto-configuration
No configs specified for inline runner
Running step 1 of 1...
Creating temp directory /tmp/3.hadoop.20180918.165434.220892
job output is in mr-output
Removing temp directory /tmp/3.hadoop.20180918.165434.220892...


- Run on your Hadoop cluster:

In [30]:
!hdfs dfs -rm -r hdfs:///user/hadoop/mr-output

Deleted hdfs:///user/hadoop/mr-output


In [31]:
!python3 mr-jobs/3.1_top_n_value.py \
-r hadoop hdfs:///user/hadoop/job-data/ \
    --output-dir mr-output/ --top_n 10

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /usr/local/hadoop-2.8.4/bin...
Found hadoop binary: /usr/local/hadoop-2.8.4/bin/hadoop
Using Hadoop version 2.8.4
Looking for Hadoop streaming jar in /usr/local/hadoop-2.8.4...
Found Hadoop streaming jar: /usr/local/hadoop-2.8.4/share/hadoop/tools/lib/hadoop-streaming-2.8.4.jar
Creating temp directory /tmp/3.hadoop.20180918.171944.000093
Copying local files to hdfs:///user/hadoop/tmp/mrjob/3.hadoop.20180918.171944.000093/files/...
Running step 1 of 1...
  packageJobJar: [/tmp/hadoop-unjar369533625508089501/] [] /tmp/streamjob8345320719200866152.jar tmpDir=null
  Connecting to ResourceManager at /0.0.0.0:8032
  Connecting to ResourceManager at /0.0.0.0:8032
  Total input files to process : 2
  number of splits:2
  Submitting tokens for job: job_1537280577176_0007
  Submitted application application_1537280577176_0007
  The url to track the job: http://ebee9e947bed:808

### 3.2 Top N Records

With the top N Salaries being calculated in file `mr-output/part-00000`, now we want to find out which jobs offer those salaries. 

Since the top N values are considered small enough to fit into each node, it is preferable to distribute the file into those node where the mapper jobs run, rather than load data in the job configuration. This mechanism is called `Distributed Cache`. We can do it via `MRJob.add_file_arg()` method.

**Example**: Find all jobs that offer salaries from the top N salaries list. 

- *Data flow*:

  - mapper_init: fetch `top_n_list` to each mapper
  - $\quad\downarrow$
  - Input:`record`
  - $\quad\downarrow$
  - mapper:`<_, record>[if record contains salaries from top_n_list -> <_, record>]`
  - $\quad\downarrow$
  - Output:`record`
  
- *Features and highlights*:
  
  - `MRJob.add_file_arg('--cache')` sends an external file to Hadoop
  - To send a file to `cache` via command-line: `--cache <file_path>`
  - The cached file can then be accessd in script via: `MRJob.options.cache`
  - `MRJob.mapper_init()` fetches the top n list from the cached file to each mapper

In [32]:
%%file mr-jobs/3.2_top_n_job.py
from mrjob.job import MRJob
from mrjob.protocol import JSONValueProtocol

import os
import json

class MRTopNJob(MRJob):
    
    INPUT_PROTOCOL = JSONValueProtocol
    OUTPUT_PROTOCOL = JSONValueProtocol
        
    def configure_args(self):
        super().configure_args()
        self.add_file_arg('--cache')
    
    def mapper_init(self):
        self.cache = list()
        with open(self.options.cache, 'r') as f:
            for line in f:
                self.cache.append(tuple(json.loads(line)))
        
    def mapper(self, _, value):
        try:
            max_ = float(value['estimatedSalary']['value']['maxValue'])
            min_ = float(value['estimatedSalary']['value']['minValue'])
        except (KeyError, ValueError):
            pass
        else:
            if (max_, min_) in self.cache:
                yield _, value

if __name__ == '__main__':
    MRTopNJob.run()

Overwriting mr-jobs/3.2_top_n_job.py


- Test locally:

In [33]:
!python3 mr-jobs/3.2_top_n_job.py ../data/job-data/* --output-dir mr-output-jobs --cache mr-output/part-00000

No configs found; falling back on auto-configuration
No configs specified for inline runner
Running step 1 of 1...
Creating temp directory /tmp/3.hadoop.20180918.172132.991163
job output is in mr-output-jobs
Removing temp directory /tmp/3.hadoop.20180918.172132.991163...


- Run on your Hadoop cluster:

In [34]:
!hdfs dfs -rm -r hdfs:///user/hadoop/mr-output-jobs

rm: `hdfs:///user/hadoop/mr-output-jobs': No such file or directory


In [35]:
!python3 mr-jobs/3.2_top_n_job.py \
-r hadoop hdfs:///user/hadoop/job-data/ \
    --output-dir mr-output-jobs/ \
    --cache hdfs:///user/hadoop/mr-output/part-00000

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /usr/local/hadoop-2.8.4/bin...
Found hadoop binary: /usr/local/hadoop-2.8.4/bin/hadoop
Using Hadoop version 2.8.4
Looking for Hadoop streaming jar in /usr/local/hadoop-2.8.4...
Found Hadoop streaming jar: /usr/local/hadoop-2.8.4/share/hadoop/tools/lib/hadoop-streaming-2.8.4.jar
Creating temp directory /tmp/3.hadoop.20180918.172138.854476
Copying local files to hdfs:///user/hadoop/tmp/mrjob/3.hadoop.20180918.172138.854476/files/...
Running step 1 of 1...
  packageJobJar: [/tmp/hadoop-unjar7958850948417445614/] [] /tmp/streamjob307580746471035229.jar tmpDir=null
  Connecting to ResourceManager at /0.0.0.0:8032
  Connecting to ResourceManager at /0.0.0.0:8032
  Total input files to process : 2
  number of splits:2
  Submitting tokens for job: job_1537280577176_0008
  Submitted application application_1537280577176_0008
  The url to track the job: http://ebee9e947bed:808

## 4. Inverted Index

- Keys:
  - Indexing is a technique that is used frequently in almost all seach engine systems. Without an index, the search engine would scan every document in the corpus, which would require considerable time and computing power.
  - Inverted index is an index data structure storing a mapping from content, such as keywords, to its locations in a database file, or in a document or a set of documents. 
  - The purpose of an inverted index is to allow fast full text searches, at a cost of increased processing when a document is added to the database. 

**Example**: Generate an index from the data set to allow for faster searches on technique skills.

The skills we want to search for are listed below:

```
skillset = ['Java', 'JavaScript', 'C', 'C++', 'C#', 'Python', 'R', 'Bash',
            'MySQL', 'Postgresql', 'MongoDB', 'Html', 'Ruby', 'PHP', 
            'Swift', 'CSS', 'Julia', 'Golang', 'Github', 'Redis', 'Hadoop',
            'Spark', 'Hive', 'Pig', 'Spark', 'ElasticSearch', 'Kafka', 
            'Cassandra', 'AWS', 'GCP', 'Azure', 'Docker', 'kubernetes']
```

- *Data flow*:

  - mapper_init: define search regex pattern
  - Input:`record`
  - $\quad\downarrow$
  - mapper:`<_, record> -> find matching skills using regex -> <skill, jobId>`
  - $\quad\downarrow$
  - reducer:`<skill, [jobId]>`
  - $\quad\downarrow$
  - Output:`skill, [jobId]`
  
Note: Since no aggregation is performed throughout the entire MapReduce pipeline, a combiner in this example is redundant.

In [146]:
%%file mr-jobs/4_inverted_index.py
from mrjob.job import MRJob
from mrjob.protocol import JSONValueProtocol

import re


class MRIndexingSkills(MRJob):
    
    INPUT_PROTOCOL = JSONValueProtocol
    skillset = ['Java', 'JavaScript', 'C', 'C++', 'C#', 'Python', 'R', 'Bash',
                'MySQL', 'Postgresql', 'MongoDB', 'Html', 'Ruby', 'PHP', 
                'Swift', 'CSS', 'Julia', 'Golang', 'Github', 'Redis', 'Hadoop',
                'Spark', 'Hive', 'Pig', 'Spark', 'ElasticSearch', 'Kafka', 
                'Cassandra', 'AWS', 'GCP', 'Azure', 'Docker', 'kubernetes']
    
    def mapper_init(self):
        self.pattern = re.compile('|'.join(['(?<=\W){}(?=\W)'.format(re.escape(x)) for x in self.skillset]), 
                                  flags=re.IGNORECASE)
        
    def mapper(self, _, value):
        try:
            jobId, description = value['jobId'], value['description']
        except (KeyError, ValueError):
            pass
        else:
            skills = self.pattern.findall(description)
            for skill in skills:
                yield skill.capitalize(), jobId
    
    def reducer(self, key, values):
        yield key, list(values)
        

if __name__ == '__main__':
    MRIndexingSkills.run()

Overwriting mr-jobs/4_inverted_index.py


- Test locally:

In [147]:
!python3 mr-jobs/4_inverted_index.py ../data/job-data/* --output-dir mr-output

No configs found; falling back on auto-configuration
No configs specified for inline runner
Running step 1 of 1...
Creating temp directory /tmp/4_inverted_index.hadoop.20180918.190009.189437
job output is in mr-output
Removing temp directory /tmp/4_inverted_index.hadoop.20180918.190009.189437...


- Run on your Hadoop cluster:

In [144]:
!hdfs dfs -rm -r hdfs:///user/hadoop/mr-output

Deleted hdfs:///user/hadoop/mr-output


In [145]:
!python3 mr-jobs/4_inverted_index.py \
-r hadoop hdfs:///user/hadoop/job-data/ \
    --output-dir mr-output/

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /usr/local/hadoop-2.8.4/bin...
Found hadoop binary: /usr/local/hadoop-2.8.4/bin/hadoop
Using Hadoop version 2.8.4
Looking for Hadoop streaming jar in /usr/local/hadoop-2.8.4...
Found Hadoop streaming jar: /usr/local/hadoop-2.8.4/share/hadoop/tools/lib/hadoop-streaming-2.8.4.jar
Creating temp directory /tmp/4_inverted_index.hadoop.20180918.184113.520202
Copying local files to hdfs:///user/hadoop/tmp/mrjob/4_inverted_index.hadoop.20180918.184113.520202/files/...
Running step 1 of 1...
  packageJobJar: [/tmp/hadoop-unjar7840362551436171095/] [] /tmp/streamjob3798702398271974150.jar tmpDir=null
  Connecting to ResourceManager at /0.0.0.0:8032
  Connecting to ResourceManager at /0.0.0.0:8032
  Total input files to process : 2
  number of splits:2
  Submitting tokens for job: job_1537280577176_0010
  Submitted application application_1537280577176_0010
  The url to track t