Skip to content

Commit

Permalink
Disable fast update path restarts by default
Browse files Browse the repository at this point in the history
Even with the fix in #11561 we are still observing replica
divergence warnings in the logs. Disabling this feature entirely
until the issue has been fully investigated and a complete fix
has been implemented.

Also emit a log message when the distributor has forced convergence
of a detected inconsistent update.
  • Loading branch information
vekterli committed Dec 20, 2019
1 parent d1ef9ba commit e577768
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 25 deletions.
44 changes: 22 additions & 22 deletions storage/src/tests/distributor/twophaseupdateoperationtest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1048,28 +1048,28 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_do
}

TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_replica_set_altered_between_get_send_and_receive) {
setupDistributor(3, 3, "storage:3 distributor:1");
getConfig().set_update_fast_path_restart_enabled(true);

std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
DistributorMessageSenderStub sender;
cb->start(sender, framework::MilliSecTime(0));

// Replica set changes between time of Get requests sent and
// responses received. This may happen e.g. if concurrent mutations
// to the same bucket create a new replica. If this happens, we
// must not send the Update operations verbatim, as they will
// be started with the _current_ replica set, not the one that
// was present during the Get request.
BucketId bucket(0x400000000000cac4); // Always the same in the test.
addNodesToBucketDB(bucket, "0=1/2/3,1=2/3/4,2=3/3/3");

Timestamp old_timestamp = 500;
ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
replyToGet(*cb, sender, 0, old_timestamp);
replyToGet(*cb, sender, 1, old_timestamp);

ASSERT_EQ("Put => 1,Put => 2,Put => 0", sender.getCommands(true, false, 2));
setupDistributor(3, 3, "storage:3 distributor:1");
getConfig().set_update_fast_path_restart_enabled(true);

std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
DistributorMessageSenderStub sender;
cb->start(sender, framework::MilliSecTime(0));

// Replica set changes between time of Get requests sent and
// responses received. This may happen e.g. if concurrent mutations
// to the same bucket create a new replica. If this happens, we
// must not send the Update operations verbatim, as they will
// be started with the _current_ replica set, not the one that
// was present during the Get request.
BucketId bucket(0x400000000000cac4); // Always the same in the test.
addNodesToBucketDB(bucket, "0=1/2/3,1=2/3/4,2=3/3/3");

Timestamp old_timestamp = 500;
ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
replyToGet(*cb, sender, 0, old_timestamp);
replyToGet(*cb, sender, 1, old_timestamp);

ASSERT_EQ("Put => 1,Put => 2,Put => 0", sender.getCommands(true, false, 2));
}

// XXX currently differs in behavior from content nodes in that updates for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,4 +213,4 @@ use_btree_database bool default=false restart
## content nodes, the two-phase update path reverts back to the regular fast path.
## Since all replicas of the document were in sync, applying the update in-place
## shall be considered safe.
restart_with_fast_update_path_if_all_get_timestamps_are_consistent bool default=true
restart_with_fast_update_path_if_all_get_timestamps_are_consistent bool default=false
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ TwoPhaseUpdateOperation::TwoPhaseUpdateOperation(
_bucketSpace(bucketSpace),
_sendState(SendState::NONE_SENT),
_mode(Mode::FAST_PATH),
_fast_path_repair_source_node(0xffff),
_replySent(false)
{
document::BucketIdFactory idFactory;
Expand All @@ -52,7 +53,7 @@ struct IntermediateMessageSender : DistributorMessageSender {
std::shared_ptr<api::StorageReply> _reply;

IntermediateMessageSender(SentMessageMap& mm, std::shared_ptr<Operation> cb, DistributorMessageSender & fwd);
~IntermediateMessageSender();
~IntermediateMessageSender() override;

void sendCommand(const std::shared_ptr<api::StorageCommand>& cmd) override {
msgMap.insert(cmd->getMsgId(), callback);
Expand Down Expand Up @@ -312,11 +313,12 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorMessageSender& sender,
LOG(debug, "Update(%s) fast path: was inconsistent!", _updateCmd->getDocumentId().toString().c_str());

_updateReply = intermediate._reply;
_fast_path_repair_source_node = bestNode.second;
document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), bestNode.first);
auto cmd = std::make_shared<api::GetCommand>(bucket, _updateCmd->getDocumentId(), "[all]");
copyMessageSettings(*_updateCmd, *cmd);

sender.sendToNode(lib::NodeType::STORAGE, bestNode.second, cmd);
sender.sendToNode(lib::NodeType::STORAGE, _fast_path_repair_source_node, cmd);
transitionTo(SendState::GETS_SENT);
}
}
Expand All @@ -325,6 +327,8 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorMessageSender& sender,
// PUTs are done.
addTraceFromReply(*intermediate._reply);
sendReplyWithResult(sender, intermediate._reply->getResult());
LOG(warning, "Forced convergence of '%s' using document from node %u",
_updateCmd->getDocumentId().toString().c_str(), _fast_path_repair_source_node);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ class TwoPhaseUpdateOperation : public SequencedOperation
mbus::TraceNode _trace;
document::BucketId _updateDocBucketId;
std::vector<std::pair<document::BucketId, uint16_t>> _replicas_at_get_send_time;
uint16_t _fast_path_repair_source_node;
bool _replySent;
};

Expand Down

0 comments on commit e577768

Please sign in to comment.