diff --git a/src/v/cluster/CMakeLists.txt b/src/v/cluster/CMakeLists.txt index f7378e4c6450..a884a847bf9b 100644 --- a/src/v/cluster/CMakeLists.txt +++ b/src/v/cluster/CMakeLists.txt @@ -202,6 +202,7 @@ v_cc_library( cloud_metadata/producer_id_recovery_manager.cc cloud_metadata/uploader.cc migrations/tx_manager_migrator.cc + tx_topic_manager.cc DEPS Seastar::seastar bootstrap_rpc diff --git a/src/v/cluster/fwd.h b/src/v/cluster/fwd.h index 3a907d771e4d..8e3a3bb85547 100644 --- a/src/v/cluster/fwd.h +++ b/src/v/cluster/fwd.h @@ -73,6 +73,7 @@ struct controller_join_snapshot; class tx_manager_migrator; struct state_machine_factory; class state_machine_registry; +class tx_topic_manager; namespace node { class local_monitor; diff --git a/src/v/cluster/tests/CMakeLists.txt b/src/v/cluster/tests/CMakeLists.txt index dd1b48292451..ab600fcc1795 100644 --- a/src/v/cluster/tests/CMakeLists.txt +++ b/src/v/cluster/tests/CMakeLists.txt @@ -65,7 +65,8 @@ set(srcs ephemeral_credential_test.cc health_monitor_test.cc metadata_dissemination_test.cc - replicas_rebalancing_tests.cc) + replicas_rebalancing_tests.cc + tx_topic_test.cc) foreach(cluster_test_src ${srcs}) get_filename_component(test_name ${cluster_test_src} NAME_WE) diff --git a/src/v/cluster/tests/tx_topic_test.cc b/src/v/cluster/tests/tx_topic_test.cc new file mode 100644 index 000000000000..7856021d3bc3 --- /dev/null +++ b/src/v/cluster/tests/tx_topic_test.cc @@ -0,0 +1,86 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "cluster/tx_gateway_frontend.h" +#include "config/configuration.h" +#include "kafka/protocol/types.h" +#include "model/namespace.h" +#include "redpanda/application.h" +#include "redpanda/tests/fixture.h" + +#include + +#include + +FIXTURE_TEST(test_tm_stm_new_tx, redpanda_thread_fixture) { + // call find coordinator to initialize the topic + auto coordinator = app.tx_gateway_frontend.local() + .find_coordinator( + kafka::transactional_id{"test-tx-id"}) + .get(); + + auto cfg = app.controller->get_topics_state().local().get_topic_cfg( + model::tx_manager_nt); + + BOOST_REQUIRE( + cfg->properties.retention_duration.value() + == config::shard_local_cfg() + .transaction_coordinator_delete_retention_ms()); + BOOST_REQUIRE( + cfg->properties.segment_size.value() + == config::shard_local_cfg().transaction_coordinator_log_segment_size()); + + /** + * Change property + */ + static size_t new_segment_size = 1024 * 1024; + config::shard_local_cfg() + .transaction_coordinator_log_segment_size.set_value(new_segment_size); + + RPTEST_REQUIRE_EVENTUALLY(10s, [this] { + auto cfg = app.controller->get_topics_state().local().get_topic_cfg( + model::tx_manager_nt); + + return new_segment_size == cfg->properties.segment_size.value(); + }); + + static std::chrono::milliseconds new_retention_ms(100000); + config::shard_local_cfg() + .transaction_coordinator_delete_retention_ms.set_value(new_retention_ms); + + RPTEST_REQUIRE_EVENTUALLY(10s, [this] { + auto cfg = app.controller->get_topics_state().local().get_topic_cfg( + model::tx_manager_nt); + + return new_retention_ms == cfg->properties.retention_duration.value(); + }); + + cfg = app.controller->get_topics_state().local().get_topic_cfg( + model::tx_manager_nt); + // segment size should state the same + BOOST_REQUIRE_EQUAL(new_segment_size, cfg->properties.segment_size.value()); + /** + * Change both properties at once + */ + static size_t newer_segment_size = 20000000; + static std::chrono::milliseconds newer_retention_ms(500000); + config::shard_local_cfg() + .transaction_coordinator_log_segment_size.set_value(newer_segment_size); + + config::shard_local_cfg() + .transaction_coordinator_delete_retention_ms.set_value( + newer_retention_ms); + RPTEST_REQUIRE_EVENTUALLY(10s, [this] { + auto cfg = app.controller->get_topics_state().local().get_topic_cfg( + model::tx_manager_nt); + + return newer_retention_ms == cfg->properties.retention_duration.value() + && cfg->properties.segment_size.value() == newer_segment_size; + }); +} diff --git a/src/v/cluster/tx_gateway_frontend.cc b/src/v/cluster/tx_gateway_frontend.cc index cb7a023c4ae3..7c32009df01d 100644 --- a/src/v/cluster/tx_gateway_frontend.cc +++ b/src/v/cluster/tx_gateway_frontend.cc @@ -27,6 +27,7 @@ #include "cluster/tx_gateway.h" #include "cluster/tx_gateway_service.h" #include "cluster/tx_helpers.h" +#include "cluster/tx_topic_manager.h" #include "config/configuration.h" #include "errc.h" #include "model/fundamental.h" @@ -46,10 +47,6 @@ namespace cluster { using namespace std::chrono_literals; -namespace { -// use liberate timeout when waiting for tx manager topic creation -static constexpr auto topic_wait_timeout = 20s; -} // namespace template static auto with( ss::shared_ptr stm, @@ -300,6 +297,7 @@ tx_gateway_frontend::tx_gateway_frontend( ss::sharded& rm_partition_frontend, ss::sharded& feature_table, ss::sharded& tm_stm_cache_manager, + ss::sharded& tx_topic_manager, config::binding max_transactions_per_coordinator) : _ssg(ssg) , _partition_manager(partition_manager) @@ -313,6 +311,7 @@ tx_gateway_frontend::tx_gateway_frontend( , _rm_partition_frontend(rm_partition_frontend) , _feature_table(feature_table) , _tm_stm_cache_manager(tm_stm_cache_manager) + , _tx_topic_manager(tx_topic_manager) , _metadata_dissemination_retries( config::shard_local_cfg().metadata_dissemination_retries.value()) , _metadata_dissemination_retry_delay_ms( @@ -365,7 +364,11 @@ tx_gateway_frontend::find_coordinator(kafka::transactional_id tid) { tid, model::tx_manager_nt); - auto ec = co_await create_and_wait_for_coordinator_topic(); + auto ec = co_await _tx_topic_manager.invoke_on( + cluster::tx_topic_manager::shard, [](tx_topic_manager& mgr) { + return mgr.create_and_wait_for_coordinator_topic(); + }); + if (ec != errc::success) { co_return find_coordinator_reply( std::nullopt, std::nullopt, errc::topic_not_exists); @@ -397,98 +400,6 @@ tx_gateway_frontend::find_coordinator(kafka::transactional_id tid) { co_return find_coordinator_reply{leader, std::move(ntp), errc::success}; } -ss::future tx_gateway_frontend::try_create_coordinator_topic() { - // TODO: make configuration options class parameters - int32_t partition_count - = _feature_table.local().is_active( - features::feature::transaction_partitioning) - ? config::shard_local_cfg().transaction_coordinator_partitions() - : 1; - - cluster::topic_configuration topic{ - model::kafka_internal_namespace, - model::tx_manager_topic, - partition_count, - _controller->internal_topic_replication()}; - - topic.properties.segment_size - = config::shard_local_cfg().transaction_coordinator_log_segment_size; - topic.properties.retention_duration = tristate( - config::shard_local_cfg().transaction_coordinator_delete_retention_ms()); - topic.properties.cleanup_policy_bitflags - = config::shard_local_cfg().transaction_coordinator_cleanup_policy(); - - return _controller->get_topics_frontend() - .local() - .autocreate_topics( - {std::move(topic)}, - config::shard_local_cfg().create_topic_timeout_ms() * partition_count) - .then([](std::vector res) { - vassert( - res.size() == 1, - "Expected one result related with tx manager topic creation, " - "received answer with {} results", - res.size()); - if (res[0].ec == cluster::errc::topic_already_exists) { - return true; - } - if (res[0].ec != cluster::errc::success) { - vlog( - clusterlog.warn, - "can not create {} topic - error: {}", - model::tx_manager_nt, - cluster::make_error_code(res[0].ec).message()); - return false; - } - return true; - }) - .handle_exception([](std::exception_ptr e) { - vlog( - txlog.warn, - "can not create {} topic - error: {}", - model::tx_manager_nt, - e); - return false; - }); -} - -ss::future tx_gateway_frontend::create_and_wait_for_coordinator_topic() { - if (!co_await try_create_coordinator_topic()) { - vlog( - txlog.warn, - "Error creating transaction manager topic {}", - model::tx_manager_nt); - co_return errc::topic_not_exists; - } - - try { - auto ec = co_await _controller->get_api().local().wait_for_topic( - model::tx_manager_nt, - topic_wait_timeout + model::timeout_clock::now()); - if (ec) { - vlog( - txlog.warn, - "Error waiting for transaction manager topic {} to be created - " - "{}", - model::tx_manager_nt, - ec); - // topic is creating, reply with not_coordinator error fot - // the client to retry - co_return tx_errc::partition_not_exists; - } - } catch (const ss::timed_out_error& e) { - vlog( - txlog.warn, - "Timeout waiting for transaction manager topic {} to be created - " - "{}", - model::tx_manager_nt, - e); - co_return errc::timeout; - } - - co_return errc::success; -} - std::optional tx_gateway_frontend::ntp_for_tx_id(const kafka::transactional_id& id) { if (!_feature_table.local().is_active( @@ -3408,7 +3319,10 @@ tx_gateway_frontend::get_all_transactions() { auto ntp_meta = _metadata_cache.local().get_topic_metadata( model::tx_manager_nt); if (!ntp_meta) { - auto ec = co_await create_and_wait_for_coordinator_topic(); + auto ec = co_await _tx_topic_manager.invoke_on( + cluster::tx_topic_manager::shard, [](tx_topic_manager& mgr) { + return mgr.create_and_wait_for_coordinator_topic(); + }); if (ec != errc::success) { co_return tx_errc::partition_not_exists; } diff --git a/src/v/cluster/tx_gateway_frontend.h b/src/v/cluster/tx_gateway_frontend.h index 48f8fa44bed2..413b4852edc7 100644 --- a/src/v/cluster/tx_gateway_frontend.h +++ b/src/v/cluster/tx_gateway_frontend.h @@ -41,6 +41,7 @@ class tx_gateway_frontend final ss::sharded&, ss::sharded&, ss::sharded&, + ss::sharded&, config::binding max_transactions_per_coordinator); std::optional ntp_for_tx_id(const kafka::transactional_id&); @@ -93,6 +94,7 @@ class tx_gateway_frontend final ss::sharded& _rm_partition_frontend; ss::sharded& _feature_table; ss::sharded& _tm_stm_cache_manager; + ss::sharded& _tx_topic_manager; int16_t _metadata_dissemination_retries; std::chrono::milliseconds _metadata_dissemination_retry_delay_ms; ss::timer _expire_timer; @@ -129,8 +131,6 @@ class tx_gateway_frontend final ss::future> wait_for_leader(const model::ntp&); - ss::future try_create_coordinator_topic(); - ss::future create_and_wait_for_coordinator_topic(); ss::future> get_tx( model::term_id, ss::shared_ptr, diff --git a/src/v/cluster/tx_topic_manager.cc b/src/v/cluster/tx_topic_manager.cc new file mode 100644 index 000000000000..e18782f26fc2 --- /dev/null +++ b/src/v/cluster/tx_topic_manager.cc @@ -0,0 +1,238 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "cluster/tx_topic_manager.h" + +#include "base/vassert.h" +#include "cluster/controller.h" +#include "cluster/controller_api.h" +#include "cluster/topics_frontend.h" +#include "features/feature_table.h" +#include "model/namespace.h" +#include "model/timeout_clock.h" +#include "ssx/future-util.h" +#include "tristate.h" + +#include +#include + +#include + +namespace cluster { + +namespace { +static constexpr auto topic_operation_timeout = 20s; + +} // namespace + +tx_topic_manager::tx_topic_manager( + controller& controller, + ss::sharded& features, + config::binding partition_count, + config::binding segment_size, + config::binding retention_duration) + : _controller(controller) + , _features(features) + , _partition_count(std::move(partition_count)) + , _segment_size(std::move(segment_size)) + , _retention_duration(std::move(retention_duration)) {} + +ss::future<> tx_topic_manager::start() { + co_await do_reconcile_topic_properties(); + + _segment_size.watch([this] { reconcile_topic_properties(); }); + _retention_duration.watch([this] { reconcile_topic_properties(); }); +} + +void tx_topic_manager::reconcile_topic_properties() { + ssx::spawn_with_gate( + _gate, [this] { return do_reconcile_topic_properties(); }); +} + +ss::future<> tx_topic_manager::do_reconcile_topic_properties() { + /** + * We hold mutex to make sure only one instance of reconciliation loop is + * active. Properties update evens are asynchronous hence there would be a + * possibility to have more than one reconciliation process active. + */ + auto u = co_await _reconciliation_mutex.get_units(); + + vlog(txlog.trace, "Reconciling tx manager topic properties"); + auto tp_md = _controller.get_topics_state().local().get_topic_metadata_ref( + model::tx_manager_nt); + // nothing to do, topic does not exists + if (!tp_md) { + vlog( + txlog.trace, + "Transactional manager topic does not exist. Skipping " + "reconciliation"); + co_return; + } + + const auto& topic_properties + = tp_md.value().get().get_configuration().properties; + const bool needs_update + = topic_properties.retention_duration + != tristate(_retention_duration()) + || topic_properties.segment_size != _segment_size(); + + if (!needs_update) { + vlog( + txlog.trace, + "Transactional manager topic does not need properties update"); + co_return; + } + + topic_properties_update topic_update(model::tx_manager_nt); + topic_update.properties.retention_duration.value + = tristate(_retention_duration()); + topic_update.properties.retention_duration.op + = incremental_update_operation::set; + topic_update.properties.segment_size.value = _segment_size(); + topic_update.properties.segment_size.op = incremental_update_operation::set; + try { + vlog( + txlog.info, + "Updating properties of transactional manager topic with: {}", + topic_update); + + auto results = co_await _controller.get_topics_frontend() + .local() + .update_topic_properties( + {std::move(topic_update)}, + topic_operation_timeout + + model::timeout_clock::now()); + vassert( + results.size() == 1, + "Transaction topic manager update properties requests contains only " + "one topic therefore one result is expected, actual results: {}", + results.size()); + + const auto& result = results[0]; + if (result.ec == errc::success) { + co_return; + } + vlog( + txlog.warn, + "Unable to update transaction manager topic properties - {}", + make_error_code(result.ec).message()); + } catch (...) { + vlog( + txlog.warn, + "Unable to update transaction manager topic properties - {}", + std::current_exception()); + } + /** + * In case of an error, retry after a while + */ + if (!_gate.is_closed()) { + co_await ss::sleep_abortable( + 10s, _controller.get_abort_source().local()); + + reconcile_topic_properties(); + } +} + +ss::future<> tx_topic_manager::stop() { return _gate.close(); } + +ss::future tx_topic_manager::try_create_coordinator_topic() { + const int32_t partition_count + = _features.local().is_active(features::feature::transaction_partitioning) + ? _partition_count() + : 1; + + cluster::topic_configuration topic_cfg{ + model::kafka_internal_namespace, + model::tx_manager_topic, + partition_count, + _controller.internal_topic_replication()}; + + topic_cfg.properties.segment_size = _segment_size(); + topic_cfg.properties.retention_duration + = tristate(_retention_duration()); + + topic_cfg.properties.cleanup_policy_bitflags + = config::shard_local_cfg().transaction_coordinator_cleanup_policy(); + + vlog( + txlog.info, + "Creating transaction manager topic {} with {} partitions", + model::tx_manager_nt, + partition_count); + + return _controller.get_topics_frontend() + .local() + .autocreate_topics( + {std::move(topic_cfg)}, + config::shard_local_cfg().create_topic_timeout_ms() * partition_count) + .then([](std::vector results) { + vassert( + results.size() == 1, + "Expected one result related with tx manager topic creation, " + "received answer with {} results", + results.size()); + const auto& result = results[0]; + return make_error_code(result.ec); + }) + .handle_exception([](std::exception_ptr e) { + vlog( + txlog.warn, + "Error creating tx manager topic {} - {}", + model::tx_manager_nt, + e); + + return make_error_code(errc::topic_operation_error); + }); +} + +ss::future +tx_topic_manager::create_and_wait_for_coordinator_topic() { + auto ec = co_await try_create_coordinator_topic(); + if (!(ec == errc::success || ec == errc::topic_already_exists)) { + vlog( + txlog.warn, + "Error creating tx manager topic {} - {}", + model::tx_manager_nt, + ec.message()); + co_return errc::topic_not_exists; + } + + try { + auto ec = co_await _controller.get_api().local().wait_for_topic( + model::tx_manager_nt, + topic_operation_timeout + model::timeout_clock::now()); + + if (ec) { + vlog( + txlog.warn, + "Error waiting for transaction manager topic {} to be created " + "- " + "{}", + model::tx_manager_nt, + ec); + // topic is creating, reply with not_coordinator error for + // the client to retry + co_return tx_errc::partition_not_exists; + } + } catch (const ss::timed_out_error& e) { + vlog( + txlog.warn, + "Timeout waiting for transaction manager topic {} to be created - " + "{}", + model::tx_manager_nt, + e); + co_return errc::timeout; + } + + co_return errc::success; +} + +} // namespace cluster diff --git a/src/v/cluster/tx_topic_manager.h b/src/v/cluster/tx_topic_manager.h new file mode 100644 index 000000000000..6ab49960d372 --- /dev/null +++ b/src/v/cluster/tx_topic_manager.h @@ -0,0 +1,63 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +#pragma once + +#include "base/seastarx.h" +#include "cluster/fwd.h" +#include "config/property.h" +#include "features/fwd.h" +#include "utils/mutex.h" + +#include +#include +/** + * Service responsible for managing transactional coordinator topic. + * + * The tx_topic_manager, creates the 'kafka_internal/tx' topic and handles its + * property updates. + * + * The tx_topic_manager is created on shard 0 only. + */ +namespace cluster { + +class tx_topic_manager { +public: + static constexpr ss::shard_id shard = 0; + + tx_topic_manager( + controller& controller, + ss::sharded& features, + config::binding partition_count, + config::binding segment_size, + config::binding retention_duration); + + ss::future<> start(); + + ss::future<> stop(); + + ss::future create_and_wait_for_coordinator_topic(); + +private: + ss::future try_create_coordinator_topic(); + + void reconcile_topic_properties(); + + ss::future<> do_reconcile_topic_properties(); + + controller& _controller; + ss::sharded& _features; + config::binding _partition_count; + config::binding _segment_size; + config::binding _retention_duration; + ss::gate _gate; + mutex _reconciliation_mutex; +}; +} // namespace cluster diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index 4b361325f50f..efc66f7c1477 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -65,6 +65,7 @@ #include "cluster/topics_frontend.h" #include "cluster/tx_gateway.h" #include "cluster/tx_gateway_frontend.h" +#include "cluster/tx_topic_manager.h" #include "cluster/types.h" #include "compression/async_stream_zstd.h" #include "compression/stream_zstd.h" @@ -1817,6 +1818,19 @@ void application::wire_up_redpanda_services( .get(); syschecks::systemd_message("Creating tx coordinator frontend").get(); + construct_single_service_sharded( + tx_topic_manager, + std::ref(*controller), + std::ref(feature_table), + config::shard_local_cfg().transaction_coordinator_partitions.bind(), + config::shard_local_cfg().transaction_coordinator_log_segment_size.bind(), + config::shard_local_cfg() + .transaction_coordinator_delete_retention_ms.bind()) + .get(); + tx_topic_manager + .invoke_on( + cluster::tx_topic_manager::shard, &cluster::tx_topic_manager::start) + .get(); // usually it'a an anti-pattern to let the same object be accessed // from different cores without precautionary measures like foreign // ptr. we treat exceptions on the case by case basis validating the @@ -1837,6 +1851,7 @@ void application::wire_up_redpanda_services( std::ref(rm_partition_frontend), std::ref(feature_table), std::ref(tm_stm_cache_manager), + std::ref(tx_topic_manager), ss::sharded_parameter([] { return config::shard_local_cfg() .max_transactions_per_coordinator.bind(); diff --git a/src/v/redpanda/application.h b/src/v/redpanda/application.h index 2755ce6d3daa..ef094ed1ebb7 100644 --- a/src/v/redpanda/application.h +++ b/src/v/redpanda/application.h @@ -128,6 +128,8 @@ class application { ss::sharded self_test_frontend; ss::sharded shard_table; ss::sharded tm_stm_cache_manager; + // only one instance on core 0 + ss::sharded tx_topic_manager; ss::sharded tx_gateway_frontend; ss::sharded feature_table;