-
Notifications
You must be signed in to change notification settings - Fork 553
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
shard_placement_table
: stress test and fixes
#17194
Conversation
shard_placement_table
: stress test and fixes
ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/46472#018e5935-7d6d-4145-a44e-ffea99a170fa ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/46622#018e670d-c4e5-45b9-8309-b2a322f471c2 |
new failures in https://buildkite.com/redpanda/redpanda/builds/46622#018e670d-c4ea-4d8b-afc4-ee9eee8f1675:
|
Test failure looks like an instance of #15261 |
/ci-repeat 1 |
auto dest_it = dest._states.find(ntp); | ||
if ( | ||
dest_it == dest._states.end() | ||
|| dest_it->second.shard_revision < shard_rev) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are there any anomalies that could be identified by a dest_it->second.shard_revision > shard_rev
relationship? I assumed this would be equality, but I could also see >
having the meaning that the destination shard has at least the minimum amount of information to pass this consistency check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are there any anomalies that could be identified by a
dest_it->second.shard_revision > shard_rev
relationship?
This is not that big of a deal than the <
anomaly. The important thing is to ensure that the current state is never newer than target
(or assigned
), if this is not true, reconciliation can go wrong. If OTOH target
is a bit newer than expected, we'll just make a shortcut in the partition path across shards.
But this is a partial fix anyway (the main one is 092fa5f)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when we update the
target before
is target
the same as destination shard?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is target the same as destination shard?
Destination or initial. The shard where the partition is supposed to be.
/// If this shard is the initial shard for some incarnation of this | ||
/// partition on this node, this field will contain the corresponding | ||
/// log revision. | ||
std::optional<model::revision_id> _is_initial_for; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is tracking the initial shard important? Is it because some residual state sticks around on the initial shard instead? I didn't look at the RFC is this explained there? I'll go take a look at that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, we have to remember that the initial shard can create the partition without waiting for anybody. But we can't do it by immediately setting the current state to "hosted", because it can still be occupied by the previous log revision. So _is_initial_for
becomes an intermediate place where we can store this information while the previous log revision is being cleaned up.
// Revision check protects against race conditions between operations | ||
// on instances of the same ntp with different log revisions (e.g. after | ||
// a topic was deleted and then re-created). These operations can happen | ||
// on different shards, therefore erase() corresponding to the old | ||
// instance can happen after update() corresponding to the new one. Note | ||
// that concurrent updates are not a problem during cross-shard | ||
// transfers because even though corresponding erase() and update() will | ||
// have the same log_revision, update() will always come after erase(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙏
} | ||
} | ||
|
||
co_await spt.local().set_target( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i can't quite tell, but it looks like concurrent partition movements are allowed and occur in the stress test. is that true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the test a new shard assignment or a new log revision can be introduced concurrently while an old transfer is still ongoing. For several log revisions transfers can be concurrent.
@@ -37,15 +37,33 @@ struct ntp_table { | |||
model::revision_id revision; | |||
}; | |||
|
|||
using partition_key = std::pair<model::ntp, model::revision_id>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and that there is max 1 transfer in
progress.
Oh i just saw this in the next commit message!
It will be used in cases when shard_placement_table is in the middle of an update and controller_backend has to wait for it to finish before reconciling partitions.
Introduce additional check when preparing xshard-transfer that shard-local states at source and destination are consistent (i.e. that the destination knows that it is the destination)
Use log revision instead of shard revision for tracking initial shard for a partition. This is important in the scenario when we update the target before even creating the initial instance. E.g.: 1. set initial target to s1 (log revision lr, shard revision sr1) 2. quickly update target to s2 (same log revision lr, shard revision sr2) 3. shard s1 must still be confident that it can create the partition, as there is no previous instance with the same log revision.
Add a function that calculates expected log revision of a partition on a particular node based on the corresponding partition_replicas_view
Introduce helper that calculates required reconciliation action for an NTP on this shard, and use it in controller_backend
Concurrent updates are a problem only for partitions with different log revisions, as updates corresponding to x-shard movements (when log revision stays the same but the shard revision changes) are sequential.
Previously we stored the target shard for the ntp on all shards with state related to that ntp. This way during a cross-shard transfer the source shard knows immediately where to transfer state to. The drawback is that it is hard to synchronize state updates coming from different controller_backend reconciliation fibers and from updating targets. Introduce a simple assignment marker instead that shows if the partition should be present or not on the current shard (to get the destination shard we have to query the node-wide map on shard 0)
Even if we haven't yet started the partition instance anywhere (and therefore state.current is nullopt on all shards) we must still allow transfers to happen. In this case the role of state.current is played by state._is_initial_for.
Check that in the process of reconciliation we don't launch several partition instances concurrently; and that there is max 1 transfer in progress.
e05e6c3
to
fe2f0c8
Compare
force-push to resolve conflicts |
Add stress test testing the logic in
shard_placement_table
and fix a few bugs that it found.Backports Required
Release Notes