Skip to content

Commit

Permalink
rpc: pass requests by value
Browse files Browse the repository at this point in the history
Pass RPC requests by value, as we start to write some handlers using
coroutines, it can be a footgun to access have a suspension point, then
the request is destroyed. An example that motivated this:

```c++
ss::future<reply> service::handle(request&& r, rpc::streaming_context&) {
  co_await ss::coroutine::switch_to(get_scheduling_group());
  co_return co_await handle_request(std::move(r));
}
```

In this case `r` is destroyed because the coroutine does not own the
value, and handle request would have a stack-use-after-return issue.

Now that we pass by value, we don't have to remember to move the value
into the current method.

Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
  • Loading branch information
rockwotj committed Jan 5, 2024
1 parent 47956d8 commit 5becd89
Show file tree
Hide file tree
Showing 27 changed files with 161 additions and 162 deletions.
2 changes: 1 addition & 1 deletion src/v/cluster/bootstrap_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace cluster {

ss::future<cluster_bootstrap_info_reply>
bootstrap_service::cluster_bootstrap_info(
cluster_bootstrap_info_request&&, rpc::streaming_context&) {
cluster_bootstrap_info_request, rpc::streaming_context&) {
cluster_bootstrap_info_reply r{};
r.broker = make_self_broker(config::node());
r.version = features::feature_table::get_latest_logical_version();
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/bootstrap_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class bootstrap_service : public cluster_bootstrap_service {
, _storage(storage) {}

ss::future<cluster_bootstrap_info_reply> cluster_bootstrap_info(
cluster_bootstrap_info_request&&, rpc::streaming_context&) override;
cluster_bootstrap_info_request, rpc::streaming_context&) override;

private:
ss::sharded<storage::api>& _storage;
Expand Down
6 changes: 3 additions & 3 deletions src/v/cluster/cloud_metadata/offsets_recovery_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@ class offsets_recovery_rpc_service : public offsets_recovery_service {
.cloud_storage_cluster_metadata_upload_timeout_ms.bind()) {}

ss::future<offsets_lookup_reply> offsets_lookup(
offsets_lookup_request&& req, rpc::streaming_context&) override {
offsets_lookup_request req, rpc::streaming_context&) override {
co_return co_await _offsets_lookup.local().lookup(std::move(req));
}

ss::future<offsets_upload_reply> offsets_upload(
offsets_upload_request&& req, rpc::streaming_context& ctx) override {
offsets_upload_request req, rpc::streaming_context& ctx) override {
auto ntp = req.offsets_ntp;
co_return co_await _offsets_upload_router.local().process_or_dispatch(
std::move(req), std::move(ntp), _metadata_timeout_ms());
}

ss::future<offsets_recovery_reply> offsets_recovery(
offsets_recovery_request&& req, rpc::streaming_context& ctx) override {
offsets_recovery_request req, rpc::streaming_context& ctx) override {
auto ntp = req.offsets_ntp;
co_return co_await _offsets_recovery_router.local().process_or_dispatch(
std::move(req), std::move(ntp), 30s);
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/ephemeral_credential_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace cluster {

ss::future<put_ephemeral_credential_reply>
ephemeral_credential_service::put_ephemeral_credential(
put_ephemeral_credential_request&& r, rpc::streaming_context&) {
put_ephemeral_credential_request r, rpc::streaming_context&) {
co_await _fe.local().put(r.principal, r.user, r.credential);
co_return errc::success;
}
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/ephemeral_credential_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class ephemeral_credential_service final
, _fe(fe) {}

ss::future<put_ephemeral_credential_reply> put_ephemeral_credential(
put_ephemeral_credential_request&&, rpc::streaming_context&) override;
put_ephemeral_credential_request, rpc::streaming_context&) override;

private:
ss::sharded<ephemeral_credential_frontend>& _fe;
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/id_allocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ id_allocator::id_allocator(
, _id_allocator_frontend(id_allocator_frontend) {}

ss::future<allocate_id_reply>
id_allocator::allocate_id(allocate_id_request&& req, rpc::streaming_context&) {
id_allocator::allocate_id(allocate_id_request req, rpc::streaming_context&) {
auto timeout = req.timeout;
return _id_allocator_frontend.local()
.allocator_router()
Expand All @@ -36,7 +36,7 @@ id_allocator::allocate_id(allocate_id_request&& req, rpc::streaming_context&) {
}

ss::future<reset_id_allocator_reply> id_allocator::reset_id_allocator(
reset_id_allocator_request&& req, rpc::streaming_context&) {
reset_id_allocator_request req, rpc::streaming_context&) {
auto timeout = req.timeout;
return _id_allocator_frontend.local().id_reset_router().process_or_dispatch(
std::move(req), model::id_allocator_ntp, timeout);
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/id_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ class id_allocator final : public id_allocator_service {
ss::sharded<cluster::id_allocator_frontend>&);

virtual ss::future<allocate_id_reply>
allocate_id(allocate_id_request&&, rpc::streaming_context&) final;
allocate_id(allocate_id_request, rpc::streaming_context&) final;

virtual ss::future<reset_id_allocator_reply> reset_id_allocator(
reset_id_allocator_request&&, rpc::streaming_context&) final;
reset_id_allocator_request, rpc::streaming_context&) final;

private:
ss::sharded<cluster::id_allocator_frontend>& _id_allocator_frontend;
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/metadata_dissemination_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ metadata_dissemination_handler::metadata_dissemination_handler(

ss::future<update_leadership_reply>
metadata_dissemination_handler::update_leadership_v2(
update_leadership_request_v2&& req, rpc::streaming_context&) {
update_leadership_request_v2 req, rpc::streaming_context&) {
return ss::with_scheduling_group(
get_scheduling_group(), [this, req = std::move(req)]() mutable {
return do_update_leadership(std::move(req.leaders));
Expand Down Expand Up @@ -88,7 +88,7 @@ make_get_leadership_reply(const partition_leaders_table& leaders) {
}

ss::future<get_leadership_reply> metadata_dissemination_handler::get_leadership(
get_leadership_request&&, rpc::streaming_context&) {
get_leadership_request, rpc::streaming_context&) {
return ss::with_scheduling_group(get_scheduling_group(), [this]() mutable {
return ss::make_ready_future<get_leadership_reply>(
make_get_leadership_reply(_leaders.local()));
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/metadata_dissemination_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ class metadata_dissemination_handler
ss::sharded<partition_leaders_table>&);

ss::future<get_leadership_reply>
get_leadership(get_leadership_request&&, rpc::streaming_context&) final;
get_leadership(get_leadership_request, rpc::streaming_context&) final;

ss::future<update_leadership_reply> update_leadership_v2(
update_leadership_request_v2&&, rpc::streaming_context&) final;
update_leadership_request_v2, rpc::streaming_context&) final;

private:
ss::future<update_leadership_reply>
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/migrations/tx_manager_migrator_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ class tx_manager_migrator_handler : public tx_manager_migrator_service {
pm, st, metadata_cache, connection_cache, leaders, self) {}

ss::future<tx_manager_replicate_reply> tx_manager_replicate(
tx_manager_replicate_request&& request, ::rpc::streaming_context&) final {
tx_manager_replicate_request request, ::rpc::streaming_context&) final {
auto ntp = request.ntp;
return _replicate_router.process_or_dispatch(
std::move(request), ntp, tx_manager_migrator::default_timeout);
}

ss::future<tx_manager_read_reply> tx_manager_read(
tx_manager_read_request&& request, ::rpc::streaming_context&) final {
tx_manager_read_request request, ::rpc::streaming_context&) final {
auto ntp = request.ntp;
return _read_router.process_or_dispatch(
std::move(request), ntp, tx_manager_migrator::default_timeout);
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/node_status_rpc_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ node_status_rpc_handler::node_status_rpc_handler(
, _node_status_backend(node_status_backend) {}

ss::future<node_status_reply> node_status_rpc_handler::node_status(
node_status_request&& r, rpc::streaming_context&) {
node_status_request r, rpc::streaming_context&) {
return _node_status_backend.invoke_on(
node_status_backend::shard, [r = std::move(r)](auto& service) {
return service.process_request(std::move(r));
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/node_status_rpc_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class node_status_rpc_handler final : public node_status_rpc_service {
ss::sharded<node_status_backend>&);

virtual ss::future<node_status_reply>
node_status(node_status_request&&, rpc::streaming_context&) override;
node_status(node_status_request, rpc::streaming_context&) override;

private:
ss::sharded<node_status_backend>& _node_status_backend;
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/partition_balancer_rpc_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ partition_balancer_rpc_handler::partition_balancer_rpc_handler(

ss::future<partition_balancer_overview_reply>
partition_balancer_rpc_handler::overview(
partition_balancer_overview_request&&, rpc::streaming_context&) {
partition_balancer_overview_request, rpc::streaming_context&) {
auto overview = co_await _backend.invoke_on(
partition_balancer_backend::shard,
[](partition_balancer_backend& backend) { return backend.overview(); });
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/partition_balancer_rpc_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class partition_balancer_rpc_handler final
ss::sharded<partition_balancer_backend>&);

virtual ss::future<partition_balancer_overview_reply> overview(
partition_balancer_overview_request&&, rpc::streaming_context&) override;
partition_balancer_overview_request, rpc::streaming_context&) override;

private:
ss::sharded<partition_balancer_backend>& _backend;
Expand Down
8 changes: 4 additions & 4 deletions src/v/cluster/self_test_rpc_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,28 @@ self_test_rpc_handler::self_test_rpc_handler(
, _self_test_backend(backend) {}

ss::future<get_status_response> self_test_rpc_handler::start_test(
start_test_request&& r, rpc::streaming_context&) {
start_test_request r, rpc::streaming_context&) {
return _self_test_backend.invoke_on(
self_test_backend::shard,
[r](auto& service) { return service.start_test(r); });
}

ss::future<get_status_response>
self_test_rpc_handler::stop_test(empty_request&&, rpc::streaming_context&) {
self_test_rpc_handler::stop_test(empty_request, rpc::streaming_context&) {
return _self_test_backend.invoke_on(
self_test_backend::shard,
[](auto& service) { return service.stop_test(); });
}

ss::future<get_status_response>
self_test_rpc_handler::get_status(empty_request&&, rpc::streaming_context&) {
self_test_rpc_handler::get_status(empty_request, rpc::streaming_context&) {
return _self_test_backend.invoke_on(
self_test_backend::shard,
[](auto& service) { return service.get_status(); });
}

ss::future<netcheck_response>
self_test_rpc_handler::netcheck(netcheck_request&& r, rpc::streaming_context&) {
self_test_rpc_handler::netcheck(netcheck_request r, rpc::streaming_context&) {
return _self_test_backend.invoke_on(
self_test_backend::shard, [r = std::move(r)](auto& service) mutable {
return service.netcheck(r.source, std::move(r.buf));
Expand Down
8 changes: 4 additions & 4 deletions src/v/cluster/self_test_rpc_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ class self_test_rpc_handler final : public self_test_rpc_service {
ss::sharded<self_test_backend>&);

ss::future<get_status_response>
start_test(start_test_request&&, rpc::streaming_context&) final;
start_test(start_test_request, rpc::streaming_context&) final;

ss::future<get_status_response>
stop_test(empty_request&&, rpc::streaming_context&) final;
stop_test(empty_request, rpc::streaming_context&) final;

ss::future<get_status_response>
get_status(empty_request&&, rpc::streaming_context&) final;
get_status(empty_request, rpc::streaming_context&) final;

ss::future<netcheck_response>
netcheck(netcheck_request&&, rpc::streaming_context&) final;
netcheck(netcheck_request, rpc::streaming_context&) final;

private:
ss::sharded<self_test_backend>& _self_test_backend;
Expand Down
Loading

0 comments on commit 5becd89

Please sign in to comment.