In [None]:
import os
import sys

# Get enviroment variables set from utils/setup.sh
home_dir = os.environ['HOME']
root_dir = os.environ['BD_GitRoot']

# Add utils to the python system path
sys.path.append(root_dir + '/utils')

# Read AWS credentials from 'EC2_VAULT'/Creds.pkl 
from read_mrjob_creds import *
(key_id, secret_key, s3_bucket, username) = read_credentials()

examples_dir = root_dir + '/notebooks/mrjob/'

!ls -l $examples_dir

In [None]:
#This notebook should be run AFTER the notebook "Simple use of mrjob"
%load $examples_dir/mr_word_freq_count.py

## A demonstration of counters

Counters are used to track the number of jobs of each type that have been executed.

In [None]:
%%writefile mr_word_freq_counters.py
#!/usr/bin/python
# Copyright 2009-2010 Yelp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The classic MapReduce job: count the frequency of words.
"""
from mrjob.job import MRJob
import re
from sys import stderr

WORD_RE = re.compile(r"[\w']+")

#logfile=open('log','w')
logfile=stderr

class MRWordFreqCount(MRJob):

    def mapper(self, _, line):
        self.increment_counter('MrJob Counters','mapper',1)
        for word in WORD_RE.findall(line):
            #logfile.write('mapper '+word.lower()+'\n')
            yield (word.lower(), 1)

    def combiner(self, word, counts):
        self.increment_counter('MrJob Counters','combiner',1)
        yield (word, sum(counts))
        #l_counts=[c for c in counts]  # extract list from iterator
        #S=sum(l_counts)
        #logfile.write('combiner '+word+' ['+','.join([str(c) for c in l_counts])+']='+str(S)+'\n')
        #yield (word, S)

    def reducer(self, word, counts):
        self.increment_counter('MrJob Counters','reducer',1)
        yield (word, sum(counts))
        #l_counts=[c for c in counts]  # extract list from iterator
        #S=sum(l_counts)
        #logfile.write('reducer '+word+' ['+','.join([str(c) for c in l_counts])+']='+str(S)+'\n')
        #yield (word, S)

if __name__ == '__main__':
    MRWordFreqCount.run()

In [None]:
!python mr_word_freq_counters.py --runner=local $root_dir/README.md > counts

## Runners and job flows

Runners run your job on a resource: **inline**,**local**,**Hadoop** or **EMR**

By default, mrjob, when running with the option **-r emr**, will create a Cluster on EMR for the job, run the job, and then
dismantle the cluster.

As setting up a cluster can take a long time (hours), it is usually better to keep the hadoop cluster running and send to it additional jobs. From this perspective, an EMR cluster is called a **job flow**.

In [None]:
from mrjob.emr import EMRJobRunner

In [None]:
JobRunner = EMRJobRunner(aws_access_key_id=key_id, aws_secret_access_key=secret_key)

In [None]:
emr_conn = JobRunner.make_emr_conn()

In [None]:
job_flows=emr_conn.describe_jobflows()

In [None]:
type(job_flows[0])

In [None]:
for flow in job_flows:
    print flow,flow.name,flow.jobflowid,flow.state

In [None]:
flow.jobflowid