Skip to content

Commit

Permalink
SERVER-25145 OplogReader::connectToSyncSource selects sync sources ba…
Browse files Browse the repository at this point in the history
…sed on required optime if given
  • Loading branch information
benety committed Nov 3, 2016
1 parent fb1cc32 commit d16d537
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 1 deletion.
51 changes: 51 additions & 0 deletions src/mongo/db/repl/oplogreader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,36 @@ HostAndPort OplogReader::getHost() const {
return _host;
}

Status OplogReader::_compareRequiredOpTimeWithQueryResponse(const OpTime& requiredOpTime) {
auto containsMinValid = more();
if (!containsMinValid) {
return Status(
ErrorCodes::NoMatchingDocument,
"remote oplog does not contain entry with optime matching our required optime");
}
auto doc = nextSafe();
const auto opTime = fassertStatusOK(40351, OpTime::parseFromOplogEntry(doc));
if (requiredOpTime != opTime) {
return Status(ErrorCodes::BadValue,
str::stream() << "remote oplog contain entry with matching timestamp "
<< opTime.getTimestamp().toString()
<< " but optime "
<< opTime.toString()
<< " does not "
"match our required optime");
}
if (requiredOpTime.getTerm() != opTime.getTerm()) {
return Status(ErrorCodes::BadValue,
str::stream() << "remote oplog contain entry with term " << opTime.getTerm()
<< " that does not "
"match the term in our required optime");
}
return Status::OK();
}

void OplogReader::connectToSyncSource(OperationContext* txn,
const OpTime& lastOpTimeFetched,
const OpTime& requiredOpTime,
ReplicationCoordinator* replCoord) {
const Timestamp sentinelTimestamp(duration_cast<Seconds>(Date_t::now().toDurationSinceEpoch()),
0);
Expand Down Expand Up @@ -192,6 +220,29 @@ void OplogReader::connectToSyncSource(OperationContext* txn,
continue;
}

// Check if sync source contains required optime.
if (!requiredOpTime.isNull()) {
// This query is structured so that it is executed on the sync source using the oplog
// start hack (oplogReplay=true and $gt/$gte predicate over "ts").
auto ts = requiredOpTime.getTimestamp();
tailingQuery(rsOplogName.c_str(), BSON("ts" << BSON("$gte" << ts << "$lte" << ts)));
auto status = _compareRequiredOpTimeWithQueryResponse(requiredOpTime);
if (!status.isOK()) {
const auto blacklistDuration = Seconds(60);
const auto until = Date_t::now() + blacklistDuration;
warning() << "We cannot use " << candidate.toString()
<< " as a sync source because it does not contain the necessary "
"operations for us to reach a consistent state: "
<< status << " last fetched optime: " << lastOpTimeFetched
<< ". required optime: " << requiredOpTime
<< ". Blacklisting this sync source for " << blacklistDuration
<< " until: " << until;
resetConnection();
replCoord->blacklistSyncSource(candidate, until);
continue;
}
resetCursor();
}

// TODO: If we were too stale (recovering with maintenance mode on), then turn it off, to
// allow becoming secondary/etc.
Expand Down
7 changes: 7 additions & 0 deletions src/mongo/db/repl/oplogreader.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,14 @@ class OplogReader {
*/
void connectToSyncSource(OperationContext* txn,
const OpTime& lastOpTimeFetched,
const OpTime& requiredOpTime,
ReplicationCoordinator* replCoord);

private:
/**
* Checks query response for required optime.
*/
Status _compareRequiredOpTimeWithQueryResponse(const OpTime& requiredOpTime);
};

} // namespace repl
Expand Down
2 changes: 1 addition & 1 deletion src/mongo/db/repl/rs_initialsync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ Status _initialSync(OperationContext* txn, BackgroundSync* bgsync) {
while (r.getHost().empty()) {
// We must prime the sync source selector so that it considers all candidates regardless
// of oplog position, by passing in null OpTime as the last op fetched time.
r.connectToSyncSource(txn, OpTime(), replCoord);
r.connectToSyncSource(txn, OpTime(), OpTime(), replCoord);

if (r.getHost().empty()) {
std::string msg =
Expand Down

0 comments on commit d16d537

Please sign in to comment.