Skip to content
Browse files

First commit.

  • Loading branch information...
0 parents commit 75ee86e75922411957dc5df7066a76e3170e2d71 Ned Rockson committed May 31, 2011
Showing with 221 additions and 0 deletions.
  1. 0 README
  2. +107 −0 mrjerb.py
  3. +114 −0 mrjerb_test.py
0 README
No changes.
107 mrjerb.py
@@ -0,0 +1,107 @@
+import logging
+import subprocess
+from subprocess import CalledProcessError
+
+class MRJobStreamingLauncher(object):
+ def RunMRJobStreamingJob(self, input_path, hdfs_output_path, mrjob_file,
+ archive_file, job_name, python_path,
+ num_reduce_tasks_override=None,
+ jar_paths=None, partitioner_class=None,
+ output_protocol_override=None,
+ cleanup_override=None,
+ delete_output_path=False, compress_output='false',
+ hadoop_binary='hadoop'):
+ """Runs a hadoop streaming job using mrjob.
+ Args:
+ input_path - The data input path. This can be local or hdfs.
+ Hdfs paths need to be prepended with hdfs://... also can be
+ a list of input paths.
+ hdfs_output_path - The data output path on hdfs. This does not need
+ hdfs protocol prepended.
+ mrjob_file - The location of the mrjob python file
+ archive_file - Python archive file containing the python code to be
+ deployed for the streaming job.
+ job_name - The name of the MR.
+ python_path - The python path to send to the job. This should be in the
+ form 'code/[your pythonpath]'. If there are multiple dirs
+ to put on the pythonpath, separate with colons.
+ num_reduce_tasks_override - Use this to override the number of reduce
+ tasks. Otherwise the default will be used.
+ jar_paths - List of paths to any libjars to use. These will be passed with
+ the -libjars command.
+ partitioner_class - Override the default HashPartitioner. This needs to
+ be a java class.
+ output_protocol_override - Override to the output protocol. Valid values
+ are json, json_value, pickle, pickle_value,
+ raw_value, repr, repr_value. Any of the
+ protocols with value in the name means only
+ the value will be output (key is lost).
+ cleanup_override - Override when this job should cleanup its files on
+ HDFS. This should really only be used for debugging.
+ Valid values are NONE, IF_SUCCESSFUL, SCRATCH, ALL.
+ delete_output_path - If set, output path will be deleted prior to launch.
+ compress_output - Set to 'true' if output should be compressed. Note that
+ this parameter is a string and not a boolean.
+ hadoop_binary - Path to the hadoop binary. Must be set if output paths
+ should be deleted.
+ """
+ if isinstance(input_path, str):
+ input_paths = input_path
+ else:
+ input_paths = ' '.join(input_path)
+ command = ['python', mrjob_file, '-r hadoop', '-o ', hdfs_output_path,
+ '--cmdenv \"PYTHONPATH=%s\"' % python_path]
+
+ command_additions = []
+
+ if output_protocol_override is not None:
+ command_additions.append('--output-protocol=%s' %
+ output_protocol_override)
+ if cleanup_override is None:
+ command_additions.append('--cleanup=ALL')
+ else:
+ command_additions.append('--cleanup=%s' % cleanup_override)
+
+ # Append the name code to the archive file so it's referencible.
+ archive_file = '%s#code' % archive_file
+ files_arg = ['--archive=%s' % (archive_file)]
+
+ jobconf_arr = ['--jobconf="mapred.output.compress=%s"' % compress_output,
+ '--jobconf="mapred.job.name=%s"' % job_name]
+ if num_reduce_tasks_override:
+ jobconf_arr.extend(['--jobconf="mapred.reduce.tasks=%s"' % \
+ str(num_reduce_tasks_override)])
+
+ hadoop_extra_args = []
+ if jar_paths is not None:
+ files_arg.extend(['--archive=%s' % jar_path for jar_path in jar_paths])
+ hadoop_extra_args = \
+ ['--hadoop_extra_arg=-libjars %s' % ','.join(jar_paths),
+ '--hadoop_extra_arg=-partitioner %s' % partitioner_class]
+
+ command.extend(files_arg)
+ command.extend(jobconf_arr)
+ command.extend(hadoop_extra_args)
+ command.extend(command_additions)
+ command.append(input_paths)
+
+ if delete_output_path:
+ try:
+ rm_cmd = [hadoop_binary, 'fs', '-rmr', hdfs_output_path]
+ subprocess.check_call(' '.join(rm_cmd), shell=True)
+ except CalledProcessError as e:
+ logging.error('Caught an exception deleting hdfs path %s. '
+ 'Exception: %s' % (hdfs_output_path, e))
+ return False
+
+
+ command_str = ' '.join(command)
+
+ logging.info(command_str)
+ try:
+ subprocess.check_call(command_str, shell=True)
+ except CalledProcessError as exc:
+ logging.error("Cannot run MRJob job using %s; "\
+ "skipping (%s)" % (command_str, str(exc)))
+ return False
+ return True
114 mrjerb_test.py
@@ -0,0 +1,114 @@
+import copy
+import subprocess
+import unittest
+
+import mox
+
+import mrjerb
+
+class MRJobStreamingLauncherTestCase(mox.MoxTestBase):
+ """Test case for mrjerb.MRJobStreamingLauncher
+ """
+ def testRunMRJobStreamingJob(self):
+ """Test mrjerb.MRJobStreamingLauncher.RunMRJobStreamingJob
+ """
+ self.mox.StubOutWithMock(subprocess, 'check_call')
+
+ launcher = mrjerb.MRJobStreamingLauncher()
+ input_path = '/a/b/c/d'
+ input_path2 = 'w/x/y/z'
+ output_path = 'e/f/g/h'
+ mrjob_file = 'mrjob_file.py'
+ reporting_jar = 'streaming_jar'
+ archive_file = '/tmp/code.tar.gz'
+ job_name = 'DNA'
+ partitioner_class = 'com.foo.bar.baz'
+
+ python_path = 'code/tellapart/gen-py:code/tellapart/py'
+
+ base_arr = \
+ ['python', mrjob_file, '-r hadoop', '-o ', output_path,
+ '--cmdenv \"PYTHONPATH=%s"' % python_path,
+ '--archive=%s#code' % archive_file]
+
+ reporting_arr = ['--archive=%s' % reporting_jar]
+
+ jobconf_arr = ['--jobconf="mapred.output.compress=false"',
+ '--jobconf="mapred.job.name=%s"' % job_name]
+
+ num_reduce_tasks_arr = ['--jobconf="mapred.reduce.tasks=37"']
+
+ hadoop_extra_args = \
+ ['--hadoop_extra_arg=-libjars %s' % reporting_jar,
+ '--hadoop_extra_arg=-partitioner %s' % partitioner_class]
+
+ normal_command_additions = ['--cleanup=ALL']
+
+ expected_hadoop_rm_cmd_arr = ['hadoop', 'fs', '-rmr', output_path]
+
+ expected_cmd_arr = copy.deepcopy(base_arr)
+ expected_cmd_arr.extend(jobconf_arr)
+ expected_cmd_arr.extend(normal_command_additions)
+ expected_cmd_arr.append(input_path)
+ expected_string = ' '.join(expected_cmd_arr)
+ subprocess.check_call(expected_string, shell=True).AndReturn(None)
+
+ expected_cmd_arr = copy.deepcopy(base_arr)
+ expected_cmd_arr.extend(jobconf_arr)
+ expected_cmd_arr.extend(normal_command_additions)
+ expected_cmd_arr.append(input_path)
+ expected_cmd_arr.append(input_path2)
+ expected_string = ' '.join(expected_cmd_arr)
+ subprocess.check_call(expected_string, shell=True).AndReturn(None)
+
+ expected_cmd_arr = copy.deepcopy(base_arr)
+ expected_cmd_arr.extend(reporting_arr)
+ expected_cmd_arr.extend(jobconf_arr)
+ expected_cmd_arr.extend(num_reduce_tasks_arr)
+ expected_cmd_arr.extend(hadoop_extra_args)
+ expected_cmd_arr.extend(normal_command_additions)
+ expected_cmd_arr.append(input_path)
+ expected_string = ' '.join(expected_cmd_arr)
+ subprocess.check_call(expected_string, shell=True).AndReturn(None)
+
+ expected_cmd_arr = copy.deepcopy(base_arr)
+ expected_cmd_arr.extend(reporting_arr)
+ expected_cmd_arr.extend(jobconf_arr)
+ expected_cmd_arr.extend(num_reduce_tasks_arr)
+ expected_cmd_arr.extend(hadoop_extra_args)
+ expected_cmd_arr.append('--output-protocol=raw_value')
+ expected_cmd_arr.append('--cleanup=NONE')
+ expected_cmd_arr.append(input_path)
+ subprocess.check_call(' '.join(expected_hadoop_rm_cmd_arr),
+ shell=True).AndReturn(None)
+ expected_string = ' '.join(expected_cmd_arr)
+ subprocess.check_call(expected_string, shell=True).AndReturn(None)
+
+ self.mox.ReplayAll()
+
+ launcher.RunMRJobStreamingJob(input_path, output_path, mrjob_file,
+ archive_file, job_name, python_path)
+
+ launcher.RunMRJobStreamingJob([input_path, input_path2], output_path,
+ mrjob_file, archive_file, job_name,
+ python_path)
+
+ launcher.RunMRJobStreamingJob(input_path, output_path,
+ mrjob_file, archive_file, job_name,
+ python_path,
+ num_reduce_tasks_override=37,
+ jar_paths=[reporting_jar],
+ partitioner_class=partitioner_class)
+
+ launcher.RunMRJobStreamingJob(input_path, output_path,
+ mrjob_file, archive_file, job_name,
+ python_path,
+ num_reduce_tasks_override=37,
+ jar_paths=[reporting_jar],
+ partitioner_class=partitioner_class,
+ output_protocol_override='raw_value',
+ cleanup_override='NONE',
+ delete_output_path=True)
+
+if __name__ == '__main__':
+ unittest.main()

0 comments on commit 75ee86e

Please sign in to comment.
Something went wrong with that request. Please try again.