Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid fast past update restart race with concurrently created replica #11561

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 29 additions & 1 deletion storage/src/tests/distributor/twophaseupdateoperationtest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,10 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_ge
}

TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_ownership_changes_between_get_and_restarted_fast_path_updates) {
do_test_ownership_changed_between_gets_and_second_phase(70, 70, 0); // Timestamps in sync -> Update restart
// TODO find a way to test this case properly again since this test now triggers
// the "replica set has changed" check and does not actually restart with a fast
// update path.
do_test_ownership_changed_between_gets_and_second_phase(70, 70, 70); // Timestamps in sync -> Update restart
}

TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_mismatch_fails_with_tas_error) {
Expand Down Expand Up @@ -1044,6 +1047,31 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_do
EXPECT_EQ(0, metrics.fast_path_restarts.getValue());
}

TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_replica_set_altered_between_get_send_and_receive) {
setupDistributor(3, 3, "storage:3 distributor:1");
getConfig().set_update_fast_path_restart_enabled(true);

std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas.
DistributorMessageSenderStub sender;
cb->start(sender, framework::MilliSecTime(0));

// Replica set changes between time of Get requests sent and
// responses received. This may happen e.g. if concurrent mutations
// to the same bucket create a new replica. If this happens, we
// must not send the Update operations verbatim, as they will
// be started with the _current_ replica set, not the one that
// was present during the Get request.
BucketId bucket(0x400000000000cac4); // Always the same in the test.
addNodesToBucketDB(bucket, "0=1/2/3,1=2/3/4,2=3/3/3");

Timestamp old_timestamp = 500;
ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true));
replyToGet(*cb, sender, 0, old_timestamp);
replyToGet(*cb, sender, 1, old_timestamp);

ASSERT_EQ("Put => 1,Put => 2,Put => 0", sender.getCommands(true, false, 2));
}

// XXX currently differs in behavior from content nodes in that updates for
// document IDs without explicit doctypes will _not_ be auto-failed on the
// distributor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ GetOperation::assignTargetNodeGroups(const BucketDatabase::ReadGuard& read_guard
for (uint32_t i = 0; i < e->getNodeCount(); i++) {
const BucketCopy& copy = e->getNodeRef(i);

// TODO this could ideally be a set
_replicas_in_db.emplace_back(e.getBucketId(), copy.getNode());

if (!copy.valid()) {
_responses[GroupId(e.getBucketId(), copy.getChecksum(), copy.getNode())].push_back(copy);
} else if (!copy.empty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ class GetOperation : public Operation
// Exposed for unit testing. TODO feels a bit dirty :I
const DistributorBucketSpace& bucketSpace() const noexcept { return _bucketSpace; }

const std::vector<std::pair<document::BucketId, uint16_t>>& replicas_in_db() const noexcept {
return _replicas_in_db;
}

private:
class GroupId {
public:
Expand Down Expand Up @@ -88,6 +92,7 @@ class GetOperation : public Operation

PersistenceOperationMetricSet& _metric;
framework::MilliSecTimer _operationTimer;
std::vector<std::pair<document::BucketId, uint16_t>> _replicas_in_db;
bool _has_replica_inconsistency;

void sendReply(DistributorMessageSender& sender);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ TwoPhaseUpdateOperation::TwoPhaseUpdateOperation(
_updateDocBucketId = idFactory.getBucketId(_updateCmd->getDocumentId());
}

TwoPhaseUpdateOperation::~TwoPhaseUpdateOperation() {}
TwoPhaseUpdateOperation::~TwoPhaseUpdateOperation() = default;

namespace {

Expand Down Expand Up @@ -183,6 +183,7 @@ TwoPhaseUpdateOperation::startSafePathUpdate(DistributorMessageSender& sender)
_manager, _bucketSpace, _bucketSpace.getBucketDatabase().acquire_read_guard(), get, _getMetric);
GetOperation & op = *getOperation;
IntermediateMessageSender intermediate(_sentMessageMap, std::move(getOperation), sender);
_replicas_at_get_send_time = op.replicas_in_db(); // Populated at construction time, not at start()-time
op.start(intermediate, _manager.getClock().getTimeInMillis());
transitionTo(SendState::GETS_SENT);

Expand Down Expand Up @@ -408,7 +409,23 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorMessageSender& sen
bool TwoPhaseUpdateOperation::may_restart_with_fast_path(const api::GetReply& reply) {
return (_manager.getDistributor().getConfig().update_fast_path_restart_enabled() &&
reply.wasFound() &&
reply.had_consistent_replicas());
reply.had_consistent_replicas() &&
replica_set_unchanged_after_get_operation());
}

bool TwoPhaseUpdateOperation::replica_set_unchanged_after_get_operation() const {
std::vector<BucketDatabase::Entry> entries;
_bucketSpace.getBucketDatabase().getParents(_updateDocBucketId, entries);

std::vector<std::pair<document::BucketId, uint16_t>> replicas_in_db_now;
for (uint32_t j = 0; j < entries.size(); ++j) {
const auto& e = entries[j];
for (uint32_t i = 0; i < e->getNodeCount(); i++) {
const auto& copy = e->getNodeRef(i);
replicas_in_db_now.emplace_back(e.getBucketId(), copy.getNode());
}
}
return (replicas_in_db_now == _replicas_at_get_send_time);
}

void TwoPhaseUpdateOperation::restart_with_fast_path_due_to_consistent_get_timestamps(DistributorMessageSender& sender) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class TwoPhaseUpdateOperation : public SequencedOperation
void replyWithTasFailure(DistributorMessageSender& sender,
vespalib::stringref message);
bool may_restart_with_fast_path(const api::GetReply& reply);
bool replica_set_unchanged_after_get_operation() const;
void restart_with_fast_path_due_to_consistent_get_timestamps(DistributorMessageSender& sender);

UpdateMetricSet& _updateMetric;
Expand All @@ -136,6 +137,7 @@ class TwoPhaseUpdateOperation : public SequencedOperation
Mode _mode;
mbus::TraceNode _trace;
document::BucketId _updateDocBucketId;
std::vector<std::pair<document::BucketId, uint16_t>> _replicas_at_get_send_time;
bool _replySent;
};

Expand Down