Permalink
Browse files

HADOOP-48 add config option to use of / instead of min/max

  • Loading branch information...
mpobrien committed Jan 17, 2013
1 parent 4ac8b3d commit 6e4ee2a346e4b895c2cd50008ee5e680dc9025a8
@@ -302,6 +302,15 @@ public boolean isShardChunkedSplittingEnabled() {
public void setShardChunkSplittingEnabled( boolean value) {
MongoConfigUtil.setShardChunkSplittingEnabled( _conf, value );
}
+
+
+ public boolean isRangeQueryEnabled(){
+ return MongoConfigUtil.isRangeQueryEnabled( _conf );
+ }
+
+ public void setRangeQueryEnabled(boolean value){
+ MongoConfigUtil.setRangeQueryEnabled( _conf, value );
+ }
public boolean canReadSplitsFromSecondary() {
return MongoConfigUtil.canReadSplitsFromSecondary( _conf );
@@ -132,6 +132,13 @@
*/
public static final String SPLITS_SLAVE_OK = "mongo.input.split.allow_read_from_secondaries";
+ /**
+ * If true then queries for splits will be constructed using $lt/$gt instead of $min and $max.
+ *
+ * Defaults to {@code false}
+ */
+ public static final String SPLITS_USE_RANGEQUERY = "mongo.input.split.use_range_queries";
+
public static boolean isJobVerbose( Configuration conf ){
return conf.getBoolean( JOB_VERBOSE, false );
}
@@ -439,10 +446,28 @@ public static void setSplitSize( Configuration conf, int value ){
conf.setInt( INPUT_SPLIT_SIZE, value );
}
+ /**
+ * if TRUE,
+ * Splits will be queried using $lt/$gt instead of $max and $min.
+ * This allows the database's query optimizer to choose the best index,
+ * instead of being forced to use the one in the $max/$min keys.
+ * This will only work if the key used for splitting is *not* a compound key.
+ * Make sure that all values under the splitting key are of the same type, or
+ * this will cause incomplete results.
+ * @return
+ */
+ public static boolean isRangeQueryEnabled( Configuration conf ){
+ return conf.getBoolean( SPLITS_USE_RANGEQUERY, false );
+ }
+
+ public static void setRangeQueryEnabled( Configuration conf, boolean value ){
+ conf.setBoolean( SPLITS_USE_RANGEQUERY, value );
+ }
+
/**
* if TRUE,
* Splits will be read by connecting to the individual shard servers,
- * however this really isn't safe unless you know what you're doing.
+ * Only use this
* ( issue has to do with chunks moving / relocating during balancing phases)
* @return
*/
@@ -5,6 +5,8 @@
import com.mongodb.hadoop.input.*;
import org.apache.commons.logging.*;
import org.apache.hadoop.mapreduce.*;
+import org.bson.types.MinKey;
+import org.bson.types.MaxKey;
import java.net.UnknownHostException;
import java.util.*;
@@ -55,13 +57,14 @@
final boolean slaveOk = conf.canReadSplitsFromSecondary();
- log.info(" Calculate Splits Code ... Use Shards? " + useShards + ", Use Chunks? " + useChunks + "; Collection Sharded? " + isSharded);
+ final boolean useRangeQuery = conf.isRangeQueryEnabled();
+
+ log.info(" Calculate Splits Code ... Use Shards? " + useShards + ", Use Chunks? " + useChunks + "; Collection Sharded? " + isSharded + "; Use Rang queries? " + useRangeQuery);
if (conf.createInputSplits()) {
log.info( "Creation of Input Splits is enabled." );
if (isSharded && (useShards || useChunks)){ // todo I don't think these settings can be run together
if (useShards && useChunks)
log.warn( "Combining 'use chunks' and 'read from shards directly' can have unexpected & erratic behavior in a live system due to chunk migrations. " );
-
log.info( "Sharding mode calculation entering." );
return calculateShardedSplits( conf, useShards, useChunks, slaveOk, uri, mongo );
}
@@ -247,6 +250,11 @@ else if ( useShards ){
log.warn( "WARNING getting splits that connect directly to the backend mongods"
+ " is risky and might not produce correct results" );
+ if ( conf.isRangeQueryEnabled() ){
+ log.warn( "WARNING using range queries can produce incorrect results if values"
+ + " stored under the splitting key have different types.");
+ }
+
if ( log.isDebugEnabled() ){
log.debug( "getSplitsUsingChunks(): originalQuery: " + originalQuery );
}
@@ -307,8 +315,8 @@ else if ( useShards ){
final BasicDBObject row = (BasicDBObject) cur.next();
DBObject minObj = ( (DBObject) row.get( "min" ) );
DBObject shardKeyQuery = new BasicDBObject();
- DBObject min = new BasicDBObject();
- DBObject max = new BasicDBObject();
+ BasicDBObject min = new BasicDBObject();
+ BasicDBObject max = new BasicDBObject();
for ( String keyName : minObj.keySet() ){
Object tMin = minObj.get( keyName );
@@ -319,11 +327,49 @@ else if ( useShards ){
if ( !( tMax == SplitFriendlyDBCallback.MAX_KEY_TYPE || tMax.equals( "MaxKey" ) ) )
max.put( keyName, tMax );
}
+
+
/** We have to put something for $query or we'll fail; if no original query use an empty DBObj */
if ( originalQuery == null )
originalQuery = new BasicDBObject();
+
+ DBObject splitQuery = originalQuery;
+
+ boolean useMinMax = true;
+ if( conf.isRangeQueryEnabled() ){
+ Map.Entry<String, Object> minKey = min.size() == 1 ?
+ min.entrySet().iterator().next() : null;
+ Map.Entry<String, Object> maxKey = max.size() == 1 ?
+ max.entrySet().iterator().next() : null;
+ if(minKey == null && maxKey == null ){
+ log.error("Range query is enabled but one or more split boundaries contains a compound key:\n" +
+ "minKey: " + min.toString() + "\n" +
+ "maxKey: " + max.toString());
+ }
+
+ if( (minKey != null && originalQuery.containsKey(minKey.getKey())) ||
+ (maxKey != null && originalQuery.containsKey(maxKey.getKey())) ){
+ log.error("Range query is enabled but split key conflicts with query filter:\n" +
+ "minKey: " + min.toString() + "\n" +
+ "maxKey: " + max.toString() + "\n" +
+ "query: " + originalQuery.toString());
+ }
+ BasicDBObject rangeObj = new BasicDBObject();
+ if( minKey!=null )//&& !SplitFriendlyDBCallback.MIN_KEY_TYPE.equals(minKey.getValue())){
+ rangeObj.put("$gte", minKey.getValue());
+ //}
+ if( maxKey!=null )//&& !SplitFriendlyDBCallback.MAX_KEY_TYPE.equals(maxKey.getValue())){
+ rangeObj.put("$lt", maxKey.getValue());
+ //}
+ splitQuery = new BasicDBObject();
+ splitQuery.putAll(originalQuery);
+ splitQuery.put(minKey.getKey(), rangeObj);
+ useMinMax = false;
+ }
+
shardKeyQuery.put( "$query", originalQuery );
+
if ( log.isDebugEnabled() ){
log.debug( "[" + numChunks + "/" + numExpectedChunks + "] new query is: " + shardKeyQuery );
}
@@ -339,15 +385,17 @@ else if ( useShards ){
}
MongoInputSplit split = new MongoInputSplit( inputURI,
conf.getInputKey(),
- originalQuery,
+ splitQuery,
conf.getFields(),
conf.getSort(), // TODO - should inputKey be the shard key?
conf.getLimit(),
conf.getSkip(),
conf.isNoTimeout());
- split.setSpecialMin(min);
- split.setSpecialMax(max);
- splits.add(split);
+ if(useMinMax){
+ split.setSpecialMin(min);
+ split.setSpecialMax(max);
+ }
+ splits.add(split);
}
if ( log.isDebugEnabled() ){

0 comments on commit 6e4ee2a

Please sign in to comment.