Skip to content

Commit

Permalink
SERVER-27149 Don't sync from nodes in an older term.
Browse files Browse the repository at this point in the history
  • Loading branch information
stbrody committed Nov 22, 2016
1 parent 524cc65 commit 1384027
Show file tree
Hide file tree
Showing 15 changed files with 170 additions and 122 deletions.
3 changes: 1 addition & 2 deletions src/mongo/db/repl/data_replicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1223,8 +1223,7 @@ void DataReplicator::_setState_inlock(const DataReplicatorState& newState) {
}

StatusWith<HostAndPort> DataReplicator::_chooseSyncSource_inlock() {
auto syncSource =
_opts.syncSourceSelector->chooseNewSyncSource(_lastFetched.opTime.getTimestamp());
auto syncSource = _opts.syncSourceSelector->chooseNewSyncSource(_lastFetched.opTime);
if (syncSource.empty()) {
return Status{ErrorCodes::InvalidSyncSource,
str::stream() << "No valid sync source available. Our last fetched optime: "
Expand Down
6 changes: 3 additions & 3 deletions src/mongo/db/repl/data_replicator_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class SyncSourceSelectorMock : public SyncSourceSelector {
public:
SyncSourceSelectorMock(const HostAndPort& syncSource) : _syncSource(syncSource) {}
void clearSyncSourceBlacklist() override {}
HostAndPort chooseNewSyncSource(const Timestamp& ts) override {
HostAndPort chooseNewSyncSource(const OpTime& ot) override {
HostAndPort result = _syncSource;
return result;
}
Expand Down Expand Up @@ -120,8 +120,8 @@ class DataReplicatorTest : public executor::ThreadPoolExecutorTest, public SyncS
void clearSyncSourceBlacklist() override {
_syncSourceSelector->clearSyncSourceBlacklist();
}
HostAndPort chooseNewSyncSource(const Timestamp& ts) override {
return _syncSourceSelector->chooseNewSyncSource(ts);
HostAndPort chooseNewSyncSource(const OpTime& ot) override {
return _syncSourceSelector->chooseNewSyncSource(ot);
}
void blacklistSyncSource(const HostAndPort& host, Date_t until) override {
_syncSourceSelector->blacklistSyncSource(host, until);
Expand Down
2 changes: 1 addition & 1 deletion src/mongo/db/repl/oplogreader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ void OplogReader::connectToSyncSource(OperationContext* txn,
invariant(conn() == NULL);

while (true) {
HostAndPort candidate = replCoord->chooseNewSyncSource(lastOpTimeFetched.getTimestamp());
HostAndPort candidate = replCoord->chooseNewSyncSource(lastOpTimeFetched);

if (candidate.empty()) {
if (oldestOpTimeSeen == sentinel) {
Expand Down
6 changes: 3 additions & 3 deletions src/mongo/db/repl/replication_coordinator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3037,15 +3037,15 @@ bool ReplicationCoordinatorImpl::isReplEnabled() const {
return getReplicationMode() != modeNone;
}

HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const Timestamp& lastTimestampFetched) {
HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const OpTime& lastOpTimeFetched) {
LockGuard topoLock(_topoMutex);

HostAndPort oldSyncSource = _topCoord->getSyncSourceAddress();
auto chainingPreference = isCatchingUp()
? TopologyCoordinator::ChainingPreference::kAllowChaining
: TopologyCoordinator::ChainingPreference::kUseConfiguration;
HostAndPort newSyncSource = _topCoord->chooseNewSyncSource(
_replExecutor.now(), lastTimestampFetched, chainingPreference);
HostAndPort newSyncSource =
_topCoord->chooseNewSyncSource(_replExecutor.now(), lastOpTimeFetched, chainingPreference);

stdx::lock_guard<stdx::mutex> lock(_mutex);
// If we lost our sync source, schedule new heartbeats immediately to update our knowledge
Expand Down
2 changes: 1 addition & 1 deletion src/mongo/db/repl/replication_coordinator_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class ReplicationCoordinatorImpl : public ReplicationCoordinator {

virtual bool isReplEnabled() const override;

virtual HostAndPort chooseNewSyncSource(const Timestamp& lastTimestampFetched) override;
virtual HostAndPort chooseNewSyncSource(const OpTime& lastOpTimeFetched) override;

virtual void blacklistSyncSource(const HostAndPort& host, Date_t until) override;

Expand Down
2 changes: 1 addition & 1 deletion src/mongo/db/repl/replication_coordinator_mock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ Status ReplicationCoordinatorMock::checkReplEnabledForCommand(BSONObjBuilder* re
return Status::OK();
}

HostAndPort ReplicationCoordinatorMock::chooseNewSyncSource(const Timestamp& lastTimestampFetched) {
HostAndPort ReplicationCoordinatorMock::chooseNewSyncSource(const OpTime& lastOpTimeFetched) {
return HostAndPort();
}

Expand Down
2 changes: 1 addition & 1 deletion src/mongo/db/repl/replication_coordinator_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ class ReplicationCoordinatorMock : public ReplicationCoordinator {

virtual Status checkReplEnabledForCommand(BSONObjBuilder* result);

virtual HostAndPort chooseNewSyncSource(const Timestamp& lastTimestampFetched);
virtual HostAndPort chooseNewSyncSource(const OpTime& lastOpTimeFetched);

virtual void blacklistSyncSource(const HostAndPort& host, Date_t until);

Expand Down
2 changes: 1 addition & 1 deletion src/mongo/db/repl/sync_source_resolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ bool SyncSourceResolver::_isShuttingDown() const {
StatusWith<HostAndPort> SyncSourceResolver::_chooseNewSyncSource() {
HostAndPort candidate;
try {
candidate = _syncSourceSelector->chooseNewSyncSource(_lastOpTimeFetched.getTimestamp());
candidate = _syncSourceSelector->chooseNewSyncSource(_lastOpTimeFetched);
} catch (...) {
return exceptionToStatus();
}
Expand Down
8 changes: 4 additions & 4 deletions src/mongo/db/repl/sync_source_resolver_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ class TaskExecutorWithFailureInScheduleRemoteCommand : public unittest::TaskExec
class SyncSourceSelectorMock : public SyncSourceSelector {
public:
void clearSyncSourceBlacklist() override {}
HostAndPort chooseNewSyncSource(const Timestamp& ts) override {
HostAndPort chooseNewSyncSource(const OpTime& ot) override {
chooseNewSyncSourceHook();
lastTimestampFetched = ts;
lastOpTimeFetched = ot;
return syncSource;
}
void blacklistSyncSource(const HostAndPort& host, Date_t until) override {
Expand All @@ -84,7 +84,7 @@ class SyncSourceSelectorMock : public SyncSourceSelector {
}

HostAndPort syncSource = HostAndPort("host1", 1234);
Timestamp lastTimestampFetched;
OpTime lastOpTimeFetched;
stdx::function<void()> chooseNewSyncSourceHook = []() {};

HostAndPort blacklistHost;
Expand Down Expand Up @@ -264,7 +264,7 @@ TEST_F(SyncSourceResolverTest,
// Resolver invokes callback with empty host and becomes inactive immediately.
ASSERT_FALSE(_resolver->isActive());
ASSERT_EQUALS(HostAndPort(), unittest::assertGet(_response.syncSourceStatus));
ASSERT_EQUALS(lastOpTimeFetched.getTimestamp(), _selector->lastTimestampFetched);
ASSERT_EQUALS(lastOpTimeFetched, _selector->lastOpTimeFetched);

// Cannot restart a completed resolver.
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, _resolver->startup());
Expand Down
2 changes: 1 addition & 1 deletion src/mongo/db/repl/sync_source_selector.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class SyncSourceSelector {
/**
* Chooses a viable sync source, or, if none available, returns empty HostAndPort.
*/
virtual HostAndPort chooseNewSyncSource(const Timestamp& lastTimestampFetched) = 0;
virtual HostAndPort chooseNewSyncSource(const OpTime& lastOpTimeFetched) = 0;

/**
* Blacklists choosing 'host' as a sync source until time 'until'.
Expand Down
2 changes: 1 addition & 1 deletion src/mongo/db/repl/topology_coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class TopologyCoordinator {
* Chooses and sets a new sync source, based on our current knowledge of the world.
*/
virtual HostAndPort chooseNewSyncSource(Date_t now,
const Timestamp& lastTimestampFetched,
const OpTime& lastOpTimeFetched,
ChainingPreference chainingPreference) = 0;

/**
Expand Down
10 changes: 5 additions & 5 deletions src/mongo/db/repl/topology_coordinator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ HostAndPort TopologyCoordinatorImpl::getSyncSourceAddress() const {
}

HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now,
const Timestamp& lastTimestampFetched,
const OpTime& lastOpTimeFetched,
ChainingPreference chainingPreference) {
// If we are not a member of the current replica set configuration, no sync source is valid.
if (_selfIndex == -1) {
Expand Down Expand Up @@ -305,12 +305,12 @@ HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now,
}
}
// only consider candidates that are ahead of where we are
if (it->getAppliedOpTime().getTimestamp() <= lastTimestampFetched) {
if (it->getAppliedOpTime() <= lastOpTimeFetched) {
LOG(1) << "Cannot select sync source equal to or behind our last fetched optime. "
<< "My last fetched oplog timestamp: " << lastTimestampFetched.toBSON()
<< ", latest oplog timestamp of sync candidate "
<< "My last fetched oplog optime: " << lastOpTimeFetched.toBSON()
<< ", latest oplog optime of sync candidate "
<< itMemberConfig.getHostAndPort() << ": "
<< it->getAppliedOpTime().getTimestamp().toBSON();
<< it->getAppliedOpTime().toBSON();
continue;
}
// Candidate cannot be more latent than anything we've already considered.
Expand Down
4 changes: 2 additions & 2 deletions src/mongo/db/repl/topology_coordinator_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ class TopologyCoordinatorImpl : public TopologyCoordinator {
virtual UpdateTermResult updateTerm(long long term, Date_t now);
virtual void setForceSyncSourceIndex(int index);
virtual HostAndPort chooseNewSyncSource(Date_t now,
const Timestamp& lastTimestampFetched,
ChainingPreference chainingPreference);
const OpTime& lastOpTimeFetched,
ChainingPreference chainingPreference) override;
virtual void blacklistSyncSource(const HostAndPort& host, Date_t until);
virtual void unblacklistSyncSource(const HostAndPort& host, Date_t now);
virtual void clearSyncSourceBlacklist();
Expand Down
Loading

0 comments on commit 1384027

Please sign in to comment.