Skip to content

Allow a Job to be a DAG instead of a chain? #18

hydropyrum opened this Issue Aug 24, 2010 · 5 comments

5 participants


It would be nice if it were possible, when calling Job.additer, to specify that the input for an iteration should be the output of one or more previous iterations of the Job. Something like...

job = dumbo.Job()
job.input # is an id for job's input (i.e., specified by -input on the command line)
o0 = job.additer(mapper, reducer) # returns an id for the iteration's output
o1 = job.additer(mapper, reducer, input=job.input) # take input from the job input instead of iteration 0
o2 = job.additer(mapper, reducer, input=[o0,o1]) # take input from both iteration 0 and 1

The job's output would be the output of the last iteration as always.

It seems to me this would be a fairly easy modification that would add a lot of flexibility.


I like this implementation. It seems backward compatible.

This seems similar to the MultiMapper class (examples/, except that it allows separate reduces for the two maps. In the example above, the reducers (and mappers?) are the same, so it both maps could be done with MultiMapper, then fed into the same reducer, then the third map/reduce could process that (single) output. But if we wanted two distinct map/reduce pipelines feeding into a third, I don't think the MultiMapper could do it.


Agreed, this is more general than MultiMapper and would be a good addition to dumbo


This is now in my master branch. I made some minor stylistic changes, but apart from those it went in unchanged.

Updated example:

job = dumbo.Job()
job.root # id for the job's root input (i.e., specified by -input on the command line)
o0 = job.additer(mapper1, reducer1) # returns an id for the iteration's output
o1 = job.additer(mapper2, reducer2, input=job.root) # consume root input, not the output of iteration 0
o2 = job.additer(mapper3, reducer3, input=[o0,o1]) # take input from both iteration 0 and 1

I don't see how I can mix this nice feature with MultiMapper/JoinReducer.
Did I miss something?

edit: I may have found a way:

kwargs['output'] = job_output + "_pre" + str(iter + 1)


multimapper = MultiMapper()
multimapper.add("_pre1", primary(mapper))
multimapper.add("_pre2", secondary(mapper))

should do the trick.

This issue was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.