Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.2.x] c/topics_dispatcher: do not guesstimate leader ids #16239

19 changes: 0 additions & 19 deletions src/v/cluster/partition_leaders_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,25 +221,6 @@ void partition_leaders_table::update_partition_leader(
}
}

ss::future<> partition_leaders_table::update_with_estimates() {
for (const auto& [ns_tp, topic] :
_topic_table.local().all_topics_metadata()) {
for (const auto& part : topic.metadata.get_assignments()) {
if (!_leaders.contains(leader_key_view{ns_tp, part.id})) {
model::ntp ntp{ns_tp.ns, ns_tp.tp, part.id};
vassert(
!part.replicas.empty(),
"set of replicas for ntp {} can't be empty",
ntp);
update_partition_leader(
ntp, model::term_id{1}, part.replicas.begin()->node_id);
}

co_await ss::coroutine::maybe_yield();
}
}
}

ss::future<model::node_id> partition_leaders_table::wait_for_leader(
const model::ntp& ntp,
ss::lowres_clock::time_point timeout,
Expand Down
6 changes: 0 additions & 6 deletions src/v/cluster/partition_leaders_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,6 @@ class partition_leaders_table {
model::term_id,
std::optional<model::node_id>);

// Intended to be called when we apply the topic snapshot: add leader
// guesses for partitions that are present in the topic table but we have
// not leader information for (i.e. all as-yet unseen partitions). Assumes
// that topic_table doesn't change during the call.
ss::future<> update_with_estimates();

struct leader_info_t {
model::topic_namespace tp_ns;
model::partition_id pid;
Expand Down
80 changes: 40 additions & 40 deletions src/v/cluster/tests/metadata_dissemination_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,33 +28,40 @@
#include <vector>
using namespace std::chrono_literals; // NOLINT

static auto timeout = 20s;
static ss::logger test_logger("test-logger");
std::vector<model::node_id>
wait_for_leaders_updates(int id, cluster::metadata_cache& cache) {
std::vector<model::node_id> leaders;
tests::cooperative_spin_wait_with_timeout(
std::chrono::seconds(10),
[&cache, &leaders] {
leaders.clear();
const model::topic_namespace tn(
model::ns("default"), model::topic("test_1"));
auto tp_md = cache.get_topic_metadata(tn);

if (!tp_md) {
return false;
}
if (tp_md->get_assignments().size() != 3) {
return false;
}
for (auto& p_md : tp_md->get_assignments()) {
auto leader_id = cache.get_leader_id(tn, p_md.id);
if (!leader_id) {
return false;
}
leaders.push_back(*leader_id);
}
return true;
})
.get0();
tests::cooperative_spin_wait_with_timeout(timeout, [&cache, &leaders, id] {
leaders.clear();
const model::topic_namespace tn(
model::ns("default"), model::topic("test_1"));
auto tp_md = cache.get_topic_metadata(tn);
test_logger.info(
"waiting for leaders on node {}, current topic metadata: {}",
id,
tp_md.has_value());
if (!tp_md) {
return false;
}
if (tp_md->get_assignments().size() != 3) {
return false;
}
for (auto& p_md : tp_md->get_assignments()) {
auto leader_id = cache.get_leader_id(tn, p_md.id);
test_logger.info(
"waiting for leaders on node {}, partition {}, leader_id: {}",
id,
p_md.id,
leader_id);
if (!leader_id) {
return false;
}
leaders.push_back(*leader_id);
}
return true;
}).get0();
return leaders;
}

Expand All @@ -71,12 +78,9 @@ FIXTURE_TEST(
auto& cache_1 = get_local_cache(n_2);
auto& cache_2 = get_local_cache(n_3);

tests::cooperative_spin_wait_with_timeout(
std::chrono::seconds(10),
[&cache_1, &cache_2] {
return cache_1.node_count() == 3 && cache_2.node_count() == 3;
})
.get0();
tests::cooperative_spin_wait_with_timeout(timeout, [&cache_1, &cache_2] {
return cache_1.node_count() == 3 && cache_2.node_count() == 3;
}).get0();

// Make sure we have 3 working nodes
BOOST_REQUIRE_EQUAL(cache_0.node_count(), 3);
Expand Down Expand Up @@ -110,10 +114,9 @@ FIXTURE_TEST(test_metadata_dissemination_joining_node, cluster_test_fixture) {
auto& cache_0 = get_local_cache(n_1);
auto& cache_1 = get_local_cache(n_2);

tests::cooperative_spin_wait_with_timeout(
std::chrono::seconds(10),
[&cache_1] { return cache_1.node_count() == 2; })
.get0();
tests::cooperative_spin_wait_with_timeout(timeout, [&cache_1] {
return cache_1.node_count() == 2;
}).get0();
// Make sure we have 2 working nodes
BOOST_REQUIRE_EQUAL(cache_0.node_count(), 2);
BOOST_REQUIRE_EQUAL(cache_1.node_count(), 2);
Expand All @@ -132,12 +135,9 @@ FIXTURE_TEST(test_metadata_dissemination_joining_node, cluster_test_fixture) {
create_node_application(model::node_id{2});
auto& cache_2 = get_local_cache(model::node_id{2});
// Wait for node to join the cluster
tests::cooperative_spin_wait_with_timeout(
std::chrono::seconds(10),
[&cache_1, &cache_2] {
return cache_1.node_count() == 3 && cache_2.node_count() == 3;
})
.get0();
tests::cooperative_spin_wait_with_timeout(timeout, [&cache_1, &cache_2] {
return cache_1.node_count() == 3 && cache_2.node_count() == 3;
}).get0();

auto leaders_0 = wait_for_leaders_updates(0, cache_0);
auto leaders_1 = wait_for_leaders_updates(1, cache_1);
Expand Down
29 changes: 0 additions & 29 deletions src/v/cluster/topic_updates_dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,16 +124,11 @@ ss::future<std::error_code> topic_updates_dispatcher::apply(
if (ec == errc::success) {
add_allocations_for_new_partitions(
assignments, get_allocation_domain(tp_ns));
ss::chunked_fifo<ntp_leader> leaders;
for (const auto& p_as : assignments) {
_partition_balancer_state.local().handle_ntp_update(
tp_ns.ns, tp_ns.tp, p_as.id, {}, p_as.replicas);
leaders.emplace_back(
model::ntp(tp_ns.ns, tp_ns.tp, p_as.id),
p_as.replicas.begin()->node_id);
co_await ss::coroutine::maybe_yield();
}
co_await update_leaders_with_estimates(std::move(leaders));

co_return errc::success;
}
Expand Down Expand Up @@ -468,25 +463,6 @@ topic_updates_dispatcher::collect_in_progress(
return in_progress;
}

ss::future<> topic_updates_dispatcher::update_leaders_with_estimates(
ss::chunked_fifo<ntp_leader> leaders) {
return ss::do_with(
std::move(leaders), [this](ss::chunked_fifo<ntp_leader>& leaders) {
return ss::parallel_for_each(leaders, [this](ntp_leader& leader) {
vlog(
clusterlog.debug,
"update_leaders_with_estimates: new NTP {} leader {}",
leader.first,
leader.second);
return _partition_leaders_table.invoke_on_all(
[leader = std::move(leader)](partition_leaders_table& l) {
return l.update_partition_leader(
leader.first, model::term_id(1), leader.second);
});
});
});
}

template<typename Cmd>
ss::future<std::error_code> do_apply(
ss::shard_id shard,
Expand Down Expand Up @@ -577,11 +553,6 @@ ss::future<> topic_updates_dispatcher::apply_snapshot(
return topics.apply_snapshot(offset, snap);
});

co_await _partition_leaders_table.invoke_on_all(
[](partition_leaders_table& leaders) {
return leaders.update_with_estimates();
});

co_await _partition_allocator.local().apply_snapshot(snap);

co_await _partition_balancer_state.local().apply_snapshot(snap);
Expand Down
2 changes: 0 additions & 2 deletions src/v/cluster/topic_updates_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ class topic_updates_dispatcher {

using ntp_leader = std::pair<model::ntp, model::node_id>;

ss::future<>
update_leaders_with_estimates(ss::chunked_fifo<ntp_leader> leaders);
template<typename T>
void
add_allocations_for_new_partitions(const T&, partition_allocation_domain);
Expand Down
9 changes: 8 additions & 1 deletion src/v/kafka/server/handlers/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,15 @@ std::optional<cluster::leader_term> get_leader_term(
const cluster::metadata_cache& md_cache,
const std::vector<model::node_id>& replicas) {
auto leader_term = md_cache.get_leader_term(tp_ns, p_id);
/**
* If current broker do not yet have any information about leadership we
* fallback to leader guesstimating. We return first replica from the
* replica set and term 0. (This is the same logic that has been a part of
* cluster::topic_dispatcher before)
*/
if (!leader_term) {
return std::nullopt;
leader_term.emplace(replicas[0], model::term_id(0));
return leader_term;
}
if (!leader_term->leader.has_value()) {
const auto previous = md_cache.get_previous_leader_id(tp_ns, p_id);
Expand Down
3 changes: 2 additions & 1 deletion tests/rptest/tests/consumer_group_recovery_tool_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def __init__(self, test_ctx, *args, **kwargs):
# clear topics from the the kafka_nodelete_topics to allow for
# __consumer_offsets to be configured in this test.
"kafka_nodelete_topics": [],
"group_topic_partitions": self.initial_partition_count
"group_topic_partitions": self.initial_partition_count,
"enable_leader_balancer": False,
},
node_prealloc_count=1,
**kwargs)
Expand Down
18 changes: 14 additions & 4 deletions tests/rptest/tests/partition_force_reconfiguration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from random import shuffle
import time
from rptest.tests.partition_movement import PartitionMovementMixin
from rptest.util import wait_until_result


class PartitionForceReconfigurationTest(EndToEndTest, PartitionMovementMixin):
Expand Down Expand Up @@ -60,17 +61,26 @@ def no_leader():
def _alive_nodes(self):
return [n.account.hostname for n in self.redpanda.started_nodes()]

def _stop_majority_nodes(self, replication=5, conf=None):
def _stop_majority_nodes(self, replication=5):
"""
Stops a random majority of nodes hosting partition 0 of test topic.
"""
assert self.redpanda
if not conf:
conf = self.redpanda._admin._get_stable_configuration(

def _get_details():
d = self.redpanda._admin._get_stable_configuration(
hosts=self._alive_nodes(),
topic=self.topic,
replication=replication)
replicas = conf.replicas
if d is None:
return (False, None)
return (True, d)

partition_details = wait_until_result(_get_details,
mmaslankaprv marked this conversation as resolved.
Show resolved Hide resolved
timeout_sec=30,
backoff_sec=2)

replicas = partition_details.replicas
shuffle(replicas)
mid = len(replicas) // 2 + 1
(killed, alive) = (replicas[0:mid], replicas[mid:])
Expand Down
3 changes: 3 additions & 0 deletions tests/rptest/tests/upgrade_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ def test_upgrade_with_all_workloads(self, single_upgrade):
# First version, start up the workload
self._producer.start(clean=False)
self._producer.wait_for_offset_map()
self._producer.wait_for_acks(100,
timeout_sec=10,
backoff_sec=2)
wrote_at_least = self._producer.produce_status.acked
for consumer in self._consumers:
consumer.start(clean=False)
Expand Down
Loading