Permalink
Browse files

SERVER-6671 add a 'v' version field to each oplog document

The version field will allow us to detect the primary's version.
We need to know which version because only newer primary oplog streams
should prevent a secondary from enforcing unique index constraints
in initial sync or recovering states.  The version field will also be
useful in the future when we want to make schema changes in the oplog.
  • Loading branch information...
1 parent 8ac8994 commit 206094501a570a75b2fae5a5eb515dc5b55182bb @milkie milkie committed Oct 3, 2012
Showing with 53 additions and 30 deletions.
  1. +21 −20 src/mongo/db/oplog.cpp
  2. +1 −0 src/mongo/db/repl/rs.cpp
  3. +13 −8 src/mongo/db/repl/rs.h
  4. +17 −2 src/mongo/db/repl/rs_sync.cpp
  5. +1 −0 src/mongo/db/repl/rs_sync.h
View
@@ -125,10 +125,30 @@ namespace mongo {
*b = EOO;
}
+ /* we write to local.oplog.rs:
+ { ts : ..., h: ..., v: ..., op: ..., ns: ..., o: ... }
+ ts: an OpTime timestamp
+ h: hash
+ v: version
+ op:
+ "i" insert
+ "u" update
+ "d" delete
+ "c" db cmd
+ "db" declares presence of a database (ns is set to the db name + '.')
+ "n" no op
+ logNS: always null! DEPRECATED
+ bb:
+ if not null, specifies a boolean to pass along to the other side as b: param.
+ used for "justOne" or "upsert" flags on 'd', 'u'
+
+ */
+
// global is safe as we are in write lock. we put the static outside the function to avoid the implicit mutex
// the compiler would use if inside the function. the reason this is static is to avoid a malloc/free for this
// on every logop call.
static BufBuilder logopbufbuilder(8*1024);
+ const static int OPLOG_VERSION = 2;
static void _logOpRS(const char *opstr, const char *ns, const char *logNS, const BSONObj& obj, BSONObj *o2, bool *bb, bool fromMigrate ) {
Lock::DBWrite lk1("local");
@@ -160,6 +180,7 @@ namespace mongo {
BSONObjBuilder b(logopbufbuilder);
b.appendTimestamp("ts", ts.asDate());
b.append("h", hashNew);
+ b.append("v", OPLOG_VERSION);
b.append("op", opstr);
b.append("ns", ns);
if (fromMigrate)
@@ -206,26 +227,6 @@ namespace mongo {
}
}
- /* we write to local.oplog.$main:
- { ts : ..., op: ..., ns: ..., o: ... }
- ts: an OpTime timestamp
- op:
- "i" insert
- "u" update
- "d" delete
- "c" db cmd
- "db" declares presence of a database (ns is set to the db name + '.')
- "n" no op
- logNS: where to log it. 0/null means "local.oplog.$main".
- bb:
- if not null, specifies a boolean to pass along to the other side as b: param.
- used for "justOne" or "upsert" flags on 'd', 'u'
- first: true
- when set, indicates this is the first thing we have logged for this database.
- thus, the slave does not need to copy down all the data when it sees this.
-
- note this is used for single collection logging even when --replSet is enabled.
- */
static void _logOpOld(const char *opstr, const char *ns, const char *logNS, const BSONObj& obj, BSONObj *o2, bool *bb, bool fromMigrate ) {
Lock::DBWrite lk("local");
static BufBuilder bufbuilder(8*1024); // todo there is likely a mutex on this constructor
View
@@ -419,6 +419,7 @@ namespace mongo {
ghost(0),
_writerPool(replWriterThreadCount),
_prefetcherPool(replPrefetcherThreadCount),
+ oplogVersion(0),
_indexPrefetchConfig(PREFETCH_ALL) {
}
View
@@ -544,6 +544,8 @@ namespace mongo {
void syncThread();
const OpTime lastOtherOpTime() const;
static void setMinValid(BSONObj obj);
+
+ int oplogVersion;
private:
IndexPrefetchConfig _indexPrefetchConfig;
};
@@ -682,15 +684,18 @@ namespace mongo {
if (!theReplSet) return false;
// see SERVER-6671
MemberState ms = theReplSet->state();
- if ((ms == MemberState::RS_STARTUP2) ||
- (ms == MemberState::RS_RECOVERING) ||
- (ms == MemberState::RS_ROLLBACK)) {
- // Never ignore _id index
- if (!idx.isIdIndex()) {
- return true;
- }
+ if (! ((ms == MemberState::RS_STARTUP2) ||
+ (ms == MemberState::RS_RECOVERING) ||
+ (ms == MemberState::RS_ROLLBACK))) {
+ return false;
}
- return false;
+ // 2 is the oldest oplog version where operations
+ // are fully idempotent.
+ if (theReplSet->oplogVersion < 2) return false;
+ // Never ignore _id index
+ if (idx.isIdIndex()) return false;
+
+ return true;
}
}
@@ -265,8 +265,8 @@ namespace replset {
break;
}
}
-
-
+ setOplogVersion(ops.getDeque().front());
+
multiApply(ops.getDeque(), func);
n += ops.getDeque().size();
@@ -311,6 +311,19 @@ namespace replset {
oplogApplySegment(applyGTEObj, minValidObj, multiSyncApply);
}
+ void SyncTail::setOplogVersion(const BSONObj& op) {
+ BSONElement version = op["v"];
+ // old primaries do not get the unique index ignoring feature
+ // because some of their ops are not imdepotent, see
+ // SERVER-7186
+ if (version.eoo()) {
+ theReplSet->oplogVersion = 1;
+ RARELY log() << "warning replset primary is an older version than we are; upgrade recommended" << endl;
+ } else {
+ theReplSet->oplogVersion = version.Int();
+ }
+ }
+
/* tail an oplog. ok to return, will be re-called. */
void SyncTail::oplogApplication() {
while( 1 ) {
@@ -379,6 +392,7 @@ namespace replset {
}
const BSONObj& lastOp = ops.getDeque().back();
+ setOplogVersion(lastOp);
handleSlaveDelay(lastOp);
// Set minValid to the last op to be applied in this next batch.
@@ -415,6 +429,7 @@ namespace replset {
// otherwise, apply what we have
return true;
}
+
// check for commands
if ((op["op"].valuestrsafe()[0] == 'c') ||
// Index builds are acheived through the use of an insert op, not a command op.
@@ -98,6 +98,7 @@ namespace replset {
void fillWriterVectors(const std::deque<BSONObj>& ops,
std::vector< std::vector<BSONObj> >* writerVectors);
void handleSlaveDelay(const BSONObj& op);
+ void setOplogVersion(const BSONObj& op);
};
/**

0 comments on commit 2060945

Please sign in to comment.