Skip to content

Commit

Permalink
transform/rpc: use smp_group for cross shard calls
Browse files Browse the repository at this point in the history
The transform/rpc subsystem makes cross shard calls when accessing the
correct partition for a transform, use a `smp_group` to manage this,
like other subsystems.

Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
  • Loading branch information
rockwotj committed Jan 16, 2024
1 parent d018c59 commit cb310c5
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 15 deletions.
7 changes: 5 additions & 2 deletions src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
#include "resource_mgmt/io_priority.h"
#include "resource_mgmt/memory_groups.h"
#include "resource_mgmt/memory_sampling.h"
#include "resource_mgmt/smp_groups.h"
#include "rpc/rpc_utils.h"
#include "security/audit/audit_log_manager.h"
#include "ssx/abort_source.h"
Expand Down Expand Up @@ -1204,7 +1205,9 @@ void application::wire_up_runtime_services(
}),
ss::sharded_parameter([this] {
return transform::rpc::partition_manager::make_default(
&shard_table, &partition_manager);
&shard_table,
&partition_manager,
smp_service_groups.transform_smp_sg());
}),
ss::sharded_parameter([this] {
return transform::service::create_reporter(&_transform_service);
Expand Down Expand Up @@ -2636,7 +2639,7 @@ void application::start_runtime_services(
runtime_services.push_back(
std::make_unique<transform::rpc::network_service>(
sched_groups.transforms_sg(),
smp_service_groups.cluster_smp_sg(),
smp_service_groups.transform_smp_sg(),
&_transform_rpc_service));
}

Expand Down
30 changes: 18 additions & 12 deletions src/v/transform/rpc/deps.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <seastar/core/do_with.hh>
#include <seastar/core/future.hh>
#include <seastar/core/lowres_clock.hh>
#include <seastar/core/smp.hh>

#include <memory>
#include <type_traits>
Expand Down Expand Up @@ -74,9 +75,11 @@ class partition_manager_impl final : public partition_manager {
public:
partition_manager_impl(
ss::sharded<cluster::shard_table>* table,
ss::sharded<cluster::partition_manager>* manager)
ss::sharded<cluster::partition_manager>* manager,
ss::smp_service_group smp_group)
: _table(table)
, _manager(manager) {}
, _manager(manager)
, _smp_group(smp_group) {}

std::optional<ss::shard_id> shard_owner(const model::ktp& ntp) final {
return _table->local().shard_for(ntp);
Expand Down Expand Up @@ -211,19 +214,12 @@ class partition_manager_impl final : public partition_manager {
co_return response;
}

template<class Func>
requires requires(Func f, cluster::partition_manager& mgr) { f(mgr); }
std::invoke_result_t<Func, cluster::partition_manager&>
invoke_func_on_shard_impl(ss::shard_id shard, Func&& func) {
return _manager->invoke_on(shard, std::forward<Func>(func));
}

ss::future<cluster::errc> invoke_on_shard_impl(
ss::shard_id shard,
const model::any_ntp auto& ntp,
ss::noncopyable_function<
ss::future<cluster::errc>(kafka::partition_proxy*)> fn) {
return _manager->invoke_on(
return invoke_func_on_shard_impl(
shard,
[ntp, fn = std::move(fn)](cluster::partition_manager& mgr) mutable {
auto pp = kafka::make_partition_proxy(ntp, mgr);
Expand All @@ -239,8 +235,17 @@ class partition_manager_impl final : public partition_manager {
});
}

template<class Func>
requires requires(Func f, cluster::partition_manager& mgr) { f(mgr); }
std::invoke_result_t<Func, cluster::partition_manager&>
invoke_func_on_shard_impl(ss::shard_id shard, Func&& func) {
return _manager->invoke_on(
shard, {_smp_group}, std::forward<Func>(func));
}

ss::sharded<cluster::shard_table>* _table;
ss::sharded<cluster::partition_manager>* _manager;
ss::smp_service_group _smp_group;
};

class topic_creator_impl : public topic_creator {
Expand Down Expand Up @@ -331,8 +336,9 @@ transform::rpc::topic_metadata_cache::make_default(
std::unique_ptr<partition_manager>
transform::rpc::partition_manager::make_default(
ss::sharded<cluster::shard_table>* table,
ss::sharded<cluster::partition_manager>* manager) {
return std::make_unique<partition_manager_impl>(table, manager);
ss::sharded<cluster::partition_manager>* manager,
ss::smp_service_group smp_group) {
return std::make_unique<partition_manager_impl>(table, manager, smp_group);
}

std::optional<ss::shard_id>
Expand Down
3 changes: 2 additions & 1 deletion src/v/transform/rpc/deps.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ class partition_manager {

static std::unique_ptr<partition_manager> make_default(
ss::sharded<cluster::shard_table>*,
ss::sharded<cluster::partition_manager>*);
ss::sharded<cluster::partition_manager>*,
ss::smp_service_group smp_group);

/**
* Lookup which shard owns a particular ntp.
Expand Down

0 comments on commit cb310c5

Please sign in to comment.