In [None]:
## MRJob Tests

In [1]:
%load_ext autoreload
%autoreload 2

### Secondary Sort
Secondary using the 3rd key in reverse order.

In [37]:
%%writefile test.data
4,10,3,Apple
2,2,4,Orange
6,-1,6,Lemon
0,9,18,Apple
6,8,7,Lemon
6,199,20,Lemon
6,-9,2,Lemon
6,-1,10,Lemon
6,-9223372036854775808,43,Orange

Overwriting test.data


- commented out
jobconf={
    "stream.num.map.output.key.fields":"3",
    "mapreduce.job.output.key.comparator.class":
        "org.apache.hadoop.mapred.lib.KeyFieldBasedComparator",
    "mapreduce.partition.keycomparator.options":"-k1,1n -k3,3nr",
      }

In [103]:
%%writefile test.py

from mrjob.job import MRJob, MRStep
import mrjob
import csv

import sys
def toStringKey(n):
    n = int(n)
    digits = len(str(sys.maxint))
    minInt = -sys.maxint - 1

    if n < 0:
        key = "-" + str(abs(minInt-n)).zfill(digits)
    else:
        key = str(n).zfill(digits)
        
    return key
    
class test(MRJob):
    SORT_VALUES = True
    
    def mapper(self, line_no, line):
        cell = line.strip().split(',')
        
        yield cell[0], [toStringKey(cell[1])] + cell[1:]

    def reducer(self, key, value):
        self.increment_counter("patng", "hello", amount=1)
        yield key, [v for v in value]
        
    def reducer_final(self):
        with open("result.txt", "w") as f:
            f.write("hello\n")

if __name__ == '__main__':
    test.run()


Overwriting test.py


In [8]:
from test import test
mr_job = test(args=['test.data', '-r', 'inline', '--strict-protocols'])
with mr_job.make_runner() as runner: 
    runner.run()
    print "Output:"
    for line in runner.stream_output():
        print line
        
    print "Counters:", runner.counters()



Output:
"0"	[["0000000000000000009", "9", "18", "Apple"]]

"2"	[["0000000000000000002", "2", "4", "Orange"]]

"4"	[["0000000000000000010", "10", "3", "Apple"]]

"6"	[["-0000000000000000000", "-9223372036854775808", "43", "Orange"], ["-9223372036854775799", "-9", "2", "Lemon"], ["-9223372036854775807", "-1", "10", "Lemon"], ["-9223372036854775807", "-1", "6", "Lemon"], ["0000000000000000008", "8", "7", "Lemon"], ["0000000000000000199", "199", "20", "Lemon"]]

Counters: [{'patng': {'hello': 4}}]


## Testing using Hadoop

In [29]:
!/usr/local/Cellar/hadoop/2.7.1/libexec/sbin/start-dfs.sh
!/usr/local/Cellar/hadoop/2.7.1/libexec/sbin/start-yarn.sh
!/usr/local/Cellar/hadoop/2.7.1/libexec/sbin/start-historyserver.sh

16/03/10 11:35:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Starting namenodes on [localhost]
localhost: namenode running as process 1489. Stop it first.
localhost: datanode running as process 1581. Stop it first.
Starting secondary namenodes [0.0.0.0]
0.0.0.0: secondarynamenode running as process 1696. Stop it first.
16/03/10 11:35:28 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
starting yarn daemons
resourcemanager running as process 1826. Stop it first.
localhost: nodemanager running as process 1927. Stop it first.
/usr/local/Cellar/hadoop/2.7.1/libexec/sbin/start-historyserver.sh: line 1: ./mr-jobhistory-daemon.sh: No such file or directory


In [22]:
%%writefile test1.data
4,10,3,Chair
2,2,4,Desk
6,-1,6,Lamp
0,9,18,Chair


Overwriting test1.data


In [23]:
%%writefile test2.data
12,1,1,Pencil
6,8,7,Ball
6,199,20,Ball
6,-9,2,Ball
6,-1,10,Ball

Overwriting test2.data


In [24]:
%%writefile badData.data
12,1,1,BAD
6,8,7,BAD


Overwriting badData.data


In [10]:
!hdfs dfs -rm -f learn_Testinput
!hdfs dfs -mkdir learn_Testinput


16/03/09 09:48:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/03/09 09:48:29 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [14]:
!hdfs dfs -put test?.data learn_Testinput/
!hdfs dfs -put badData.data learn_Testinput/

16/03/09 09:50:08 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
put: `learn_Testinput/test1.data': File exists
put: `learn_Testinput/test2.data': File exists
16/03/09 09:50:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [10]:
!hdfs dfs -ls hdfs://127.0.0.1/user/patrickng/learn_Testinput/

16/03/10 10:03:04 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 3 items
-rw-r--r--   1 patrickng supergroup         20 2016-03-09 09:50 hdfs://127.0.0.1/user/patrickng/learn_Testinput/badData.data
-rw-r--r--   1 patrickng supergroup         51 2016-03-09 09:49 hdfs://127.0.0.1/user/patrickng/learn_Testinput/test1.data
-rw-r--r--   1 patrickng supergroup         67 2016-03-09 09:49 hdfs://127.0.0.1/user/patrickng/learn_Testinput/test2.data


#### Run it on Hadoop with output streamed out to the driver
**Note:** 
+ You can use regular expression in the input.  Or you can specify the whole folder.
+ Custom counters **don't work** in Hadoop or EMR

In [11]:
from test import test
mr_job = test(args=['hdfs://127.0.0.1/user/patrickng/learn_Testinput/test?.data', 
                    '-r', 'hadoop', 
                    '--strict-protocols'])
with mr_job.make_runner() as runner: 
    runner.run()
    print "Output:"
    for line in runner.stream_output():
        print line
        
    print "Counters:", runner.counters()

The have been translated as follows
 mapred.text.key.comparator.options: mapreduce.partition.keycomparator.options
mapred.text.key.partitioner.options: mapreduce.partition.keypartitioner.options
mapred.output.key.comparator.class: mapreduce.job.output.key.comparator.class


Output:
"0"	[["0000000000000000009", "9", "18", "Apple"]]


ERROR:mrjob.fs.hadoop:STDERR: 16/03/10 10:03:50 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable




"12"	[["0000000000000000001", "1", "1", "Banana"]]

"2"	[["0000000000000000002", "2", "4", "Orange"]]

"4"	[["0000000000000000010", "10", "3", "Apple"]]

"6"	[["-9223372036854775799", "-9", "2", "Lemon"], ["-9223372036854775807", "-1", "10", "Lemon"], ["-9223372036854775807", "-1", "6", "Lemon"], ["0000000000000000008", "8", "7", "Lemon"], ["0000000000000000199", "199", "20", "Lemon"]]

Counters: [{}]


In [38]:
## Run the program with input and output in Hadoop
!python test.py \
--strict-protocols \
hdfs://127.0.0.1/user/patrickng/learn_Testinput/test?.data \
-r hadoop \
-q

"0"	[["0000000000000000009", "9", "18", "Apple"]]
"12"	[["0000000000000000001", "1", "1", "Banana"]]
"2"	[["0000000000000000002", "2", "4", "Orange"]]
"4"	[["0000000000000000010", "10", "3", "Apple"]]
"6"	[["-9223372036854775799", "-9", "2", "Lemon"], ["-9223372036854775807", "-1", "10", "Lemon"], ["-9223372036854775807", "-1", "6", "Lemon"], ["0000000000000000008", "8", "7", "Lemon"], ["0000000000000000199", "199", "20", "Lemon"]]


#### Run it on Hadoop with output saved to an HDFS folder, plus streaming down the output at the same time.
**Note:**
+ The output folder **cannot exists**, otherwise the job will fail..


In [107]:
!hdfs dfs -rm -r learn_Testoutput

16/03/09 12:54:06 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Deleted learn_Testoutput


In [108]:
## Run the program with input and output in Hadoop
!python test.py \
--strict-protocols \
hdfs://127.0.0.1/user/patrickng/learn_Testinput/test?.data \
-r hadoop \
--output-dir hdfs://127.0.0.1/user/patrickng/learn_Testoutput 
# Note:
# We're not using --no-output here.

no configs found; falling back on auto-configuration
no configs found; falling back on auto-configuration
creating tmp directory /var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/test.patrickng.20160309.045408.971826
writing wrapper script to /var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/test.patrickng.20160309.045408.971826/setup-wrapper.sh
Using Hadoop version 2.7.1
Copying local files into hdfs:///user/patrickng/tmp/mrjob/test.patrickng.20160309.045408.971826/files/
Detected hadoop configuration property names that do not match hadoop version 2.7.1:
The have been translated as follows
 mapred.text.key.comparator.options: mapreduce.partition.keycomparator.options
mapred.text.key.partitioner.options: mapreduce.partition.keypartitioner.options
mapred.output.key.comparator.class: mapreduce.job.output.key.comparator.class
HADOOP: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
HADOOP: packageJobJar: [/var/folders/dm/nsw7wjf91f1c74hg

In [109]:
!hdfs dfs -ls learn_Testoutput/

16/03/09 12:55:03 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r--   1 patrickng supergroup          0 2016-03-09 12:54 learn_Testoutput/_SUCCESS
-rw-r--r--   1 patrickng supergroup        436 2016-03-09 12:54 learn_Testoutput/part-00000


In [102]:
!hdfs dfs -cat learn_Testoutput/part-00000

16/03/09 11:27:54 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
"0"	[["0000000000000000009", "9", "18", "Apple"]]
"12"	[["0000000000000000001", "1", "1", "Banana"]]
"2"	[["0000000000000000002", "2", "4", "Orange"]]
"4"	[["0000000000000000010", "10", "3", "Apple"]]
"6"	[["-9223372036854775799", "-9", "2", "Lemon"], ["-9223372036854775807", "-1", "10", "Lemon"], ["-9223372036854775807", "-1", "6", "Lemon"], ["0000000000000000008", "8", "7", "Lemon"], ["0000000000000000199", "199", "20", "Lemon"]]


## Testing using EMR

In [72]:
import boto3
import botocore

s3 = boto3.resource('s3')
name = "patng323-learn-mrjob"

bucket = s3.Bucket(name)
exists = True

# Check if the bucket if it exists
try:
    s3.meta.client.head_bucket(Bucket=name)
except botocore.exceptions.ClientError as e:
    error_code = int(e.response['Error']['Code'])
    if error_code == 404:
        exists = False
    else:
        raise

if not exists:
    s3.create_bucket(Bucket=name)
else:
    # Clear all items (including "folders") if it exists
    for key in bucket.objects.all():
        key.delete()

key= s3.ObjectSummary(bucket_name='patng323-learn-mrjob', key='input/badData.data')
key= s3.ObjectSummary(bucket_name='patng323-learn-mrjob', key='input/test1.data')
key= s3.ObjectSummary(bucket_name='patng323-learn-mrjob', key='input/test2.data')
key= s3.ObjectSummary(bucket_name='patng323-learn-mrjob', key='output/_SUCCESS')
key= s3.ObjectSummary(bucket_name='patng323-learn-mrjob', key='output/part-00000')


In [60]:
# upload files to S3
s3.Object(name, 'input/test1.data').put(Body=open('test1.data', 'rb'))
s3.Object(name, 'input/test2.data').put(Body=open('test2.data', 'rb'))
s3.Object(name, 'input/badData.data').put(Body=open('badData.data', 'rb'))

{u'ETag': '"234e16b14d93a16aa97b4621c51ebf16"',
 'ResponseMetadata': {'HTTPStatusCode': 200,
  'HostId': 'C9DmLd9jFF8LVEqyXsDCoqscJdbraD0cs+c/wKsGNCEu9FsNzLc7tYkj83fz20QV',
  'RequestId': '0452644BA8E9FE47'}}

In [2]:
!aws s3 ls patng323-learn-mrjob/input/

2016-03-09 10:22:12         20 badData.data
2016-03-09 10:22:11         48 test1.data
2016-03-09 10:22:11         63 test2.data


### Start an EMR job

In [3]:
# Don't forget to terminate it when it's not needed
!mrjob create-job-flow --num-ec2-instances=1 \
--ec2-instance-type=m1.medium \
--max-hours-idle=1

no configs found; falling back on auto-configuration
no configs found; falling back on auto-configuration
using existing scratch bucket mrjob-0c26425c25d7acc1
using s3://mrjob-0c26425c25d7acc1/tmp/ as our scratch dir on S3
Creating persistent job flow to run several jobs in...
creating tmp directory /var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/no_script.patrickng.20160310.014237.369516
writing master bootstrap script to /var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/no_script.patrickng.20160310.014237.369516/b.py
Copying non-input files into s3://mrjob-0c26425c25d7acc1/tmp/no_script.patrickng.20160310.014237.369516/files/
Waiting 5.0s for S3 eventual consistency
Creating Elastic MapReduce job flow
Can't access IAM API, trying default instance profile: EMR_EC2_DefaultRole
Can't access IAM API, trying default service role: EMR_DefaultRole
Job flow created with ID: j-1EI9DBDPZO66S
j-1EI9DBDPZO66S


In [27]:
# Note: the output directory MUST not exist
!aws s3 rm --recursive s3://patng323-learn-mrjob/output
!echo "Wait 5.0s sec for S3 eventual consistency"
!sleep 5

!python test.py \
s3://patng323-learn-mrjob/input/test?.data \
-r emr \
--emr-job-flow-id j-1EI9DBDPZO66S \
--no-output \
--output-dir s3://patng323-learn-mrjob/output/ \


delete: s3://patng323-learn-mrjob/output/_SUCCESS
delete: s3://patng323-learn-mrjob/output/part-00000
Wait 5.0s sec for S3 eventual consistency
Got unexpected keyword arguments: ssh_tunnel
using configs in /Users/patrickng/.mrjob.conf
using existing scratch bucket mrjob-0c26425c25d7acc1
using s3://mrjob-0c26425c25d7acc1/tmp/ as our scratch dir on S3
creating tmp directory /var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/test.patrickng.20160310.030437.424045

PLEASE NOTE: Starting in mrjob v0.5.0, protocols will be strict by default. It's recommended you run your job with --strict-protocols or set up mrjob.conf as described at https://pythonhosted.org/mrjob/whats-new.html#ready-for-strict-protocols

Copying non-input files into s3://mrjob-0c26425c25d7acc1/tmp/test.patrickng.20160310.030437.424045/files/
Adding our job to existing job flow j-1EI9DBDPZO66S
Job launched 32.6s ago, status RUNNING: Running step (test.patrickng.20160310.030437.424045: Step 1 of 1)
Job launched 65.6s ago, stat

In [28]:
!aws s3 ls s3://patng323-learn-mrjob/output/

2016-03-10 11:06:07          0 _SUCCESS
2016-03-10 11:06:02        429 part-00000


In [70]:
!aws s3 cp s3://patng323-learn-mrjob/output/part-00000 .

download: s3://patng323-learn-mrjob/output/part-00000 to ./part-00000


In [71]:
!cat part-00000

"0"	[["0000000000000000009", "9", "18", "Chair"]]
"12"	[["0000000000000000001", "1", "1", "Pencil"]]
"2"	[["0000000000000000002", "2", "4", "Desk"]]
"4"	[["0000000000000000010", "10", "3", "Chair"]]
"6"	[["-9223372036854775799", "-9", "2", "Ball"], ["-9223372036854775807", "-1", "10", "Ball"], ["-9223372036854775807", "-1", "6", "Lamp"], ["0000000000000000008", "8", "7", "Ball"], ["0000000000000000199", "199", "20", "Ball"]]


#### Run this if you want to clear the output "folder" in s3

In [79]:
import boto3
import botocore

s3 = boto3.resource('s3')
name = "patng323-learn-mrjob"

bucket = s3.Bucket(name)
# Clear all items (including "folders") if it exists
for object in bucket.objects.all():
    if object.key.startswith('output/'):
        object.delete()

In [80]:
!aws s3 ls patng323-learn-mrjob/output/

#### Don't forget to terminate the cluster

In [83]:
!mrjob terminate-job-flow j-2X9F8BNAZZYIN

no configs found; falling back on auto-configuration
no configs found; falling back on auto-configuration
using existing scratch bucket mrjob-03ccebffb1b98a81
using s3://mrjob-03ccebffb1b98a81/tmp/ as our scratch dir on S3
Terminated job flow j-2X9F8BNAZZYIN


### Check if the input to mapper has terminating newline or not.
Answer: NO

### How about passing a list inside a tuple as values?

In [19]:
%%writefile test2.py

from mrjob.job import MRJob, MRStep
import mrjob
import csv
import sys

class test2(MRJob):
    def mapper1(self, line_no, line):
        fields = line.split(',')
        v = ["a","b","c"]
        yield fields[0], (v, len(fields))

    def reducer1(self, key, values):
        items = []
        for v in values:
            yield key, ("#".join(v[0]), v[1])
        
        
    def steps(self):
        return [
            MRStep(mapper=self.mapper1,
                  reducer=self.reducer1)
            ]

    
if __name__ == '__main__':
    test2.run()


Overwriting test2.py


In [20]:
from test2 import test2
mr_job = test2(args=['test.data', '-r', 'local', '--strict-protocols'])
with mr_job.make_runner() as runner: 
    runner.run()
    print "Output:"
    for line in runner.stream_output():
        print mr_job.parse_output_line(line)

ERROR:mrjob.local:STDERR: + __mrjob_PWD=/private/var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/test2.patrickng.20160210.130421.417554/job_local_dir/0/mapper/0
ERROR:mrjob.local:STDERR: + exec
ERROR:mrjob.local:STDERR: + /usr/bin/python -c 'import fcntl; fcntl.flock(9, fcntl.LOCK_EX)'
ERROR:mrjob.local:STDERR: + export PYTHONPATH=/private/var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/test2.patrickng.20160210.130421.417554/job_local_dir/0/mapper/0/mrjob.tar.gz:/Users/patrickng/Programs/spark-1.5.1-bin-hadoop2.6/python/pyspark:/Users/patrickng/Programs/spark-1.5.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip:/Users/patrickng/Programs/spark-1.5.1-bin-hadoop2.6/python:/Users/patrickng/Programs/spark-1.5.1-bin-hadoop2.6/python/build::/Library/Python/2.7/site-packages
ERROR:mrjob.local:STDERR: + PYTHONPATH=/private/var/folders/dm/nsw7wjf91f1c74hgl17ldw040000gn/T/test2.patrickng.20160210.130421.417554/job_local_dir/0/mapper/0/mrjob.tar.gz:/Users/patrickng/Programs/spark-1.5.1-bin-hadoop2.

Output:
('0', ['a#b#c', 4])
('2', ['a#b#c', 4])
('4', ['a#b#c', 4])
('6', ['a#b#c', 4])
('6', ['a#b#c', 4])
('6', ['a#b#c', 4])
('6', ['a#b#c', 4])
('6', ['a#b#c', 4])
