Skip to content

Real Multiple Outputs in Hadoop

Paul Houle edited this page Sep 11, 2013 · 3 revisions
Clone this wiki locally

Why?

I think many people find MultiplesOutputs in Hadoop to be unsatisfying. For one thing, the implementation is completely different.

In the v2 API, at least, there is the problem that multiple outputs go into files like

/project/part-m-00000.gz
/project/alpha-m-00000.gz
/project/beta-m-00000.gz

instead of being able to send them to separate directories.

/mainOutput/
/alpha/
/beta/

Cascading and Pig can split up data and send it out into different paths, so it's got to be possible. This became Issue #25.

How was it done?

By copying the code from Hadoop's MultipleOutputs and modifying it. The implementation can be seen here:

com.java.ontology2.bakemono.mapred source

There are three classes involved:

  • RealMultipleOutputs -- replaces MultipleOutputs; the really interesting trick behind MultipleOutputs is that it clones the Configuration of the Hadoop job and passes this context into the place where data is written into alternate streams. Shunting files to different directories or filesystems is mainly a matter of generating a different path name.
  • RealMultipleOutputsCommitter -- Well, it's a little more complex than that because with all the files stuffed in one directory, the old MultipleOutputs could rely on the default commit mechanism but now with the files spread out in different directories, we need to commit all of the separate files. This class is initialized with a list of OutputCommitter(s) and simply applies commit operations to the union of all registered Committers.
  • RealMultipleOutputsMainOutputWrapper -- This is a very simple class with a long name. This is a shim used to get the RealMultipleOutputsCommitter into place. It is technically a FileOutputFormat but really it is a wrapper for whatever output format you want for the primary output stream.

How do you use it?

It is straightforward to add multiple outputs to a job IF you initialize the RealMultipleOutputs AFTER you initialize everything else. If you try it any other way it's possible that the RealMultipleOutputs system won't know enough to initialize all of the things that it needs to initialize itself.

The following code snippet copied from PSE3Tool.java illustrates how it is done:

            String input=arg0[0];
            String output=arg0[1];

            Path acceptedPath=new Path(output,"accepted");
            Path rejectedPath=new Path(output,"rejected");

            conf.set("mapred.compress.map.output", "true");
            conf.set("mapred.output.compression.type", "BLOCK"); 
            conf.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");

            Job job=new Job(conf,"pse3");
            job.setJarByClass(PSE3Tool.class);
            job.setMapperClass(PSE3Mapper.class);
            job.setReducerClass(Uniq.class);
            job.setNumReduceTasks(4);

            job.setMapOutputKeyClass(WritableTriple.class);
            job.setMapOutputValueClass(LongWritable.class);


            job.setOutputKeyClass(Triple.class);
            job.setOutputValueClass(LongWritable.class);

            FileInputFormat.addInputPath(job, new Path(input));
            FileOutputFormat.setOutputPath(job, acceptedPath);
            FileOutputFormat.setCompressOutput(job, true);
            FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
            RealMultipleOutputs.addNamedOutput(job, "rejected", rejectedPath,TextOutputFormat.class, Text.class, Text.class);

            // Gotcha -- this has to run before the definitions above associated with the output format because
            // this is going to be configured against the job as it stands a moment from now

            job.setOutputFormatClass(RealMultipleOutputsMainOutputWrapper.class);
            RealMultipleOutputsMainOutputWrapper.setRootOutputFormat(job, SPOTripleOutputFormat.class);


            return job.waitForCompletion(true) ? 0 :1;

Real and Imagined Limitations

In developing this I got close enough to Hadoop to smell its sweat, so I feel it is likely to be reliable.

There are a few cases where I've inlined private methods on classes from Hadoop. The usual pattern is that this is a static 'setter' or 'getter' that sets or gets the value on a Job or Configuration object that is passed in. If Hadoop or some Hadoop knockoff decides to change the way configuration works this could break.

The system also assumes that all of the output formats are FileOutputFormats. If this isn't the case, something will break. Using this together with MultipleOutputs or trying to stuff a RealMultipleOutputsMainOutputWrapper into itself is a recipe for disaster.

If any other modules you are using need to be initialized last, you may have problems.

Something went wrong with that request. Please try again.