Permalink
Browse files

fix a possible issue with initial sync with replication when updates …

…are in flight.

also correct UpdateResult in one case

Conflicts:

	db/oplog.cpp
	db/repl/rs_sync.cpp
  • Loading branch information...
kchodorow committed Oct 5, 2011
1 parent ca86100 commit 2e48cfb4d798f2a5d3d3535a1d2cbe442d086325
Showing with 91 additions and 22 deletions.
  1. +47 −4 db/oplog.cpp
  2. +1 −1 db/oplog.h
  3. +1 −1 db/repl/rs.h
  4. +40 −15 db/repl/rs_sync.cpp
  5. +2 −1 db/update.cpp
View
@@ -480,7 +480,14 @@ namespace mongo {
}
}
void applyOperation_inlock(const BSONObj& op , bool fromRepl ) {
/** @param fromRepl false if from ApplyOpsCmd
@return true if was and update should have happened and the document DNE. see replset initial sync code.
*/
bool applyOperation_inlock(const BSONObj& op , bool fromRepl ) {
assertInWriteLock();
LOG(6) << "applying op: " << op << endl;
bool failedUpdate = false;
OpCounters * opCounters = fromRepl ? &replOpCounters : &globalOpCounters;
if( logLevel >= 6 )
@@ -530,9 +537,45 @@ namespace mongo {
}
else if ( *opType == 'u' ) {
opCounters->gotUpdate();
// dm do we create this for a capped collection?
// - if not, updates would be slow
// - but if were by id would be slow on primary too so maybe ok
// - if on primary was by another key and there are other indexes, this could be very bad w/out an index
// - if do create, odd to have on secondary but not primary. also can cause secondary to block for
// quite a while on creation.
RARELY ensureHaveIdIndex(ns); // otherwise updates will be super slow
updateObjects(ns, o, op.getObjectField("o2"), /*upsert*/ op.getBoolField("b"), /*multi*/ false, /*logop*/ false , debug );
OpDebug debug;
BSONObj updateCriteria = op.getObjectField("o2");
bool upsert = fields[3].booleanSafe();
UpdateResult ur = updateObjects(ns, o, updateCriteria, upsert, /*multi*/ false, /*logop*/ false , debug );
if( ur.num == 0 ) {
if( ur.mod ) {
if( updateCriteria.nFields() == 1 ) {
// was a simple { _id : ... } update criteria
failedUpdate = true;
// todo: probably should assert in these failedUpdate cases if not in initialSync
}
// need to check to see if it isn't present so we can set failedUpdate correctly.
// note that adds some overhead for this extra check in some cases, such as an updateCriteria
// of the form
// { _id:..., { x : {$size:...} }
// thus this is not ideal.
else if( Helpers::findById(nsdetails(ns), updateCriteria).isNull() ) {
failedUpdate = true;
}
else {
// it's present; zero objects were updated because of additional specifiers in the query for idempotence
}
}
else {
// this could happen benignly on an oplog duplicate replay of an upsert
// (because we are idempotent),
// if an regular non-mod update fails the item is (presumably) missing.
if( !upsert ) {
failedUpdate = true;
}
}
}
}
else if ( *opType == 'd' ) {
opCounters->gotDelete();
@@ -557,7 +600,7 @@ namespace mongo {
ss << "unknown opType [" << opType << "]";
throw MsgAssertionException( 13141 , ss.str() );
}
return failedUpdate;
}
class ApplyOpsCmd : public Command {
View
@@ -213,5 +213,5 @@ namespace mongo {
* used for applying from an oplog
* @param fromRepl really from replication or for testing/internal/command/etc...
*/
void applyOperation_inlock(const BSONObj& op , bool fromRepl = true );
bool applyOperation_inlock(const BSONObj& op , bool fromRepl = true );
}
View
@@ -344,7 +344,7 @@ namespace mongo {
void _syncThread();
bool tryToGoLiveAsASecondary(OpTime&); // readlocks
void syncTail();
void syncApply(const BSONObj &o);
bool syncApply(const BSONObj &o);
unsigned _syncRollback(OplogReader& r);
void syncRollback(OplogReader& r);
void syncFixUp(HowToFixUp& h, OplogReader& r);
View
@@ -25,24 +25,27 @@ namespace mongo {
using namespace bson;
extern unsigned replSetForceInitialSyncFailure;
/* apply the log op that is in param o */
void ReplSetImpl::syncApply(const BSONObj &o) {
char db[MaxDatabaseNameLen];
void NOINLINE_DECL blank(const BSONObj& o) {
if( *o.getStringField("op") != 'n' ) {
log() << "replSet skipping bad op in oplog: " << o.toString() << rsLog;
}
}
/* apply the log op that is in param o
@return bool failedUpdate
*/
bool ReplSetImpl::syncApply(const BSONObj &o) {
const char *ns = o.getStringField("ns");
nsToDatabase(ns, db);
if ( *ns == '.' || *ns == 0 ) {
if( *o.getStringField("op") == 'n' )
return;
log() << "replSet skipping bad op in oplog: " << o.toString() << endl;
return;
blank(o);
return false;
}
Client::Context ctx(ns);
ctx.getClient()->curop()->reset();
/* todo : if this asserts, do we want to ignore or not? */
applyOperation_inlock(o);
return applyOperation_inlock(o);
}
/* initial oplog application, during initial sync, after cloning.
@@ -57,6 +60,7 @@ namespace mongo {
const string hn = source->h().toString();
OplogReader r;
OplogReader missingObjReader;
try {
if( !r.connect(hn) ) {
log() << "replSet initial sync error can't connect to " << hn << " to read " << rsoplog << rsLog;
@@ -133,17 +137,38 @@ namespace mongo {
throw DBException("primary changed",0);
}
if( ts >= applyGTE ) {
// optimes before we started copying need not be applied.
syncApply(o);
if( ts >= applyGTE ) { // optimes before we started copying need not be applied.
bool failedUpdate = syncApply(o);
if( failedUpdate ) {
// we don't have the object yet, which is possible on initial sync. get it.
log() << "replSet info adding missing object" << endl; // rare enough we can log
if( !missingObjReader.connect(hn) ) { // ok to call more than once
log() << "replSet initial sync fails, couldn't connect to " << hn << endl;
return false;
}
const char *ns = o.getStringField("ns");
BSONObj query = BSONObjBuilder().append(o.getObjectField("o2")[_id]).obj(); // might be more than just _id in the update criteria
BSONObj missingObj = missingObjReader.findOne(
ns,
query );
assert( !missingObj.isEmpty() );
DiskLoc d = theDataFileMgr.insert(ns, (void*) missingObj.objdata(), missingObj.objsize());
assert( !d.isNull() );
// now reapply the update from above
bool failed = syncApply(o);
if( failed ) {
log() << "replSet update still fails after adding missing object " << ns << endl;
assert(false);
}
}
}
_logOpObjRS(o); /* with repl sets we write the ops to our oplog too */
}
if( ++n % 100000 == 0 ) {
// simple progress metering
log() << "replSet initialSyncOplogApplication " << n << rsLog;
}
getDur().commitIfNeeded();
}
catch (DBException& e) {
@@ -364,7 +389,7 @@ namespace mongo {
{
const Member *primary = box.getPrimary();
if( !target->hbinfo().hbstate.readable() ||
// if we are not syncing from the primary, return (if
// it's up) so that we can try accessing it again
View
@@ -1271,7 +1271,8 @@ namespace mongo {
logOp( "i", ns, no );
return UpdateResult( 0 , 0 , 1 , no );
}
return UpdateResult( 0 , 0 , 0 );
return UpdateResult( 0 , isOperatorUpdate , 0 );
}
UpdateResult updateObjects(const char *ns, const BSONObj& updateobj, BSONObj patternOrig, bool upsert, bool multi, bool logop , OpDebug& debug ) {

0 comments on commit 2e48cfb

Please sign in to comment.