Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.3.x] transform: create smp and scheduling groups #16139

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/v/cluster/bootstrap_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace cluster {

ss::future<cluster_bootstrap_info_reply>
bootstrap_service::cluster_bootstrap_info(
cluster_bootstrap_info_request&&, rpc::streaming_context&) {
cluster_bootstrap_info_request, rpc::streaming_context&) {
cluster_bootstrap_info_reply r{};
r.broker = make_self_broker(config::node());
r.version = features::feature_table::get_latest_logical_version();
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/bootstrap_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class bootstrap_service : public cluster_bootstrap_service {
, _storage(storage) {}

ss::future<cluster_bootstrap_info_reply> cluster_bootstrap_info(
cluster_bootstrap_info_request&&, rpc::streaming_context&) override;
cluster_bootstrap_info_request, rpc::streaming_context&) override;

private:
ss::sharded<storage::api>& _storage;
Expand Down
6 changes: 3 additions & 3 deletions src/v/cluster/cloud_metadata/offsets_recovery_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@ class offsets_recovery_rpc_service : public offsets_recovery_service {
.cloud_storage_cluster_metadata_upload_timeout_ms.bind()) {}

ss::future<offsets_lookup_reply> offsets_lookup(
offsets_lookup_request&& req, rpc::streaming_context&) override {
offsets_lookup_request req, rpc::streaming_context&) override {
co_return co_await _offsets_lookup.local().lookup(std::move(req));
}

ss::future<offsets_upload_reply> offsets_upload(
offsets_upload_request&& req, rpc::streaming_context& ctx) override {
offsets_upload_request req, rpc::streaming_context& ctx) override {
auto ntp = req.offsets_ntp;
co_return co_await _offsets_upload_router.local().process_or_dispatch(
std::move(req), std::move(ntp), _metadata_timeout_ms());
}

ss::future<offsets_recovery_reply> offsets_recovery(
offsets_recovery_request&& req, rpc::streaming_context& ctx) override {
offsets_recovery_request req, rpc::streaming_context& ctx) override {
auto ntp = req.offsets_ntp;
co_return co_await _offsets_recovery_router.local().process_or_dispatch(
std::move(req), std::move(ntp), 30s);
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/ephemeral_credential_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace cluster {

ss::future<put_ephemeral_credential_reply>
ephemeral_credential_service::put_ephemeral_credential(
put_ephemeral_credential_request&& r, rpc::streaming_context&) {
put_ephemeral_credential_request r, rpc::streaming_context&) {
co_await _fe.local().put(r.principal, r.user, r.credential);
co_return errc::success;
}
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/ephemeral_credential_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class ephemeral_credential_service final
, _fe(fe) {}

ss::future<put_ephemeral_credential_reply> put_ephemeral_credential(
put_ephemeral_credential_request&&, rpc::streaming_context&) override;
put_ephemeral_credential_request, rpc::streaming_context&) override;

private:
ss::sharded<ephemeral_credential_frontend>& _fe;
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/id_allocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ id_allocator::id_allocator(
, _id_allocator_frontend(id_allocator_frontend) {}

ss::future<allocate_id_reply>
id_allocator::allocate_id(allocate_id_request&& req, rpc::streaming_context&) {
id_allocator::allocate_id(allocate_id_request req, rpc::streaming_context&) {
auto timeout = req.timeout;
return _id_allocator_frontend.local()
.allocator_router()
Expand All @@ -36,7 +36,7 @@ id_allocator::allocate_id(allocate_id_request&& req, rpc::streaming_context&) {
}

ss::future<reset_id_allocator_reply> id_allocator::reset_id_allocator(
reset_id_allocator_request&& req, rpc::streaming_context&) {
reset_id_allocator_request req, rpc::streaming_context&) {
auto timeout = req.timeout;
return _id_allocator_frontend.local().id_reset_router().process_or_dispatch(
std::move(req), model::id_allocator_ntp, timeout);
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/id_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ class id_allocator final : public id_allocator_service {
ss::sharded<cluster::id_allocator_frontend>&);

virtual ss::future<allocate_id_reply>
allocate_id(allocate_id_request&&, rpc::streaming_context&) final;
allocate_id(allocate_id_request, rpc::streaming_context&) final;

virtual ss::future<reset_id_allocator_reply> reset_id_allocator(
reset_id_allocator_request&&, rpc::streaming_context&) final;
reset_id_allocator_request, rpc::streaming_context&) final;

private:
ss::sharded<cluster::id_allocator_frontend>& _id_allocator_frontend;
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/metadata_dissemination_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ metadata_dissemination_handler::metadata_dissemination_handler(

ss::future<update_leadership_reply>
metadata_dissemination_handler::update_leadership_v2(
update_leadership_request_v2&& req, rpc::streaming_context&) {
update_leadership_request_v2 req, rpc::streaming_context&) {
return ss::with_scheduling_group(
get_scheduling_group(), [this, req = std::move(req)]() mutable {
return do_update_leadership(std::move(req.leaders));
Expand Down Expand Up @@ -88,7 +88,7 @@ make_get_leadership_reply(const partition_leaders_table& leaders) {
}

ss::future<get_leadership_reply> metadata_dissemination_handler::get_leadership(
get_leadership_request&&, rpc::streaming_context&) {
get_leadership_request, rpc::streaming_context&) {
return ss::with_scheduling_group(get_scheduling_group(), [this]() mutable {
return ss::make_ready_future<get_leadership_reply>(
make_get_leadership_reply(_leaders.local()));
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/metadata_dissemination_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ class metadata_dissemination_handler
ss::sharded<partition_leaders_table>&);

ss::future<get_leadership_reply>
get_leadership(get_leadership_request&&, rpc::streaming_context&) final;
get_leadership(get_leadership_request, rpc::streaming_context&) final;

ss::future<update_leadership_reply> update_leadership_v2(
update_leadership_request_v2&&, rpc::streaming_context&) final;
update_leadership_request_v2, rpc::streaming_context&) final;

private:
ss::future<update_leadership_reply>
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/migrations/tx_manager_migrator_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ class tx_manager_migrator_handler : public tx_manager_migrator_service {
pm, st, metadata_cache, connection_cache, leaders, self) {}

ss::future<tx_manager_replicate_reply> tx_manager_replicate(
tx_manager_replicate_request&& request, ::rpc::streaming_context&) final {
tx_manager_replicate_request request, ::rpc::streaming_context&) final {
auto ntp = request.ntp;
return _replicate_router.find_shard_and_process(
std::move(request), ntp, tx_manager_migrator::default_timeout);
}

ss::future<tx_manager_read_reply> tx_manager_read(
tx_manager_read_request&& request, ::rpc::streaming_context&) final {
tx_manager_read_request request, ::rpc::streaming_context&) final {
auto ntp = request.ntp;
return _read_router.find_shard_and_process(
std::move(request), ntp, tx_manager_migrator::default_timeout);
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/node_status_rpc_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ node_status_rpc_handler::node_status_rpc_handler(
, _node_status_backend(node_status_backend) {}

ss::future<node_status_reply> node_status_rpc_handler::node_status(
node_status_request&& r, rpc::streaming_context&) {
node_status_request r, rpc::streaming_context&) {
return _node_status_backend.invoke_on(
node_status_backend::shard, [r = std::move(r)](auto& service) {
return service.process_request(std::move(r));
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/node_status_rpc_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class node_status_rpc_handler final : public node_status_rpc_service {
ss::sharded<node_status_backend>&);

virtual ss::future<node_status_reply>
node_status(node_status_request&&, rpc::streaming_context&) override;
node_status(node_status_request, rpc::streaming_context&) override;

private:
ss::sharded<node_status_backend>& _node_status_backend;
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/partition_balancer_rpc_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ partition_balancer_rpc_handler::partition_balancer_rpc_handler(

ss::future<partition_balancer_overview_reply>
partition_balancer_rpc_handler::overview(
partition_balancer_overview_request&&, rpc::streaming_context&) {
partition_balancer_overview_request, rpc::streaming_context&) {
auto overview = co_await _backend.invoke_on(
partition_balancer_backend::shard,
[](partition_balancer_backend& backend) { return backend.overview(); });
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/partition_balancer_rpc_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class partition_balancer_rpc_handler final
ss::sharded<partition_balancer_backend>&);

virtual ss::future<partition_balancer_overview_reply> overview(
partition_balancer_overview_request&&, rpc::streaming_context&) override;
partition_balancer_overview_request, rpc::streaming_context&) override;

private:
ss::sharded<partition_balancer_backend>& _backend;
Expand Down
8 changes: 4 additions & 4 deletions src/v/cluster/self_test_rpc_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,28 @@ self_test_rpc_handler::self_test_rpc_handler(
, _self_test_backend(backend) {}

ss::future<get_status_response> self_test_rpc_handler::start_test(
start_test_request&& r, rpc::streaming_context&) {
start_test_request r, rpc::streaming_context&) {
return _self_test_backend.invoke_on(
self_test_backend::shard,
[r](auto& service) { return service.start_test(r); });
}

ss::future<get_status_response>
self_test_rpc_handler::stop_test(empty_request&&, rpc::streaming_context&) {
self_test_rpc_handler::stop_test(empty_request, rpc::streaming_context&) {
return _self_test_backend.invoke_on(
self_test_backend::shard,
[](auto& service) { return service.stop_test(); });
}

ss::future<get_status_response>
self_test_rpc_handler::get_status(empty_request&&, rpc::streaming_context&) {
self_test_rpc_handler::get_status(empty_request, rpc::streaming_context&) {
return _self_test_backend.invoke_on(
self_test_backend::shard,
[](auto& service) { return service.get_status(); });
}

ss::future<netcheck_response>
self_test_rpc_handler::netcheck(netcheck_request&& r, rpc::streaming_context&) {
self_test_rpc_handler::netcheck(netcheck_request r, rpc::streaming_context&) {
return _self_test_backend.invoke_on(
self_test_backend::shard, [r = std::move(r)](auto& service) mutable {
return service.netcheck(r.source, std::move(r.buf));
Expand Down
8 changes: 4 additions & 4 deletions src/v/cluster/self_test_rpc_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ class self_test_rpc_handler final : public self_test_rpc_service {
ss::sharded<self_test_backend>&);

ss::future<get_status_response>
start_test(start_test_request&&, rpc::streaming_context&) final;
start_test(start_test_request, rpc::streaming_context&) final;

ss::future<get_status_response>
stop_test(empty_request&&, rpc::streaming_context&) final;
stop_test(empty_request, rpc::streaming_context&) final;

ss::future<get_status_response>
get_status(empty_request&&, rpc::streaming_context&) final;
get_status(empty_request, rpc::streaming_context&) final;

ss::future<netcheck_response>
netcheck(netcheck_request&&, rpc::streaming_context&) final;
netcheck(netcheck_request, rpc::streaming_context&) final;

private:
ss::sharded<self_test_backend>& _self_test_backend;
Expand Down
Loading