From dd9a388737be6bd6399f7e24be94ea6f984e6b79 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Mon, 6 May 2024 18:15:13 -0700 Subject: [PATCH] tx/producer_state: move to cluster::tx --- src/v/cluster/fwd.h | 5 ++++- src/v/cluster/producer_state.cc | 6 ++---- src/v/cluster/producer_state.h | 6 ++---- src/v/cluster/producer_state_manager.cc | 4 ++-- src/v/cluster/producer_state_manager.h | 4 ++-- src/v/cluster/rm_stm.cc | 2 +- src/v/cluster/rm_stm.h | 24 ++++++++++----------- src/v/cluster/tests/producer_state_tests.cc | 21 +++++++++--------- src/v/cluster/tests/randoms.h | 4 ++-- src/v/cluster/tests/rm_stm_test_fixture.h | 7 +++--- src/v/redpanda/application.cc | 2 +- src/v/redpanda/application.h | 2 +- 12 files changed, 44 insertions(+), 43 deletions(-) diff --git a/src/v/cluster/fwd.h b/src/v/cluster/fwd.h index 5a036e7a09b75..c2b6c7f46c134 100644 --- a/src/v/cluster/fwd.h +++ b/src/v/cluster/fwd.h @@ -63,7 +63,6 @@ class feature_manager; class drain_manager; class partition_balancer_backend; class partition_balancer_state; -class producer_state_manager; class node_status_backend; class node_status_table; class ephemeral_credential_frontend; @@ -83,6 +82,10 @@ class id_allocator_stm; class tm_stm; class rm_stm; +namespace tx { +class producer_state_manager; +} + namespace node { class local_monitor; } // namespace node diff --git a/src/v/cluster/producer_state.cc b/src/v/cluster/producer_state.cc index e3071783de46a..f4cf87d753743 100644 --- a/src/v/cluster/producer_state.cc +++ b/src/v/cluster/producer_state.cc @@ -15,9 +15,7 @@ #include "cluster/logger.h" #include "cluster/rm_stm_types.h" -namespace cluster { - -using namespace tx; +namespace cluster::tx { result_promise_t::future_type request::result() const { return _result.get_shared_future(); @@ -344,4 +342,4 @@ producer_state::snapshot(kafka::offset log_start_offset) const { return snapshot; } -} // namespace cluster +} // namespace cluster::tx diff --git a/src/v/cluster/producer_state.h b/src/v/cluster/producer_state.h index a613dc4cec669..0e2e5b4b05227 100644 --- a/src/v/cluster/producer_state.h +++ b/src/v/cluster/producer_state.h @@ -30,7 +30,7 @@ using namespace std::chrono_literals; // Befriended to expose internal state in tests. struct test_fixture; -namespace cluster { +namespace cluster::tx { template concept AcceptsUnits = requires(Func f, ssx::semaphore_units units) { @@ -38,9 +38,7 @@ concept AcceptsUnits = requires(Func f, ssx::semaphore_units units) { }; class producer_state; -namespace tx { struct producer_state_snapshot; -} class request; using producer_ptr = ss::lw_shared_ptr; @@ -236,4 +234,4 @@ class producer_state { friend struct ::test_fixture; }; -} // namespace cluster +} // namespace cluster::tx diff --git a/src/v/cluster/producer_state_manager.cc b/src/v/cluster/producer_state_manager.cc index 0dee65f81707d..3a2bd54cc148e 100644 --- a/src/v/cluster/producer_state_manager.cc +++ b/src/v/cluster/producer_state_manager.cc @@ -21,7 +21,7 @@ #include #include -namespace cluster { +namespace cluster::tx { producer_state_manager::producer_state_manager( config::binding max_producer_ids, @@ -115,4 +115,4 @@ void producer_state_manager::post_eviction_hook::operator()( _state_manger._eviction_counter++; return state._post_eviction_hook(); } -}; // namespace cluster +}; // namespace cluster::tx diff --git a/src/v/cluster/producer_state_manager.h b/src/v/cluster/producer_state_manager.h index 3e582aa743875..6f79693521d9d 100644 --- a/src/v/cluster/producer_state_manager.h +++ b/src/v/cluster/producer_state_manager.h @@ -19,7 +19,7 @@ #include -namespace cluster { +namespace cluster::tx { class producer_state_manager { public: explicit producer_state_manager( @@ -89,4 +89,4 @@ class producer_state_manager { friend struct ::test_fixture; }; -} // namespace cluster +} // namespace cluster::tx diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 1dd4e689760ef..6d1596fa40947 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -90,7 +90,7 @@ rm_stm::rm_stm( raft::consensus* c, ss::sharded& tx_gateway_frontend, ss::sharded& feature_table, - ss::sharded& producer_state_manager, + ss::sharded& producer_state_manager, std::optional vcluster_id) : raft::persisted_stm<>(rm_stm_snapshot, logger, c) , _tx_locks( diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index dad42b60f6027..cef955444a261 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -64,15 +64,15 @@ class rm_stm final : public raft::persisted_stm<> { public: static constexpr std::string_view name = "rm_stm"; - using producers_t = mt:: - map_t; + using producers_t + = mt::map_t; explicit rm_stm( ss::logger&, raft::consensus*, ss::sharded&, ss::sharded&, - ss::sharded&, + ss::sharded&, std::optional); ss::future> begin_tx( @@ -167,7 +167,7 @@ class rm_stm final : public raft::persisted_stm<> { ss::future<> do_remove_persistent_state(); ss::future> do_aborted_transactions(model::offset, model::offset); - producer_ptr maybe_create_producer(model::producer_identity); + tx::producer_ptr maybe_create_producer(model::producer_identity); void cleanup_producer_state(model::producer_identity); ss::future<> reset_producers(); ss::future> do_begin_tx( @@ -188,7 +188,7 @@ class rm_stm final : public raft::persisted_stm<> { ss::future> load_abort_snapshot(tx::abort_index); ss::future<> save_abort_snapshot(tx::abort_snapshot); - void update_tx_offsets(producer_ptr, const model::record_batch_header&); + void update_tx_offsets(tx::producer_ptr, const model::record_batch_header&); ss::future> do_replicate( model::batch_identity, @@ -200,14 +200,14 @@ class rm_stm final : public raft::persisted_stm<> { model::batch_identity, model::record_batch_reader); ss::future> do_sync_and_transactional_replicate( - producer_ptr, + tx::producer_ptr, model::batch_identity, model::record_batch_reader, ssx::semaphore_units); ss::future> do_transactional_replicate( model::term_id, - producer_ptr, + tx::producer_ptr, model::batch_identity, model::record_batch_reader); @@ -219,7 +219,7 @@ class rm_stm final : public raft::persisted_stm<> { ss::future> do_idempotent_replicate( model::term_id, - producer_ptr, + tx::producer_ptr, model::batch_identity, model::record_batch_reader, raft::replicate_options, @@ -227,7 +227,7 @@ class rm_stm final : public raft::persisted_stm<> { ssx::semaphore_units&); ss::future> do_sync_and_idempotent_replicate( - producer_ptr, + tx::producer_ptr, model::batch_identity, model::record_batch_reader, raft::replicate_options, @@ -426,7 +426,7 @@ class rm_stm final : public raft::persisted_stm<> { config::binding _log_stats_interval_s; ss::timer _log_stats_timer; prefix_logger _ctx_log; - ss::sharded& _producer_state_manager; + ss::sharded& _producer_state_manager; std::optional _vcluster_id; producers_t _producers; @@ -447,7 +447,7 @@ class rm_stm_factory : public state_machine_factory { bool enable_transactions, bool enable_idempotence, ss::sharded&, - ss::sharded&, + ss::sharded&, ss::sharded&, ss::sharded&); bool is_applicable_for(const storage::ntp_config&) const final; @@ -457,7 +457,7 @@ class rm_stm_factory : public state_machine_factory { bool _enable_transactions; bool _enable_idempotence; ss::sharded& _tx_gateway_frontend; - ss::sharded& _producer_state_manager; + ss::sharded& _producer_state_manager; ss::sharded& _feature_table; ss::sharded& _topics; }; diff --git a/src/v/cluster/tests/producer_state_tests.cc b/src/v/cluster/tests/producer_state_tests.cc index ef3a9ab068432..9325de3b6ff89 100644 --- a/src/v/cluster/tests/producer_state_tests.cc +++ b/src/v/cluster/tests/producer_state_tests.cc @@ -30,11 +30,12 @@ #include using namespace std::chrono_literals; +using namespace cluster::tx; ss::logger logger{"producer_state_test"}; struct test_fixture { - using psm_ptr = std::unique_ptr; + using psm_ptr = std::unique_ptr; static constexpr uint64_t default_max_producers = 10; @@ -50,7 +51,7 @@ struct test_fixture { void create_producer_state_manager( size_t max_producers, size_t min_producers_per_vcluster) { - _psm = std::make_unique( + _psm = std::make_unique( config::mock_binding(max_producers), std::chrono::milliseconds::max(), config::mock_binding(min_producers_per_vcluster)); @@ -65,12 +66,12 @@ struct test_fixture { } } - cluster::producer_state_manager& manager() { return *_psm; } + producer_state_manager& manager() { return *_psm; } - cluster::producer_ptr new_producer( + producer_ptr new_producer( ss::noncopyable_function f = [] {}, std::optional vcluster = std::nullopt) { - auto p = ss::make_lw_shared( + auto p = ss::make_lw_shared( model::random_producer_identity(), raft::group_id{_counter++}, std::move(f)); @@ -78,7 +79,7 @@ struct test_fixture { return p; } - void clean(std::vector& producers) { + void clean(std::vector& producers) { for (auto& producer : producers) { manager().deregister_producer(*producer, std::nullopt); producer->shutdown_input(); @@ -93,7 +94,7 @@ struct test_fixture { FIXTURE_TEST(test_locked_producer_is_not_evicted, test_fixture) { create_producer_state_manager(10, 10); const size_t num_producers = 10; - std::vector producers; + std::vector producers; producers.reserve(num_producers); for (int i = 0; i < num_producers; i++) { producers.push_back(new_producer()); @@ -132,7 +133,7 @@ FIXTURE_TEST(test_locked_producer_is_not_evicted, test_fixture) { FIXTURE_TEST(test_lru_maintenance, test_fixture) { create_producer_state_manager(10, 10); const size_t num_producers = 5; - std::vector producers; + std::vector producers; producers.reserve(num_producers); for (int i = 0; i < num_producers; i++) { auto prod = new_producer(); @@ -153,7 +154,7 @@ FIXTURE_TEST(test_lru_maintenance, test_fixture) { FIXTURE_TEST(test_eviction_max_pids, test_fixture) { create_producer_state_manager(10, 10); int evicted_so_far = 0; - std::vector producers; + std::vector producers; producers.reserve(default_max_producers); for (int i = 0; i < default_max_producers; i++) { producers.push_back(new_producer([&] { evicted_so_far++; })); @@ -199,7 +200,7 @@ FIXTURE_TEST(test_state_management_with_multiple_namespaces, test_fixture) { create_producer_state_manager(total_producers, 5); struct vcluster_producer { model::vcluster_id vcluster; - cluster::producer_ptr producer; + producer_ptr producer; }; std::vector producers; producers.reserve(default_max_producers); diff --git a/src/v/cluster/tests/randoms.h b/src/v/cluster/tests/randoms.h index a5b78e507d059..6c7d3c0a1c73c 100644 --- a/src/v/cluster/tests/randoms.h +++ b/src/v/cluster/tests/randoms.h @@ -195,8 +195,8 @@ inline cluster::tx::deprecated_seq_entry random_seq_entry() { namespace tests { -inline cluster::producer_ptr random_producer_state() { - return ss::make_lw_shared( +inline cluster::tx::producer_ptr random_producer_state() { + return ss::make_lw_shared( model::producer_identity{ random_generators::get_int(), random_generators::get_int()}, diff --git a/src/v/cluster/tests/rm_stm_test_fixture.h b/src/v/cluster/tests/rm_stm_test_fixture.h index e10e5b45798bf..0a52a2e860b45 100644 --- a/src/v/cluster/tests/rm_stm_test_fixture.h +++ b/src/v/cluster/tests/rm_stm_test_fixture.h @@ -27,8 +27,9 @@ struct rm_stm_test_fixture : simple_raft_fixture { config::mock_binding(std::numeric_limits::max())) .get(); producer_state_manager - .invoke_on_all( - [](cluster::producer_state_manager& mgr) { return mgr.start(); }) + .invoke_on_all([](cluster::tx::producer_state_manager& mgr) { + return mgr.start(); + }) .get(); create_raft(overrides); raft::state_machine_manager_builder stm_m_builder; @@ -72,6 +73,6 @@ struct rm_stm_test_fixture : simple_raft_fixture { auto get_expired_producers() const { return _stm->get_expired_producers(); } ss::sharded tx_gateway_frontend; - ss::sharded producer_state_manager; + ss::sharded producer_state_manager; ss::shared_ptr _stm; }; diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index ac703eb044ab1..48c60338713df 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -1474,7 +1474,7 @@ void application::wire_up_redpanda_services( })) .get(); - producer_manager.invoke_on_all(&cluster::producer_state_manager::start) + producer_manager.invoke_on_all(&cluster::tx::producer_state_manager::start) .get(); syschecks::systemd_message("Adding partition manager").get(); diff --git a/src/v/redpanda/application.h b/src/v/redpanda/application.h index 43583989fff7c..6c66c3324b3e9 100644 --- a/src/v/redpanda/application.h +++ b/src/v/redpanda/application.h @@ -127,7 +127,7 @@ class application { ss::sharded node_status_backend; ss::sharded node_status_table; ss::sharded partition_manager; - ss::sharded producer_manager; + ss::sharded producer_manager; ss::sharded rm_partition_frontend; ss::sharded self_test_backend; ss::sharded self_test_frontend;