Skip to content

Commit

Permalink
Merge pull request #17012 from BenPope/backport-pr-16982-v23.3.x
Browse files Browse the repository at this point in the history
[v23.3.x] cluster: Avoid oversize allocs for topic creation and configuration
  • Loading branch information
BenPope committed Mar 12, 2024
2 parents 323e436 + dab1205 commit a16674d
Show file tree
Hide file tree
Showing 24 changed files with 147 additions and 112 deletions.
10 changes: 5 additions & 5 deletions src/v/cloud_storage/tests/topic_recovery_service_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,13 @@ FIXTURE_TEST(recovery_with_missing_topic_manifest, fixture) {
}

FIXTURE_TEST(recovery_with_existing_topic, fixture) {
cluster::topic_configuration cfg{
model::ns{"kafka"}, model::topic{"test"}, 1, 1};
std::vector<cluster::custom_assignable_topic_configuration> topic_cfg = {
cluster::custom_assignable_topic_configuration{std::move(cfg)}};
cluster::custom_assignable_topic_configuration_vector topic_cfg{
{cluster::custom_assignable_topic_configuration{
{model::ns{"kafka"}, model::topic{"test"}, 1, 1}}}};
auto topic_create_result = app.controller->get_topics_frontend()
.local()
.create_topics(topic_cfg, model::no_timeout)
.create_topics(
std::move(topic_cfg), model::no_timeout)
.get();
wait_for_topics(std::move(topic_create_result)).get();
set_expectations_and_listen(
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/cloud_metadata/cluster_recovery_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ ss::future<cluster::errc> cluster_recovery_backend::do_action(
case recovery_stage::recovered_remote_topic_data: {
retry_chain_node topics_retry(&parent_retry);
// TODO: batch this up.
std::vector<topic_configuration> topics;
topic_configuration_vector topics;
for (size_t i = 0; i < actions.remote_topics.size(); i++) {
auto& topic_cfg = actions.remote_topics[i];
if (topic_cfg.is_internal()) {
Expand Down Expand Up @@ -259,7 +259,7 @@ ss::future<cluster::errc> cluster_recovery_backend::do_action(
case recovery_stage::recovered_topic_data: {
retry_chain_node topics_retry(&parent_retry);
// TODO: batch this up.
std::vector<topic_configuration> topics;
topic_configuration_vector topics;
for (size_t i = 0; i < actions.local_topics.size(); i++) {
topics.emplace_back(std::move(actions.local_topics[i]));
vlog(clusterlog.debug, "Creating topic {}", topics.back().tp_ns);
Expand Down
11 changes: 6 additions & 5 deletions src/v/cluster/cluster_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <seastar/core/future.hh>

#include <chrono>
#include <iterator>

namespace cluster {

Expand Down Expand Up @@ -110,13 +111,13 @@ bool are_replica_sets_equal(
return l_sorted == r_sorted;
}

std::vector<custom_assignable_topic_configuration>
without_custom_assignments(std::vector<topic_configuration> topics) {
std::vector<custom_assignable_topic_configuration> assignable_topics;
custom_assignable_topic_configuration_vector
without_custom_assignments(topic_configuration_vector topics) {
custom_assignable_topic_configuration_vector assignable_topics;
assignable_topics.reserve(topics.size());
std::transform(
topics.begin(),
topics.end(),
std::make_move_iterator(topics.begin()),
std::make_move_iterator(topics.end()),
std::back_inserter(assignable_topics),
[](topic_configuration cfg) {
return custom_assignable_topic_configuration(std::move(cfg));
Expand Down
68 changes: 36 additions & 32 deletions src/v/cluster/cluster_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,48 @@

#include <seastar/core/sharded.hh>

#include <concepts>
#include <ranges>
#include <system_error>
#include <utility>

namespace detail {

template<typename T, typename Fn>
template<std::ranges::range Rng, typename Fn>
requires std::same_as<
std::invoke_result_t<Fn, std::ranges::range_value_t<Rng>>,
cluster::topic_result>
std::vector<cluster::topic_result>
make_error_topic_results(const std::vector<T>& topics, Fn fn) {
make_error_topic_results(const Rng& topics, Fn fn) {
std::vector<cluster::topic_result> results;
results.reserve(topics.size());
std::transform(
topics.cbegin(),
topics.cend(),
std::back_inserter(results),
[&fn](const T& t) { return fn(t); });
topics.cbegin(), topics.cend(), std::back_inserter(results), fn);
return results;
}

template<typename T>
concept has_tp_ns = requires {
{
std::declval<T>().tp_ns
} -> std::convertible_to<const model::topic_namespace&>;
};

template<typename T>
const model::topic_namespace& extract_tp_ns(const T& t) {
if constexpr (std::same_as<T, model::topic_namespace>) {
return t;
} else if constexpr (has_tp_ns<T>) {
return t.tp_ns;
} else if constexpr (std::same_as<
T,
cluster::custom_assignable_topic_configuration>) {
return t.cfg.tp_ns;
} else {
static_assert(always_false_v<T>, "couldn't extract tp_ns");
}
}

} // namespace detail

namespace config {
Expand All @@ -56,31 +80,11 @@ class metadata_cache;
class partition;

/// Creates the same topic_result for all requests
template<typename T>
requires requires(const T& req) {
{ req.tp_ns } -> std::convertible_to<const model::topic_namespace&>;
}
std::vector<topic_result>
make_error_topic_results(const std::vector<T>& requests, errc error_code) {
return detail::make_error_topic_results(requests, [error_code](const T& r) {
return topic_result(r.tp_ns, error_code);
});
}

inline std::vector<topic_result> make_error_topic_results(
const std::vector<model::topic_namespace>& topics, errc error_code) {
return detail::make_error_topic_results(
topics, [error_code](const model::topic_namespace& t) {
return topic_result(t, error_code);
});
}

inline std::vector<topic_result> make_error_topic_results(
const std::vector<custom_assignable_topic_configuration>& requests,
errc error_code) {
std::vector<topic_result> make_error_topic_results(
const std::ranges::range auto& topics, errc error_code) {
return detail::make_error_topic_results(
requests, [error_code](const custom_assignable_topic_configuration& r) {
return topic_result(r.cfg.tp_ns, error_code);
topics, [error_code](const auto& t) {
return topic_result(detail::extract_tp_ns(t), error_code);
});
}

Expand Down Expand Up @@ -189,8 +193,8 @@ ss::future<std::error_code> replicate_and_wait(
});
}

std::vector<custom_assignable_topic_configuration>
without_custom_assignments(std::vector<topic_configuration>);
custom_assignable_topic_configuration_vector
without_custom_assignments(topic_configuration_vector);

template<class T>
inline std::vector<T>
Expand Down
6 changes: 3 additions & 3 deletions src/v/cluster/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,10 @@ service::purged_topic(purged_topic_request r, rpc::streaming_context&) {
[](topic_result res) { return purged_topic_reply(std::move(res)); });
}

std::pair<std::vector<model::topic_metadata>, std::vector<topic_configuration>>
std::pair<std::vector<model::topic_metadata>, topic_configuration_vector>
service::fetch_metadata_and_cfg(const std::vector<topic_result>& res) {
std::vector<model::topic_metadata> md;
std::vector<topic_configuration> cfg;
topic_configuration_vector cfg;
md.reserve(res.size());
for (const auto& r : res) {
if (r.ec == errc::success) {
Expand Down Expand Up @@ -237,7 +237,7 @@ service::do_update_topic_properties(update_topic_properties_request req) {
// local topic frontend instance will eventually dispatch request to _raft0
// core
auto res = co_await _topics_frontend.local().update_topic_properties(
req.updates,
std::move(req).updates,
config::shard_local_cfg().replicate_append_timeout_ms()
+ model::timeout_clock::now());

Expand Down
5 changes: 2 additions & 3 deletions src/v/cluster/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,8 @@ class service : public controller_service {

private:
static constexpr auto default_move_interruption_timeout = 10s;
std::
pair<std::vector<model::topic_metadata>, std::vector<topic_configuration>>
fetch_metadata_and_cfg(const std::vector<topic_result>&);
std::pair<std::vector<model::topic_metadata>, topic_configuration_vector>
fetch_metadata_and_cfg(const std::vector<topic_result>&);

ss::future<finish_partition_update_reply>
do_finish_partition_update(finish_partition_update_request);
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/tests/autocreate_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
#include <chrono>
#include <vector>

std::vector<cluster::topic_configuration> test_topics_configuration(
cluster::topic_configuration_vector test_topics_configuration(
cluster::replication_factor rf = cluster::replication_factor{1}) {
return std::vector<cluster::topic_configuration>{
return cluster::topic_configuration_vector{
cluster::topic_configuration(test_ns, model::topic("tp-1"), 10, rf),
cluster::topic_configuration(test_ns, model::topic("tp-2"), 10, rf),
cluster::topic_configuration(test_ns, model::topic("tp-3"), 10, rf),
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/tests/cluster_test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ class cluster_test_fixture {
return it.second->app.controller->is_raft0_leader();
});
auto& app_0 = leader_it->second->app;
std::vector<cluster::topic_configuration> cfgs = {
cluster::topic_configuration_vector cfgs = {
cluster::topic_configuration{
tp_ns.ns, tp_ns.tp, partitions, replication_factor}};
auto results = app_0.controller->get_topics_frontend()
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/tests/controller_api_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ FIXTURE_TEST(test_querying_ntp_status, cluster_test_fixture) {
auto leader = get_node_application(*leader_id);

// create topic
std::vector<cluster::topic_configuration> topics;
cluster::topic_configuration_vector topics;
topics.emplace_back(test_ntp.ns, test_ntp.tp.topic, 3, 1);

leader->controller->get_topics_frontend()
.local()
.create_topics(
cluster::without_custom_assignments(topics),
cluster::without_custom_assignments(std::move(topics)),
1s + model::timeout_clock::now())
.get();

Expand Down
11 changes: 5 additions & 6 deletions src/v/cluster/tests/health_monitor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,11 @@ FIXTURE_TEST(test_ntp_filter, cluster_test_fixture) {
}).get();

// create topics
std::vector<cluster::topic_configuration> topics;
topics.push_back(topic_cfg(model::kafka_namespace, "tp-1", 3, 3));
topics.push_back(topic_cfg(model::kafka_namespace, "tp-2", 3, 2));
topics.push_back(topic_cfg(model::kafka_namespace, "tp-3", 3, 1));
topics.push_back(
topic_cfg(model::kafka_internal_namespace, "internal-1", 3, 2));
cluster::topic_configuration_vector topics{
topic_cfg(model::kafka_namespace, "tp-1", 3, 3),
topic_cfg(model::kafka_namespace, "tp-2", 3, 2),
topic_cfg(model::kafka_namespace, "tp-3", 3, 1),
topic_cfg(model::kafka_internal_namespace, "internal-1", 3, 2)};

n1->controller->get_topics_frontend()
.local()
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/tests/metadata_dissemination_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ FIXTURE_TEST(
BOOST_REQUIRE_EQUAL(cache_2.node_count(), 3);

// Create topic with replication factor 1
std::vector<cluster::topic_configuration> topics;
cluster::topic_configuration_vector topics;
topics.emplace_back(model::ns("default"), model::topic("test_1"), 3, 1);
cntrl_0->controller->get_topics_frontend()
.local()
Expand Down Expand Up @@ -122,7 +122,7 @@ FIXTURE_TEST(test_metadata_dissemination_joining_node, cluster_test_fixture) {
BOOST_REQUIRE_EQUAL(cache_1.node_count(), 2);

// Create topic with replication factor 1
std::vector<cluster::topic_configuration> topics;
cluster::topic_configuration_vector topics;
topics.emplace_back(model::ns("default"), model::topic("test_1"), 3, 1);
cntrl_0->controller->get_topics_frontend()
.local()
Expand Down
19 changes: 9 additions & 10 deletions src/v/cluster/tests/serialization_rt_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1337,7 +1337,7 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) {
roundtrip_test(data);
}
{
std::vector<cluster::topic_properties_update> updates;
cluster::topic_properties_update_vector updates;
for (int i = 0, mi = random_generators::get_int(10); i < mi; i++) {
cluster::property_update<std::optional<v8_engine::data_policy>>
data_policy;
Expand All @@ -1350,16 +1350,15 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) {
cluster::incremental_topic_custom_updates custom_properties{
.data_policy = data_policy,
};
updates.push_back(cluster::topic_properties_update{
updates.emplace_back(
model::random_topic_namespace(),
random_incremental_topic_updates(),
custom_properties,
});
custom_properties);
}
cluster::update_topic_properties_request data{
.updates = updates,
.updates = std::move(updates),
};
roundtrip_test(data);
roundtrip_test(std::move(data));
}
{
cluster::topic_result data{
Expand Down Expand Up @@ -1698,18 +1697,18 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) {
roundtrip_test(data);
}
{
std::vector<cluster::topic_configuration> topics;
cluster::topic_configuration_vector topics;
for (auto i = 0, mi = random_generators::get_int(20); i < mi; ++i) {
topics.push_back(random_topic_configuration());
}
cluster::create_topics_request data{
.topics = topics,
.topics = std::move(topics),
.timeout = random_timeout_clock_duration(),
};
// adl encoding for topic_configuration doesn't encode/decode to exact
// equality, but also already existed prior to serde support being added
// so only testing the serde case.
roundtrip_test(data);
roundtrip_test(std::move(data));
}
{
auto data = random_partition_metadata();
Expand Down Expand Up @@ -1743,7 +1742,7 @@ SEASTAR_THREAD_TEST_CASE(serde_reflection_roundtrip) {
// adl serialization doesn't preserve equality for topic_configuration.
// serde serialization does and was added after support for adl so adl
// semantics are preserved.
roundtrip_test(data);
roundtrip_test(std::move(data));
}
{
raft::transfer_leadership_request data{
Expand Down
8 changes: 5 additions & 3 deletions src/v/cluster/topic_recovery_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "cloud_storage/topic_manifest.h"
#include "cluster/topic_recovery_status_frontend.h"
#include "cluster/topics_frontend.h"
#include "cluster/types.h"

#include <seastar/http/request.hh>
#include <seastar/util/defer.hh>
Expand Down Expand Up @@ -409,7 +410,7 @@ static cluster::topic_configuration make_topic_config(

ss::future<std::vector<cluster::topic_result>>
topic_recovery_service::create_topics(const recovery_request& request) {
std::vector<cluster::topic_configuration> topic_configs;
cluster::topic_configuration_vector topic_configs;
topic_configs.reserve(_downloaded_manifests->size());

std::transform(
Expand All @@ -419,7 +420,8 @@ topic_recovery_service::create_topics(const recovery_request& request) {
[&request](const auto& m) { return make_topic_config(m, request); });

co_return co_await _topics_frontend.local().autocreate_topics(
topic_configs, config::shard_local_cfg().create_topic_timeout_ms());
std::move(topic_configs),
config::shard_local_cfg().create_topic_timeout_ms());
}

ss::future<std::vector<cloud_storage::topic_manifest>>
Expand Down Expand Up @@ -531,7 +533,7 @@ ss::future<> topic_recovery_service::reset_topic_configurations() {
co_return;
}

std::vector<cluster::topic_properties_update> updates;
cluster::topic_properties_update_vector updates;
updates.reserve(_downloaded_manifests->size());
std::transform(
std::make_move_iterator(_downloaded_manifests->begin()),
Expand Down
Loading

0 comments on commit a16674d

Please sign in to comment.