Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

SERVER-2531: added REDUCE mode for M/R to sharded output collection

  • Loading branch information...
commit 1388ecbdb76160bbfbb2df018ed860c2c81f592a 1 parent 5a2f4ce
@agirbal agirbal authored
View
147 db/commands/mr.cpp
@@ -1116,102 +1116,113 @@ namespace mongo {
virtual LockType locktype() const { return NONE; }
bool run(const string& dbname , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
string shardedOutputCollection = cmdObj["shardedOutputCollection"].valuestrsafe();
+ string postProcessCollection = cmdObj["postProcessCollection"].valuestrsafe();
+ bool postProcessOnly = !postProcessCollection.empty();
Config config( dbname , cmdObj.firstElement().embeddedObjectUserCheck() );
+ if (postProcessOnly) {
+ // the temp collection has been decided by mongos
+ config.tempLong = dbname + "." + postProcessCollection;
+ }
+ // no need for incremental collection because records are already sorted
config.incLong = config.tempLong;
- set<ServerAndQuery> servers;
-
+ State state(config);
BSONObj shards = cmdObj["shards"].embeddedObjectUserCheck();
BSONObj shardCounts = cmdObj["shardCounts"].embeddedObjectUserCheck();
BSONObj counts = cmdObj["counts"].embeddedObjectUserCheck();
- vector< auto_ptr<DBClientCursor> > shardCursors;
-
- {
- // parse per shard results
- BSONObjIterator i( shards );
- while ( i.more() ) {
- BSONElement e = i.next();
- string shard = e.fieldName();
-
- BSONObj res = e.embeddedObjectUserCheck();
-
- uassert( 10078 , "something bad happened" , shardedOutputCollection == res["result"].valuestrsafe() );
- servers.insert( shard );
+ if (postProcessOnly) {
+ // this is usually for reduce mode
+ if (!state._db.exists(config.tempLong)) {
+ // nothing to do
+ return 1;
}
+ } else {
+ set<ServerAndQuery> servers;
+ vector< auto_ptr<DBClientCursor> > shardCursors;
- }
-
- State state(config);
- state.prepTempCollection();
+ {
+ // parse per shard results
+ BSONObjIterator i( shards );
+ while ( i.more() ) {
+ BSONElement e = i.next();
+ string shard = e.fieldName();
- {
- // reduce from each stream
+ BSONObj res = e.embeddedObjectUserCheck();
- BSONObj sortKey = BSON( "_id" << 1 );
+ uassert( 10078 , "something bad happened" , shardedOutputCollection == res["result"].valuestrsafe() );
+ servers.insert( shard );
- ParallelSortClusteredCursor cursor( servers , dbname + "." + shardedOutputCollection ,
- Query().sort( sortKey ) );
- cursor.init();
- state.init();
+ }
- BSONList values;
- if (!config.outDB.empty()) {
- BSONObjBuilder loc;
- if ( !config.outDB.empty())
- loc.append( "db" , config.outDB );
- if ( !config.finalShort.empty() )
- loc.append( "collection" , config.finalShort );
- result.append("result", loc.obj());
- }
- else {
- if ( !config.finalShort.empty() )
- result.append( "result" , config.finalShort );
}
- while ( cursor.more() ) {
- BSONObj t = cursor.next().getOwned();
+ state.prepTempCollection();
- if ( values.size() == 0 ) {
- values.push_back( t );
- continue;
+ {
+ // reduce from each stream
+
+ BSONObj sortKey = BSON( "_id" << 1 );
+
+ ParallelSortClusteredCursor cursor( servers , dbname + "." + shardedOutputCollection ,
+ Query().sort( sortKey ) );
+ cursor.init();
+ state.init();
+
+ BSONList values;
+ if (!config.outDB.empty()) {
+ BSONObjBuilder loc;
+ if ( !config.outDB.empty())
+ loc.append( "db" , config.outDB );
+ if ( !config.finalShort.empty() )
+ loc.append( "collection" , config.finalShort );
+ result.append("result", loc.obj());
}
-
- if ( t.woSortOrder( *(values.begin()) , sortKey ) == 0 ) {
- values.push_back( t );
- continue;
+ else {
+ if ( !config.finalShort.empty() )
+ result.append( "result" , config.finalShort );
}
- BSONObj res = config.reducer->finalReduce( values , config.finalizer.get());
- if (state.isOnDisk())
- state.insertToInc(res);
- else
- state.emit(res);
- values.clear();
- values.push_back( t );
+ while ( cursor.more() || !values.empty() ) {
+ BSONObj t;
+ if (cursor.more()) {
+ t = cursor.next().getOwned();
+
+ if ( values.size() == 0 ) {
+ values.push_back( t );
+ continue;
+ }
+
+ if ( t.woSortOrder( *(values.begin()) , sortKey ) == 0 ) {
+ values.push_back( t );
+ continue;
+ }
+ }
+
+ BSONObj res = config.reducer->finalReduce( values , config.finalizer.get());
+ if (state.isOnDisk())
+ state.insertToInc(res);
+ else
+ state.emit(res);
+ values.clear();
+ if (!t.isEmpty())
+ values.push_back( t );
+ }
}
- if ( values.size() ) {
- BSONObj res = config.reducer->finalReduce( values , config.finalizer.get() );
- if (state.isOnDisk())
- state.insertToInc(res);
- else
- state.emit(res);
+ for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ) {
+ ScopedDbConnection conn( i->_server );
+ conn->dropCollection( dbname + "." + shardedOutputCollection );
+ conn.done();
}
+
+ result.append( "shardCounts" , shardCounts );
}
long long finalCount = state.postProcessCollection();
state.appendResults( result );
- for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ) {
- ScopedDbConnection conn( i->_server );
- conn->dropCollection( dbname + "." + shardedOutputCollection );
- conn.done();
- }
-
- result.append( "shardCounts" , shardCounts );
-
// fix the global counts
BSONObjBuilder countsB(32);
BSONObjIterator j(counts);
View
3  db/commands/mr.h
@@ -283,6 +283,7 @@ namespace mongo {
void bailFromJS();
const Config& _config;
+ DBDirectClient _db;
protected:
@@ -291,8 +292,6 @@ namespace mongo {
scoped_ptr<Scope> _scope;
bool _onDisk; // if the end result of this map reduce is disk or not
- DBDirectClient _db;
-
scoped_ptr<InMemory> _temp;
long _size; // bytes in _temp
long _dupCount; // number of duplicate key entries
View
81 s/commands_public.cpp
@@ -1003,9 +1003,7 @@ namespace mongo {
// so we allocate them in our thread
// and hand off
// Note: why not use pooled connections? This has been reported to create too many connections
-
vector< shared_ptr<ShardConnection> > shardConns;
-
list< shared_ptr<Future::CommandResult> > futures;
for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) {
@@ -1100,9 +1098,7 @@ namespace mongo {
mr_shard::State state(config);
log(1) << "mr sharded output ns: " << config.ns << endl;
- // for now we only support replace output collection in sharded mode
- if (config.outType != mr_shard::Config::REPLACE &&
- config.outType != mr_shard::Config::MERGE) {
+ if (config.outType == mr_shard::Config::INMEMORY) {
errmsg = "This Map Reduce mode is not supported with sharded output";
return false;
}
@@ -1111,18 +1107,22 @@ namespace mongo {
BSONObjBuilder loc;
if ( !config.outDB.empty())
loc.append( "db" , config.outDB );
- if ( !config.finalShort.empty() )
- loc.append( "collection" , config.finalShort );
+ loc.append( "collection" , config.finalShort );
result.append("result", loc.obj());
}
else {
if ( !config.finalShort.empty() )
result.append( "result" , config.finalShort );
}
- string outns = config.finalLong;
- bool merge = (config.outType == mr_shard::Config::MERGE);
- if (!merge) {
+ string outns = config.finalLong;
+ string tempns;
+ if (config.outType == mr_shard::Config::REDUCE) {
+ // result will be inserted into a temp collection to post process
+ const string postProcessCollection = getTmpName( collection );
+ cout << "post process collection is " << postProcessCollection << endl;
+ tempns = dbName + "." + postProcessCollection;
+ } else if (config.outType == mr_shard::Config::REPLACE) {
// drop previous collection
BSONObj dropColCmd = BSON("drop" << config.finalShort);
BSONObjBuilder dropColResult(32);
@@ -1134,14 +1134,20 @@ namespace mongo {
}
}
- // create the sharded collection
BSONObj sortKey = BSON( "_id" << 1 );
- BSONObj shardColCmd = BSON("shardCollection" << outns << "key" << sortKey);
- BSONObjBuilder shardColResult(32);
- bool res = Command::runAgainstRegistered("admin.$cmd", shardColCmd, shardColResult);
- if (!res) {
- errmsg = str::stream() << "Could not create sharded output collection " << outns << ": " << shardColResult.obj().toString();
- return false;
+ if (!conf->isSharded(outns)) {
+ // create the sharded collection
+
+ BSONObj shardColCmd = BSON("shardCollection" << outns << "key" << sortKey);
+ BSONObjBuilder shardColResult(32);
+ bool res = Command::runAgainstRegistered("admin.$cmd", shardColCmd, shardColResult);
+ if (!res) {
+ errmsg = str::stream() << "Could not create sharded output collection " << outns << ": " << shardColResult.obj().toString();
+ return false;
+ }
+
+ // since it's new collection, use replace mode always
+ config.outType = mr_shard::Config::REPLACE;
}
ParallelSortClusteredCursor cursor( servers , dbName + "." + shardedOutputCollection ,
@@ -1152,7 +1158,7 @@ namespace mongo {
mr_shard::BSONList values;
Strategy* s = SHARDED;
long long finalCount = 0;
- while ( cursor.more() || values.size() > 0 ) {
+ while ( cursor.more() || !values.empty() ) {
BSONObj t;
if ( cursor.more() ) {
t = cursor.next().getOwned();
@@ -1169,11 +1175,14 @@ namespace mongo {
}
BSONObj final = config.reducer->finalReduce(values, config.finalizer.get());
- if (merge) {
+ if (config.outType == mr_shard::Config::MERGE) {
BSONObj id = final["_id"].wrap();
- s->updateSharded(conf, outns.c_str(), id, final, UpdateOption_Upsert);
+ s->updateSharded(conf, outns.c_str(), id, final, UpdateOption_Upsert, true);
+ } else if (config.outType == mr_shard::Config::REDUCE) {
+ // insert into temp collection, but final collection's sharding
+ s->insertSharded(conf, tempns.c_str(), final, 0, true, outns.c_str());
} else {
- s->insertSharded(conf, outns.c_str(), final, 0);
+ s->insertSharded(conf, outns.c_str(), final, 0, true);
}
++finalCount;
values.clear();
@@ -1181,6 +1190,36 @@ namespace mongo {
values.push_back( t );
}
+ if (config.outType == mr_shard::Config::REDUCE) {
+ // results were written to temp collection, need final reduce
+ vector< shared_ptr<ShardConnection> > shardConns;
+ list< shared_ptr<Future::CommandResult> > futures;
+ BSONObj finalCmdObj = finalCmd.obj();
+ for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) {
+ shared_ptr<ShardConnection> temp( new ShardConnection( i->getConnString() , outns ) );
+ futures.push_back( Future::spawnCommand( i->getConnString() , dbName , finalCmdObj , temp->get() ) );
+ shardConns.push_back( temp );
+ }
+
+ // now wait for the result of all shards
+ bool failed = false;
+ for ( list< shared_ptr<Future::CommandResult> >::iterator i=futures.begin(); i!=futures.end(); i++ ) {
+ shared_ptr<Future::CommandResult> res = *i;
+ if ( ! res->join() ) {
+ error() << "final reduce on sharded output m/r failed on shard: " << res->getServer() << " error: " << res->result() << endl;
+ result.append( "cause" , res->result() );
+ errmsg = "mongod mr failed: ";
+ errmsg += res->result().toString();
+ failed = true;
+ continue;
+ }
+ BSONObj result = res->result();
+ }
+
+ for ( unsigned i=0; i<shardConns.size(); i++ )
+ shardConns[i]->done();
+ }
+
for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ) {
ScopedDbConnection conn( i->_server );
conn->dropCollection( dbName + "." + shardedOutputCollection );
View
8 s/strategy.cpp
@@ -67,17 +67,19 @@ namespace mongo {
dbcon.done();
}
- void Strategy::insert( const Shard& shard , const char * ns , const BSONObj& obj , int flags ) {
+ void Strategy::insert( const Shard& shard , const char * ns , const BSONObj& obj , int flags, bool safe ) {
ShardConnection dbcon( shard , ns );
if ( dbcon.setVersion() ) {
dbcon.done();
throw StaleConfigException( ns , "for insert" );
}
dbcon->insert( ns , obj , flags);
+ if (safe)
+ dbcon->getLastError();
dbcon.done();
}
- void Strategy::update( const Shard& shard , const char * ns , const BSONObj& query , const BSONObj& toupdate , int flags ) {
+ void Strategy::update( const Shard& shard , const char * ns , const BSONObj& query , const BSONObj& toupdate , int flags, bool safe ) {
bool upsert = flags & UpdateOption_Upsert;
bool multi = flags & UpdateOption_Multi;
@@ -87,6 +89,8 @@ namespace mongo {
throw StaleConfigException( ns , "for insert" );
}
dbcon->update( ns , query , toupdate, upsert, multi);
+ if (safe)
+ dbcon->getLastError();
dbcon.done();
}
View
8 s/strategy.h
@@ -32,15 +32,15 @@ namespace mongo {
virtual void getMore( Request& r ) = 0;
virtual void writeOp( int op , Request& r ) = 0;
- virtual void insertSharded( DBConfigPtr conf, const char* ns, BSONObj& o, int flags ) = 0;
- virtual void updateSharded( DBConfigPtr conf, const char* ns, BSONObj& query, BSONObj& toupdate, int flags ) = 0;
+ virtual void insertSharded( DBConfigPtr conf, const char* ns, BSONObj& o, int flags, bool safe=false, const char* nsChunkLookup=0 ) = 0;
+ virtual void updateSharded( DBConfigPtr conf, const char* ns, BSONObj& query, BSONObj& toupdate, int flags, bool safe=false ) = 0;
protected:
void doWrite( int op , Request& r , const Shard& shard , bool checkVersion = true );
void doQuery( Request& r , const Shard& shard );
- void insert( const Shard& shard , const char * ns , const BSONObj& obj , int flags=0 );
- void update( const Shard& shard , const char * ns , const BSONObj& query , const BSONObj& toupdate , int flags=0 );
+ void insert( const Shard& shard , const char * ns , const BSONObj& obj , int flags=0 , bool safe=false );
+ void update( const Shard& shard , const char * ns , const BSONObj& query , const BSONObj& toupdate , int flags=0, bool safe=false );
};
View
14 s/strategy_shard.cpp
@@ -191,8 +191,10 @@ namespace mongo {
}
}
- void insertSharded( DBConfigPtr conf, const char* ns, BSONObj& o, int flags ) {
- ChunkManagerPtr manager = conf->getChunkManager(ns);
+ void insertSharded( DBConfigPtr conf, const char* ns, BSONObj& o, int flags, bool safe, const char* nsChunkLookup ) {
+ if (!nsChunkLookup)
+ nsChunkLookup = ns;
+ ChunkManagerPtr manager = conf->getChunkManager(nsChunkLookup);
if ( ! manager->hasShardKey( o ) ) {
bool bad = true;
@@ -206,7 +208,7 @@ namespace mongo {
}
if ( bad ) {
- log() << "tried to insert object without shard key: " << ns << " " << o << endl;
+ log() << "tried to insert object without shard key: " << nsChunkLookup << " " << o << endl;
uasserted( 14842 , "tried to insert object without shard key" );
}
@@ -221,7 +223,7 @@ namespace mongo {
try {
ChunkPtr c = manager->findChunk( o );
log(4) << " server:" << c->getShard().toString() << " " << o << endl;
- insert( c->getShard() , ns , o , flags);
+ insert( c->getShard() , ns , o , flags, safe);
// r.gotInsert();
// if ( r.getClientInfo()->autoSplitOk() )
@@ -344,7 +346,7 @@ namespace mongo {
}
}
- void updateSharded( DBConfigPtr conf, const char* ns, BSONObj& query, BSONObj& toupdate, int flags ) {
+ void updateSharded( DBConfigPtr conf, const char* ns, BSONObj& query, BSONObj& toupdate, int flags, bool safe ) {
ChunkManagerPtr manager = conf->getChunkManager(ns);
BSONObj chunkFinder = query;
@@ -410,7 +412,7 @@ namespace mongo {
// int * x = (int*)(r.d().afterNS());
// x[0] |= UpdateOption_Broadcast;
for ( set<Shard>::iterator i=shards.begin(); i!=shards.end(); i++) {
- update(*i, ns, query, toupdate, flags);
+ update(*i, ns, query, toupdate, flags, safe);
}
}
else {
View
4 s/strategy_single.cpp
@@ -262,11 +262,11 @@ namespace mongo {
return true;
}
- void insertSharded( DBConfigPtr conf, const char* ns, BSONObj& o, int flags ) {
+ void insertSharded( DBConfigPtr conf, const char* ns, BSONObj& o, int flags, bool safe, const char* nsChunkLookup ) {
// only useful for shards
}
- void updateSharded( DBConfigPtr conf, const char* ns, BSONObj& query, BSONObj& toupdate, int flags ) {
+ void updateSharded( DBConfigPtr conf, const char* ns, BSONObj& query, BSONObj& toupdate, int flags, bool safe ) {
// only useful for shards
}
Please sign in to comment.
Something went wrong with that request. Please try again.