Skip to content

Commit

Permalink
tx/producer_state: move to cluster::tx
Browse files Browse the repository at this point in the history
  • Loading branch information
bharathv committed May 9, 2024
1 parent 62915b8 commit dd9a388
Show file tree
Hide file tree
Showing 12 changed files with 44 additions and 43 deletions.
5 changes: 4 additions & 1 deletion src/v/cluster/fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ class feature_manager;
class drain_manager;
class partition_balancer_backend;
class partition_balancer_state;
class producer_state_manager;
class node_status_backend;
class node_status_table;
class ephemeral_credential_frontend;
Expand All @@ -83,6 +82,10 @@ class id_allocator_stm;
class tm_stm;
class rm_stm;

namespace tx {
class producer_state_manager;
}

namespace node {
class local_monitor;
} // namespace node
Expand Down
6 changes: 2 additions & 4 deletions src/v/cluster/producer_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
#include "cluster/logger.h"
#include "cluster/rm_stm_types.h"

namespace cluster {

using namespace tx;
namespace cluster::tx {

result_promise_t::future_type request::result() const {
return _result.get_shared_future();
Expand Down Expand Up @@ -344,4 +342,4 @@ producer_state::snapshot(kafka::offset log_start_offset) const {
return snapshot;
}

} // namespace cluster
} // namespace cluster::tx
6 changes: 2 additions & 4 deletions src/v/cluster/producer_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,15 @@ using namespace std::chrono_literals;
// Befriended to expose internal state in tests.
struct test_fixture;

namespace cluster {
namespace cluster::tx {

template<class Func>
concept AcceptsUnits = requires(Func f, ssx::semaphore_units units) {
f(std::move(units));
};

class producer_state;
namespace tx {
struct producer_state_snapshot;
}
class request;

using producer_ptr = ss::lw_shared_ptr<producer_state>;
Expand Down Expand Up @@ -236,4 +234,4 @@ class producer_state {
friend struct ::test_fixture;
};

} // namespace cluster
} // namespace cluster::tx
4 changes: 2 additions & 2 deletions src/v/cluster/producer_state_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include <seastar/core/metrics.hh>
#include <seastar/util/defer.hh>

namespace cluster {
namespace cluster::tx {

producer_state_manager::producer_state_manager(
config::binding<uint64_t> max_producer_ids,
Expand Down Expand Up @@ -115,4 +115,4 @@ void producer_state_manager::post_eviction_hook::operator()(
_state_manger._eviction_counter++;
return state._post_eviction_hook();
}
}; // namespace cluster
}; // namespace cluster::tx
4 changes: 2 additions & 2 deletions src/v/cluster/producer_state_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

#include <seastar/core/sharded.hh>

namespace cluster {
namespace cluster::tx {
class producer_state_manager {
public:
explicit producer_state_manager(
Expand Down Expand Up @@ -89,4 +89,4 @@ class producer_state_manager {

friend struct ::test_fixture;
};
} // namespace cluster
} // namespace cluster::tx
2 changes: 1 addition & 1 deletion src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ rm_stm::rm_stm(
raft::consensus* c,
ss::sharded<cluster::tx_gateway_frontend>& tx_gateway_frontend,
ss::sharded<features::feature_table>& feature_table,
ss::sharded<producer_state_manager>& producer_state_manager,
ss::sharded<tx::producer_state_manager>& producer_state_manager,
std::optional<model::vcluster_id> vcluster_id)
: raft::persisted_stm<>(rm_stm_snapshot, logger, c)
, _tx_locks(
Expand Down
24 changes: 12 additions & 12 deletions src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,15 @@ class rm_stm final : public raft::persisted_stm<> {
public:
static constexpr std::string_view name = "rm_stm";

using producers_t = mt::
map_t<absl::btree_map, model::producer_identity, cluster::producer_ptr>;
using producers_t
= mt::map_t<absl::btree_map, model::producer_identity, tx::producer_ptr>;

explicit rm_stm(
ss::logger&,
raft::consensus*,
ss::sharded<cluster::tx_gateway_frontend>&,
ss::sharded<features::feature_table>&,
ss::sharded<producer_state_manager>&,
ss::sharded<tx::producer_state_manager>&,
std::optional<model::vcluster_id>);

ss::future<checked<model::term_id, tx_errc>> begin_tx(
Expand Down Expand Up @@ -167,7 +167,7 @@ class rm_stm final : public raft::persisted_stm<> {
ss::future<> do_remove_persistent_state();
ss::future<fragmented_vector<tx::tx_range>>
do_aborted_transactions(model::offset, model::offset);
producer_ptr maybe_create_producer(model::producer_identity);
tx::producer_ptr maybe_create_producer(model::producer_identity);
void cleanup_producer_state(model::producer_identity);
ss::future<> reset_producers();
ss::future<checked<model::term_id, tx_errc>> do_begin_tx(
Expand All @@ -188,7 +188,7 @@ class rm_stm final : public raft::persisted_stm<> {
ss::future<std::optional<tx::abort_snapshot>>
load_abort_snapshot(tx::abort_index);
ss::future<> save_abort_snapshot(tx::abort_snapshot);
void update_tx_offsets(producer_ptr, const model::record_batch_header&);
void update_tx_offsets(tx::producer_ptr, const model::record_batch_header&);

ss::future<result<kafka_result>> do_replicate(
model::batch_identity,
Expand All @@ -200,14 +200,14 @@ class rm_stm final : public raft::persisted_stm<> {
model::batch_identity, model::record_batch_reader);

ss::future<result<kafka_result>> do_sync_and_transactional_replicate(
producer_ptr,
tx::producer_ptr,
model::batch_identity,
model::record_batch_reader,
ssx::semaphore_units);

ss::future<result<kafka_result>> do_transactional_replicate(
model::term_id,
producer_ptr,
tx::producer_ptr,
model::batch_identity,
model::record_batch_reader);

Expand All @@ -219,15 +219,15 @@ class rm_stm final : public raft::persisted_stm<> {

ss::future<result<kafka_result>> do_idempotent_replicate(
model::term_id,
producer_ptr,
tx::producer_ptr,
model::batch_identity,
model::record_batch_reader,
raft::replicate_options,
ss::lw_shared_ptr<available_promise<>>,
ssx::semaphore_units&);

ss::future<result<kafka_result>> do_sync_and_idempotent_replicate(
producer_ptr,
tx::producer_ptr,
model::batch_identity,
model::record_batch_reader,
raft::replicate_options,
Expand Down Expand Up @@ -426,7 +426,7 @@ class rm_stm final : public raft::persisted_stm<> {
config::binding<std::chrono::seconds> _log_stats_interval_s;
ss::timer<tx::clock_type> _log_stats_timer;
prefix_logger _ctx_log;
ss::sharded<producer_state_manager>& _producer_state_manager;
ss::sharded<tx::producer_state_manager>& _producer_state_manager;
std::optional<model::vcluster_id> _vcluster_id;

producers_t _producers;
Expand All @@ -447,7 +447,7 @@ class rm_stm_factory : public state_machine_factory {
bool enable_transactions,
bool enable_idempotence,
ss::sharded<tx_gateway_frontend>&,
ss::sharded<cluster::producer_state_manager>&,
ss::sharded<cluster::tx::producer_state_manager>&,
ss::sharded<features::feature_table>&,
ss::sharded<cluster::topic_table>&);
bool is_applicable_for(const storage::ntp_config&) const final;
Expand All @@ -457,7 +457,7 @@ class rm_stm_factory : public state_machine_factory {
bool _enable_transactions;
bool _enable_idempotence;
ss::sharded<tx_gateway_frontend>& _tx_gateway_frontend;
ss::sharded<cluster::producer_state_manager>& _producer_state_manager;
ss::sharded<cluster::tx::producer_state_manager>& _producer_state_manager;
ss::sharded<features::feature_table>& _feature_table;
ss::sharded<topic_table>& _topics;
};
Expand Down
21 changes: 11 additions & 10 deletions src/v/cluster/tests/producer_state_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@
#include <vector>

using namespace std::chrono_literals;
using namespace cluster::tx;

ss::logger logger{"producer_state_test"};

struct test_fixture {
using psm_ptr = std::unique_ptr<cluster::producer_state_manager>;
using psm_ptr = std::unique_ptr<producer_state_manager>;

static constexpr uint64_t default_max_producers = 10;

Expand All @@ -50,7 +51,7 @@ struct test_fixture {

void create_producer_state_manager(
size_t max_producers, size_t min_producers_per_vcluster) {
_psm = std::make_unique<cluster::producer_state_manager>(
_psm = std::make_unique<producer_state_manager>(
config::mock_binding<size_t>(max_producers),
std::chrono::milliseconds::max(),
config::mock_binding<size_t>(min_producers_per_vcluster));
Expand All @@ -65,20 +66,20 @@ struct test_fixture {
}
}

cluster::producer_state_manager& manager() { return *_psm; }
producer_state_manager& manager() { return *_psm; }

cluster::producer_ptr new_producer(
producer_ptr new_producer(
ss::noncopyable_function<void()> f = [] {},
std::optional<model::vcluster_id> vcluster = std::nullopt) {
auto p = ss::make_lw_shared<cluster::producer_state>(
auto p = ss::make_lw_shared<producer_state>(
model::random_producer_identity(),
raft::group_id{_counter++},
std::move(f));
manager().register_producer(*p, vcluster);
return p;
}

void clean(std::vector<cluster::producer_ptr>& producers) {
void clean(std::vector<producer_ptr>& producers) {
for (auto& producer : producers) {
manager().deregister_producer(*producer, std::nullopt);
producer->shutdown_input();
Expand All @@ -93,7 +94,7 @@ struct test_fixture {
FIXTURE_TEST(test_locked_producer_is_not_evicted, test_fixture) {
create_producer_state_manager(10, 10);
const size_t num_producers = 10;
std::vector<cluster::producer_ptr> producers;
std::vector<producer_ptr> producers;
producers.reserve(num_producers);
for (int i = 0; i < num_producers; i++) {
producers.push_back(new_producer());
Expand Down Expand Up @@ -132,7 +133,7 @@ FIXTURE_TEST(test_locked_producer_is_not_evicted, test_fixture) {
FIXTURE_TEST(test_lru_maintenance, test_fixture) {
create_producer_state_manager(10, 10);
const size_t num_producers = 5;
std::vector<cluster::producer_ptr> producers;
std::vector<producer_ptr> producers;
producers.reserve(num_producers);
for (int i = 0; i < num_producers; i++) {
auto prod = new_producer();
Expand All @@ -153,7 +154,7 @@ FIXTURE_TEST(test_lru_maintenance, test_fixture) {
FIXTURE_TEST(test_eviction_max_pids, test_fixture) {
create_producer_state_manager(10, 10);
int evicted_so_far = 0;
std::vector<cluster::producer_ptr> producers;
std::vector<producer_ptr> producers;
producers.reserve(default_max_producers);
for (int i = 0; i < default_max_producers; i++) {
producers.push_back(new_producer([&] { evicted_so_far++; }));
Expand Down Expand Up @@ -199,7 +200,7 @@ FIXTURE_TEST(test_state_management_with_multiple_namespaces, test_fixture) {
create_producer_state_manager(total_producers, 5);
struct vcluster_producer {
model::vcluster_id vcluster;
cluster::producer_ptr producer;
producer_ptr producer;
};
std::vector<vcluster_producer> producers;
producers.reserve(default_max_producers);
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/tests/randoms.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ inline cluster::tx::deprecated_seq_entry random_seq_entry() {

namespace tests {

inline cluster::producer_ptr random_producer_state() {
return ss::make_lw_shared<cluster::producer_state>(
inline cluster::tx::producer_ptr random_producer_state() {
return ss::make_lw_shared<cluster::tx::producer_state>(
model::producer_identity{
random_generators::get_int<int64_t>(),
random_generators::get_int<int16_t>()},
Expand Down
7 changes: 4 additions & 3 deletions src/v/cluster/tests/rm_stm_test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ struct rm_stm_test_fixture : simple_raft_fixture {
config::mock_binding(std::numeric_limits<uint64_t>::max()))
.get();
producer_state_manager
.invoke_on_all(
[](cluster::producer_state_manager& mgr) { return mgr.start(); })
.invoke_on_all([](cluster::tx::producer_state_manager& mgr) {
return mgr.start();
})
.get();
create_raft(overrides);
raft::state_machine_manager_builder stm_m_builder;
Expand Down Expand Up @@ -72,6 +73,6 @@ struct rm_stm_test_fixture : simple_raft_fixture {
auto get_expired_producers() const { return _stm->get_expired_producers(); }

ss::sharded<cluster::tx_gateway_frontend> tx_gateway_frontend;
ss::sharded<cluster::producer_state_manager> producer_state_manager;
ss::sharded<cluster::tx::producer_state_manager> producer_state_manager;
ss::shared_ptr<cluster::rm_stm> _stm;
};
2 changes: 1 addition & 1 deletion src/v/redpanda/application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1474,7 +1474,7 @@ void application::wire_up_redpanda_services(
}))
.get();

producer_manager.invoke_on_all(&cluster::producer_state_manager::start)
producer_manager.invoke_on_all(&cluster::tx::producer_state_manager::start)
.get();

syschecks::systemd_message("Adding partition manager").get();
Expand Down
2 changes: 1 addition & 1 deletion src/v/redpanda/application.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class application {
ss::sharded<cluster::node_status_backend> node_status_backend;
ss::sharded<cluster::node_status_table> node_status_table;
ss::sharded<cluster::partition_manager> partition_manager;
ss::sharded<cluster::producer_state_manager> producer_manager;
ss::sharded<cluster::tx::producer_state_manager> producer_manager;
ss::sharded<cluster::rm_partition_frontend> rm_partition_frontend;
ss::sharded<cluster::self_test_backend> self_test_backend;
ss::sharded<cluster::self_test_frontend> self_test_frontend;
Expand Down

0 comments on commit dd9a388

Please sign in to comment.