Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

this works correctly now, and illustrates both map and reduce

The mrjob conf file now has bootstrap actions that pull in our build of python27,
and also install xz on each hadoop node.
  • Loading branch information...
commit c7a22f1694ba31b305a0940dfd3085654e758088 1 parent b327ed6
John R. Frank authored
Showing with 32 additions and 20 deletions.
  1. +12 −4 subcorpus_counter.conf
  2. +20 −16 subcorpus_counter.py
View
16 subcorpus_counter.conf
@@ -6,18 +6,25 @@ runners:
## turn on logging and tracker
ssh_tunnel_to_job_tracker: true
s3_log_uri: s3://trec-kba-emr/example/logs
- bootstrap_mrjob: true
+ ## do not bootstrap mrjob, because we install it using our private
+ ## python27 install below.
+ bootstrap_mrjob: false
+
## must include this tarball created by running `make`
python_archives:
- kba_corpus.tar.gz
+
## these bootstrap actions happen during cluster setup
bootstrap_actions:
- - s3://elasticmapreduce/bootstrap-actions/configure-hadoop -m mapred.tasktracker.map.tasks.maximum=20 -m mapred.tasktracker.reduce.tasks.maximum=3
+ - s3://elasticmapreduce/bootstrap-actions/configure-hadoop -m mapred.tasktracker.map.tasks.maximum=5 -m mapred.tasktracker.reduce.tasks.maximum=5
+ ## install our own python27 built on the m1.large instance class;
+ ## this script also installs xz-utils and bootstraps mrjob
+ - s3://trec-kba-emr/emr-setup/install_python27.m1.large.sh
+
## could enable skipping -- see also jobconf for more skipping related params
#- s3://elasticmapreduce/bootstrap-actions/configure-hadoop -m mapred.skip.map.auto.incr.proc.count=false -m mapred.skip.mode.enabled=true
#- s3://elasticmapreduce/bootstrap-actions/configure-hadoop -m io.map.index.skip=1
- ## could install our own python27 built on the m1.large instance class
- #- s3://trec-kba-emr/emr-setup/install_python27.m1.large.sh
+
## could use spot instances by uncommenting the bid prices
#ec2_instance_type: m1.large
#num_ec2_instances: 1
@@ -29,6 +36,7 @@ runners:
#ec2_task_instance_type: m2.xlarge
#ec2_task_instance_bid_price: '0.154'
#num_ec2_task_instances: 1
+
enable_emr_debugging: true
jobconf:
# enable skip mode
View
36 subcorpus_counter.py
@@ -19,6 +19,7 @@
import re
import os
import sys
+import json
import time
import urllib
import syslog
@@ -38,6 +39,9 @@
class SubcorpusCounter(MRJob):
INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol
+ ## this is the default, so redundant
+ INTERNAL_PROTOCOL = mrjob.protocol.JSONProtocol
+
def mapper(self, empty, public_url):
'''
Takes as input a public URL to a TREC KBA 2012 chunk file,
@@ -90,31 +94,31 @@ def mapper(self, empty, public_url):
key = 'FAILED-%s' % re.sub('\s+', '-', str(exc))
## could emit this, but that would polute the output
# yield key, public_url
- self.increment_counter('Errors', key, 1)
+ self.increment_counter('SubcorpusCounter', key, 1)
else:
## it must have all worked, so emit data
+ self.increment_counter('SubcorpusCounter','Success',1)
yield subcorpus_name, (num_ner_tokens, num_ner_sentences)
finally:
## help hadoop keep track
self.increment_counter('SkippingTaskCounters','MapProcessedRecords',1)
- ## This reducer works intermittently, perhaps a memory problem and
- ## just need a newer version of mrjob? This is not required for
- ## illustrating how to begin interacting with the corpus, so
- ## commenting out.
- #def reducer(self, source, counts):
- # '''
- # Sums up all the counts for a given source
- # '''
- # num_ner_tokens = 0
- # num_ner_sentences = 0
- # for this_num_ner_tokens, this_num_ner_sentences in counts:
- # num_ner_tokens += this_num_ner_tokens
- # num_ner_sentences += this_num_ner_sentences
- # yield source, (num_ner_tokens, num_ner_sentences)
- # self.increment_counter('SkippingTaskCounters','ReduceProcessedRecords',1)
+ def reducer(self, source, counts):
+ '''
+ Sums up all the counts for a given source
+ '''
+ num_ner_tokens = 0
+ num_ner_sentences = 0
+ kba_corpus.log('reading counts for %r' % source)
+ self.increment_counter('SubcorpusCounter','ReducerLaunched',1)
+ for count_pair in counts:
+ num_ner_tokens += count_pair[0]
+ num_ner_sentences += count_pair[1]
+ self.increment_counter('SubcorpusCounter','CountPairRead',1)
+ yield source, (num_ner_tokens, num_ner_sentences)
+ self.increment_counter('SkippingTaskCounters','ReduceProcessedRecords',1)
if __name__ == '__main__':
SubcorpusCounter.run()
Please sign in to comment.
Something went wrong with that request. Please try again.