Skip to content

Commit

Permalink
starting some sharding mongod logic cleaning
Browse files Browse the repository at this point in the history
  • Loading branch information
erh committed Jun 11, 2010
1 parent 37e6e26 commit 951ac0f
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 31 deletions.
2 changes: 1 addition & 1 deletion SConstruct
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ coreServerFiles += scriptingFiles

coreShardFiles = []
shardServerFiles = coreShardFiles + Glob( "s/strategy*.cpp" ) + [ "s/commands_admin.cpp" , "s/commands_public.cpp" , "s/request.cpp" , "s/cursors.cpp" , "s/server.cpp" , "s/chunk.cpp" , "s/shard.cpp" , "s/shardkey.cpp" , "s/config.cpp" , "s/config_migrate.cpp" , "s/s_only.cpp" , "s/stats.cpp" , "s/balance.cpp" , "s/balancer_policy.cpp" , "db/cmdline.cpp" ]
serverOnlyFiles += coreShardFiles + [ "s/d_logic.cpp" ]
serverOnlyFiles += coreShardFiles + [ "s/d_logic.cpp" , "s/d_writeback.cpp" ]

serverOnlyFiles += [ "db/module.cpp" ] + Glob( "db/modules/*.cpp" )

Expand Down
32 changes: 2 additions & 30 deletions s/d_logic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "../util/queue.h"

#include "shard.h"
#include "d_logic.h"

using namespace std;

Expand All @@ -49,7 +50,6 @@ namespace mongo {
string shardConfigServer;

boost::thread_specific_ptr<OID> clientServerIds;
map< string , BlockingQueue<BSONObj>* > clientQueues;

unsigned long long getVersion( BSONElement e , string& errmsg ){
if ( e.eoo() ){
Expand Down Expand Up @@ -80,32 +80,7 @@ namespace mongo {
}
};

class WriteBackCommand : public MongodShardCommand {
public:
virtual LockType locktype() const { return NONE; }
WriteBackCommand() : MongodShardCommand( "writebacklisten" ){}
void help(stringstream& h) const { h<<"internal"; }
bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){

BSONElement e = cmdObj.firstElement();
if ( e.type() != jstOID ){
errmsg = "need oid as first value";
return 0;
}

const OID id = e.__oid();

if ( ! clientQueues[id.str()] )
clientQueues[id.str()] = new BlockingQueue<BSONObj>();

BSONObj z = clientQueues[id.str()]->blockingPop();
log(1) << "WriteBackCommand got : " << z << endl;

result.append( "data" , z );

return true;
}
} writeBackCommand;

// setShardVersion( ns )

Expand Down Expand Up @@ -158,9 +133,6 @@ namespace mongo {
OID * nid = new OID();
nid->init( s );
clientServerIds.reset( nid );

if ( ! clientQueues[s] )
clientQueues[s] = new BlockingQueue<BSONObj>();
}
else if ( clientId != *clientServerIds.get() ){
errmsg = "server id has changed!";
Expand Down Expand Up @@ -550,7 +522,7 @@ namespace mongo {
b.append( "ns" , ns );
b.appendBinData( "msg" , m.header()->len , bdtCustom , (char*)(m.singleData()) );
log() << "writing back msg with len: " << m.header()->len << " op: " << m.operation() << endl;
clientQueues[clientID->str()]->push( b.obj() );
queueWriteBack( clientID->str() , b.obj() );

return true;
}
Expand Down
3 changes: 3 additions & 0 deletions s/d_logic.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@

namespace mongo {

/* queue a write back on a remote server for a failed write */
void queueWriteBack( const string& remote , const BSONObj& o );

/**
* @return true if we have any shard info for the ns
*/
Expand Down
79 changes: 79 additions & 0 deletions s/d_writeback.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// d_logic.cpp

/**
* Copyright (C) 2008 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

#include "../pch.h"

#include "../db/commands.h"
#include "../db/jsobj.h"
#include "../db/dbmessage.h"
#include "../db/query.h"

#include "../client/connpool.h"

#include "../util/queue.h"

#include "shard.h"

using namespace std;

namespace mongo {

map< string , BlockingQueue<BSONObj>* > writebackQueue;
mongo::mutex writebackQueueLock("sharding:writebackQueueLock");

BlockingQueue<BSONObj>* getWritebackQueue( const string& remote ){
scoped_lock lk (writebackQueueLock );
BlockingQueue<BSONObj>*& q = writebackQueue[remote];
if ( ! q )
q = new BlockingQueue<BSONObj>();
return q;
}

void queueWriteBack( const string& remote , const BSONObj& o ){
getWritebackQueue( remote )->push( o );
}

class WriteBackCommand : public Command {
public:
virtual LockType locktype() const { return NONE; }
virtual bool slaveOk() const { return false; }
virtual bool adminOnly() const { return true; }

WriteBackCommand() : Command( "writebacklisten" ){}

void help(stringstream& h) const { h<<"internal"; }

bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){

BSONElement e = cmdObj.firstElement();
if ( e.type() != jstOID ){
errmsg = "need oid as first value";
return 0;
}

const OID id = e.__oid();
BSONObj z = getWritebackQueue(id.str())->blockingPop();
log(1) << "WriteBackCommand got : " << z << endl;

result.append( "data" , z );

return true;
}
} writeBackCommand;

}

0 comments on commit 951ac0f

Please sign in to comment.