Skip to content
This repository has been archived by the owner on Mar 28, 2019. It is now read-only.

Commit

Permalink
Give mappers access to the configuration via context. Allow using an …
Browse files Browse the repository at this point in the history
…environment variable to save results to a different directory.
  • Loading branch information
bsmedberg committed Feb 21, 2014
1 parent 8077126 commit 3b7b162
Showing 1 changed file with 22 additions and 6 deletions.
28 changes: 22 additions & 6 deletions org/mozilla/jydoop/HadoopDriver.java
Expand Up @@ -56,7 +56,7 @@ static private PythonWrapper getPythonWrapper(Configuration conf) throws IOExcep
public static class WritableIterWrapper extends PyIterator
{
private Iterator<PythonValue> iter;

public WritableIterWrapper(Iterator<PythonValue> i)
{
iter = i;
Expand Down Expand Up @@ -89,6 +89,11 @@ public Counter getCounter(String groupName, String counterName)
{
return cx.getCounter(groupName, counterName);
}

public Configuration getConfiguration()
{
return cx.getConfiguration();
}
}

public static class HBaseMapper extends TableMapper<PythonKey, PythonValue> {
Expand Down Expand Up @@ -291,7 +296,17 @@ public int run(String[] args) throws Exception {
String scriptFile = args[0];
String outPath = args[1];

Path outdir = new Path(outPath);
Path hadoopDir = new Path(outPath);

// If we're not using an absolute path, see if JYDOOP_OUTPUT_DIR is set
// in the environment.
if (!hadoopDir.isAbsolute()) {
String localOutputDir = System.getenv("JYDOOP_OUTPUT_DIR");
if (localOutputDir != null) {
outPath = new Path(new Path(localOutputDir), hadoopDir).toString();
}
}

final Configuration conf = getConf();
conf.set("mapred.compress.map.output", "true");
conf.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.SnappyCodec");
Expand All @@ -302,10 +317,10 @@ public int run(String[] args) throws Exception {
Job job = new Job(conf, jobname);
job.setJarByClass(HadoopDriver.class); // class that contains mapper
try {
fs.delete(outdir, true);
fs.delete(hadoopDir, true);
} catch(Exception e) {
}
FileOutputFormat.setOutputPath(job, outdir); // adjust directories as required
FileOutputFormat.setOutputPath(job, hadoopDir); // adjust directories as required

job.setMapOutputKeyClass(PythonKey.class);
job.setMapOutputValueClass(PythonValue.class);
Expand Down Expand Up @@ -371,7 +386,7 @@ public int run(String[] args) throws Exception {

// Now read the hadoop files and call the output function

final FileStatus[] files = fs.listStatus(outdir);
final FileStatus[] files = fs.listStatus(hadoopDir);

class KeyValueIterator extends PyIterator
{
Expand Down Expand Up @@ -414,10 +429,11 @@ public PyObject __iternext__()
outputfunc = org.python.core.imp.load("jydoop").__getattr__("outputWithKey");
}
}

outputfunc.__call__(Py.newString(outPath), new KeyValueIterator());

// If we got here, the temporary files are irrelevant. Delete them.
fs.delete(outdir, true);
fs.delete(hadoopDir, true);

return 0;
}
Expand Down

0 comments on commit 3b7b162

Please sign in to comment.