Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quadratic behavior in topic_table::notify_waiters #7665

Closed
travisdowns opened this issue Dec 8, 2022 · 0 comments · Fixed by #7782
Closed

Quadratic behavior in topic_table::notify_waiters #7665

travisdowns opened this issue Dec 8, 2022 · 0 comments · Fixed by #7782
Assignees
Labels
area/controller kind/bug Something isn't working performance sev/medium Bugs that do not meet criteria for high or critical, but are more severe than low.

Comments

@travisdowns
Copy link
Member

travisdowns commented Dec 8, 2022

Version & Environment

Redpanda version: 22.2.2

(however it applies to tip of dev as of 22/12/08 also)

What went wrong?

Reactor stalls, high CPU use and slow performance when topics with many controller deltas are recovered.

What should have happened instead?

None of the above.

How to reproduce the issue?

  1. Recover a topic with many deltas and unknown additional conditions.
  2. Examine the trace log.

Additional information

The primary hint we have about the behavior is from the following reactor stall backtrace:

_end at ??:0
std::__1::__wrap_iter<cluster::topic_table_delta*>::operator++() at /vectorized/llvm/bin/../include/c++/v1/__iterator/wrap_iter.h:100
 (inlined by) std::__1::back_insert_iterator<std::__1::vector<cluster::topic_table_delta, std::__1::allocator<cluster::topic_table_delta> > > std::__1::copy_if<std::__1::__wrap_iter<cluster::topic_table_delta*>, std::__1::back_insert_iterator<std::__1::vector<cluster::topic_table_delta, std::__1::allocator<cluster::topic_table_delta> > >, cluster::topic_table::notify_waiters()::$_0>(std::__1::__wrap_iter<cluster::topic_table_delta*>, std::__1::__wrap_iter<cluster::topic_table_delta*>, std::__1::back_insert_iterator<std::__1::vector<cluster::topic_table_delta, std::__1::allocator<cluster::topic_table_delta> > >, cluster::topic_table::notify_waiters()::$_0) at /vectorized/llvm/bin/../include/c++/v1/__algorithm/copy_if.h:26
 (inlined by) cluster::topic_table::notify_waiters() at /var/lib/buildkite-agent/builds/buildkite-amd64-xfs-builders-i-0433325f5a753b5ff-1/redpanda/redpanda/vbuild/release/clang/../../../src/v/cluster/topic_table.cc:635
cluster::topic_table::apply(cluster::controller_command<model::topic_namespace, cluster::incremental_topic_updates, (signed char)4, (model::record_batch_type)6>, detail::base_named_type<long, model::model_offset_type, std::__1::integral_constant<bool, true> >) at /var/lib/buildkite-agent/builds/buildkite-amd64-xfs-builders-i-0433325f5a753b5ff-1/redpanda/redpanda/vbuild/release/clang/../../../src/v/cluster/topic_table.cc:576
seastar::future<std::__1::error_code> cluster::do_apply<cluster::controller_command<model::topic_namespace, cluster::incremental_topic_updates, (signed char)4, (model::record_batch_type)6> >(unsigned int, cluster::controller_command<model::topic_namespace, cluster::incremental_topic_updates, (signed char)4, (model::record_batch_type)6>, seastar::sharded<cluster::topic_table>&, detail::base_named_type<long, model::model_offset_type, std::__1::integral_constant<bool, true> >)::'lambda'(cluster::topic_table&)::operator()(cluster::topic_table&) at /var/lib/buildkite-agent/builds/buildkite-amd64-xfs-builders-i-0433325f5a753b5ff-1/redpanda/redpanda/vbuild/release/clang/../../../src/v/cluster/topic_updates_dispatcher.cc:250
 (inlined by) decltype(static_cast<cluster::controller_command<model::topic_namespace, cluster::incremental_topic_updates, (signed char)4, (model::record_batch_type)6>>(fp)(static_cast<cluster::topic_table&>(fp0))) std::__1::__invoke_constexpr<seastar::future<std::__1::error_code> cluster::do_apply<cluster::controller_command<model::topic_namespace, cluster::incremental_topic_updates, (signed char)4, (model::record_batch_type)6> >(unsigned int, cluster::controller_command<model::topic_namespace, cluster::incremental_topic_updates, (signed char)4, (model::record_batch_type)6>, seastar::sharded<cluster::topic_table>&, detail::base_named_type<long, model::model_offset_type, std::__1::integral_constant<bool, true> >)::'lambda'(cluster::topic_table&), cluster::topic_table&>(cluster::controller_command<model::topic_namespace, cluster::incremental_topic_updates, (signed char)4, (model::record_batch_type)6>&&, cluster::topic_table&) at /vectorized/llvm/bin/../include/c++/v1/type_traits:3648
 (inlined by) decltype(auto) std::__1::__apply_tuple_impl<seastar::future<std::__1::error_code> cluster::do_apply<cluster::controller_command<model::topic_namespace, cluster::incremental_topic_updates, (signed char)4, (model::record_batch_type)6> >(unsigned int, cluster::controller_command<model::topic_namespace, cluster::incremental_topic_updates, (signed char)4, (model::record_batch_type)6>, seastar::sharded<cluster::topic_table>&, detail::base_named_type<long, model::model_offset_type, std::__1::integral_constant<bool, true> >)::'lambda'(cluster::topic_table&), std::__1::tuple<cluster::topic_table&>, 0ul>(cluster::controller_command<model::topic_namespace, cluster::incremental_topic_updates, (signed char)4, (model::record_batch_type)6>&&, std::__1::tuple<cluster::topic_table&>&&, std::__1::__tuple_indices<0ul>) at /vectorized/llvm/bin/../include/c++/v1/tuple:1595
 (inlined by) decltype(auto) std::__1::apply<seastar::future<std::__1::error_code> cluster::do_apply<cluster::controller_command<model::topic_namespace, cluster::incremental_topic_updates, (signed char)4, (model::record_batch_type)6> >(unsigned int, cluster::controller_command<model::topic_namespace, cluster::incremental_topic_updates, (signed char)4, (model::record_batch_type)6>, seastar::sharded<cluster::topic_table>&, detail::base_named_type<long, model::model_offset_type, std::__1::integral_constant<bool, true> >)::'lambda'(cluster::topic_table&), std::__1::tuple<cluster::topic_table&> >(cluster::controller_command<model::topic_namespace, cluster::incremental_topic_updates, (signed char)4, (model::record_batch_type)6>&&, std::__1::tuple<cluster::topic_table&>&&) at /vectorized/llvm/bin/../include/c++/v1/tuple:1604
 (inlined by) seastar::future<std::__1::error_code> seastar::sharded<cluster::topic_table>::invoke_on<seastar::future<std::__1::error_code> cluster::do_apply<cluster::controller_command<model::topic_namespace, cluster::incremental_topic_updates, (signed char)4, (model::record_batch_type)6> >(unsigned int, cluster::controller_command<model::topic_namespace, cluster::incremental_topic_updates, (signed char)4, (model::record_batch_type)6>, seastar::sharded<cluster::topic_table>&, detail::base_named_type<long, model::model_offset_type, std::__1::integral_constant<bool, true> >)::'lambda'(cluster::topic_table&), seastar::future<std::__1::error_code> >(unsigned int, seastar::smp_submit_to_options, cluster::controller_command<model::topic_namespace, cluster::incremental_topic_updates, (signed char)4, (model::record_batch_type)6>&&)::'lambda'()::operator()() at /vectorized/include/seastar/core/sharded.hh:421
 (inlined by) seastar::future<std::__1::error_code> seastar::futurize<seastar::future<std::__1::error_code> >::invoke<seastar::future<std::__1::error_code> seastar::sharded<cluster::topic_table>::invoke_on<seastar::future<std::__1::error_code> cluster::do_apply<cluster::controller_command<model::topic_namespace, cluster::incremental_topic_updates, (signed char)4, (model::record_batch_type)6> >(unsigned int, cluster::controller_command<model::topic_namespace, cluster::incremental_topic_updates, (signed char)4, (model::record_batch_type)6>, seastar::sharded<cluster::topic_table>&, detail::base_named_type<long, model::model_offset_type, std::__1::integral_constant<bool, true> >)::'lambda'(cluster::topic_table&), seastar::future<std::__1::error_code> >(unsigned int, seastar::smp_submit_to_options, cluster::controller_command<model::topic_namespace, cluster::incremental_topic_updates, (signed char)4, (model::record_batch_type)6>&&)::'lambda'()&>(cluster::controller_command<model::topic_namespace, cluster::incremental_topic_updates, (signed char)4, (model::record_batch_type)6>&&) at /vectorized/include/seastar/core/future.hh:2142
 (inlined by) seastar::smp_message_queue::async_work_item<seastar::future<std::__1::error_code> seastar::sharded<cluster::topic_table>::invoke_on<seastar::future<std::__1::error_code> cluster::do_apply<cluster::controller_command<model::topic_namespace, cluster::incremental_topic_updates, (signed char)4, (model::record_batch_type)6> >(unsigned int, cluster::controller_command<model::topic_namespace, cluster::incremental_topic_updates, (signed char)4, (model::record_batch_type)6>, seastar::sharded<cluster::topic_table>&, detail::base_named_type<long, model::model_offset_type, std::__1::integral_constant<bool, true> >)::'lambda'(cluster::topic_table&), seastar::future<std::__1::error_code> >(unsigned int, seastar::smp_submit_to_options, cluster::controller_command<model::topic_namespace, cluster::incremental_topic_updates, (signed char)4, (model::record_batch_type)6>&&)::'lambda'()>::run_and_dispose() at /vectorized/include/seastar/core/smp.hh:231

The time is being spent inside the copy_if here:

void topic_table::notify_waiters() {
    /// If by invocation of this method there are no waiters, notify
    /// function_ptrs stored in \ref notifications, without consuming all
    /// pending_deltas for when a subsequent waiter does arrive
    std::vector<delta> changes;
    std::copy_if(
      _pending_deltas.begin(),
      _pending_deltas.end(),
      std::back_inserter(changes),
      [this](const delta& d) { return d.offset > _last_consumed_by_notifier; });
    for (auto& cb : _notifications) {
        cb.second(changes);
    }
    if (!changes.empty()) {
        _last_consumed_by_notifier = changes.back().offset;
    }

    /// Consume all pending deltas
    if (!_waiters.empty()) {
        changes.clear();
        changes.swap(_pending_deltas);
        std::vector<std::unique_ptr<waiter>> active_waiters;
        active_waiters.swap(_waiters);
        for (auto& w : active_waiters) {
            w->promise.set_value(changes);
        }
    }
}

Consider what happens when recovering a large topic with many deltas and when _waiters is empty. The deltas are applied from the log one-by-one and notify_waiters is called after each addition. Then the copy_if will traverse the entire pending deltas every time, so we have O(n^2) behavior in the number of deltas.

We could avoid this by looking for first delta where d.offset > _last_consumed_by_notifier, searching in reverse in _pending_deltas, reducing the complexity to O(n) overall.

Another optimization is skipping this work entirely if both _notifiers and _waiters are empty, though I don't know if this is the case during replay.

@travisdowns travisdowns added the kind/bug Something isn't working label Dec 8, 2022
@ballard26 ballard26 self-assigned this Dec 9, 2022
@jcsp jcsp added the sev/medium Bugs that do not meet criteria for high or critical, but are more severe than low. label Dec 19, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/controller kind/bug Something isn't working performance sev/medium Bugs that do not meet criteria for high or critical, but are more severe than low.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants