Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix some concurrent memory access problems in partition balancer #18305

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 15 additions & 1 deletion src/v/cluster/scheduling/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "cluster/logger.h"
#include "cluster/scheduling/allocation_state.h"
#include "utils/exceptions.h"
#include "utils/to_string.h"

#include <fmt/ostream.h>
Expand Down Expand Up @@ -47,6 +48,9 @@ allocation_units::allocation_units(

allocation_units::~allocation_units() {
oncore_debug_verify(_oncore);
if (unlikely(!_state)) {
return;
}
for (const auto& replica : _added_replicas) {
_state->remove_allocation(replica, _domain);
_state->remove_final_count(replica, _domain);
Expand Down Expand Up @@ -80,6 +84,11 @@ allocated_partition::prepare_move(model::node_id prev_node) const {

model::broker_shard allocated_partition::add_replica(
model::node_id node, const std::optional<previous_replica>& prev) {
if (unlikely(!_state)) {
throw concurrent_modification_error(
"allocation_state was concurrently replaced");
}

if (!_original_node2shard) {
_original_node2shard.emplace();
for (const auto& bs : _replicas) {
Expand Down Expand Up @@ -155,7 +164,12 @@ bool allocated_partition::is_original(model::node_id node) const {
}

errc allocated_partition::try_revert(const reallocation_step& step) {
if (!_original_node2shard || !_state) {
if (unlikely(!_state)) {
throw concurrent_modification_error(
"allocation_state was concurrently replaced");
}

if (!_original_node2shard) {
return errc::no_update_in_progress;
}

Expand Down
4 changes: 3 additions & 1 deletion src/v/cluster/tests/partition_balancer_planner_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,16 @@ struct partition_balancer_planner_fixture {
cluster::partition_balancer_planner make_planner(
model::partition_autobalancing_mode mode
= model::partition_autobalancing_mode::continuous,
size_t max_concurrent_actions = 2) {
size_t max_concurrent_actions = 2,
bool request_ondemand_rebalance = false) {
return cluster::partition_balancer_planner(
cluster::planner_config{
.mode = mode,
.soft_max_disk_usage_ratio = 0.8,
.hard_max_disk_usage_ratio = 0.95,
.max_concurrent_actions = max_concurrent_actions,
.node_availability_timeout_sec = std::chrono::minutes(1),
.ondemand_rebalance_requested = request_ondemand_rebalance,
.segment_fallocation_step = 16,
.node_responsiveness_timeout = std::chrono::seconds(10),
.topic_aware = true,
Expand Down
142 changes: 141 additions & 1 deletion src/v/cluster/tests/partition_balancer_planner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@
// by the Apache License, Version 2.0

#include "base/vlog.h"
#include "cluster/controller_snapshot.h"
#include "cluster/health_monitor_types.h"
#include "cluster/tests/partition_balancer_planner_fixture.h"
#include "utils/stable_iterator_adaptor.h"

#include <seastar/testing/thread_test_case.hh>
#include <seastar/util/defer.hh>

static ss::logger logger("partition_balancer_planner");
static ss::logger logger("pb_planner_test");

// a shorthand to avoid spelling out model::node_id
static model::node_id n(model::node_id::type id) { return model::node_id{id}; };
Expand Down Expand Up @@ -924,3 +927,140 @@ FIXTURE_TEST(balancing_modes, partition_balancer_planner_fixture) {
BOOST_REQUIRE_EQUAL(plan_data.cancellations.size(), 0);
BOOST_REQUIRE_EQUAL(plan_data.failed_actions_count, 0);
}

FIXTURE_TEST(
concurrent_topic_table_updates, partition_balancer_planner_fixture) {
// Apply lots of topic_table update commands, while concurrently invoking
// the planner. The main goal of this test is to pass ASan checks.

allocator_register_nodes(5);
config::shard_local_cfg().disable_metrics.set_value(true);
config::shard_local_cfg().disable_public_metrics.set_value(true);

auto make_create_tp_cmd = [this](ss::sstring name, int partitions) {
int16_t replication_factor = 3;
cluster::topic_configuration cfg(
test_ns, model::topic{name}, partitions, replication_factor);

ss::chunked_fifo<cluster::partition_assignment> assignments;
for (model::partition_id::type i = 0; i < partitions; ++i) {
std::vector<model::broker_shard> replicas;
for (int r = 0; r < replication_factor; ++r) {
replicas.push_back(model::broker_shard{
model::node_id{r},
random_generators::get_int<uint32_t>(0, 3)});
}
std::shuffle(
replicas.begin(),
replicas.end(),
random_generators::internal::gen);

assignments.push_back(cluster::partition_assignment{
raft::group_id{1}, model::partition_id{i}, replicas});
}
return cluster::create_topic_cmd{
make_tp_ns(name),
cluster::topic_configuration_assignment(cfg, std::move(assignments))};
};

size_t successes = 0;
size_t failures = 0;
size_t reassignments = 0;
bool should_stop = false;
ss::future<> planning_fiber = ss::async([&] {
while (!should_stop) {
vlog(logger.trace, "planning fiber: invoking...");
auto hr = create_health_report();
auto planner = make_planner(
model::partition_autobalancing_mode::node_add, 50, true);

try {
auto plan_data = planner.plan_actions(hr, as).get();
successes += 1;
reassignments += plan_data.reassignments.size();
} catch (concurrent_modification_error&) {
failures += 1;
}
vlog(logger.trace, "planning fiber: iteration done");
}
});
auto deferred = ss::defer([&] {
if (!should_stop) {
should_stop = true;
planning_fiber.get();
}
});

cluster::topic_table other_tt;
model::offset controller_offset{0};
std::set<ss::sstring> cur_topics;
bool node_isolated = false;

for (size_t iter = 0; iter < 1'000; ++iter) {
int random_val = random_generators::get_int(0, 10);
if (random_val == 10) {
// allow the planner to make some progress
ss::sleep(50ms).get();
continue;
}

// randomly create and delete topics
auto topic = ssx::sformat("topic_{}", random_val);
if (!cur_topics.contains(topic)) {
vlog(
logger.trace,
"modifying fiber: creating topic {} (isolated: {})",
topic,
node_isolated);
auto cmd = make_create_tp_cmd(
topic, random_generators::get_int(1, 20));
other_tt.apply(cmd, controller_offset).get();
if (!node_isolated) {
workers.dispatch_topic_command(cmd);
}
cur_topics.insert(topic);
} else {
vlog(
logger.trace,
"modifying fiber: deleting topic {} (isolated: {})",
topic,
node_isolated);
cluster::delete_topic_cmd cmd{make_tp_ns(topic), make_tp_ns(topic)};
other_tt.apply(cmd, controller_offset).get();
if (!node_isolated) {
workers.dispatch_topic_command(cmd);
}
cur_topics.erase(topic);
}

if (random_generators::get_int(5) == 0) {
// flip node_isolated flag

if (node_isolated) {
// simulate node coming back from isolation and recovering
// current controller state from a snapshot.
vlog(logger.trace, "modifying fiber: applying snapshot");
node_isolated = false;
cluster::controller_snapshot snap;
other_tt.fill_snapshot(snap).get();
workers.members.local().fill_snapshot(snap);
workers.dispatcher.apply_snapshot(controller_offset, snap)
.get();
} else {
node_isolated = true;
}
}

controller_offset += 1;

vlog(logger.trace, "modifying fiber: iteration done");
}

should_stop = true;
planning_fiber.get();

// sanity-check that planning made some progress.
BOOST_REQUIRE(successes > 0);
BOOST_REQUIRE(failures > 0);
BOOST_REQUIRE(reassignments > 0);
}
38 changes: 31 additions & 7 deletions src/v/cluster/topic_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ topic_table::apply(finish_moving_partition_replicas_cmd cmd, model::offset o) {

_updates_in_progress.erase(it);

_topics_map_revision++;

on_partition_move_finish(cmd.key, cmd.value);

// notify backend about finished update
Expand Down Expand Up @@ -416,6 +418,8 @@ topic_table::apply(cancel_moving_partition_replicas_cmd cmd, model::offset o) {
current_assignment_it->replicas
= in_progress_it->second.get_previous_replicas();

_topics_map_revision++;

_pending_deltas.emplace_back(
std::move(cmd.key),
current_assignment_it->group,
Expand Down Expand Up @@ -459,6 +463,11 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) {
co_return errc::no_update_in_progress;
}

auto p_meta_it = tp->second.partitions.find(ntp.tp.partition);
if (p_meta_it == tp->second.partitions.end()) {
co_return errc::partition_not_exists;
}

// revert replica set update
current_assignment_it->replicas
= in_progress_it->second.get_target_replicas();
Expand All @@ -469,11 +478,7 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) {
current_assignment_it->replicas,
};

// update partition_meta object
auto p_meta_it = tp->second.partitions.find(ntp.tp.partition);
if (p_meta_it == tp->second.partitions.end()) {
co_return errc::partition_not_exists;
}
// update partition_meta object:
// the cancellation was reverted and update went through, we must
// update replicas_revisions.
p_meta_it->second.replicas_revisions = update_replicas_revisions(
Expand All @@ -485,6 +490,8 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) {
/// Since the update is already finished we drop in_progress state
_updates_in_progress.erase(in_progress_it);

_topics_map_revision++;

// notify backend about finished update
_pending_deltas.emplace_back(
ntp,
Expand Down Expand Up @@ -670,6 +677,7 @@ topic_table::apply(set_topic_partitions_disabled_cmd cmd, model::offset o) {
}
}

_topics_map_revision++;
notify_waiters();

co_return errc::success;
Expand Down Expand Up @@ -998,6 +1006,7 @@ class topic_table::snapshot_applier {
disabled_partitions_t& _disabled_partitions;
fragmented_vector<delta>& _pending_deltas;
topic_table_probe& _probe;
model::revision_id& _topics_map_revision;
model::revision_id _snap_revision;

public:
Expand All @@ -1006,14 +1015,17 @@ class topic_table::snapshot_applier {
, _disabled_partitions(parent._disabled_partitions)
, _pending_deltas(parent._pending_deltas)
, _probe(parent._probe)
, _topics_map_revision(parent._topics_map_revision)
, _snap_revision(snap_revision) {}

void delete_ntp(
const model::topic_namespace& ns_tp, const partition_assignment& p_as) {
auto ntp = model::ntp(ns_tp.ns, ns_tp.tp, p_as.id);
vlog(
clusterlog.trace, "deleting ntp {} not in controller snapshot", ntp);
_updates_in_progress.erase(ntp);
if (_updates_in_progress.erase(ntp)) {
_topics_map_revision++;
};

_pending_deltas.emplace_back(
std::move(ntp),
Expand All @@ -1035,7 +1047,9 @@ class topic_table::snapshot_applier {
delete_ntp(ns_tp, p_as);
co_await ss::coroutine::maybe_yield();
}
_disabled_partitions.erase(ns_tp);
if (_disabled_partitions.erase(ns_tp)) {
_topics_map_revision++;
};
_probe.handle_topic_deletion(ns_tp);
// topic_metadata_item object is supposed to be removed from _topics by
// the caller
Expand All @@ -1050,6 +1064,9 @@ class topic_table::snapshot_applier {
vlog(clusterlog.trace, "adding ntp {} from controller snapshot", ntp);
size_t pending_deltas_start_idx = _pending_deltas.size();

// we are going to modify md_item so increment the revision right away.
_topics_map_revision++;

const model::partition_id p_id = ntp.tp.partition;

// 1. reconcile the _topics state (the md_item object) and generate
Expand Down Expand Up @@ -1191,7 +1208,9 @@ class topic_table::snapshot_applier {
topic_metadata_item ret{topic_metadata{topic.metadata, {}}};
if (topic.disabled_set) {
_disabled_partitions[ns_tp] = *topic.disabled_set;
_topics_map_revision++;
}

for (const auto& [p_id, partition] : topic.partitions) {
auto ntp = model::ntp(ns_tp.ns, ns_tp.tp, p_id);
add_ntp(ntp, topic, partition, ret, false);
Expand Down Expand Up @@ -1230,6 +1249,7 @@ ss::future<> topic_table::apply_snapshot(
// The topic was re-created, delete and add it anew.
co_await applier.delete_topic(ns_tp, md_item);
md_item = co_await applier.create_topic(ns_tp, topic_snapshot);
_topics_map_revision++;
} else {
// The topic was present in the previous set, now we need to
// reconcile individual partitions.
Expand All @@ -1247,10 +1267,12 @@ ss::future<> topic_table::apply_snapshot(
old_disabled_set = std::exchange(
_disabled_partitions[ns_tp],
*topic_snapshot.disabled_set);
_topics_map_revision++;
} else if (auto it = _disabled_partitions.find(ns_tp);
it != _disabled_partitions.end()) {
old_disabled_set = std::move(it->second);
_disabled_partitions.erase(it);
_topics_map_revision++;
}

// 2. For each partition in the new set, reconcile assignments
Expand Down Expand Up @@ -1288,6 +1310,7 @@ ss::future<> topic_table::apply_snapshot(
if (!topic_snapshot.partitions.contains(as_it_copy->id)) {
applier.delete_ntp(ns_tp, *as_it_copy);
md_item.get_assignments().erase(as_it_copy);
_topics_map_revision++;
}
co_await ss::coroutine::maybe_yield();
}
Expand Down Expand Up @@ -1633,6 +1656,7 @@ void topic_table::change_partition_replicas(
auto previous_assignment = current_assignment.replicas;
// replace partition replica set
current_assignment.replicas = new_assignment;
_topics_map_revision++;

// calculate delta for backend

Expand Down