Skip to content

Commit

Permalink
transform: run transforms on a scheduling group
Browse files Browse the repository at this point in the history
Signed-off-by: Tyler Rockwood <rockwood@redpanda.com>
  • Loading branch information
rockwotj committed Jan 16, 2024
1 parent 4b3251b commit 6928acc
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 11 deletions.
3 changes: 2 additions & 1 deletion src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1246,7 +1246,8 @@ void application::wire_up_runtime_services(
&raft_group_manager,
&controller->get_topics_state(),
&partition_manager,
&_transform_rpc_client)
&_transform_rpc_client,
sched_groups.transforms_sg())
.get();
}

Expand Down
10 changes: 7 additions & 3 deletions src/v/transform/api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

#include <seastar/core/circular_buffer.hh>
#include <seastar/core/lowres_clock.hh>
#include <seastar/core/scheduling.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/smp.hh>
Expand Down Expand Up @@ -405,15 +406,17 @@ service::service(
ss::sharded<raft::group_manager>* group_manager,
ss::sharded<cluster::topic_table>* topic_table,
ss::sharded<cluster::partition_manager>* partition_manager,
ss::sharded<rpc::client>* rpc_client)
ss::sharded<rpc::client>* rpc_client,
ss::scheduling_group sg)
: _runtime(runtime)
, _self(self)
, _plugin_frontend(plugin_frontend)
, _feature_table(feature_table)
, _group_manager(group_manager)
, _topic_table(topic_table)
, _partition_manager(partition_manager)
, _rpc_client(rpc_client) {}
, _rpc_client(rpc_client)
, _sg(sg) {}

service::~service() = default;

Expand All @@ -433,7 +436,8 @@ ss::future<> service::start() {
&_topic_table->local(),
&_partition_manager->local(),
&_rpc_client->local(),
_batcher.get()));
_batcher.get()),
_sg);
co_await _batcher->start();
co_await _manager->start();
register_notifications();
Expand Down
5 changes: 4 additions & 1 deletion src/v/transform/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "wasm/fwd.h"

#include <seastar/core/lowres_clock.hh>
#include <seastar/core/scheduling.hh>
#include <seastar/core/sharded.hh>
#include <seastar/util/defer.hh>
#include <seastar/util/noncopyable_function.hh>
Expand All @@ -44,7 +45,8 @@ class service : public ss::peering_sharded_service<service> {
ss::sharded<raft::group_manager>* group_manager,
ss::sharded<cluster::topic_table>* topic_table,
ss::sharded<cluster::partition_manager>* partition_manager,
ss::sharded<rpc::client>* rpc_client);
ss::sharded<rpc::client>* rpc_client,
ss::scheduling_group sg);
service(const service&) = delete;
service(service&&) = delete;
service& operator=(const service&) = delete;
Expand Down Expand Up @@ -107,6 +109,7 @@ class service : public ss::peering_sharded_service<service> {
std::unique_ptr<commit_batcher<ss::lowres_clock>> _batcher;
std::vector<ss::deferred_action<ss::noncopyable_function<void()>>>
_notification_cleanups;
ss::scheduling_group _sg;
};

} // namespace transform
6 changes: 5 additions & 1 deletion src/v/transform/tests/transform_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <seastar/core/manual_clock.hh>
#include <seastar/core/print.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/scheduling.hh>
#include <seastar/util/later.hh>
#include <seastar/util/noncopyable_function.hh>

Expand Down Expand Up @@ -290,7 +291,10 @@ class TransformManagerTest : public ::testing::Test {
auto t = std::make_unique<processor_tracker>();
_tracker = t.get();
_manager = std::make_unique<manager<ss::manual_clock>>(
/*self=*/model::node_id(0), std::move(r), std::move(t));
/*self=*/model::node_id(0),
std::move(r),
std::move(t),
ss::current_scheduling_group());
_manager->start().get();
}
void TearDown() override {
Expand Down
12 changes: 8 additions & 4 deletions src/v/transform/transform_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <seastar/core/future.hh>
#include <seastar/core/loop.hh>
#include <seastar/core/lowres_clock.hh>
#include <seastar/core/scheduling.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/when_all.hh>
Expand Down Expand Up @@ -253,11 +254,14 @@ template<typename ClockType>
manager<ClockType>::manager(
model::node_id self,
std::unique_ptr<registry> r,
std::unique_ptr<processor_factory> f)
std::unique_ptr<processor_factory> f,
ss::scheduling_group sg)
: _self(self)
, _queue([](const std::exception_ptr& ex) {
vlog(tlog.error, "unexpected transform manager error: {}", ex);
})
, _queue(
sg,
[](const std::exception_ptr& ex) {
vlog(tlog.error, "unexpected transform manager error: {}", ex);
})
, _registry(std::move(r))
, _processors(std::make_unique<processor_table<ClockType>>())
, _processor_factory(std::move(f)) {}
Expand Down
4 changes: 3 additions & 1 deletion src/v/transform/transform_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include <seastar/core/lowres_clock.hh>
#include <seastar/core/manual_clock.hh>
#include <seastar/core/scheduling.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/util/bool_class.hh>

Expand Down Expand Up @@ -107,7 +108,8 @@ class manager {
manager(
model::node_id self,
std::unique_ptr<registry>,
std::unique_ptr<processor_factory>);
std::unique_ptr<processor_factory>,
ss::scheduling_group);
manager(const manager&) = delete;
manager& operator=(const manager&) = delete;
manager(manager&&) = delete;
Expand Down

0 comments on commit 6928acc

Please sign in to comment.