-
Notifications
You must be signed in to change notification settings - Fork 552
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix quadratic behavior in notify_waiters #7782
Conversation
From issue 7665; Consider when recovering a large topic with many deltas and when _waiters is empty. The deltas are applied from the log one-by-one and notify_waiters is called after each addition. Then the copy_if will traverse the entire pending deltas every time, so we have O(n^2) behavior in the number of deltas. This commit reduces the behavior to O(n) by storing the last sent index of _pending_deltas
_pending_deltas.end(), | ||
std::back_inserter(changes), | ||
[this](const delta& d) { return d.offset > _last_consumed_by_notifier; }); | ||
std::span changes{starting_iter, _pending_deltas.cend()}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should iterate through the _pending_deltas
and assert that d.offset > _last_consumed_by_notifier
instead of assuming it. Alternatively the commit message should explain why it's always true now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the old code assumed offsets were increasing and if they weren't, we would skip notification of the listeners in the _notifications
array for some changes (but _waiters
would be notified), which just seems like a bug.
The new logic doesn't assume anything about the offsets now, it just sends whatever suffix of the _pending_deltas
array hasn't already been sent to the notifier in a previous call and keeps track of the offset within the vector of that suffix.
So I'm not sure there's anything to assert here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_pending_deltas
should be monotonically increasing if the offsets applied to the topic_table
by the controller_stm
are in order. I believe this to be the case since if out of order offsets are applied to the topic_table then it could end up in an incorrect state. I.e, if a delete topic and create topic command was swapped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think with current solution we can remove _last_consumed_by_notifier
, it is no longer needed as we use iterators to provide the same semantics, previous code didn't leverage the fact that deltas were sorted by offset while current does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately I don't believe we can't remove _last_consumed_by_notifier
quite yet as other code in the topic_table relies on it.
src/v/cluster/topic_table.cc
Outdated
for (auto& cb : _notifications) { | ||
cb.second(changes); | ||
} | ||
|
||
if (!changes.empty()) { | ||
for (auto& cb : _notifications) { | ||
cb.second(changes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by moving the for-loop under the changes.empty check i assume there are no consumers that are interested in the case that changes is empty? i'd assume not, but it is a semantic change for the callback consumers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right you are. Definitely a semantic change. I went through and checked all of the consumers of this callback and none do anything for an empty _pending_deltas
. Should I maybe add a comment to make note of this behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the other semantic change is that late registration of a callback might result in missing some deltas notifications that were previously notified and now skipped.
these semantic changes are only important insomuch as the use cases are safe, so verifying that users are tolerant of the changes is important. we don't have to maintain the previous semantics since this is just internal api / utility.
} | ||
|
||
active_waiters[active_waiters.size() - 1]->promise.set_value( | ||
std::move(_pending_deltas)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm into the spirit of this optimization, but IIUC it only saves 1 out N copies at the expense of extra code and complexity, so the benefit isn't really clear.
if you decide to keep the optimization, though, you'll probably want to fix the final std::move(_pending_deltas)
to explicitly express the desire to move and clear pending_deltas (ie something like std::exchange(_pending_deltas, {})) generally std::move(X) doesn't have the semantics that it guarantees anything is actually moved. For example, if X
was const
then std::move(X)
will just do a copy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main assumption here is that there tends to only be 1 waiter. The interface even warns against having multiple waiters. So in that sense we are eliminating all copies with this optimization.
I will add a std::exchange(_pending_deltas, {}))
after the move to just ensure the intent to clear it is obvious.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe the deltas should be moved into a shared pointer and then change the interface so that the promises all get copies of a lw_shared_ptr? then it's 1 copy for any number of waiters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be better to just restrict the interface to only allow for one waiter. Beyond unit tests the src/v/cluster/controller_backend.cc
is the only user of this interface currently. So it wouldn't be hard to change it to only allow for one waiter. And I don't believe the interface will ever work well with more than one waiter as unless the waiters all register at the same time none of them will get every pending delta.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Baring that interface change though using a shared_ptr
is definitely a great idea. Will switch the PR over to that if we decide not to change the interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i wrote this with an assumption that there is only one waiter, now i would probably use ss::queue
for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will be making a separate PR to change to interface for wait_for_changes
. This PR is related to an incident and needs to be merged asap. And I don't want to add any more complications to it.
This changes the delta_cb_t to take a std::span instead of a reference to a vector. The new interface allows for us to avoid copying a subset of `_pending_deltas` before calling all of the delta callbacks. Note that this change assumes that offsets in `_pending_deltas` are monotomically increasing from `_pending_deltas.begin()` to `_pending_deltas.end()`.
_last_consumed_by_notifier = changes.back().offset; | ||
} | ||
|
||
_last_consumed_by_notifier_offset = _pending_deltas.end() | ||
- _pending_deltas.begin(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think end() - begin()
is just size()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's equivalent according to the standard. But the standard doesn't guarantee that you can safely cast from size_t
to ptrdiff_t
. It may just be pedantic though . I can switch to using size()
with a static cast if you think it's fine.
for (auto& cb : _notifications) { | ||
cb.second(changes); | ||
} | ||
|
||
_last_consumed_by_notifier = changes.back().offset; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this even used now? It seems like we can just use the [...]_offset
variant now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It has one current use here. It may be able to be replaced with _last_applied_revision_id
. However, in some cases _last_applied_revision_id
is greater than _last_consumed_by_notifier
since _last_consumed_by_notifier
won't be updated if an apply function returns an error but _last_applied_revision_id
will be. So just to be safe I'll leave it in there for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, OK. That line honestly looks like a bug. It is comparing model::revision
with a model::offset
which I think does not ever make sense.
It's a shame because I feel like things are getting complicated: we now have _last_consumed_by_notifier
and _last_consumed_by_notifier_offset
yet the former is actually an offset (in the log sense) and the latter is an offset into the pending deltas, and they serve basically the same purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind I guess it is not a bug: it is valid to compare the revision ID to the offset in this case (see slack). My uneasiness remains regarding these two very similar bits of state but I guess it is what it is.
@@ -678,6 +694,11 @@ topic_table::wait_for_changes(ss::abort_source& as) { | |||
if (!_pending_deltas.empty()) { | |||
ret_t ret; | |||
ret.swap(_pending_deltas); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does anyone think this swap idiom (which appears elsewhere in this method too) is better than just ret_t ret(std::move(_pending_deltas));
?
I guess the swap variant is more explicit in that it doesn't require you to understand than a moved-from vector is guaranteed to be empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the swap variant is more explicit in that it doesn't require you to understand than a moved-from vector is guaranteed to be empty.
yeh, guaranteeing that the moved-from vector is the important bit. the example i use is that if _pending_deltas
happened to be a const-ref then the compiler will not complain and std::move(const ref)
will really be a copy.
here i might even avoid swap, and be even more explicit: auto ret = std::exchange(_pending_deltas, {})
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really good point, @dotnwat - I hadn't considered that the move might sometimes fail and result in a copy: normally that's "only" a performance problem but where we rely on the well-defined moved-from state as we do here it would also be a hard-to-spot bug.
std::copy_if( | ||
_pending_deltas.begin(), | ||
starting_iter, | ||
_pending_deltas.end(), | ||
std::back_inserter(changes), | ||
[this](const delta& d) { return d.offset > _last_consumed_by_notifier; }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with the new optimization i think we no longer need _last_consumed_by_notifier_offset
since all the deltas in changes
will have offset greater than _last_consumed_by_notifier
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This copy is removed in a later commit so we'll still need _last_consumed_by_notifier_offset
to skip ahead past deltas we've already sent to notifiers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess a lot of the messiness here is because nofity_waiters
sort of has to rebuild information that was lost in the caller: the pattern in the caller is to add some deltas to _pending_deltas
and then call notify_waiters
, but then this function has to reconstruct which deltas are the "new" deltas (just added) and which deltas are laying around from a previous call. It needs to make this distinction because the "new" deltas should be passed to the _notifications
callbacks immediately. That's why we need to track some state which points to the dividing list like _last_consumed...
.
Perhaps a simpler interface would be add_deltas
which takes an explicit list of deltas to add. It can just pass this list as-is to the _notifiers
list immediately, and then do the _waiters
logic: append to _pending_deltas
if there are no waiters, otherwise pass the combination of _pending_deltas + new_changes
to the waiters. None of that relies on any ordering of the offsets or anything like that and it removes the need to track this dividing line state.
The CI failure was #7816 which has been fixed |
/backport v22.3.x |
/backport v22.2.x |
/backport v22.1.x |
Failed to run cherry-pick command. I executed the below command:
|
@ballard26 fyi i updated the release notes section of the PR body from |
From issue 7665;
This PR reduces the behavior to O(n) by storing the last sent index of
_pending_deltas
Fixes #7665
Backports Required
UX Changes
Release Notes
Improvements