Skip to content

Commit

Permalink
c/partition_balancer/ut: add concurrent topic_table updates test
Browse files Browse the repository at this point in the history
  • Loading branch information
ztlpn committed May 10, 2024
1 parent a57a575 commit 1c63990
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 2 deletions.
4 changes: 3 additions & 1 deletion src/v/cluster/tests/partition_balancer_planner_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,16 @@ struct partition_balancer_planner_fixture {
cluster::partition_balancer_planner make_planner(
model::partition_autobalancing_mode mode
= model::partition_autobalancing_mode::continuous,
size_t max_concurrent_actions = 2) {
size_t max_concurrent_actions = 2,
bool request_ondemand_rebalance = false) {
return cluster::partition_balancer_planner(
cluster::planner_config{
.mode = mode,
.soft_max_disk_usage_ratio = 0.8,
.hard_max_disk_usage_ratio = 0.95,
.max_concurrent_actions = max_concurrent_actions,
.node_availability_timeout_sec = std::chrono::minutes(1),
.ondemand_rebalance_requested = request_ondemand_rebalance,
.segment_fallocation_step = 16,
.node_responsiveness_timeout = std::chrono::seconds(10),
.topic_aware = true,
Expand Down
142 changes: 141 additions & 1 deletion src/v/cluster/tests/partition_balancer_planner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@
// by the Apache License, Version 2.0

#include "base/vlog.h"
#include "cluster/controller_snapshot.h"
#include "cluster/health_monitor_types.h"
#include "cluster/tests/partition_balancer_planner_fixture.h"
#include "utils/stable_iterator_adaptor.h"

#include <seastar/testing/thread_test_case.hh>
#include <seastar/util/defer.hh>

static ss::logger logger("partition_balancer_planner");
static ss::logger logger("pb_planner_test");

// a shorthand to avoid spelling out model::node_id
static model::node_id n(model::node_id::type id) { return model::node_id{id}; };
Expand Down Expand Up @@ -924,3 +927,140 @@ FIXTURE_TEST(balancing_modes, partition_balancer_planner_fixture) {
BOOST_REQUIRE_EQUAL(plan_data.cancellations.size(), 0);
BOOST_REQUIRE_EQUAL(plan_data.failed_actions_count, 0);
}

FIXTURE_TEST(
concurrent_topic_table_updates, partition_balancer_planner_fixture) {
// Apply lots of topic_table update commands, while concurrently invoking
// the planner. The main goal of this test is to pass ASan checks.

allocator_register_nodes(5);
config::shard_local_cfg().disable_metrics.set_value(true);
config::shard_local_cfg().disable_public_metrics.set_value(true);

auto make_create_tp_cmd = [this](ss::sstring name, int partitions) {
int16_t replication_factor = 3;
cluster::topic_configuration cfg(
test_ns, model::topic{name}, partitions, replication_factor);

ss::chunked_fifo<cluster::partition_assignment> assignments;
for (model::partition_id::type i = 0; i < partitions; ++i) {
std::vector<model::broker_shard> replicas;
for (int r = 0; r < replication_factor; ++r) {
replicas.push_back(model::broker_shard{
model::node_id{r},
random_generators::get_int<uint32_t>(0, 3)});
}
std::shuffle(
replicas.begin(),
replicas.end(),
random_generators::internal::gen);

assignments.push_back(cluster::partition_assignment{
raft::group_id{1}, model::partition_id{i}, replicas});
}
return cluster::create_topic_cmd{
make_tp_ns(name),
cluster::topic_configuration_assignment(cfg, std::move(assignments))};
};

size_t successes = 0;
size_t failures = 0;
size_t reassignments = 0;
bool should_stop = false;
ss::future<> planning_fiber = ss::async([&] {
while (!should_stop) {
vlog(logger.trace, "planning fiber: invoking...");
auto hr = create_health_report();
auto planner = make_planner(
model::partition_autobalancing_mode::node_add, 50, true);

try {
auto plan_data = planner.plan_actions(hr, as).get();
successes += 1;
reassignments += plan_data.reassignments.size();
} catch (concurrent_modification_error&) {
failures += 1;
}
vlog(logger.trace, "planning fiber: iteration done");
}
});
auto deferred = ss::defer([&] {
if (!should_stop) {
should_stop = true;
planning_fiber.get();
}
});

cluster::topic_table other_tt;
model::offset controller_offset{0};
std::set<ss::sstring> cur_topics;
bool node_isolated = false;

for (size_t iter = 0; iter < 1'000; ++iter) {
int random_val = random_generators::get_int(0, 10);
if (random_val == 10) {
// allow the planner to make some progress
ss::sleep(50ms).get();
continue;
}

// randomly create and delete topics
auto topic = ssx::sformat("topic_{}", random_val);
if (!cur_topics.contains(topic)) {
vlog(
logger.trace,
"modifying fiber: creating topic {} (isolated: {})",
topic,
node_isolated);
auto cmd = make_create_tp_cmd(
topic, random_generators::get_int(1, 20));
other_tt.apply(cmd, controller_offset).get();
if (!node_isolated) {
workers.dispatch_topic_command(cmd);
}
cur_topics.insert(topic);
} else {
vlog(
logger.trace,
"modifying fiber: deleting topic {} (isolated: {})",
topic,
node_isolated);
cluster::delete_topic_cmd cmd{make_tp_ns(topic), make_tp_ns(topic)};
other_tt.apply(cmd, controller_offset).get();
if (!node_isolated) {
workers.dispatch_topic_command(cmd);
}
cur_topics.erase(topic);
}

if (random_generators::get_int(5) == 0) {
// flip node_isolated flag

if (node_isolated) {
// simulate node coming back from isolation and recovering
// current controller state from a snapshot.
vlog(logger.trace, "modifying fiber: applying snapshot");
node_isolated = false;
cluster::controller_snapshot snap;
other_tt.fill_snapshot(snap).get();
workers.members.local().fill_snapshot(snap);
workers.dispatcher.apply_snapshot(controller_offset, snap)
.get();
} else {
node_isolated = true;
}
}

controller_offset += 1;

vlog(logger.trace, "modifying fiber: iteration done");
}

should_stop = true;
planning_fiber.get();

// sanity-check that planning made some progress.
BOOST_REQUIRE(successes > 0);
BOOST_REQUIRE(failures > 0);
BOOST_REQUIRE(reassignments > 0);
}

0 comments on commit 1c63990

Please sign in to comment.