Permalink
Browse files

SERVER-2581: MR locks server for a long time in merge and reduce mode…

…s with many keys
  • Loading branch information...
1 parent 0c8d23a commit cf953c164861f764eb7b4333cf97e2159a08ab3d @agirbal agirbal committed Aug 21, 2011
Showing with 37 additions and 1 deletion.
  1. +16 −1 db/commands/mr.cpp
  2. +4 −0 db/commands/mr.h
  3. +9 −0 jstests/mr_merge.js
  4. +8 −0 jstests/mr_outreduce.js
View
@@ -261,6 +261,12 @@ namespace mongo {
if (o.hasElement("db")) {
outDB = o["db"].String();
}
+
+ if (o.hasElement("nonAtomic")) {
+ outNonAtomic = o["nonAtomic"].Bool();
+ if (outNonAtomic)
+ uassert( 15889 , "nonAtomic option cannot be used with this output type", (outType == REDUCE || outType == MERGE) );
+ }
}
else {
uasserted( 13606 , "'out' has to be a string or an object" );
@@ -422,12 +428,19 @@ namespace mongo {
if ( _onDisk == false || _config.outType == Config::INMEMORY )
return _temp->size();
- dblock lock;
+ if (_config.outNonAtomic)
+ return postProcessCollectionNonAtomic();
+ writelock lock;
+ return postProcessCollectionNonAtomic();
+ }
+
+ long long State::postProcessCollectionNonAtomic() {
if ( _config.finalLong == _config.tempLong )
return _db.count( _config.finalLong );
if ( _config.outType == Config::REPLACE || _db.count( _config.finalLong ) == 0 ) {
+ writelock lock;
// replace: just rename from temp to final collection name, dropping previous collection
_db.dropCollection( _config.finalLong );
BSONObj info;
@@ -441,6 +454,7 @@ namespace mongo {
// merge: upsert new docs into old collection
auto_ptr<DBClientCursor> cursor = _db.query( _config.tempLong , BSONObj() );
while ( cursor->more() ) {
+ writelock lock;
BSONObj o = cursor->next();
Helpers::upsert( _config.finalLong , o );
getDur().commitIfNeeded();
@@ -453,6 +467,7 @@ namespace mongo {
auto_ptr<DBClientCursor> cursor = _db.query( _config.tempLong , BSONObj() );
while ( cursor->more() ) {
+ writelock lock;
BSONObj temp = cursor->next();
BSONObj old;
View
@@ -194,6 +194,9 @@ namespace mongo {
INMEMORY // only store in memory, limited in size
} outType;
+ // if true, no lock during output operation
+ bool outNonAtomic;
+
static AtomicUInt JOB_NUMBER;
}; // end MRsetup
@@ -252,6 +255,7 @@ namespace mongo {
@return number objects in collection
*/
long long postProcessCollection();
+ long long postProcessCollectionNonAtomic();
/**
* if INMEMORY will append
View
@@ -47,5 +47,14 @@ expected["6"] = 1
assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "C" );
+// test that the nonAtomic output gives valid result
+t.insert( { a : [ 6 , 7 ] } )
+out.insert( { _id : 20 , value : "10" } )
+res = t.mapReduce( m , r , { out : { merge : outName, nonAtomic: true } } )
+expected["6"]++;
+expected["20"] = 10
+expected["7"] = 1
+
+assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "D" );
View
@@ -38,4 +38,12 @@ expected["5"] = 1
expected["10"] = 5
assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "B" );
+t.insert( { _id : 5 , a : [ 5 , 6 ] } )
+out.insert( { _id : 20 , value : "10" } ) // this is a sentinal to make sure it wasn't killed
+res = t.mapReduce( m , r , { out : { reduce : outName, nonAtomic: true } , query : { _id : { $gt : 4 } } } )
+
+expected["5"]++;
+expected["6"] = 1
+expected["20"] = 10
+assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "C" );

0 comments on commit cf953c1

Please sign in to comment.