Skip to content


Subversion checkout URL

You can clone with
Download ZIP


Skip/Limit Support in MongoInputSplit cursor #56

wants to merge 2 commits into from

6 participants


If limit or skip is supplied to MongoInputSplit, add it to the query cursor rather than just ignoring it.


Can you provide a valid use case for skip/limit? We have internally been discussing deprecating it.


We're internally trying a custom Splitter that cuts down on job execution time when the number of records returned by the query is relatively small compared to the total data size.

From what we can see, the adapter creates splits for every chunk/vector in the database, and the query is essentially run on each, correct? We have some jobs that can sometimes return few or no records - e.g. "give me the documents that have changed in the last 20 minutes". On our unsharded database, with > 1M records, this created ~6k splits, taking ~1hr for 0 actual input records.

The current solution we have is to count up the number of hits on the query up front - if its a relatively low percentage of the actual database, we create splits using the query, using limit and skip calculated based on # of hit records and average document size. I got bit passing in the limit and skip without effect, so patched it so that they were considered.

I know it's far from optimal, and can (and does) have worse performance when crunching a large amount of the data, but when processing a small subset of the data brings our total job execution down drastically because of fewer splits created.

If you have any ideas, I would love to discuss it with you if you have the time.


Hey I just saw this - we ran into the exact problem described by bpfoster above a while back - in case its useful to anyone else who comes upon this discussion, we developed a custom splitter:

Just dump those files (they're under Apache-Licensed) in a library and use them instead of the default ones.


    hj.setInputFormatClass((Class<? extends InputFormat>) Class.forName ("com.ikanow.infinit.e.data_model.custom.InfiniteMongoInputFormat", true, child));
    hj.setOutputFormatClass((Class<? extends OutputFormat>) Class.forName ("com.mongodb.hadoop.MongoOutputFormat", true, child));


    <property><!-- InputFormat Class --><name>mongo.job.input.format</name><value>com.ikanow.infinit.e.data_model.custom.InfiniteMongoInputFormat</value></property>
    <property><!-- OutputFormat Class --><name>mongo.job.output.format</name><value>com.mongodb.hadoop.MongoOutputFormat</value></property>

The only additional configuration that is needed are 2 XML fields specifying some limits:

    <property><!-- Maximum number of splits [optional] --><name>max.splits</name><value>8</value></property>
    <property><!-- Maximum number of docs per split [optional] --><name></name><value>12500</value></property>
  • max.splits: the maximum number of groups of data that will be grabbed at once
  • the maximum number of data entries that will be grabbed for each split.

Note that if the maximums would be exceeded, the splitter "backs out" to the default MongoInputFormat.

@llvtt llvtt added the core label

We're running into this issue as well - I'll describe our use case, and maybe someone from the Mongo team can chime in with any thoughts?

Basically, our full set of documents is many times larger than the data we want to analyze at any given time. For instance, we might have a collection with 32 million items, split across 13000 shard chunks, but we're only analyzing about 100k documents.

We've tried almost every permutation of the mongo config settings in the connector, and as near as I can tell, your options really only are: 1) 1 Split / Shard, or 2) 1 Split / Shard Chunk. Is that a fair assessment, is there a way (besides using the splitter reference above, thanks for sharing, BTW) to create a more reasonable number of chunks?

Happy to provide more information.




In case it's any help -

We ended up with 2 solutions

  • We picked a compound shard key (time and data type) that mapped onto our typical hadoop queries (and then also rewrote the chunk/shard code to identify and discard empty shards based on comparing the relevant query terms to the chunk limits)
  • Used other indexes to generate a stream of _ids (ie making the query "covered," hence very efficient since you're only hitting the index collection) and then create mappers out of the _id stream (1000 at a time in our case, we have large-ish objects though)

And then also:

  • One thing we didn't do, but other MongoDB users I have talked to have done is: if your data changes infrequently then you can periodically "export" the bson data based on that query and then generate mappers over that data (this also has the advantage of not hitting the DB at all during the task

Hey Alex,
Thanks so much for the comment - I only just realized this discussion is 2 years old, so thanks for chiming in! I'll look over the ideas and see if they'll work for us - sounds like the common theme is rewriting the mongo splitter.



No worries - I just wish I'd been less lazy and made the components more re-usable for other people!

I also haven't checked the repo in the last year, so it's possible other people have already contributed some of these types of solutions back.


@bwmcadams or @mpobrien or @erh Could someone from the maintainers chime in? I'm happy to build this and PR, but last time I did a PR against mongo-hadoop it took 3 months to review and was completely broken by the time you guys looked at it. Do you have any thoughts about how to handle this situation? Why was it even an internal debate about whether or not to support it?

Thanks guys!

EDIT: I should say, I'm happy to make some of Alex's ideas into something more reusable, packaged with the connector.


Hi Pat, @llvtt is maintaining this project now - he can take a look at it for you.



Let me see if I'm understanding this correctly:

If you want to analyze only a small fraction of the contents of a MongoDB collection, there isn't an efficient way to do that given current the current Splitters, since they work based on shard chunks, shards, or $min/$max, thus creating more splits than necessary, if that data is spread across many chunks or parts of an index when calling 'splitVector'.

You're proposing to get around this by adding another means of splitting MongoDB collections by number of documents, done with $skip and $limit in addition to mongo.input.query... is that correct?

This sounds fine to me. Make a pull request, and I'll review it.


Thanks so much for chiming in Lyle! Quick question - I might be missing something, but is there a way to use min / max for split generation against sharded clusters? As near as I could tell, the only way to create splits is shard chunks or actual shards - is there another way?

Then, quick design question, from a config perspective, how would you want a user to enable this split option? Also, I'm not 100% sure I'd use skip / limit since I'm trying to get a more general way to just limit the number of splits, period.



$min/$max is only used for shard chunks and when splitting an unsharded collection (both per the results of the splitVector command). There isn't any way to specify $min/$max on each split directly.

As for configuration, there's already a config option (mongo.splitter.class) to specify the splitter implementation manually. If you wrote a MongoSkipAndLimitSplitter, then the user can just pass the class name to this option. I don't know if there needs to be anything more complicated for choosing the splitter.


Pinging this discussion, though I know it's been inactive for awhile. I'm working on implementing something like this as part of My current plan is to implement a SkipAndLimitSplitter. This Splitter exchanges more load on MongoDB for splits are all nonempty and (mostly) evenly sized, even after applying mongo.input.query. This could potentially save a ton of time on the Hadoop end. As I was writing it, it occurred to me that someone may have written it already.

So as not to duplicate effort or step on anyone's toes, I thought I'd ask here if someone has written this. If not, no worries, I'll just use the code I've written.

@patwhite, did you make any progress on your Splitter implementation?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
8 core/src/main/java/com/mongodb/hadoop/input/
@@ -136,9 +136,11 @@ public void readFields( DataInput in ) throws IOException{
DBCursor getCursor(){
// Return the cursor with the split's query, etc. already slotted in for
// them.
- // todo - support limit/skip
if ( _cursor == null ){
- _cursor = MongoConfigUtil.getCollection( _mongoURI ).find( _querySpec, _fieldSpec ).sort( _sortSpec );
+ _cursor = MongoConfigUtil.getCollection( _mongoURI ).find( _querySpec, _fieldSpec ).sort( _sortSpec ).limit(_limit);
+ if (_skip > 0) {
+ _cursor.skip(_skip);
+ }
if (_notimeout) _cursor.setOptions( Bytes.QUERYOPTION_NOTIMEOUT );
@@ -160,7 +162,7 @@ BSONDecoder getBSONDecoder(){
public String toString(){
- return "MongoInputSplit{URI=" + _mongoURI + ", keyField=" + _keyField + ", query=" + _querySpec + ", sort=" + _sortSpec + ", fields=" + _fieldSpec + '}';
+ return "MongoInputSplit{URI=" + _mongoURI + ", keyField=" + _keyField + ", query=" + _querySpec + ", sort=" + _sortSpec + ", fields=" + _fieldSpec + ", limit=" + _limit + ", skip=" + _skip +'}';
public MongoInputSplit(){ }
Something went wrong with that request. Please try again.