Skip to content

Commit

Permalink
tests: wait for leaders in balancer test
Browse files Browse the repository at this point in the history
Signed-off-by: Michal Maslanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Dec 15, 2023
1 parent 60e235f commit bc310c3
Showing 1 changed file with 28 additions and 0 deletions.
28 changes: 28 additions & 0 deletions src/v/cluster/tests/partition_balancer_planner_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@
#include "cluster/node_status_table.h"
#include "cluster/partition_balancer_planner.h"
#include "cluster/partition_balancer_state.h"
#include "cluster/partition_leaders_table.h"
#include "cluster/tests/utils.h"
#include "cluster/topic_updates_dispatcher.h"
#include "cluster/types.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "random/generators.h"
#include "test_utils/fixture.h"
#include "units.h"

#include <seastar/core/chunked_fifo.hh>
#include <seastar/core/loop.hh>
#include <seastar/core/lowres_clock.hh>

#include <chrono>
#include <optional>
Expand Down Expand Up @@ -55,6 +59,7 @@ struct controller_workers {
controller_workers()
: dispatcher(allocator, table, leaders, state) {
table.start().get();
leaders.start(std::ref(table)).get();
members.start_single().get();
allocator
.start_single(
Expand Down Expand Up @@ -132,6 +137,7 @@ struct controller_workers {
~controller_workers() {
state.stop().get();
node_status_table.stop().get();
leaders.stop().get();
table.stop().get();
allocator.stop().get();
members.stop().get();
Expand Down Expand Up @@ -196,12 +202,21 @@ struct partition_balancer_planner_fixture {
make_tp_ns(name),
workers.make_tp_configuration(name, partitions, replication_factor)};
workers.dispatch_topic_command(std::move(cmd));
ss::parallel_for_each(boost::irange(partitions), [this, name](int i) {
return workers.leaders.local()
.wait_for_leader(
model::ntp(test_ns, model::topic{name}, model::partition_id(i)),
ss::lowres_clock::now() + 5s,
std::nullopt)
.discard_result();
}).get();
}

void create_topic(
const ss::sstring& name,
std::vector<std::vector<model::node_id>> partition_nodes) {
BOOST_REQUIRE(!partition_nodes.empty());
auto partition_count = partition_nodes.size();
int16_t replication_factor = partition_nodes.front().size();
cluster::topic_configuration cfg(
test_ns,
Expand All @@ -226,6 +241,19 @@ struct partition_balancer_planner_fixture {
cluster::topic_configuration_assignment(cfg, std::move(assignments))};

workers.dispatch_topic_command(std::move(cmd));

ss::parallel_for_each(
boost::irange(partition_count),
[this, name](int i) {
return workers.leaders.local()
.wait_for_leader(
model::ntp(
test_ns, model::topic{name}, model::partition_id(i)),
ss::lowres_clock::now() + 5s,
std::nullopt)
.discard_result();
})
.get();
}

void allocator_register_nodes(
Expand Down

0 comments on commit bc310c3

Please sign in to comment.