Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
vgkholla committed Mar 30, 2014
2 parents 8a5ddae + 38afb1b commit 7082a23
Show file tree
Hide file tree
Showing 11 changed files with 696 additions and 266 deletions.
1 change: 1 addition & 0 deletions src/mongo/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ commonFiles = [ "pch.cpp",
"util/net/listen.cpp",
"util/startup_test.cpp",
"util/version.cpp",
"util/HungarianAlgo.cpp",
"client/connpool.cpp",
"client/dbclient.cpp",
"client/dbclient_rs.cpp",
Expand Down
24 changes: 9 additions & 15 deletions src/mongo/db/instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,37 +448,31 @@ namespace mongo {
DbMessage d(m);
if (isWriteThrottled(d.getns()))
{
BSONObjBuilder b;
log() << "[MYCODE] Write Op Throttled" << endl;
b.append("err", "write throttled");
replyToQuery(0, m, dbresponse, b.obj());
return;
uassert(16733, "write throttled", false);
}
receivedInsert(m, currentOp);
else
receivedInsert(m, currentOp);
}
else if ( op == dbUpdate ) {
DbMessage d(m);
if (isWriteThrottled(d.getns()))
{
BSONObjBuilder b;
log() << "[MYCODE] Write Op Throttled" << endl;
b.append("err", "write throttled");
replyToQuery(0, m, dbresponse, b.obj());
return;
uassert(16734, "write throttled", false);
}
receivedUpdate(m, currentOp);
else
receivedUpdate(m, currentOp);
}
else if ( op == dbDelete ) {
DbMessage d(m);
if (isWriteThrottled(d.getns()))
{
BSONObjBuilder b;
log() << "[MYCODE] Write Op Throttled" << endl;
b.append("err", "write throttled");
replyToQuery(0, m, dbresponse, b.obj());
return;
uassert(16735, "write throttled", false);
}
receivedDelete(m, currentOp);
else
receivedDelete(m, currentOp);
}
else {
mongo::log() << " operation isn't supported: " << op << endl;
Expand Down
6 changes: 3 additions & 3 deletions src/mongo/db/oplog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ namespace mongo {
bool flag = false;
if (cli.exists(rsSettingNS))
{
log() << "[MYCODE] Checking for throttle value" << endl;
//log() << "[MYCODE] Checking for throttle value" << endl;
try
{
scoped_ptr<DBClientCursor> cursor(cli.query( rsSettingNS, Query() ));
Expand All @@ -96,10 +96,10 @@ namespace mongo {
log() << "[MYCODE] dbexception: findOne call failed for " << rsSettingNS << endl;
}

log() << "[MYCODE] Throttle value: " << throttleObj.toString() << endl;
//log() << "[MYCODE] Throttle value: " << throttleObj.toString() << endl;
if (!throttleObj["stopped"].eoo())
{
log() << "[MYCODE] Checking for stopped value" << endl;
//log() << "[MYCODE] Checking for stopped value" << endl;
bool stopped = throttleObj["stopped"].Bool();
if (stopped)
{
Expand Down
180 changes: 7 additions & 173 deletions src/mongo/db/repl/replset_commands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ namespace mongo {
out->push_back(Privilege(AuthorizationManager::SERVER_RESOURCE_NAME, actions));
}
virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
cout << "[MYCODE] Replica Set Write Throttle Command called" << endl;
cout << "[MYCODE] Replica Set Write Throttle Command called: " << cmdObj.toString() << endl;
if( !check(errmsg, result) )
return false;

Expand Down Expand Up @@ -726,177 +726,6 @@ namespace mongo {
}
} cmdReplSetAdd;

/*class CmdReplSetAdd : public ReplSetCommand {
public:
virtual void help( stringstream &help ) const {
help << "{ {replSetAdd : <host>}, {primary: true} }";
help << "'add' member to the replica set. If primary is true then add as primary\n";
help << "\nhttp://dochub.mongodb.org/core/replicasetcommands";
}
virtual void addRequiredPrivileges(const std::string& dbname,
const BSONObj& cmdObj,
std::vector<Privilege>* out) {
ActionSet actions;
actions.addAction(ActionType::replSetAdd);
out->push_back(Privilege(AuthorizationManager::SERVER_RESOURCE_NAME, actions));
}
CmdReplSetAdd() : ReplSetCommand("replSetAdd") { }
virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {*/
/*if ( cmdObj["replSetAdd"] != String ) {
errmsg = "no hostname specified";
return false;
}*/
/*if( !check(errmsg, result) )
return false;
cout << "[MYCODE] ReplSetAdd CMDOBJ:" << cmdObj.toString() << endl;
string addedHost = cmdObj["replSetAdd"].String();
bool wantPrimary = cmdObj["primary"].Bool();
int addedHostID = cmdObj["id"].Int();
BSONObj config = theReplSet->getConfig().asBson().getOwned();
cout << "[MYCODE] ReplSetAdd CONFIGPRINT:" << config.toString() << "\n";
string id = config["_id"].String();
int version = config["version"].Int();
vector<ReplSetConfig::MemberCfg> configMembers = theReplSet->config().members;
string myid = theReplSet->config()._id;
int max = 0;
for( vector<ReplSetConfig::MemberCfg>::const_iterator i = configMembers.begin(); i != configMembers.end(); i++ ) {
// we know we're up
if (i->h.isSelf()) {
continue;
}
BSONObj res;
{
bool ok = false;
try {
int theirVersion = -1000;
ok = requestHeartbeat(myid, "", i->h.toString(), res, -1, theirVersion, false);
if( max >= theirVersion ) {
max = theirVersion;
}
}
catch(DBException& e) {
log() << "replSet cmufcc requestHeartbeat " << i->h.toString() << " : " << e.toString() << rsLog;
}
catch(...) {
log() << "replSet cmufcc error exception in requestHeartbeat?" << rsLog;
}
}
}
version = max > version ? max : version;
version++;
vector<BSONElement> members = config["members"].Array();
BSONObjBuilder update;
update.append("_id", id);
update.append("version", version);
BSONArrayBuilder newMember(update.subarrayStart("members"));
double maxPr = 1;
for (vector<BSONElement>::iterator it = members.begin(); it != members.end(); it++)
{
BSONObj hostObj = (*it).Obj();
cout << "[MYCODE] ReplSetAdd MEMBER:" << hostObj.toString() << endl;
newMember.append(*it);
if (hostObj["priority"].ok() && maxPr < hostObj["priority"].Double())
maxPr = hostObj["priority"].Double();
}
if (wantPrimary)
newMember.append(BSON("host" << addedHost << "_id" << addedHostID << "priority" << maxPr + 1));
else
newMember.append(BSON("host" << addedHost << "_id" << addedHostID));
newMember.done();
BSONObj updateObj = update.done();
printf("[MYCODE] ReplSetAdd UPDATE: %s\n", updateObj.toString().c_str());
try
{
scoped_ptr<ReplSetConfig> newConfig
(ReplSetConfig::make(updateObj, true));
log() << "replSet replSetReconfig config object parses ok, " <<
newConfig->members.size() << " members specified" << rsLog;
if( !ReplSetConfig::legalChange(theReplSet->getConfig(), *newConfig, errmsg) ) {
return false;
}
checkMembersUpForConfigChange(*newConfig, result, false);
log() << "replSet replSetReconfig [2]" << rsLog;
theReplSet->haveNewConfig(*newConfig, true);
ReplSet::startupStatusMsg.set("replSetReconfig'd");
}
catch(DBException &e) {
cout << "[MYCODE] ReplSetRemove Trying to remove the host" << addedHost << "threw exception: " << e.toString() << endl;
}
cout << "[MYCODE] Replica Set Current Version:" << theReplSet->config().version << " Local Computed Version:" << version << endl;
BSONObj cmd = BSON("replSetReconfig" << updateObj << "force" << true);
BSONObj info;
for( vector<ReplSetConfig::MemberCfg>::const_iterator i = configMembers.begin(); i != configMembers.end(); i++ ) {
string hostStr = i->h.toString();
cout << "[MYCODE] Sending replSetReconfig to Host:" << hostStr << endl;
if (i->h.isSelf())
continue;
scoped_ptr<ScopedDbConnection> conn(
ScopedDbConnection::getInternalScopedDbConnection(hostStr));
try
{
if (!conn->get()->runCommand("admin", cmd, info, 0))
{
cout << "[MYCODE] ReplSetAdd failed to reconfigure the replica set\n";
}
string errmsg = conn->get()->getLastError();
cout << "[MYCODE] ReplSetAdd Error:" << errmsg << endl;
}
catch(DBException &e) {
cout << "[MYCODE] ReplSetAdd Trying to add the host " << addedHost << " threw exception: " << e.toString() << endl;
}
conn->done();
}
scoped_ptr<ScopedDbConnection> hostConn(
ScopedDbConnection::getInternalScopedDbConnection(addedHost));
try
{
if (!hostConn->get()->runCommand("admin", cmd, info, 0))
{
cout << "[MYCODE] ReplSetAdd failed to reconfigure the replica set\n";
}
string errmsg = hostConn->get()->getLastError();
cout << "[MYCODE] ReplSetAdd Error:" << errmsg << endl;
if (wantPrimary)
{
theReplSet->stepDown(120);
hostConn->get()->runCommand("admin", BSON("replSetLeader" << 1 << "priority" << maxPr + 1), info, 0);
}
}
catch(DBException &e) {
cout << "[MYCODE] ReplSetAdd Trying to add the host " << addedHost << " threw exception: " << e.toString() << endl;
}
hostConn->done();
return true;
}
} cmdReplSetAdd;*/

class CmdReplayOplog : public ReplSetCommand {
private:
OplogReader oplogReader;
Expand Down Expand Up @@ -957,6 +786,11 @@ namespace mongo {
proposedKey, globalMin, globalMax,
splitPoints, assignments,removedReplicas);


int _tailingQueryOptions = QueryOption_SlaveOk;
_tailingQueryOptions |= QueryOption_CursorTailable | QueryOption_OplogReplay;
oplogReader.setTailingQueryOptions(_tailingQueryOptions);

//replay the oplog
success = replayOplog(errmsg, ns, startTime, endTime, primary, shardID, numChunks, replayAllOps,
proposedKey, globalMin, globalMax,
Expand Down Expand Up @@ -1024,7 +858,7 @@ namespace mongo {
}

void printLogID() {
cout<<"[MYCODE_HOLLA] ";
log()<<"[MYCODE_HOLLA] ";
}

void printExtractedArgs(string& ns, OpTime& startTime, OpTime& endTime, string& primary, int& shardID, int& numChunks, bool& replayAllOps,
Expand Down
2 changes: 1 addition & 1 deletion src/mongo/db/repl/rs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ namespace mongo {
// will fail with "not master" (of course client could check result code, but in
// case they are not)
log() << "replSet closing client sockets after relinquishing primary" << rsLog;
MessagingPort::closeAllSockets(ScopedConn::keepOpen);
//MessagingPort::closeAllSockets(ScopedConn::keepOpen);
}
else if( box.getState().startup2() ) {
// This block probably isn't necessary
Expand Down
6 changes: 3 additions & 3 deletions src/mongo/db/repl/rs_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ namespace replset {
bool flag = false;
if (cli.exists(rsSettingNS))
{
log() << "[MYCODE] Checking for throttle value" << endl;
//log() << "[MYCODE] Checking for throttle value" << endl;
try
{
scoped_ptr<DBClientCursor> cursor(cli.query( rsSettingNS, Query() ));
Expand All @@ -102,10 +102,10 @@ namespace replset {
log() << "[MYCODE] dbexception: findOne call failed for " << rsSettingNS << endl;
}

log() << "[MYCODE] Throttle value: " << throttleObj.toString() << endl;
//log() << "[MYCODE] Throttle value: " << throttleObj.toString() << endl;
if (!throttleObj["stopped"].eoo())
{
log() << "[MYCODE] Checking for stopped value" << endl;
//log() << "[MYCODE] Checking for stopped value" << endl;
bool stopped = throttleObj["stopped"].Bool();
if (stopped)
{
Expand Down
Loading

0 comments on commit 7082a23

Please sign in to comment.