Permalink
Browse files

map/reduce no longer uses temp collections.

you have to specify out
SERVER-1837
  • Loading branch information...
1 parent c249669 commit c0fa67131864a2f2c5cf59f7cb83eb8612ab40b8 @erh erh committed Dec 16, 2010
View
217 db/commands/mr.cpp
@@ -75,7 +75,7 @@ namespace mongo {
// since there are many cases where the point of finalize
// is converting many fields to 1
BSONObjBuilder b;
- b.append( o["_id"] );
+ b.append( o.firstElement() );
s->append( b , "value" , "return" );
return b.obj();
}
@@ -180,47 +180,53 @@ namespace mongo {
ns = dbname + "." + cmdObj.firstElement().valuestr();
verbose = cmdObj["verbose"].trueValue();
- keeptemp = cmdObj["keeptemp"].trueValue();
-
- { // setup names
- stringstream ss;
- if ( ! keeptemp )
- ss << "tmp.";
- ss << "mr." << cmdObj.firstElement().String() << "_" << time(0) << "_" << JOB_NUMBER++;
- tempShort = ss.str();
- tempLong = dbname + "." + tempShort;
- incLong = tempLong + "_inc";
-
- if ( ! keeptemp && markAsTemp )
- cc().addTempCollection( tempLong );
- replicate = keeptemp;
+ uassert( 13602 , "outType is no longer a valid option" , cmdObj["outType"].eoo() );
- if ( cmdObj["out"].type() == String ){
- finalShort = cmdObj["out"].valuestr();
- replicate = true;
- }
- else
- finalShort = tempShort;
-
- finalLong = dbname + "." + finalShort;
-
+ if ( cmdObj["out"].type() == String ){
+ finalShort = cmdObj["out"].String();
+ outType = REPLACE;
}
+ else if ( cmdObj["out"].type() == Object ){
+ BSONObj o = cmdObj["out"].embeddedObject();
+ uassert( 13607 , "'out' has to have a single field" , o.nFields() == 1 );
+
+ BSONElement e = o.firstElement();
+ string t = e.fieldName();
- if ( cmdObj["outType"].type() == String ){
- uassert( 13521 , "need 'out' if using 'outType'" , cmdObj["out"].type() == String );
- string t = cmdObj["outType"].String();
- if ( t == "normal" || t == "replace" )
+ if ( t == "normal" || t == "replace" ){
outType = REPLACE;
- else if ( t == "merge" )
+ finalShort = e.String();
+ }
+ else if ( t == "merge" ){
outType = MERGE;
- else if ( t == "reduce" )
+ finalShort = e.String();
+ }
+ else if ( t == "reduce" ){
outType = REDUCE;
- else
- uasserted( 13522 , str::stream() << "unknown outType [" << t << "]" );
+ finalShort = e.String();
+ }
+ else if ( t == "inline" ){
+ outType = INMEMORY;
+ }
+ else{
+ uasserted( 13522 , str::stream() << "unknown out specifier [" << t << "]" );
+ }
}
else {
- outType = REPLACE;
+ uasserted( 13606 , "'out' has to be a string or an object" );
+ }
+
+ if ( outType != INMEMORY ){ // setup names
+ tempShort = str::stream() << "tmp.mr." << cmdObj.firstElement().String() << "_" << finalShort << "_" << JOB_NUMBER++;
+ tempLong = dbname + "." + tempShort;
+
+ incLong = tempLong + "_inc";
+
+ //if ( markAsTemp )
+ cc().addTempCollection( tempLong );
+
+ finalLong = dbname + "." + finalShort;
}
{ // scope and code
@@ -256,13 +262,16 @@ namespace mongo {
}
void State::prepTempCollection(){
+ if ( ! _onDisk )
+ return;
+
_db.dropCollection( _config.tempLong );
{ // create
writelock lock( _config.tempLong.c_str() );
Client::Context ctx( _config.tempLong.c_str() );
string errmsg;
- assert( userCreateNS( _config.tempLong.c_str() , BSONObj() , errmsg , _config.replicate ) );
+ assert( userCreateNS( _config.tempLong.c_str() , BSONObj() , errmsg , true ) );
}
@@ -291,8 +300,41 @@ namespace mongo {
}
+ void State::appendResults( BSONObjBuilder& final ){
+ if ( _onDisk )
+ return;
+
+ uassert( 13604 , "too much data for in memory map/reduce" , _size < ( BSONObjMaxUserSize / 2 ) );
+
+ BSONArrayBuilder b( (int)(_size * 1.2) ); // _size is data size, doesn't count overhead and keys
+
+ for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); ++i ){
+ BSONObj key = i->first;
+ BSONList& all = i->second;
+
+ assert( all.size() == 1 );
+
+ BSONObjIterator vi( all[0] );
+ vi.next();
+
+ BSONObjBuilder temp( b.subobjStart() );
+ temp.appendAs( key.firstElement() , "_id" );
+ temp.appendAs( vi.next() , "value" );
+ temp.done();
+ }
+
+ BSONArray res = b.arr();
+ uassert( 13605 , "too much data for in memory map/reduce" , res.objsize() < ( BSONObjMaxUserSize * 2 / 3 ) );
+
+ final.append( "results" , res );
+ }
+
long long State::renameIfNeeded(){
- assertInWriteLock();
+ if ( ! _onDisk )
+ return _temp->size();
+
+ dblock lock;
+
if ( _config.finalLong == _config.tempLong )
return _db.count( _config.finalLong );
@@ -342,26 +384,31 @@ namespace mongo {
_db.dropCollection( _config.tempLong );
break;
}
+ case Config::INMEMORY: {
+ return _temp->size();
+ }
}
+
return _db.count( _config.finalLong );
}
void State::insert( const string& ns , BSONObj& o ){
+ assert( _onDisk );
+
writelock l( ns );
Client::Context ctx( ns );
- if ( _config.replicate )
- theDataFileMgr.insertAndLog( ns.c_str() , o , false );
- else
- theDataFileMgr.insertWithObjMod( ns.c_str() , o , false );
+ theDataFileMgr.insertAndLog( ns.c_str() , o , false );
}
void State::_insertToInc( BSONObj& o ){
+ assert( _onDisk );
theDataFileMgr.insertWithObjMod( _config.incLong.c_str() , o , true );
}
State::State( const Config& c ) : _config( c ), _size(0), _numEmits(0){
_temp.reset( new InMemory() );
+ _onDisk = _config.outType != Config::INMEMORY;
}
bool State::sourceExists(){
@@ -372,9 +419,16 @@ namespace mongo {
return _db.count( _config.ns , _config.filter , 0 , (unsigned) _config.limit );
}
- void State::cleanup(){
- _db.dropCollection( _config.tempLong );
- _db.dropCollection( _config.incLong );
+ State::~State(){
+ if ( _onDisk ){
+ try {
+ _db.dropCollection( _config.tempLong );
+ _db.dropCollection( _config.incLong );
+ }
+ catch ( std::exception& e ){
+ error() << "couldn't cleanup after map reduce: " << e.what() << endl;
+ }
+ }
}
void State::init(){
@@ -391,16 +445,17 @@ namespace mongo {
_config.finalizer->init( this );
_scope->injectNative( "emit" , fast_emit );
-
- // clear temp collections
- _db.dropCollection( _config.tempLong );
- _db.dropCollection( _config.incLong );
-
- writelock l( _config.incLong );
- Client::Context ctx( _config.incLong );
- string err;
- assert( userCreateNS( _config.incLong.c_str() , BSON( "autoIndexId" << 0 ) , err , false ) );
+ if ( _onDisk ){
+ // clear temp collections
+ _db.dropCollection( _config.tempLong );
+ _db.dropCollection( _config.incLong );
+
+ writelock l( _config.incLong );
+ Client::Context ctx( _config.incLong );
+ string err;
+ assert( userCreateNS( _config.incLong.c_str() , BSON( "autoIndexId" << 0 ) , err , false ) );
+ }
}
@@ -415,6 +470,26 @@ namespace mongo {
}
void State::finalReduce( CurOp * op , ProgressMeterHolder& pm ){
+ if ( ! _onDisk ){
+ if ( _config.finalizer ){
+ long size = 0;
+ for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); ++i ){
+ BSONObj key = i->first;
+ BSONList& all = i->second;
+
+ assert( all.size() == 1 );
+
+ BSONObj res = _config.finalizer->finalize( all[0] );
+
+ all.clear();
+ all.push_back( res );
+ size += res.objsize();
+ }
+ _size = size;
+ }
+ return;
+ }
+
assert( _temp->size() == 0 );
// TODO: this is a bit sketchy
@@ -477,6 +552,7 @@ namespace mongo {
}
void State::reduceInMemory(){
+
InMemory * n = new InMemory(); // for new data
long nSize = 0;
@@ -485,10 +561,15 @@ namespace mongo {
BSONList& all = i->second;
if ( all.size() == 1 ){
- // this key has low cardinality, so just write to db
- writelock l(_config.incLong);
- Client::Context ctx(_config.incLong.c_str());
- _insertToInc( *(all.begin()) );
+ if ( _onDisk ){
+ // this key has low cardinality, so just write to db
+ writelock l(_config.incLong);
+ Client::Context ctx(_config.incLong.c_str());
+ _insertToInc( *(all.begin()) );
+ }
+ else {
+ _add( n , all[0] , nSize );
+ }
}
else if ( all.size() > 1 ){
BSONObj res = _config.reducer->reduce( all );
@@ -501,6 +582,9 @@ namespace mongo {
}
void State::dumpToInc(){
+ if ( ! _onDisk )
+ return;
+
writelock l(_config.incLong);
Client::Context ctx(_config.incLong);
@@ -529,6 +613,9 @@ namespace mongo {
}
void State::checkSize(){
+ if ( ! _onDisk )
+ return;
+
if ( _size < 1024 * 5 )
return;
@@ -670,16 +757,12 @@ namespace mongo {
}
catch ( ... ){
log() << "mr failed, removing collection" << endl;
- state.cleanup();
throw;
}
- long long finalCount = 0;
- {
- dblock lock;
- finalCount = state.renameIfNeeded();
- }
-
+ long long finalCount = state.renameIfNeeded();
+ state.appendResults( result );
+
timingBuilder.append( "total" , t.millis() );
result.append( "result" , config.finalShort );
@@ -783,13 +866,9 @@ namespace mongo {
state.dumpToInc();
- long long finalCount;
- {
- dblock lk;
- finalCount = state.renameIfNeeded();
- }
- log(0) << " mapreducefinishcommand " << config.finalLong << " " << finalCount << endl;
-
+ state.renameIfNeeded();
+ state.appendResults( result );
+
for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ){
ScopedDbConnection conn( i->_server );
conn->dropCollection( dbname + "." + shardedOutputCollection );
View
44 db/commands/mr.h
@@ -42,7 +42,10 @@ namespace mongo {
virtual ~Finalizer(){}
virtual void init( State * state ) = 0;
- virtual BSONObj finalize( const BSONObj& o ) = 0;
+ /**
+ * this takes a tuple and returns a tuple
+ */
+ virtual BSONObj finalize( const BSONObj& tuple ) = 0;
};
class Reducer : boost::noncopyable {
@@ -150,8 +153,6 @@ namespace mongo {
// options
bool verbose;
- bool keeptemp;
- bool replicate;
// query options
@@ -179,7 +180,8 @@ namespace mongo {
enum { REPLACE , // atomically replace the collection
MERGE , // merge keys, override dups
- REDUCE // merge keys, reduce dups
+ REDUCE , // merge keys, reduce dups
+ INMEMORY // only store in memory, limited in size
} outType;
static AtomicUInt JOB_NUMBER;
@@ -192,6 +194,8 @@ namespace mongo {
class State {
public:
State( const Config& c );
+ ~State();
+
void init();
// ---- prep -----
@@ -231,15 +235,18 @@ namespace mongo {
void finalReduce( CurOp * op , ProgressMeterHolder& pm );
// ------- cleanup/data positioning ----------
-
+
/**
@return number objects in collection
*/
long long renameIfNeeded();
- /** removes temp collections */
- void cleanup();
-
+ /**
+ * if INMEMORY will append
+ * may also append stats or anything else it likes
+ */
+ void appendResults( BSONObjBuilder& b );
+
// -------- util ------------
/**
@@ -263,33 +270,16 @@ namespace mongo {
scoped_ptr<Scope> _scope;
const Config& _config;
+ bool _onDisk; // if the end result of this map reduce is disk or not
DBDirectClient _db;
scoped_ptr<InMemory> _temp;
- long _size;
+ long _size; // bytes in _temp
long long _numEmits;
};
- /**
- * keeps all temporary state in memory
- * if data is larger than can fit in a BSONObj to return
- * will throw an exception
- */
- class StateInMemory : public State {
-
- };
-
- /**
- * keeps some things in memory and pushes
- * to disk when gets too big for ram
- * intended for when output will end up on disk
- */
- class StateOnDisk : public State {
-
- };
-
BSONObj fast_emit( const BSONObj& args );
} // end mr namespace
View
22 jstests/mr1.js
@@ -49,7 +49,7 @@ r2 = function( key , values ){
return total;
};
-res = db.runCommand( { mapreduce : "mr1" , map : m , reduce : r } );
+res = db.runCommand( { mapreduce : "mr1" , map : m , reduce : r , out : "mr1_out" } );
d( res );
if ( ks == "_id" ) assert( res.ok , "not ok" );
assert.eq( 4 , res.counts.input , "A" );
@@ -66,7 +66,7 @@ assert.eq( 3 , z.b , "E" );
assert.eq( 3 , z.c , "F" );
x.drop();
-res = db.runCommand( { mapreduce : "mr1" , map : m , reduce : r , query : { x : { "$gt" : 2 } } } );
+res = db.runCommand( { mapreduce : "mr1" , map : m , reduce : r , query : { x : { "$gt" : 2 } } , out : "mr1_out" } );
d( res );
assert.eq( 2 , res.counts.input , "B" );
x = db[res.result];
@@ -77,7 +77,7 @@ assert.eq( 1 , z.b , "C2" );
assert.eq( 2 , z.c , "C3" );
x.drop();
-res = db.runCommand( { mapreduce : "mr1" , map : m2 , reduce : r2 , query : { x : { "$gt" : 2 } } } );
+res = db.runCommand( { mapreduce : "mr1" , map : m2 , reduce : r2 , query : { x : { "$gt" : 2 } } , out : "mr1_out" } );
d( res );
assert.eq( 2 , res.counts.input , "B" );
x = db[res.result];
@@ -104,7 +104,7 @@ for ( i=5; i<1000; i++ ){
t.save( { x : i , tags : [ "b" , "d" ] } );
}
-res = db.runCommand( { mapreduce : "mr1" , map : m , reduce : r } );
+res = db.runCommand( { mapreduce : "mr1" , map : m , reduce : r , out : "mr1_out" } );
d( res );
assert.eq( 999 , res.counts.input , "Z1" );
x = db[res.result];
@@ -125,20 +125,20 @@ assert.eq( 995 , getk( "d" ).value.count , "ZD" );
x.drop();
if ( true ){
- printjson( db.runCommand( { mapreduce : "mr1" , map : m , reduce : r , verbose : true } ) );
+ printjson( db.runCommand( { mapreduce : "mr1" , map : m , reduce : r , verbose : true , out : "mr1_out" } ) );
}
print( "t1: " + Date.timeFunc(
function(){
- var out = db.runCommand( { mapreduce : "mr1" , map : m , reduce : r } );
+ var out = db.runCommand( { mapreduce : "mr1" , map : m , reduce : r , out : "mr1_out" } );
if ( ks == "_id" ) assert( out.ok , "XXX : " + tojson( out ) );
db[out.result].drop();
} , 10 ) + " (~500 on 2.8ghz) - itcount: " + Date.timeFunc( function(){ db.mr1.find().itcount(); } , 10 ) );
// test doesn't exist
-res = db.runCommand( { mapreduce : "lasjdlasjdlasjdjasldjalsdj12e" , map : m , reduce : r } );
+res = db.runCommand( { mapreduce : "lasjdlasjdlasjdjasldjalsdj12e" , map : m , reduce : r , out : "mr1_out" } );
assert( ! res.ok , "should be not ok" );
if ( true ){
@@ -166,11 +166,15 @@ if ( true ){
}
x.drop();
- res = db.runCommand( { mapreduce : "mr1" , out : "mr1_foo" , map : m2 , reduce : r2 } );
+ res = db.runCommand( { mapreduce : "mr1" , out : "mr1_foo" , map : m2 , reduce : r2 , out : "mr1_out" } );
d(res);
print( "t3: " + res.timeMillis + " (~3500 on 2.8ghz)" );
+
+ res = db.runCommand( { mapreduce : "mr1" , map : m2 , reduce : r2 , out : { inline : true } } );
+ print( "t4: " + res.timeMillis );
+
}
-res = db.runCommand( { mapreduce : "mr1" , map : m , reduce : r } );
+res = db.runCommand( { mapreduce : "mr1" , map : m , reduce : r , out : "mr1_out" } );
assert( res.ok , "should be ok" );
View
26 jstests/mr2.js
@@ -29,7 +29,12 @@ function r( who , values ){
function reformat( r ){
var x = {};
- r.find().forEach(
+ var cursor;
+ if ( r.results )
+ cursor = r.results;
+ else
+ cursor = r.find();
+ cursor.forEach(
function(z){
x[z._id] = z.value;
}
@@ -41,10 +46,21 @@ function f( who , res ){
res.avg = res.totalSize / res.num;
return res;
}
-res = t.mapReduce( m , r , { finalize : f } );
+
+res = t.mapReduce( m , r , { finalize : f , out : "mr2_out" } );
+printjson( res )
x = reformat( res );
-assert.eq( 9 , x.a.avg , "A" );
-assert.eq( 16 , x.b.avg , "B" );
-assert.eq( 18 , x.c.avg , "C" );
+assert.eq( 9 , x.a.avg , "A1" );
+assert.eq( 16 , x.b.avg , "A2" );
+assert.eq( 18 , x.c.avg , "A3" );
res.drop();
+res = t.mapReduce( m , r , { finalize : f , out : { inline : 1 } } );
+printjson( res )
+x = reformat( res );
+assert.eq( 9 , x.a.avg , "B1" );
+assert.eq( 16 , x.b.avg , "B2" );
+assert.eq( 18 , x.c.avg , "B3" );
+res.drop();
+
+
View
10 jstests/mr3.js
@@ -25,7 +25,7 @@ r = function( key , values ){
return { count : total };
};
-res = t.mapReduce( m , r );
+res = t.mapReduce( m , r , { out : "mr3_out" } );
z = res.convertToSingleObject()
assert.eq( 3 , Object.keySet( z ).length , "A1" );
@@ -35,7 +35,7 @@ assert.eq( 3 , z.c.count , "A4" );
res.drop();
-res = t.mapReduce( m , r , { mapparams : [ 2 , 2 ] } );
+res = t.mapReduce( m , r , { out : "mr3_out" , mapparams : [ 2 , 2 ] } );
z = res.convertToSingleObject()
assert.eq( 3 , Object.keySet( z ).length , "B1" );
@@ -52,15 +52,15 @@ realm = m;
m = function(){
emit( this._id , 1 );
}
-res = t.mapReduce( m , r );
+res = t.mapReduce( m , r , { out : "mr3_out" } );
res.drop();
m = function(){
emit( this._id , this.xzz.a );
}
before = db.getCollectionNames().length;
-assert.throws( function(){ t.mapReduce( m , r ); } );
+assert.throws( function(){ t.mapReduce( m , r , { out : "mr3_out" } ); } );
assert.eq( before , db.getCollectionNames().length , "after throw crap" );
@@ -69,5 +69,5 @@ r = function( k , v ){
return v.x.x.x;
}
before = db.getCollectionNames().length;
-assert.throws( function(){ t.mapReduce( m , r ); } );
+assert.throws( function(){ t.mapReduce( m , r , "mr3_out" ) } )
assert.eq( before , db.getCollectionNames().length , "after throw crap" );
View
4 jstests/mr4.js
@@ -23,7 +23,7 @@ r = function( key , values ){
return { count : total };
};
-res = t.mapReduce( m , r , { scope : { xx : 1 } } );
+res = t.mapReduce( m , r , { out : "mr4_out" , scope : { xx : 1 } } );
z = res.convertToSingleObject()
assert.eq( 3 , Object.keySet( z ).length , "A1" );
@@ -34,7 +34,7 @@ assert.eq( 3 , z.c.count , "A4" );
res.drop();
-res = t.mapReduce( m , r , { scope : { xx : 2 } } );
+res = t.mapReduce( m , r , { scope : { xx : 2 } , out : "mr4_out" } );
z = res.convertToSingleObject()
assert.eq( 3 , Object.keySet( z ).length , "A1" );
View
4 jstests/mr5.js
@@ -25,7 +25,7 @@ r = function( k , v ){
return { stats : stats , total : total }
}
-res = t.mapReduce( m , r , { scope : { xx : 1 } } );
+res = t.mapReduce( m , r , { out : "mr5_out" , scope : { xx : 1 } } );
//res.find().forEach( printjson )
z = res.convertToSingleObject()
@@ -44,7 +44,7 @@ m = function(){
-res = t.mapReduce( m , r , { scope : { xx : 1 } } );
+res = t.mapReduce( m , r , { out : "mr5_out" , scope : { xx : 1 } } );
//res.find().forEach( printjson )
z = res.convertToSingleObject()
View
6 jstests/mr_bigobject.js
@@ -18,13 +18,13 @@ r = function( k , v ){
return 1;
}
-assert.throws( function(){ t.mapReduce( m , r ); } , "emit should fail" )
+assert.throws( function(){ t.mapReduce( m , r , "mr_bigobject_out" ); } , "emit should fail" )
m = function(){
emit( 1 , this.s );
}
-assert.eq( { 1 : 1 } , t.mapReduce( m , r ).convertToSingleObject() , "A1" )
+assert.eq( { 1 : 1 } , t.mapReduce( m , r , "mr_bigobject_out" ).convertToSingleObject() , "A1" )
r = function( k , v ){
total = 0;
@@ -38,4 +38,4 @@ r = function( k , v ){
return total;
}
-assert.eq( { 1 : 10 * s.length } , t.mapReduce( m , r ).convertToSingleObject() , "A1" )
+assert.eq( { 1 : 10 * s.length } , t.mapReduce( m , r , "mr_bigobject_out" ).convertToSingleObject() , "A1" )
View
6 jstests/mr_errorhandling.js
@@ -24,15 +24,15 @@ r = function( k , v ){
return total;
}
-res = t.mapReduce( m_good , r );
+res = t.mapReduce( m_good , r , "mr_errorhandling_out" );
assert.eq( { 1 : 1 , 2 : 2 , 3 : 2 , 4 : 1 } , res.convertToSingleObject() , "A" );
res.drop()
res = null;
theerror = null;
try {
- res = t.mapReduce( m_bad , r );
+ res = t.mapReduce( m_bad , r , "mr_errorhandling_out" );
}
catch ( e ){
theerror = e.toString();
@@ -42,6 +42,6 @@ assert( theerror , "B2" );
assert( theerror.indexOf( "emit" ) >= 0 , "B3" );
// test things are still in an ok state
-res = t.mapReduce( m_good , r );
+res = t.mapReduce( m_good , r , "mr_errorhandling_out" );
assert.eq( { 1 : 1 , 2 : 2 , 3 : 2 , 4 : 1 } , res.convertToSingleObject() , "A" );
res.drop()
View
6 jstests/mr_index2.js
@@ -7,16 +7,16 @@ t.save( { arr : [1, 2] } )
map = function() { emit(this._id, 1) }
reduce = function(k,vals) { return Array.sum( vals ); }
-res = t.mapReduce(map,reduce, { query : {} })
+res = t.mapReduce(map,reduce, { out : "mr_index2_out" , query : {} })
assert.eq( 1 ,res.counts.input , "A" )
res.drop()
-res = t.mapReduce(map,reduce, { query : { arr: {$gte:0} } })
+res = t.mapReduce(map,reduce, { out : "mr_index2_out" , query : { arr: {$gte:0} } })
assert.eq( 1 ,res.counts.input , "B" )
res.drop()
t.ensureIndex({arr:1})
-res = t.mapReduce(map,reduce, { query : { arr: {$gte:0} } })
+res = t.mapReduce(map,reduce, { out : "mr_index2_out" , query : { arr: {$gte:0} } })
assert.eq( 1 ,res.counts.input , "C" )
res.drop();
View
2 jstests/mr_merge.js
@@ -39,7 +39,7 @@ assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "B" );
t.insert( { a : [ 5 , 6 ] } )
out.insert( { _id : 10 , value : "5" } )
-res = t.mapReduce( m , r , { out : outName , outType : "merge" } )
+res = t.mapReduce( m , r , { out : { merge : outName } } )
expected["5"]++;
expected["10"] = 5
View
2 jstests/mr_outreduce.js
@@ -31,7 +31,7 @@ assert.eq( tos( expected ) , tos( res.convertToSingleObject() ) , "A" );
t.insert( { _id : 4 , a : [ 4 , 5 ] } )
out.insert( { _id : 10 , value : "5" } ) // this is a sentinal to make sure it wasn't killed
-res = t.mapReduce( m , r , { out : outName , outType : "reduce" , query : { _id : { $gt : 3 } } } )
+res = t.mapReduce( m , r , { out : { reduce : outName } , query : { _id : { $gt : 3 } } } )
expected["4"]++;
expected["5"] = 1
View
6 jstests/mr_sort.js
@@ -24,17 +24,17 @@ r = function( k , v ){
}
-res = t.mapReduce( m , r );
+res = t.mapReduce( m , r , "mr_sort_out " );
x = res.convertToSingleObject();
res.drop();
assert.eq( { "a" : 55 } , x , "A1" )
-res = t.mapReduce( m , r , { query : { x : { $lt : 3 } } } )
+res = t.mapReduce( m , r , { out : "mr_sort_out" , query : { x : { $lt : 3 } } } )
x = res.convertToSingleObject();
res.drop();
assert.eq( { "a" : 3 } , x , "A2" )
-res = t.mapReduce( m , r , { sort : { x : 1 } , limit : 2 } );
+res = t.mapReduce( m , r , { out : "mr_sort_out" , sort : { x : 1 } , limit : 2 } );
x = res.convertToSingleObject();
res.drop();
assert.eq( { "a" : 3 } , x , "A3" )
View
31 jstests/sharding/features2.js
@@ -92,8 +92,10 @@ r = function( key , values ){
doMR = function( n ){
print(n);
-
- var res = db.mr.mapReduce( m , r );
+
+ // on-disk
+
+ var res = db.mr.mapReduce( m , r , "smr1_out" );
printjson( res );
assert.eq( new NumberLong(4) , res.counts.input , "MR T0 " + n );
@@ -103,11 +105,30 @@ doMR = function( n ){
var z = {};
x.find().forEach( function(a){ z[a._id] = a.value.count; } );
assert.eq( 3 , Object.keySet( z ).length , "MR T2 " + n );
- assert.eq( 2 , z.a , "MR T2 " + n );
- assert.eq( 3 , z.b , "MR T2 " + n );
- assert.eq( 3 , z.c , "MR T2 " + n );
+ assert.eq( 2 , z.a , "MR T3 " + n );
+ assert.eq( 3 , z.b , "MR T4 " + n );
+ assert.eq( 3 , z.c , "MR T5 " + n );
x.drop();
+
+ // inline
+
+ var res = db.mr.mapReduce( m , r , { out : { inline : 1 } } );
+ printjson( res );
+ assert.eq( new NumberLong(4) , res.counts.input , "MR T6 " + n );
+
+ var z = {};
+ res.find().forEach( function(a){ z[a._id] = a.value.count; } );
+ printjson( z );
+ assert.eq( 3 , Object.keySet( z ).length , "MR T7 " + n ) ;
+ assert.eq( 2 , z.a , "MR T8 " + n );
+ assert.eq( 3 , z.b , "MR T9 " + n );
+ assert.eq( 3 , z.c , "MR TA " + n );
+
+ print( "sleeping for eliot" );
+ sleep( 20000 );
+
+
}
doMR( "before" );
View
4 s/commands_public.cpp
@@ -842,9 +842,7 @@ namespace mongo {
fn == "verbose" ){
b.append( e );
}
- else if ( fn == "keeptemp" ||
- fn == "out" ||
- fn == "outType" ||
+ else if ( fn == "out" ||
fn == "finalize" ){
// we don't want to copy these
}
View
13 shell/collection.js
@@ -541,6 +541,8 @@ MapReduceResult.prototype._simpleKeys = function(){
}
MapReduceResult.prototype.find = function(){
+ if ( this.results )
+ return this.results;
return DBCollection.prototype.find.apply( this._coll , arguments );
}
@@ -560,10 +562,15 @@ MapReduceResult.prototype.convertToSingleObject = function(){
/**
* @param optional object of optional fields;
*/
-DBCollection.prototype.mapReduce = function( map , reduce , optional ){
+DBCollection.prototype.mapReduce = function( map , reduce , optionsOrOutString ){
var c = { mapreduce : this._shortName , map : map , reduce : reduce };
- if ( optional )
- Object.extend( c , optional );
+ assert( optionsOrOutString , "need to an optionsOrOutString" )
+
+ if ( typeof( optionsOrOutString ) == "string" )
+ c["out"] = optionsOrOutString;
+ else
+ Object.extend( c , optionsOrOutString );
+
var raw = this._db.runCommand( c );
if ( ! raw.ok )
throw "map reduce failed: " + tojson( raw );
View
13 shell/mongo_vstudio.cpp
@@ -3289,6 +3289,8 @@ const StringData _jscode_raw_collection =
"}\n"
"\n"
"MapReduceResult.prototype.find = function(){\n"
+"if ( this.results )\n"
+"return this.results;\n"
"return DBCollection.prototype.find.apply( this._coll , arguments );\n"
"}\n"
"\n"
@@ -3308,10 +3310,15 @@ const StringData _jscode_raw_collection =
"/**\n"
"* @param optional object of optional fields;\n"
"*/\n"
-"DBCollection.prototype.mapReduce = function( map , reduce , optional ){\n"
+"DBCollection.prototype.mapReduce = function( map , reduce , optionsOrOutString ){\n"
"var c = { mapreduce : this._shortName , map : map , reduce : reduce };\n"
-"if ( optional )\n"
-"Object.extend( c , optional );\n"
+"assert( optionsOrOutString , \"need to an optionsOrOutString\" )\n"
+"\n"
+"if ( typeof( optionsOrOutString ) == \"string\" )\n"
+"c[\"out\"] = optionsOrOutString;\n"
+"else\n"
+"Object.extend( c , optionsOrOutString );\n"
+"\n"
"var raw = this._db.runCommand( c );\n"
"if ( ! raw.ok )\n"
"throw \"map reduce failed: \" + tojson( raw );\n"

0 comments on commit c0fa671

Please sign in to comment.