Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

SERVER-6671 end batch early if oplog version change is detected

  • Loading branch information...
commit edd2fa6fea3f52e1e9ac06f26a7f60d5758adae9 1 parent 8e66053
@milkie milkie authored
View
24 src/mongo/db/repl/rs_sync.cpp
@@ -39,7 +39,7 @@ namespace mongo {
namespace replset {
SyncTail::SyncTail(BackgroundSyncInterface *q) :
- Sync(""), _networkQueue(q)
+ Sync(""), oplogVersion(0), _networkQueue(q)
{}
SyncTail::~SyncTail() {}
@@ -444,6 +444,28 @@ namespace replset {
return true;
}
+ // check for oplog version change
+ BSONElement elemVersion = op["v"];
+ int curVersion = 0;
+ if (elemVersion.eoo())
+ // missing version means version 1
+ curVersion = 1;
+ else
+ curVersion = elemVersion.Int();
+
+ if (curVersion != oplogVersion) {
+ // Version changes cause us to end a batch.
+ // If we are starting a new batch, reset version number
+ // and continue.
+ if (ops->empty()) {
+ oplogVersion = curVersion;
+ }
+ else {
+ // End batch early
+ return true;
+ }
+ }
+
// Copy the op to the deque and remove it from the bgsync queue.
ops->push_back(op);
_networkQueue->consume();
View
3  src/mongo/db/repl/rs_sync.h
@@ -105,6 +105,9 @@ namespace replset {
// Initial Sync and Sync Tail each use a different function.
void multiApply(std::deque<BSONObj>& ops, MultiSyncApplyFunc applyFunc);
+ // The version of the last op to be read
+ int oplogVersion;
+
private:
BackgroundSyncInterface* _networkQueue;
View
18 src/mongo/dbtests/replsettests.cpp
@@ -407,7 +407,8 @@ namespace ReplSetTests {
class TestRSSync : public Base {
- void addOp(const string& op, BSONObj o, BSONObj* o2 = 0, const char* coll = 0) {
+ void addOp(const string& op, BSONObj o, BSONObj* o2 = NULL, const char* coll = NULL,
+ int version = 0) {
OpTime ts;
{
Lock::GlobalWrite lk;
@@ -416,6 +417,9 @@ namespace ReplSetTests {
BSONObjBuilder b;
b.appendTimestamp("ts", ts.asLL());
+ if (version != 0) {
+ b.append("v", version);
+ }
b.append("op", op);
b.append("o", o);
@@ -439,6 +443,12 @@ namespace ReplSetTests {
}
}
+ void addVersionedInserts(int expected) {
+ for (int i=0; i < expected; i++) {
+ addOp("i", BSON("_id" << i << "x" << 789), NULL, NULL, i);
+ }
+ }
+
void addUpdates() {
BSONObj id = BSON("_id" << "123456something");
addOp("i", id);
@@ -475,6 +485,12 @@ namespace ReplSetTests {
ASSERT_EQUALS(expected, static_cast<int>(client()->count(ns())));
drop();
+ addVersionedInserts(100);
+ applyOplog();
+
+ ASSERT_EQUALS(expected, static_cast<int>(client()->count(ns())));
+
+ drop();
addUpdates();
applyOplog();
Please sign in to comment.
Something went wrong with that request. Please try again.