Skip to content

Commit

Permalink
Merge 'forward_service: reduce allocations in forward_service' from P…
Browse files Browse the repository at this point in the history
…iotr Sarna

This series refactors the code to get rid of unnecessary
allocations by extracing a helper requires_thread() function,
as well as by removing std::optional usage in forward_result,
now that it's possible to merge empty results with each other,
both ways (#11064).

Closes #11120

* github.com:scylladb/scylla:
  forward_service: remove redundant optional from forward_service
  forward_service: open-code running a Sestar thread
  forward_service: add requires_thread helper
  • Loading branch information
avikivity committed Jul 27, 2022
2 parents 71bec22 + abc5a7b commit a03a33d
Showing 1 changed file with 29 additions and 26 deletions.
55 changes: 29 additions & 26 deletions service/forward_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,18 @@ class forward_aggregates {

template<typename Func>
auto with_thread_if_needed(Func&& func) const {
bool required = std::any_of(_funcs.cbegin(), _funcs.cend(), [](const ::shared_ptr<db::functions::aggregate_function>& f) {
return f->requires_thread();
});

if (required) {
if (requires_thread()) {
return async(std::move(func));
} else {
return futurize_invoke(std::move(func));
}
}

bool requires_thread() const {
return std::any_of(_funcs.cbegin(), _funcs.cend(), [](const ::shared_ptr<db::functions::aggregate_function>& f) {
return f->requires_thread();
});
}
};

forward_aggregates::forward_aggregates(const query::forward_request& request) {
Expand Down Expand Up @@ -502,12 +504,12 @@ future<query::forward_result> forward_service::dispatch(query::forward_request r
tracing::trace(tr_state, "Dispatching forward_request to {} endpoints", vnodes_per_addr.size());

retrying_dispatcher dispatcher(*this, tr_state);
std::optional<query::forward_result> result;
query::forward_result result;

return do_with(std::move(dispatcher), std::move(result), std::move(vnodes_per_addr), std::move(req), std::move(tr_state),
[] (
retrying_dispatcher& dispatcher,
std::optional<query::forward_result>& result,
query::forward_result& result,
std::map<netw::messaging_service::msg_addr, dht::partition_range_vector>& vnodes_per_addr,
query::forward_request& req,
tracing::trace_state_ptr& tr_state
Expand All @@ -517,7 +519,7 @@ future<query::forward_result> forward_service::dispatch(query::forward_request r
std::pair<netw::messaging_service::msg_addr, dht::partition_range_vector> vnodes_with_addr
) {
netw::messaging_service::msg_addr addr = vnodes_with_addr.first;
std::optional<query::forward_result>& result_ = result;
query::forward_result& result_ = result;
tracing::trace_state_ptr& tr_state_ = tr_state;
retrying_dispatcher& dispatcher_ = dispatcher;

Expand All @@ -540,30 +542,31 @@ future<query::forward_result> forward_service::dispatch(query::forward_request r
flogger.debug("received forward_result={} from {}", partial_result_printer, addr);

return aggrs.with_thread_if_needed([&result_, &aggrs, partial_result = std::move(partial_result)] () mutable {
if (result_) {
aggrs.merge(*result_, std::move(partial_result));
} else {
result_ = partial_result;
}
aggrs.merge(result_, std::move(partial_result));
});
});
}
).then(
[&result, &req, &tr_state] () -> future<query::forward_result> {
forward_aggregates aggrs(req);
return do_with(std::move(aggrs), [&result, &req, &tr_state] (forward_aggregates& aggrs) {
return aggrs.with_thread_if_needed([&result, &req, &tr_state, &aggrs] () mutable {
query::forward_result::printer result_printer{
.functions = get_functions(req),
.res = *result
};
tracing::trace(tr_state, "Merged result is {}", result_printer);
flogger.debug("merged result is {}", result_printer);

aggrs.finalize(*result);
return *result;
});
});
const bool requires_thread = aggrs.requires_thread();

auto merge_result = [&result, &req, &tr_state, aggrs = std::move(aggrs)] () mutable {
query::forward_result::printer result_printer{
.functions = get_functions(req),
.res = result
};
tracing::trace(tr_state, "Merged result is {}", result_printer);
flogger.debug("merged result is {}", result_printer);

aggrs.finalize(result);
return result;
};
if (requires_thread) {
return seastar::async(std::move(merge_result));
} else {
return make_ready_future<query::forward_result>(merge_result());
}
}
);
}
Expand Down

0 comments on commit a03a33d

Please sign in to comment.