From 19690d1b1dc06c692ce142c396e37c35e9fe5c8d Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Mon, 15 Apr 2024 14:10:36 +0200 Subject: [PATCH 01/17] cluster: track group id in shard_placement_table We track group ids in partition assignments and current state because we want to use them as kvstore keys - group id is just a number so it is better in this regard than the ntp and it uniquely identifies the current incarnation of the ntp (i.e. each group id is uniquely mapped to ntp, but ntp can be mapped to several group ids, though only one will be current). --- src/v/cluster/cluster_utils.cc | 9 +++- src/v/cluster/shard_placement_table.cc | 49 ++++++++++--------- src/v/cluster/shard_placement_table.h | 34 ++++++------- src/v/cluster/tests/cluster_utils_tests.cc | 8 +-- .../tests/shard_placement_table_test.cc | 1 + src/v/cluster/types.cc | 9 +++- src/v/cluster/types.h | 7 ++- 7 files changed, 67 insertions(+), 50 deletions(-) diff --git a/src/v/cluster/cluster_utils.cc b/src/v/cluster/cluster_utils.cc index 2f1e98cb8741..6bf1e3446dc6 100644 --- a/src/v/cluster/cluster_utils.cc +++ b/src/v/cluster/cluster_utils.cc @@ -237,12 +237,16 @@ std::optional placement_target_on_node( // expected shard is determined by the resulting assignment // (including cancellation effects). return shard_placement_target{ - log_revision, resulting_shard_on_node.value()}; + replicas_view.assignment.group, + log_revision, + resulting_shard_on_node.value()}; } else { // partition is moved away from this node, but we keep the original // replica until update is finished. return shard_placement_target{ - log_revision, orig_shard_on_node.value()}; + replicas_view.assignment.group, + log_revision, + orig_shard_on_node.value()}; } } else if (replicas_view.update) { // if partition appears on the node as a result of the update, create @@ -252,6 +256,7 @@ std::optional placement_target_on_node( replicas_view.update->get_target_replicas(), node); if (updated_shard_on_node) { return shard_placement_target{ + replicas_view.assignment.group, replicas_view.update->get_update_revision(), updated_shard_on_node.value()}; } diff --git a/src/v/cluster/shard_placement_table.cc b/src/v/cluster/shard_placement_table.cc index 320b22d0a553..a054c5c4bfb7 100644 --- a/src/v/cluster/shard_placement_table.cc +++ b/src/v/cluster/shard_placement_table.cc @@ -24,7 +24,8 @@ std::ostream& operator<<( std::ostream& o, const shard_placement_table::shard_local_assignment& as) { fmt::print( o, - "{{log_revision: {}, shard_revision: {}}}", + "{{group: {}, log_revision: {}, shard_revision: {}}}", + as.group, as.log_revision, as.shard_revision); return o; @@ -47,7 +48,8 @@ std::ostream& operator<<( std::ostream& o, const shard_placement_table::shard_local_state& ls) { fmt::print( o, - "{{log_revision: {}, status: {}, shard_revision: {}}}", + "{{group: {}, log_revision: {}, status: {}, shard_revision: {}}}", + ls.group, ls.log_revision, ls.status, ls.shard_revision); @@ -152,13 +154,15 @@ ss::future<> shard_placement_table::initialize( auto placement = placement_state(); auto assigned = shard_local_assignment{ + .group = target->group, .log_revision = target->log_revision, .shard_revision = _cur_shard_revision}; if (orig_shard && target->shard != orig_shard) { // cross-shard transfer, orig_shard gets the hosted marker if (ss::this_shard_id() == orig_shard) { - placement.current = shard_local_state::initial(assigned); + placement.current = shard_local_state( + assigned, hosted_status::hosted); _states.emplace(ntp, placement); } else if (ss::this_shard_id() == target->shard) { placement.assigned = assigned; @@ -166,7 +170,8 @@ ss::future<> shard_placement_table::initialize( } } else if (ss::this_shard_id() == target->shard) { // in other cases target shard gets the hosted marker - placement.current = shard_local_state::initial(assigned); + placement.current = shard_local_state( + assigned, hosted_status::hosted); placement.assigned = assigned; _states.emplace(ntp, placement); } @@ -235,16 +240,17 @@ ss::future<> shard_placement_table::set_target( if (target) { const bool is_initial = (!prev_target || prev_target->log_revision != target->log_revision); + shard_local_assignment as{ + .group = target->group, + .log_revision = target->log_revision, + .shard_revision = shard_rev, + }; co_await container().invoke_on( target->shard, - [&ntp, target, shard_rev, is_initial, shard_callback]( + [&ntp, &as, is_initial, shard_callback]( shard_placement_table& other) { return other.set_assigned_on_this_shard( - ntp, - target->log_revision, - shard_rev, - is_initial, - shard_callback); + ntp, as, is_initial, shard_callback); }); } @@ -259,26 +265,20 @@ ss::future<> shard_placement_table::set_target( ss::future<> shard_placement_table::set_assigned_on_this_shard( const model::ntp& ntp, - model::revision_id target_log_rev, - model::shard_revision_id shard_rev, + const shard_local_assignment& as, bool is_initial, shard_callback_t shard_callback) { vlog( clusterlog.trace, - "[{}] setting assigned on this shard, " - "log_revision: {}, shard_revision: {}, is_initial: {}", + "[{}] setting assigned on this shard to: {}, is_initial: {}", ntp, - target_log_rev, - shard_rev, + as, is_initial); auto& state = _states.try_emplace(ntp).first->second; - state.assigned = shard_local_assignment{ - .log_revision = target_log_rev, - .shard_revision = shard_rev, - }; + state.assigned = as; if (is_initial) { - state._is_initial_for = target_log_rev; + state._is_initial_for = as.log_revision; } // Notify the caller that something has changed on this shard. @@ -334,7 +334,8 @@ ss::future shard_placement_table::prepare_create( if (!state.current) { if (state._is_initial_for == expected_log_rev) { - state.current = shard_local_state::initial(*state.assigned); + state.current = shard_local_state( + *state.assigned, hosted_status::hosted); state._is_initial_for = std::nullopt; } else { // x-shard transfer hasn't started yet, wait for it. @@ -443,8 +444,8 @@ ss::future> shard_placement_table::prepare_transfer( // at this point we commit to the transfer on the // destination shard - dest_state.current = shard_local_state::receiving( - dest_state.assigned.value()); + dest_state.current = shard_local_state( + dest_state.assigned.value(), hosted_status::receiving); if (dest_state._is_initial_for <= expected_log_rev) { dest_state._is_initial_for = std::nullopt; } diff --git a/src/v/cluster/shard_placement_table.h b/src/v/cluster/shard_placement_table.h index 818377e0be8d..566dc11b74b2 100644 --- a/src/v/cluster/shard_placement_table.h +++ b/src/v/cluster/shard_placement_table.h @@ -46,6 +46,7 @@ class shard_placement_table /// Struct used to express the fact that a partition replica of some ntp is /// expected on this shard. struct shard_local_assignment { + raft::group_id group; model::revision_id log_revision; model::shard_revision_id shard_revision; @@ -65,25 +66,25 @@ class shard_placement_table /// Current state of shard-local partition kvstore data on this shard. struct shard_local_state { + raft::group_id group; model::revision_id log_revision; hosted_status status; model::shard_revision_id shard_revision; - static shard_local_state initial(const shard_local_assignment& as) { - return shard_local_state{ - .log_revision = as.log_revision, - .status = hosted_status::hosted, - .shard_revision = as.shard_revision, - }; - } - - static shard_local_state receiving(const shard_local_assignment& as) { - return shard_local_state{ - .log_revision = as.log_revision, - .status = hosted_status::receiving, - .shard_revision = as.shard_revision, - }; - } + shard_local_state( + raft::group_id g, + model::revision_id lr, + hosted_status s, + model::shard_revision_id sr) + : group(g) + , log_revision(lr) + , status(s) + , shard_revision(sr) {} + + shard_local_state( + const shard_local_assignment& as, hosted_status status) + : shard_local_state( + as.group, as.log_revision, status, as.shard_revision) {} friend std::ostream& operator<<(std::ostream&, const shard_local_state&); @@ -186,8 +187,7 @@ class shard_placement_table private: ss::future<> set_assigned_on_this_shard( const model::ntp&, - model::revision_id target_log_revision, - model::shard_revision_id, + const shard_local_assignment&, bool is_initial, shard_callback_t); diff --git a/src/v/cluster/tests/cluster_utils_tests.cc b/src/v/cluster/tests/cluster_utils_tests.cc index 89c707f4c9ff..2ccf6188ac3a 100644 --- a/src/v/cluster/tests/cluster_utils_tests.cc +++ b/src/v/cluster/tests/cluster_utils_tests.cc @@ -39,13 +39,15 @@ SEASTAR_THREAD_TEST_CASE(test_find_shard_on_node) { } SEASTAR_THREAD_TEST_CASE(test_placement_target_on_node) { + raft::group_id group{111}; + cluster::replicas_t orig_replicas{ model::broker_shard{model::node_id(1), 2}, model::broker_shard{model::node_id(2), 1}, model::broker_shard{model::node_id(3), 0}}; cluster::partition_assignment orig_assignment( - raft::group_id(111), model::partition_id(23), orig_replicas); + group, model::partition_id(23), orig_replicas); cluster::topic_table::partition_meta partition_meta{ .replicas_revisions = { @@ -59,13 +61,13 @@ SEASTAR_THREAD_TEST_CASE(test_placement_target_on_node) { // node_id, log_revision, shard_id using expected_list_t = std::vector>; - auto check = []( + auto check = [group]( std::string_view case_id, const cluster::topic_table::partition_replicas_view& rv, expected_list_t expected_list) { for (auto [node_id, log_revision, shard_id] : expected_list) { cluster::shard_placement_target expected{ - model::revision_id(log_revision), shard_id}; + group, model::revision_id(log_revision), shard_id}; auto actual = cluster::placement_target_on_node( rv, model::node_id(node_id)); BOOST_REQUIRE_MESSAGE( diff --git a/src/v/cluster/tests/shard_placement_table_test.cc b/src/v/cluster/tests/shard_placement_table_test.cc index 7d20c80115bf..5400e7c4bfa1 100644 --- a/src/v/cluster/tests/shard_placement_table_test.cc +++ b/src/v/cluster/tests/shard_placement_table_test.cc @@ -644,6 +644,7 @@ class shard_assigner { std::optional target; if (auto it = _ntpt.ntp2meta.find(ntp); it != _ntpt.ntp2meta.end()) { target = shard_placement_target( + it->second.group, it->second.log_revision, random_generators::get_int(get_max_shard_id())); } diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index 7c6a84c6f343..7eb7da4ff80f 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -438,8 +438,13 @@ std::ostream& operator<<(std::ostream& o, const partition_assignment& p_as) { return o; } -std::ostream& operator<<(std::ostream& o, const shard_placement_target& eg) { - fmt::print(o, "{{log_revision: {}, shard: {}}}", eg.log_revision, eg.shard); +std::ostream& operator<<(std::ostream& o, const shard_placement_target& spt) { + fmt::print( + o, + "{{group: {}, log_revision: {}, shard: {}}}", + spt.group, + spt.log_revision, + spt.shard); return o; } diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index fbe1ea9d099b..809f88987017 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -2442,10 +2442,13 @@ using replicas_revision_map /// Log revision is needed to distinguish different incarnations of the /// partition. struct shard_placement_target { - shard_placement_target(model::revision_id lr, ss::shard_id s) - : log_revision(lr) + shard_placement_target( + raft::group_id g, model::revision_id lr, ss::shard_id s) + : group(g) + , log_revision(lr) , shard(s) {} + raft::group_id group; model::revision_id log_revision; ss::shard_id shard; From d7ec7c73fd328da00d6cc9fdb0207c1466f9d178 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Mon, 15 Apr 2024 15:19:00 +0200 Subject: [PATCH 02/17] c/shard_placement_table: persist changes to kvstore --- src/v/cluster/controller.cc | 5 +- src/v/cluster/shard_placement_table.cc | 298 ++++++++++++++---- src/v/cluster/shard_placement_table.h | 13 +- .../tests/shard_placement_table_test.cc | 3 +- src/v/storage/kvstore.h | 1 + 5 files changed, 247 insertions(+), 73 deletions(-) diff --git a/src/v/cluster/controller.cc b/src/v/cluster/controller.cc index 89ee02a0909d..1cf7326cdeb7 100644 --- a/src/v/cluster/controller.cc +++ b/src/v/cluster/controller.cc @@ -187,7 +187,10 @@ ss::future<> controller::wire_up() { std::ref(_partition_allocator), std::ref(_node_status_table)); }) - .then([this] { return _shard_placement.start(); }) + .then([this] { + return _shard_placement.start(ss::sharded_parameter( + [this] { return std::ref(_storage.local().kvs()); })); + }) .then([this] { _probe.start(); }); } diff --git a/src/v/cluster/shard_placement_table.cc b/src/v/cluster/shard_placement_table.cc index a054c5c4bfb7..012a1790fc9d 100644 --- a/src/v/cluster/shard_placement_table.cc +++ b/src/v/cluster/shard_placement_table.cc @@ -96,6 +96,64 @@ operator<<(std::ostream& o, const shard_placement_table::placement_state& ps) { return o; } +namespace { + +static constexpr auto kvstore_key_space + = storage::kvstore::key_space::shard_placement; + +// enum type is irrelevant, serde will serialize to 32 bit anyway +enum class kvstore_key_type { + assignment = 1, + current_state = 2, +}; + +struct assignment_marker + : serde:: + envelope, serde::compat_version<0>> { + model::revision_id log_revision; + model::shard_revision_id shard_revision; + + auto serde_fields() { return std::tie(log_revision, shard_revision); } +}; + +bytes assignment_kvstore_key(const raft::group_id group) { + iobuf buf; + serde::write(buf, kvstore_key_type::assignment); + serde::write(buf, group); + return iobuf_to_bytes(buf); +} + +struct current_state_marker + : serde::envelope< + current_state_marker, + serde::version<0>, + serde::compat_version<0>> { + // NOTE: we need ntp in this marker because we want to be able to find and + // clean garbage kvstore state for old groups that have already been deleted + // from topic_table. Some of the partition kvstore state items use keys + // based on group id and some - based on ntp, so we need both. + model::ntp ntp; + model::revision_id log_revision; + model::shard_revision_id shard_revision; + bool is_complete = false; + + auto serde_fields() { + return std::tie(ntp, log_revision, shard_revision, is_complete); + } +}; + +bytes current_state_kvstore_key(const raft::group_id group) { + iobuf buf; + serde::write(buf, kvstore_key_type::current_state); + serde::write(buf, group); + return iobuf_to_bytes(buf); +} + +} // namespace + +shard_placement_table::shard_placement_table(storage::kvstore& kvstore) + : _kvstore(kvstore) {} + ss::future<> shard_placement_table::initialize( const topic_table& topics, model::node_id self) { // We expect topic_table to remain unchanged throughout the loop because the @@ -221,8 +279,6 @@ ss::future<> shard_placement_table::set_target( co_return; } - // 1. update node-wide map - const model::shard_revision_id shard_rev = _cur_shard_revision; _cur_shard_revision += 1; @@ -233,9 +289,69 @@ ss::future<> shard_placement_table::set_target( prev_target, target, shard_rev); + + // 1. Persist the new target in kvstore + + if (target) { + co_await container().invoke_on( + target->shard, + [&target, shard_rev, &ntp](shard_placement_table& other) { + auto marker_buf = serde::to_iobuf(assignment_marker{ + .log_revision = target->log_revision, + .shard_revision = shard_rev, + }); + vlog( + clusterlog.trace, + "[{}] put assigned marker, lr: {} sr: {}", + ntp, + target->log_revision, + shard_rev); + return other._kvstore.put( + kvstore_key_space, + assignment_kvstore_key(target->group), + std::move(marker_buf)); + }); + } else { + co_await container().invoke_on( + prev_target.value().shard, + [group = prev_target->group, &ntp](shard_placement_table& other) { + vlog(clusterlog.trace, "[{}] remove assigned marker", ntp); + return other._kvstore.remove( + kvstore_key_space, assignment_kvstore_key(group)); + }); + } + + // 2. At this point we've successfully committed the new target to + // persistent storage. Update in-memory state. + entry.target = target; - // 2. update shard-local state + if (prev_target && (!target || target->shard != prev_target->shard)) { + co_await container().invoke_on( + prev_target->shard, + [&ntp, shard_callback](shard_placement_table& other) { + auto it = other._states.find(ntp); + if (it == other._states.end() || !it->second.assigned) { + return; + } + + vlog( + clusterlog.trace, + "[{}] removing assigned on this shard (was: {})", + ntp, + it->second.assigned); + + it->second.assigned = std::nullopt; + if (it->second.is_empty()) { + // We are on a shard that was previously a target, but didn't + // get to starting the transfer. + other._states.erase(it); + } + + // Notify the caller that something has changed on this shard. + shard_callback(ntp); + }); + } if (target) { const bool is_initial @@ -247,65 +363,52 @@ ss::future<> shard_placement_table::set_target( }; co_await container().invoke_on( target->shard, - [&ntp, &as, is_initial, shard_callback]( - shard_placement_table& other) { - return other.set_assigned_on_this_shard( - ntp, as, is_initial, shard_callback); + [&ntp, &as, is_initial, shard_callback](shard_placement_table& spt) { + auto& state = spt._states.try_emplace(ntp).first->second; + + vlog( + clusterlog.trace, + "[{}] setting assigned on this shard to: {} (was: {}), " + "is_initial: {}", + ntp, + as, + state.assigned, + is_initial); + + state.assigned = as; + if (is_initial) { + state._is_initial_for = as.log_revision; + } + + // Notify the caller that something has changed on this shard. + shard_callback(ntp); }); } + // 3. Lastly, remove obsolete kvstore marker + if (prev_target && (!target || target->shard != prev_target->shard)) { co_await container().invoke_on( prev_target->shard, - [&ntp, shard_callback](shard_placement_table& other) { - return other.remove_assigned_on_this_shard(ntp, shard_callback); + [group = prev_target->group, &ntp](shard_placement_table& other) { + vlog( + clusterlog.trace, "[{}] remove obsolete assigned marker", ntp); + return other._kvstore + .remove(kvstore_key_space, assignment_kvstore_key(group)) + .handle_exception([group](std::exception_ptr ex) { + // Ignore the exception because the update has already been + // committed. Obsolete marker will be deleted after the next + // restart. + vlog( + clusterlog.debug, + "failed to remove assignment marker for group {}: {}", + group, + ex); + }); }); } } -ss::future<> shard_placement_table::set_assigned_on_this_shard( - const model::ntp& ntp, - const shard_local_assignment& as, - bool is_initial, - shard_callback_t shard_callback) { - vlog( - clusterlog.trace, - "[{}] setting assigned on this shard to: {}, is_initial: {}", - ntp, - as, - is_initial); - - auto& state = _states.try_emplace(ntp).first->second; - state.assigned = as; - if (is_initial) { - state._is_initial_for = as.log_revision; - } - - // Notify the caller that something has changed on this shard. - shard_callback(ntp); - co_return; -} - -ss::future<> shard_placement_table::remove_assigned_on_this_shard( - const model::ntp& ntp, shard_callback_t shard_callback) { - vlog(clusterlog.trace, "[{}] removing assigned on this shard", ntp); - - auto it = _states.find(ntp); - if (it == _states.end()) { - co_return; - } - - it->second.assigned = std::nullopt; - if (it->second.is_empty()) { - // We are on a shard that was previously a target, but didn't get to - // starting the transfer. - _states.erase(it); - } - - // Notify the caller that something has changed on this shard. - shard_callback(ntp); -} - std::optional shard_placement_table::state_on_this_shard(const model::ntp& ntp) const { auto it = _states.find(ntp); @@ -332,11 +435,33 @@ ss::future shard_placement_table::prepare_create( co_return errc::waiting_for_reconfiguration_finish; } + // copy assigned as it may change while we are updating kvstore + auto assigned = *state.assigned; + if (!state.current) { if (state._is_initial_for == expected_log_rev) { - state.current = shard_local_state( - *state.assigned, hosted_status::hosted); - state._is_initial_for = std::nullopt; + auto marker_buf = serde::to_iobuf(current_state_marker{ + .ntp = ntp, + .log_revision = expected_log_rev, + .shard_revision = assigned.shard_revision, + .is_complete = true, + }); + vlog( + clusterlog.trace, + "[{}] put initial cur state marker, lr: {} sr: {}", + ntp, + expected_log_rev, + assigned.shard_revision); + co_await _kvstore.put( + kvstore_key_space, + current_state_kvstore_key(assigned.group), + std::move(marker_buf)); + + state.current = shard_local_state(assigned, hosted_status::hosted); + if (state._is_initial_for == expected_log_rev) { + // could have changed while we were updating kvstore. + state._is_initial_for = std::nullopt; + } } else { // x-shard transfer hasn't started yet, wait for it. co_return errc::waiting_for_partition_shutdown; @@ -424,22 +549,26 @@ ss::future> shard_placement_table::prepare_transfer( // We are in the middle of shard_placement_table update, and // the destination shard doesn't yet know that it is the // destination. Wait for the update to finish. - return errc::waiting_for_shard_placement_update; + return ss::make_ready_future( + errc::waiting_for_shard_placement_update); } auto& dest_state = dest_it->second; if (dest_state._next) { // probably still finishing a previous transfer to this // shard and we are already trying to transfer it back. - return errc::waiting_for_partition_shutdown; + return ss::make_ready_future( + errc::waiting_for_partition_shutdown); } else if (dest_state.current) { if (dest_state.current->log_revision != expected_log_rev) { // someone has to delete obsolete log revision first - return errc::waiting_for_reconfiguration_finish; + return ss::make_ready_future( + errc::waiting_for_partition_shutdown); } // probably still finishing a previous transfer to this // shard and we are already trying to transfer it back. - return errc::waiting_for_partition_shutdown; + return ss::make_ready_future( + errc::waiting_for_partition_shutdown); } // at this point we commit to the transfer on the @@ -449,7 +578,27 @@ ss::future> shard_placement_table::prepare_transfer( if (dest_state._is_initial_for <= expected_log_rev) { dest_state._is_initial_for = std::nullopt; } - return errc::success; + + // TODO: immediate hosted or _is_initial_for if source is empty. + + auto marker_buf = serde::to_iobuf(current_state_marker{ + .ntp = ntp, + .log_revision = expected_log_rev, + .shard_revision = dest_state.current.value().shard_revision, + .is_complete = false, + }); + vlog( + clusterlog.trace, + "[{}] put receiving cur state marker, lr: {} sr: {}", + ntp, + expected_log_rev, + dest_state.current->shard_revision); + return dest._kvstore + .put( + kvstore_key_space, + current_state_kvstore_key(dest_state.current->group), + std::move(marker_buf)) + .then([] { return errc::success; }); }); if (ec != errc::success) { @@ -477,7 +626,25 @@ ss::future<> shard_placement_table::finish_transfer_on_destination( "[{}] unexpected local status, current: {}", ntp, it->second.current); - it->second.current->status = hosted_status::hosted; + + auto marker_buf = serde::to_iobuf(current_state_marker{ + .ntp = ntp, + .log_revision = expected_log_rev, + .shard_revision = state.current->shard_revision, + .is_complete = true, + }); + vlog( + clusterlog.trace, + "[{}] put transferred cur state marker, lr: {} sr: {}", + ntp, + expected_log_rev, + state.current->shard_revision); + co_await _kvstore.put( + kvstore_key_space, + current_state_kvstore_key(state.current->group), + std::move(marker_buf)); + + state.current->status = hosted_status::hosted; } vlog( clusterlog.trace, @@ -575,7 +742,14 @@ ss::future<> shard_placement_table::finish_delete( ss::future<> shard_placement_table::do_delete( const model::ntp& ntp, placement_state& state) { state._next = std::nullopt; - state.current = std::nullopt; + + if (state.current) { + vlog(clusterlog.trace, "[{}] remove cur state marker", ntp); + co_await _kvstore.remove( + kvstore_key_space, current_state_kvstore_key(state.current->group)); + state.current = std::nullopt; + } + if (state.is_empty()) { _states.erase(ntp); } diff --git a/src/v/cluster/shard_placement_table.h b/src/v/cluster/shard_placement_table.h index 566dc11b74b2..4ed8ba92a8ba 100644 --- a/src/v/cluster/shard_placement_table.h +++ b/src/v/cluster/shard_placement_table.h @@ -14,6 +14,7 @@ #include "base/seastarx.h" #include "cluster/types.h" #include "container/chunked_hash_map.h" +#include "storage/fwd.h" #include "utils/mutex.h" #include @@ -138,6 +139,8 @@ class shard_placement_table std::optional _next; }; + explicit shard_placement_table(storage::kvstore&); + // must be called on each shard ss::future<> initialize(const topic_table&, model::node_id self); @@ -185,15 +188,6 @@ class shard_placement_table finish_delete(const model::ntp&, model::revision_id expected_log_rev); private: - ss::future<> set_assigned_on_this_shard( - const model::ntp&, - const shard_local_assignment&, - bool is_initial, - shard_callback_t); - - ss::future<> - remove_assigned_on_this_shard(const model::ntp&, shard_callback_t); - ss::future<> do_delete(const model::ntp&, placement_state&); private: @@ -203,6 +197,7 @@ class shard_placement_table // // node_hash_map for pointer stability absl::node_hash_map _states; + storage::kvstore& _kvstore; // only on shard 0, _ntp2entry will hold targets for all ntps on this node. struct entry_t { diff --git a/src/v/cluster/tests/shard_placement_table_test.cc b/src/v/cluster/tests/shard_placement_table_test.cc index 5400e7c4bfa1..aaae8645c5b2 100644 --- a/src/v/cluster/tests/shard_placement_table_test.cc +++ b/src/v/cluster/tests/shard_placement_table_test.cc @@ -813,7 +813,8 @@ class shard_placement_test_fixture : public seastar_test { co_await kvs.invoke_on_all( [](storage::kvstore& kvs) { return kvs.start(); }); - co_await spt.start(); + co_await spt.start( + ss::sharded_parameter([this] { return std::ref(kvs.local()); })); co_await rb.start(std::ref(ntpt), std::ref(spt), std::ref(_ntp2shards)); diff --git a/src/v/storage/kvstore.h b/src/v/storage/kvstore.h index 8f5a2f0dcddc..9b8a7c06fae7 100644 --- a/src/v/storage/kvstore.h +++ b/src/v/storage/kvstore.h @@ -101,6 +101,7 @@ class kvstore { offset_translator = 4, usage = 5, stms = 6, + shard_placement = 7, /* your sub-system here */ }; From 3928208dca42bf407a310c4a41b9cbfe8321cac9 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Tue, 7 May 2024 22:57:22 +0200 Subject: [PATCH 03/17] offline_log_viewer: add support for shard_placement kvstore items --- tools/offline_log_viewer/kvstore.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tools/offline_log_viewer/kvstore.py b/tools/offline_log_viewer/kvstore.py index 3abdacfd0135..5fe53bd63b62 100644 --- a/tools/offline_log_viewer/kvstore.py +++ b/tools/offline_log_viewer/kvstore.py @@ -91,6 +91,8 @@ def _decode_ks(self, ks): return "usage" elif ks == 6: return "stms" + elif ks == 7: + return "shard_placement" return "unknown" def decode(self): @@ -259,6 +261,23 @@ def decode_storage_key(k): return ret +def decode_shard_placement_key(k): + rdr = Reader(BytesIO(k)) + ret = {} + ret['type'] = rdr.read_int32() + if ret['type'] == 0: + ret['name'] = "persistence_enabled" + elif ret['type'] == 1: + ret['name'] = "assignment" + ret['group'] = rdr.read_int64() + elif ret['type'] == 2: + ret['name'] = "current_state" + ret['group'] = rdr.read_int64() + else: + ret['name'] = "unknown" + return ret + + def decode_key(ks, key): data = key if ks == "consensus": @@ -269,6 +288,8 @@ def decode_key(ks, key): data = decode_offset_translator_key(key) elif ks == "stms": data = decode_stm_snapshot_key(key) + elif ks == "shard_placement": + data = decode_shard_placement_key(key) else: data = key.hex() return {'keyspace': ks, 'data': data} From 94d3543df7f1ed23a40fbdac14b59671c5e547ba Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Wed, 24 Apr 2024 14:40:27 +0200 Subject: [PATCH 04/17] kvstore: add for_each method --- src/v/storage/kvstore.cc | 20 ++++++++++++++++++++ src/v/storage/kvstore.h | 6 ++++++ src/v/storage/tests/kvstore_test.cc | 12 ++++++++++++ 3 files changed, 38 insertions(+) diff --git a/src/v/storage/kvstore.cc b/src/v/storage/kvstore.cc index 8f003ac4b9c9..f913e7629fe8 100644 --- a/src/v/storage/kvstore.cc +++ b/src/v/storage/kvstore.cc @@ -17,6 +17,7 @@ #include "model/namespace.h" #include "prometheus/prometheus_sanitize.h" #include "reflection/adl.h" +#include "ssx/async_algorithm.h" #include "storage/parser.h" #include "storage/record_batch_builder.h" #include "storage/segment.h" @@ -198,6 +199,25 @@ ss::future<> kvstore::put(key_space ks, bytes key, std::optional value) { }); } +ss::future<> kvstore::for_each( + key_space ks, + ss::noncopyable_function visitor) { + vassert(_started, "kvstore has not been started"); + auto gh = _gate.hold(); + auto units = co_await _db_mut.get_units(); + + auto prefix = make_spaced_key(ks, bytes_view{}); + co_await ssx::async_for_each( + _db.begin(), _db.end(), [&](const map_t::value_type& kv) { + auto spaced_key = bytes_view{kv.first}; + if (!spaced_key.starts_with(prefix)) { + return; + } + auto key = spaced_key.substr(prefix.length()); + visitor(key, kv.second); + }); +} + void kvstore::apply_op( bytes key, std::optional value, ssx::semaphore_units const&) { auto it = _db.find(key); diff --git a/src/v/storage/kvstore.h b/src/v/storage/kvstore.h index 9b8a7c06fae7..3ec6b89a1042 100644 --- a/src/v/storage/kvstore.h +++ b/src/v/storage/kvstore.h @@ -118,6 +118,12 @@ class kvstore { ss::future<> put(key_space ks, bytes key, iobuf value); ss::future<> remove(key_space ks, bytes key); + /// Iterate over all key-value pairs in a keyspace. + /// NOTE: this will stall all updates, so use with a lot of caution. + ss::future<> for_each( + key_space ks, + ss::noncopyable_function visitor); + bool empty() const { vassert(_started, "kvstore has not been started"); return _db.empty(); diff --git a/src/v/storage/tests/kvstore_test.cc b/src/v/storage/tests/kvstore_test.cc index c122409df9d9..0be160c70566 100644 --- a/src/v/storage/tests/kvstore_test.cc +++ b/src/v/storage/tests/kvstore_test.cc @@ -59,6 +59,18 @@ FIXTURE_TEST(key_space, kvstore_test_fixture) { kvs->get(storage::kvstore::key_space::consensus, empty_key).value() == value_d); + std::map testing_kvs; + kvs + ->for_each( + storage::kvstore::key_space::testing, + [&](bytes_view key, const iobuf& val) { + BOOST_REQUIRE(testing_kvs.emplace(key, val.copy()).second); + }) + .get(); + BOOST_REQUIRE_EQUAL(testing_kvs.size(), 2); + BOOST_REQUIRE(testing_kvs.at(key) == value_a); + BOOST_REQUIRE(testing_kvs.at(empty_key) == value_c); + kvs->stop().get(); // still all true after recovery From bbac4e936a21e7615358d30a31c61b0412e26c9b Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Mon, 6 May 2024 18:49:52 +0200 Subject: [PATCH 05/17] c/shard_placement_table: implement migration to kvstore persistence --- src/v/cluster/shard_placement_table.cc | 358 +++++++++++++++++-------- src/v/cluster/shard_placement_table.h | 16 +- 2 files changed, 268 insertions(+), 106 deletions(-) diff --git a/src/v/cluster/shard_placement_table.cc b/src/v/cluster/shard_placement_table.cc index 012a1790fc9d..d704f00ea07c 100644 --- a/src/v/cluster/shard_placement_table.cc +++ b/src/v/cluster/shard_placement_table.cc @@ -103,10 +103,17 @@ static constexpr auto kvstore_key_space // enum type is irrelevant, serde will serialize to 32 bit anyway enum class kvstore_key_type { + persistence_enabled = 0, assignment = 1, current_state = 2, }; +bytes persistence_enabled_kvstore_key() { + iobuf buf; + serde::write(buf, kvstore_key_type::persistence_enabled); + return iobuf_to_bytes(buf); +} + struct assignment_marker : serde:: envelope, serde::compat_version<0>> { @@ -154,6 +161,101 @@ bytes current_state_kvstore_key(const raft::group_id group) { shard_placement_table::shard_placement_table(storage::kvstore& kvstore) : _kvstore(kvstore) {} +bool shard_placement_table::is_persistence_enabled() const { + vassert( + ss::this_shard_id() == assignment_shard_id, + "method can only be invoked on shard {}", + assignment_shard_id); + + if (_persistence_enabled) { + return true; + } + + return _kvstore.get(kvstore_key_space, persistence_enabled_kvstore_key()) + .has_value(); +} + +ss::future<> shard_placement_table::enable_persistence() { + vassert( + ss::this_shard_id() == assignment_shard_id, + "method can only be invoked on shard {}", + assignment_shard_id); + + if (is_persistence_enabled()) { + co_return; + } + + vlog(clusterlog.info, "enabling table persistence..."); + + auto write_locks = container().map([](shard_placement_table& local) { + return local._persistence_lock.hold_write_lock().then( + [](ss::rwlock::holder holder) { + return ss::make_foreign( + std::make_unique(std::move(holder))); + }); + }); + + co_await container().invoke_on_all([](shard_placement_table& local) { + return local.persist_shard_local_state(); + }); + + co_await _kvstore.put( + kvstore_key_space, persistence_enabled_kvstore_key(), iobuf{}); + + co_await container().invoke_on_all( + [](shard_placement_table& local) { local._persistence_enabled = true; }); + + vlog(clusterlog.debug, "persistence enabled"); +} + +ss::future<> shard_placement_table::persist_shard_local_state() { + // 1. delete existing state + + chunked_vector old_keys; + co_await _kvstore.for_each( + kvstore_key_space, + [&](bytes_view key, const iobuf&) { old_keys.emplace_back(key); }); + + co_await ss::max_concurrent_for_each( + old_keys, 512, [this](const bytes& key) { + return _kvstore.remove(kvstore_key_space, key); + }); + + // 2. persist current map + + co_await ss::max_concurrent_for_each( + _states, 512, [this](const decltype(_states)::value_type& kv) { + const auto& [ntp, pstate] = kv; + auto f1 = ss::now(); + if (pstate.assigned) { + auto marker = assignment_marker{ + .log_revision = pstate.assigned->log_revision, + .shard_revision = pstate.assigned->shard_revision, + }; + f1 = _kvstore.put( + kvstore_key_space, + assignment_kvstore_key(pstate.assigned->group), + serde::to_iobuf(marker)); + } + + auto f2 = ss::now(); + if (pstate.current) { + auto marker = current_state_marker{ + .ntp = ntp, + .log_revision = pstate.current->log_revision, + .shard_revision = pstate.current->shard_revision, + .is_complete = pstate.current->status == hosted_status::hosted, + }; + f2 = _kvstore.put( + kvstore_key_space, + current_state_kvstore_key(pstate.current->group), + serde::to_iobuf(marker)); + } + + return ss::when_all(std::move(f1), std::move(f2)).discard_result(); + }); +} + ss::future<> shard_placement_table::initialize( const topic_table& topics, model::node_id self) { // We expect topic_table to remain unchanged throughout the loop because the @@ -250,6 +352,9 @@ ss::future<> shard_placement_table::set_target( "method can only be invoked on shard {}", assignment_shard_id); + // ensure that there is no concurrent enable_persistence() call + auto persistence_lock_holder = co_await _persistence_lock.hold_read_lock(); + if (!target && !_ntp2entry.contains(ntp)) { co_return; } @@ -292,33 +397,35 @@ ss::future<> shard_placement_table::set_target( // 1. Persist the new target in kvstore - if (target) { - co_await container().invoke_on( - target->shard, - [&target, shard_rev, &ntp](shard_placement_table& other) { - auto marker_buf = serde::to_iobuf(assignment_marker{ - .log_revision = target->log_revision, - .shard_revision = shard_rev, + if (_persistence_enabled) { + if (target) { + co_await container().invoke_on( + target->shard, + [&target, shard_rev, &ntp](shard_placement_table& other) { + auto marker_buf = serde::to_iobuf(assignment_marker{ + .log_revision = target->log_revision, + .shard_revision = shard_rev, + }); + vlog( + clusterlog.trace, + "[{}] put assigned marker, lr: {} sr: {}", + ntp, + target->log_revision, + shard_rev); + return other._kvstore.put( + kvstore_key_space, + assignment_kvstore_key(target->group), + std::move(marker_buf)); }); - vlog( - clusterlog.trace, - "[{}] put assigned marker, lr: {} sr: {}", - ntp, - target->log_revision, - shard_rev); - return other._kvstore.put( - kvstore_key_space, - assignment_kvstore_key(target->group), - std::move(marker_buf)); - }); - } else { - co_await container().invoke_on( - prev_target.value().shard, - [group = prev_target->group, &ntp](shard_placement_table& other) { - vlog(clusterlog.trace, "[{}] remove assigned marker", ntp); - return other._kvstore.remove( - kvstore_key_space, assignment_kvstore_key(group)); - }); + } else { + co_await container().invoke_on( + prev_target.value().shard, + [group = prev_target->group, &ntp](shard_placement_table& other) { + vlog(clusterlog.trace, "[{}] remove assigned marker", ntp); + return other._kvstore.remove( + kvstore_key_space, assignment_kvstore_key(group)); + }); + } } // 2. At this point we've successfully committed the new target to @@ -387,7 +494,9 @@ ss::future<> shard_placement_table::set_target( // 3. Lastly, remove obsolete kvstore marker - if (prev_target && (!target || target->shard != prev_target->shard)) { + if ( + _persistence_enabled && prev_target + && (!target || target->shard != prev_target->shard)) { co_await container().invoke_on( prev_target->shard, [group = prev_target->group, &ntp](shard_placement_table& other) { @@ -420,15 +529,20 @@ shard_placement_table::state_on_this_shard(const model::ntp& ntp) const { ss::future shard_placement_table::prepare_create( const model::ntp& ntp, model::revision_id expected_log_rev) { + // ensure that there is no concurrent enable_persistence() call + auto persistence_lock_holder = co_await _persistence_lock.hold_read_lock(); + auto state_it = _states.find(ntp); - vassert(state_it != _states.end(), "[{}] expected state", ntp); + if (state_it == _states.end()) { + // assignments got updated while we were waiting for the lock + co_return errc::waiting_for_shard_placement_update; + } auto& state = state_it->second; - vassert( - state.assigned && state.assigned->log_revision == expected_log_rev, - "[{}] unexpected assigned: {} (expected log revision: {})", - ntp, - state.assigned, - expected_log_rev); + + if (state.assigned->log_revision != expected_log_rev) { + // assignments got updated while we were waiting for the lock + co_return errc::waiting_for_shard_placement_update; + } if (state.current && state.current->log_revision != expected_log_rev) { // wait until partition with obsolete log revision is removed @@ -440,22 +554,24 @@ ss::future shard_placement_table::prepare_create( if (!state.current) { if (state._is_initial_for == expected_log_rev) { - auto marker_buf = serde::to_iobuf(current_state_marker{ - .ntp = ntp, - .log_revision = expected_log_rev, - .shard_revision = assigned.shard_revision, - .is_complete = true, - }); - vlog( - clusterlog.trace, - "[{}] put initial cur state marker, lr: {} sr: {}", - ntp, - expected_log_rev, - assigned.shard_revision); - co_await _kvstore.put( - kvstore_key_space, - current_state_kvstore_key(assigned.group), - std::move(marker_buf)); + if (_persistence_enabled) { + auto marker_buf = serde::to_iobuf(current_state_marker{ + .ntp = ntp, + .log_revision = expected_log_rev, + .shard_revision = assigned.shard_revision, + .is_complete = true, + }); + vlog( + clusterlog.trace, + "[{}] put initial cur state marker, lr: {} sr: {}", + ntp, + expected_log_rev, + assigned.shard_revision); + co_await _kvstore.put( + kvstore_key_space, + current_state_kvstore_key(assigned.group), + std::move(marker_buf)); + } state.current = shard_local_state(assigned, hosted_status::hosted); if (state._is_initial_for == expected_log_rev) { @@ -479,18 +595,27 @@ ss::future shard_placement_table::prepare_create( ss::future> shard_placement_table::prepare_transfer( const model::ntp& ntp, model::revision_id expected_log_rev) { + // ensure that there is no concurrent enable_persistence() call + auto persistence_lock_holder = co_await _persistence_lock.hold_read_lock(); + auto state_it = _states.find(ntp); vassert(state_it != _states.end(), "[{}] expected state", ntp); auto& state = state_it->second; if (state.current) { vassert( - state.current->log_revision == expected_log_rev, + state.current->log_revision >= expected_log_rev, "[{}] unexpected current: {} (expected log revision: {})", ntp, state.current, expected_log_rev); + if (state.current->log_revision > expected_log_rev) { + // New log revision transferred from another shard, but we don't + // know about it yet. Wait for the assignment update. + co_return errc::waiting_for_shard_placement_update; + } + if (state.current->status == hosted_status::receiving) { // This shard needs to transfer partition state somewhere else, but // haven't yet received it itself. Wait for it. @@ -499,25 +624,26 @@ ss::future> shard_placement_table::prepare_transfer( if (state.current->status == hosted_status::obsolete) { // Previous finish_transfer_on_source() failed? Retry it. - co_await do_delete(ntp, state); + co_await do_delete(ntp, state, persistence_lock_holder); co_return errc::success; } } else { vassert( - state._is_initial_for == expected_log_rev, + state._is_initial_for >= expected_log_rev, "[{}] unexpected is_initial_for: {} (expected log revision: {})", ntp, state._is_initial_for, expected_log_rev); + + if (state._is_initial_for > expected_log_rev) { + co_return errc::waiting_for_shard_placement_update; + } } if (!state._next) { - vassert( - !state.assigned, - "[{}] unexpected assigned: {} (expected log revision: {})", - ntp, - state.assigned, - expected_log_rev); + if (state.assigned) { + co_return errc::waiting_for_shard_placement_update; + } auto maybe_dest = co_await container().invoke_on( assignment_shard_id, @@ -581,24 +707,28 @@ ss::future> shard_placement_table::prepare_transfer( // TODO: immediate hosted or _is_initial_for if source is empty. - auto marker_buf = serde::to_iobuf(current_state_marker{ - .ntp = ntp, - .log_revision = expected_log_rev, - .shard_revision = dest_state.current.value().shard_revision, - .is_complete = false, - }); - vlog( - clusterlog.trace, - "[{}] put receiving cur state marker, lr: {} sr: {}", - ntp, - expected_log_rev, - dest_state.current->shard_revision); - return dest._kvstore - .put( - kvstore_key_space, - current_state_kvstore_key(dest_state.current->group), - std::move(marker_buf)) - .then([] { return errc::success; }); + if (dest._persistence_enabled) { + auto marker_buf = serde::to_iobuf(current_state_marker{ + .ntp = ntp, + .log_revision = expected_log_rev, + .shard_revision = dest_state.current.value().shard_revision, + .is_complete = false, + }); + vlog( + clusterlog.trace, + "[{}] put receiving cur state marker, lr: {} sr: {}", + ntp, + expected_log_rev, + dest_state.current->shard_revision); + return dest._kvstore + .put( + kvstore_key_space, + current_state_kvstore_key(dest_state.current->group), + std::move(marker_buf)) + .then([] { return errc::success; }); + } else { + return ss::make_ready_future(errc::success); + } }); if (ec != errc::success) { @@ -615,6 +745,9 @@ ss::future> shard_placement_table::prepare_transfer( ss::future<> shard_placement_table::finish_transfer_on_destination( const model::ntp& ntp, model::revision_id expected_log_rev) { + // ensure that there is no concurrent enable_persistence() call + auto persistence_lock_holder = co_await _persistence_lock.hold_read_lock(); + auto it = _states.find(ntp); if (it == _states.end()) { co_return; @@ -627,22 +760,24 @@ ss::future<> shard_placement_table::finish_transfer_on_destination( ntp, it->second.current); - auto marker_buf = serde::to_iobuf(current_state_marker{ - .ntp = ntp, - .log_revision = expected_log_rev, - .shard_revision = state.current->shard_revision, - .is_complete = true, - }); - vlog( - clusterlog.trace, - "[{}] put transferred cur state marker, lr: {} sr: {}", - ntp, - expected_log_rev, - state.current->shard_revision); - co_await _kvstore.put( - kvstore_key_space, - current_state_kvstore_key(state.current->group), - std::move(marker_buf)); + if (_persistence_enabled) { + auto marker_buf = serde::to_iobuf(current_state_marker{ + .ntp = ntp, + .log_revision = expected_log_rev, + .shard_revision = state.current->shard_revision, + .is_complete = true, + }); + vlog( + clusterlog.trace, + "[{}] put transferred cur state marker, lr: {} sr: {}", + ntp, + expected_log_rev, + state.current->shard_revision); + co_await _kvstore.put( + kvstore_key_space, + current_state_kvstore_key(state.current->group), + std::move(marker_buf)); + } state.current->status = hosted_status::hosted; } @@ -655,6 +790,9 @@ ss::future<> shard_placement_table::finish_transfer_on_destination( ss::future<> shard_placement_table::finish_transfer_on_source( const model::ntp& ntp, model::revision_id expected_log_rev) { + // ensure that there is no concurrent enable_persistence() call + auto persistence_lock_holder = co_await _persistence_lock.hold_read_lock(); + auto it = _states.find(ntp); vassert(it != _states.end(), "[{}] expected state", ntp); auto& state = it->second; @@ -670,11 +808,14 @@ ss::future<> shard_placement_table::finish_transfer_on_source( state._is_initial_for = std::nullopt; } - co_await do_delete(ntp, state); + co_await do_delete(ntp, state, persistence_lock_holder); } ss::future shard_placement_table::prepare_delete( const model::ntp& ntp, model::revision_id cmd_revision) { + // ensure that there is no concurrent enable_persistence() call + auto persistence_lock_holder = co_await _persistence_lock.hold_read_lock(); + auto it = _states.find(ntp); vassert(it != _states.end(), "[{}] expected state", ntp); auto& state = it->second; @@ -688,12 +829,11 @@ ss::future shard_placement_table::prepare_delete( } if (state.current) { - vassert( - state.current->log_revision < cmd_revision, - "[{}] unexpected current: {} (cmd revision: {})", - ntp, - state.current, - cmd_revision); + if (state.current->log_revision >= cmd_revision) { + // New log revision transferred from another shard, but we didn't + // expect it. Wait for the update. + co_return errc::waiting_for_shard_placement_update; + } if (state.current->status == hosted_status::receiving) { // If transfer to this shard is still in progress, we'll wait for @@ -709,6 +849,9 @@ ss::future shard_placement_table::prepare_delete( ss::future<> shard_placement_table::finish_delete( const model::ntp& ntp, model::revision_id expected_log_rev) { + // ensure that there is no concurrent enable_persistence() call + auto persistence_lock_holder = co_await _persistence_lock.hold_read_lock(); + auto it = _states.find(ntp); vassert(it != _states.end(), "[{}] expected state", ntp); auto& state = it->second; @@ -736,17 +879,22 @@ ss::future<> shard_placement_table::finish_delete( }); } - co_await do_delete(ntp, state); + co_await do_delete(ntp, state, persistence_lock_holder); } ss::future<> shard_placement_table::do_delete( - const model::ntp& ntp, placement_state& state) { + const model::ntp& ntp, + placement_state& state, + ss::rwlock::holder& /*persistence_lock_holder*/) { state._next = std::nullopt; if (state.current) { - vlog(clusterlog.trace, "[{}] remove cur state marker", ntp); - co_await _kvstore.remove( - kvstore_key_space, current_state_kvstore_key(state.current->group)); + if (_persistence_enabled) { + vlog(clusterlog.trace, "[{}] remove cur state marker", ntp); + co_await _kvstore.remove( + kvstore_key_space, + current_state_kvstore_key(state.current->group)); + } state.current = std::nullopt; } diff --git a/src/v/cluster/shard_placement_table.h b/src/v/cluster/shard_placement_table.h index 4ed8ba92a8ba..f1e53979518f 100644 --- a/src/v/cluster/shard_placement_table.h +++ b/src/v/cluster/shard_placement_table.h @@ -16,6 +16,7 @@ #include "container/chunked_hash_map.h" #include "storage/fwd.h" #include "utils/mutex.h" +#include "utils/rwlock.h" #include @@ -141,6 +142,10 @@ class shard_placement_table explicit shard_placement_table(storage::kvstore&); + /// Must be called on assignment_shard_id. + bool is_persistence_enabled() const; + ss::future<> enable_persistence(); + // must be called on each shard ss::future<> initialize(const topic_table&, model::node_id self); @@ -188,7 +193,12 @@ class shard_placement_table finish_delete(const model::ntp&, model::revision_id expected_log_rev); private: - ss::future<> do_delete(const model::ntp&, placement_state&); + ss::future<> do_delete( + const model::ntp&, + placement_state&, + ss::rwlock::holder& persistence_lock_holder); + + ss::future<> persist_shard_local_state(); private: friend class shard_placement_test_fixture; @@ -197,6 +207,10 @@ class shard_placement_table // // node_hash_map for pointer stability absl::node_hash_map _states; + // lock is needed to sync enabling persistence with shard-local + // modifications. + ssx::rwlock _persistence_lock; + bool _persistence_enabled = false; storage::kvstore& _kvstore; // only on shard 0, _ntp2entry will hold targets for all ntps on this node. From 29785578cf6bc2a11b1be484bd0678ae38b0625f Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Mon, 6 May 2024 13:44:26 +0200 Subject: [PATCH 06/17] bytes: add bytes_to_iobuf(bytes_view) --- src/v/bytes/include/bytes/bytes.h | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/v/bytes/include/bytes/bytes.h b/src/v/bytes/include/bytes/bytes.h index 52e054d0ab99..3d958a7244b2 100644 --- a/src/v/bytes/include/bytes/bytes.h +++ b/src/v/bytes/include/bytes/bytes.h @@ -104,6 +104,13 @@ inline iobuf bytes_to_iobuf(const bytes& in) { return out; } +inline iobuf bytes_to_iobuf(bytes_view in) { + iobuf out; + // NOLINTNEXTLINE + out.append(reinterpret_cast(in.data()), in.size()); + return out; +} + // NOLINTNEXTLINE(cert-dcl58-cpp): hash<> specialization namespace std { template<> From a3d0bfe35a1f2e3127c9dba19d745d26d7abde72 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Mon, 13 May 2024 14:02:35 +0200 Subject: [PATCH 07/17] c/shard_placement_table: initialize from kvstore --- src/v/cluster/shard_balancer.cc | 5 +- src/v/cluster/shard_placement_table.cc | 301 ++++++++++++++++++++++++- src/v/cluster/shard_placement_table.h | 31 ++- 3 files changed, 322 insertions(+), 15 deletions(-) diff --git a/src/v/cluster/shard_balancer.cc b/src/v/cluster/shard_balancer.cc index e6ae21e68d01..cf5ae32e857f 100644 --- a/src/v/cluster/shard_balancer.cc +++ b/src/v/cluster/shard_balancer.cc @@ -34,9 +34,8 @@ ss::future<> shard_balancer::start() { auto tt_version = _topics.local().topics_map_revision(); - co_await _shard_placement.invoke_on_all([this](shard_placement_table& spt) { - return spt.initialize(_topics.local(), _self); - }); + co_await _shard_placement.local().initialize_from_topic_table( + _topics, _self); // we shouldn't be receiving any controller updates at this point, so no // risk of missing a notification between initializing shard_placement_table diff --git a/src/v/cluster/shard_placement_table.cc b/src/v/cluster/shard_placement_table.cc index d704f00ea07c..c651d9eba6da 100644 --- a/src/v/cluster/shard_placement_table.cc +++ b/src/v/cluster/shard_placement_table.cc @@ -256,7 +256,302 @@ ss::future<> shard_placement_table::persist_shard_local_state() { }); } -ss::future<> shard_placement_table::initialize( +/// A struct used during initialization to merge kvstore markers recovered from +/// different shards and restore the most up-to-date shard_placement_table +/// state. +struct shard_placement_table::ntp_init_data { + struct versioned_shard { + std::optional shard; + model::shard_revision_id revision; + + void update( + std::optional s, model::shard_revision_id new_rev) { + if (new_rev > revision) { + shard = s; + revision = new_rev; + } + } + }; + + raft::group_id group; + model::revision_id log_revision; + versioned_shard hosted; + versioned_shard receiving; + versioned_shard assigned; + + void update_log_revision(raft::group_id gr, model::revision_id new_rev) { + if (new_rev > log_revision) { + *this = ntp_init_data{}; + group = gr; + log_revision = new_rev; + } + } + + void update(ss::shard_id s, const shard_local_assignment& new_assigned) { + update_log_revision(new_assigned.group, new_assigned.log_revision); + if (new_assigned.log_revision == log_revision) { + assigned.update(s, new_assigned.shard_revision); + } + } + + void update(ss::shard_id s, const shard_local_state& new_current) { + update_log_revision(new_current.group, new_current.log_revision); + if (new_current.log_revision == log_revision) { + switch (new_current.status) { + case hosted_status::hosted: + hosted.update(s, new_current.shard_revision); + receiving.update(std::nullopt, new_current.shard_revision); + break; + case hosted_status::receiving: + receiving.update(s, new_current.shard_revision); + break; + default: + break; + } + } + } +}; + +ss::future<> shard_placement_table::initialize_from_kvstore( + const chunked_hash_map& local_group2ntp) { + vassert( + ss::this_shard_id() == assignment_shard_id, + "method can only be invoked on shard {}", + assignment_shard_id); + + vassert( + is_persistence_enabled(), + "can't initialize from kvstore, persistence hasn't been enabled yet"); + co_await container().invoke_on_all( + [](shard_placement_table& spt) { spt._persistence_enabled = true; }); + + // 1. gather kvstore markers from all shards + + auto shard2init_states = co_await container().map( + [&local_group2ntp](shard_placement_table& spt) { + return spt.gather_init_states(local_group2ntp); + }); + + // 2. merge into up-to-date shard_placement_table state + + chunked_hash_map ntp2init_data; + model::shard_revision_id max_shard_revision; + ssx::async_counter counter; + for (ss::shard_id s = 0; s < shard2init_states.size(); ++s) { + co_await ssx::async_for_each_counter( + counter, + shard2init_states[s]->begin(), + shard2init_states[s]->end(), + [&](const ntp2state_t::value_type& kv) { + const auto& [ntp, state] = kv; + auto& init_data = ntp2init_data.try_emplace(ntp).first->second; + + if (state.assigned) { + init_data.update(s, state.assigned.value()); + max_shard_revision = std::max( + max_shard_revision, state.assigned->shard_revision); + } + + if (state.current) { + init_data.update(s, state.current.value()); + max_shard_revision = std::max( + max_shard_revision, state.current->shard_revision); + } + }); + } + + // 3. based on merged data, update in-memory state everywhere + + if (max_shard_revision != model::shard_revision_id{}) { + _cur_shard_revision = max_shard_revision + model::shard_revision_id{1}; + } + + co_await container().invoke_on_all( + [&ntp2init_data](shard_placement_table& spt) { + return spt.scatter_init_data(ntp2init_data); + }); + + co_await ssx::async_for_each( + ntp2init_data.begin(), + ntp2init_data.end(), + [this](const decltype(ntp2init_data)::value_type& kv) { + const auto& [ntp, init_data] = kv; + vlog( + clusterlog.trace, + "[{}] init data: group: {}, log_revision: {}, " + "assigned: {}, hosted: {}, receiving: {}", + ntp, + init_data.group, + init_data.log_revision, + init_data.assigned.shard, + init_data.hosted.shard, + init_data.receiving.shard); + + if (init_data.assigned.shard) { + auto entry = std::make_unique(); + entry->target = shard_placement_target( + init_data.group, + init_data.log_revision, + init_data.assigned.shard.value()); + _ntp2entry.emplace(ntp, std::move(entry)); + } + }); +} + +ss::future>> +shard_placement_table::gather_init_states( + const chunked_hash_map& partitions) { + chunked_vector orphan_assignments; + + co_await _kvstore.for_each( + kvstore_key_space, [&](bytes_view key_str, const iobuf& val) { + iobuf_parser key_parser(bytes_to_iobuf(key_str)); + + auto key_type = serde::read_nested(key_parser, 0); + switch (key_type) { + default: + return; + case kvstore_key_type::assignment: { + auto group = serde::read_nested(key_parser, 0); + auto ntp_it = partitions.find(group); + if (ntp_it == partitions.end()) { + vlog( + clusterlog.trace, + "recovered orphan assigned marker, group: {}", + group); + orphan_assignments.push_back(group); + } else { + auto marker = serde::from_iobuf( + val.copy()); + vlog( + clusterlog.trace, + "[{}] recovered assigned marker, lr: {} sr: {}", + ntp_it->second, + marker.log_revision, + marker.shard_revision); + + _states[ntp_it->second].assigned = shard_local_assignment{ + .group = group, + .log_revision = marker.log_revision, + .shard_revision = marker.shard_revision}; + } + break; + } + case kvstore_key_type::current_state: { + auto group = serde::read_nested(key_parser, 0); + auto marker = serde::from_iobuf(val.copy()); + vlog( + clusterlog.trace, + "[{}] recovered cur state marker, lr: {} sr: {} complete: {}", + marker.ntp, + marker.log_revision, + marker.shard_revision, + marker.is_complete); + + auto& state = _states[marker.ntp]; + if (state.current) { + throw std::runtime_error(fmt_with_ctx( + ssx::sformat, + "duplicate ntp {} in kvstore map on shard {}", + marker.ntp, + ss::this_shard_id())); + } + state.current = shard_local_state( + group, + marker.log_revision, + marker.is_complete ? hosted_status::hosted + : hosted_status::receiving, + marker.shard_revision); + break; + } + } + }); + + co_await ss::max_concurrent_for_each( + orphan_assignments.begin(), + orphan_assignments.end(), + 512, + [this](raft::group_id group) { + return _kvstore.remove( + kvstore_key_space, assignment_kvstore_key(group)); + }); + + co_return ss::make_foreign(std::make_unique(_states)); +} + +ss::future<> shard_placement_table::scatter_init_data( + const chunked_hash_map& + ntp2init_data) { + return ss::max_concurrent_for_each( + ntp2init_data.begin(), + ntp2init_data.end(), + 512, + [this](const std::pair& kv) { + const auto& [ntp, init_data] = kv; + auto it = _states.find(ntp); + if (it == _states.end()) { + return ss::now(); + } + auto& state = it->second; + + if (state.current) { + if (ss::this_shard_id() == init_data.hosted.shard) { + if (init_data.receiving.shard) { + state._next = init_data.receiving.shard; + } + } else if (ss::this_shard_id() != init_data.receiving.shard) { + state.current->status = hosted_status::obsolete; + } + } + + ss::future<> fut = ss::now(); + if (state.assigned) { + if (ss::this_shard_id() != init_data.assigned.shard) { + fut = _kvstore.remove( + kvstore_key_space, + assignment_kvstore_key(state.assigned->group)); + state.assigned = std::nullopt; + } else if (!init_data.hosted.shard) { + state._is_initial_for = init_data.log_revision; + } + } + + if (state.is_empty()) { + _states.erase(it); + } else { + vlog( + clusterlog.info, + "[{}] recovered placement state: {}", + ntp, + state); + } + + return fut; + }); +} + +ss::future<> shard_placement_table::initialize_from_topic_table( + ss::sharded& topics, model::node_id self) { + vassert( + ss::this_shard_id() == assignment_shard_id, + "method can only be invoked on shard {}", + assignment_shard_id); + vassert( + !is_persistence_enabled(), + "can't initialize from topic_table, persistence has already been " + "enabled"); + + co_await container().invoke_on_all( + [&topics, self](shard_placement_table& spt) { + return spt.do_initialize_from_topic_table(topics.local(), self); + }); + + if (!_ntp2entry.empty()) { + _cur_shard_revision += 1; + } +} + +ss::future<> shard_placement_table::do_initialize_from_topic_table( const topic_table& topics, model::node_id self) { // We expect topic_table to remain unchanged throughout the loop because the // method is supposed to be called after local controller replay is finished @@ -337,10 +632,6 @@ ss::future<> shard_placement_table::initialize( } }); } - - if (!_ntp2entry.empty()) { - _cur_shard_revision += 1; - } } ss::future<> shard_placement_table::set_target( diff --git a/src/v/cluster/shard_placement_table.h b/src/v/cluster/shard_placement_table.h index f1e53979518f..1f2ff9ae3264 100644 --- a/src/v/cluster/shard_placement_table.h +++ b/src/v/cluster/shard_placement_table.h @@ -140,14 +140,23 @@ class shard_placement_table std::optional _next; }; + using ntp2state_t = absl::node_hash_map; + explicit shard_placement_table(storage::kvstore&); /// Must be called on assignment_shard_id. bool is_persistence_enabled() const; ss::future<> enable_persistence(); - // must be called on each shard - ss::future<> initialize(const topic_table&, model::node_id self); + /// Must be called on assignment_shard_id. + /// precondition: is_persistence_enabled() == true + ss::future<> initialize_from_kvstore( + const chunked_hash_map& local_group2ntp); + + /// Must be called on assignment_shard_id. + /// precondition: is_persistence_enabled() == false + ss::future<> + initialize_from_topic_table(ss::sharded&, model::node_id self); using shard_callback_t = std::function; @@ -162,10 +171,7 @@ class shard_placement_table std::optional state_on_this_shard(const model::ntp&) const; - const absl::node_hash_map& - shard_local_states() const { - return _states; - } + const ntp2state_t& shard_local_states() const { return _states; } // partition lifecycle methods @@ -200,13 +206,24 @@ class shard_placement_table ss::future<> persist_shard_local_state(); + ss::future>> + gather_init_states(const chunked_hash_map&); + + struct ntp_init_data; + + ss::future<> + scatter_init_data(const chunked_hash_map&); + + ss::future<> + do_initialize_from_topic_table(const topic_table&, model::node_id self); + private: friend class shard_placement_test_fixture; // per-shard state // // node_hash_map for pointer stability - absl::node_hash_map _states; + ntp2state_t _states; // lock is needed to sync enabling persistence with shard-local // modifications. ssx::rwlock _persistence_lock; From 30887072849e8dd8cb8cce377cc8c7e148813d24 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Fri, 17 May 2024 13:13:28 +0200 Subject: [PATCH 08/17] c/shard_placement_table: update class comment --- src/v/cluster/shard_placement_table.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/v/cluster/shard_placement_table.h b/src/v/cluster/shard_placement_table.h index 1f2ff9ae3264..4a53bd33605a 100644 --- a/src/v/cluster/shard_placement_table.h +++ b/src/v/cluster/shard_placement_table.h @@ -30,10 +30,10 @@ namespace cluster { /// shard_balancer and current shard-local state is supposed to be modified by /// controller_backend as it creates/deletes/moves partitions. /// -/// Currently shard-local and target states are in-memory and target states -/// duplicate shard assignments that are stored in topic_table, but in the -/// future they will be persisted in the kvstore and target states will be set -/// independently. +/// Shard-local assignments and current states are persisted to kvstore on every +/// change. On startup this kvstore state is used to recover in-memory state. +/// In other words, we read kvstore only when initializing, and during normal +/// operation we only write to it. /// /// Note that in contrast to `cluster::shard_table` (that helps other parts of /// the system to find the shard where the `cluster::partition` object for some From 77107561c60689f15ec9c6fb2d30a301da6a15e0 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Tue, 30 Apr 2024 14:16:05 +0200 Subject: [PATCH 09/17] c/shard_placement_table: add get_target(ntp) method --- src/v/cluster/shard_placement_table.cc | 13 +++++++++++++ src/v/cluster/shard_placement_table.h | 3 +++ 2 files changed, 16 insertions(+) diff --git a/src/v/cluster/shard_placement_table.cc b/src/v/cluster/shard_placement_table.cc index c651d9eba6da..8517c2d251a0 100644 --- a/src/v/cluster/shard_placement_table.cc +++ b/src/v/cluster/shard_placement_table.cc @@ -818,6 +818,19 @@ shard_placement_table::state_on_this_shard(const model::ntp& ntp) const { return std::nullopt; } +std::optional +shard_placement_table::get_target(const model::ntp& ntp) const { + vassert( + ss::this_shard_id() == assignment_shard_id, + "method can only be invoked on shard {}", + assignment_shard_id); + auto it = _ntp2entry.find(ntp); + if (it != _ntp2entry.end()) { + return it->second->target; + } + return std::nullopt; +} + ss::future shard_placement_table::prepare_create( const model::ntp& ntp, model::revision_id expected_log_rev) { // ensure that there is no concurrent enable_persistence() call diff --git a/src/v/cluster/shard_placement_table.h b/src/v/cluster/shard_placement_table.h index 4a53bd33605a..9ae2f36b4bca 100644 --- a/src/v/cluster/shard_placement_table.h +++ b/src/v/cluster/shard_placement_table.h @@ -169,6 +169,9 @@ class shard_placement_table // getters + /// Must be called on assignment_shard_id. + std::optional get_target(const model::ntp&) const; + std::optional state_on_this_shard(const model::ntp&) const; const ntp2state_t& shard_local_states() const { return _states; } From ed1e25be7079611d9c43817a9751ea0c27191cd6 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Mon, 13 May 2024 14:38:08 +0200 Subject: [PATCH 10/17] c/shard_placement_table/ut: improve logging Make different components log with different prefixes in test to more easily disambiguate their messages. --- .../tests/shard_placement_table_test.cc | 61 +++++++++++++------ 1 file changed, 42 insertions(+), 19 deletions(-) diff --git a/src/v/cluster/tests/shard_placement_table_test.cc b/src/v/cluster/tests/shard_placement_table_test.cc index aaae8645c5b2..32c63b0bc944 100644 --- a/src/v/cluster/tests/shard_placement_table_test.cc +++ b/src/v/cluster/tests/shard_placement_table_test.cc @@ -18,6 +18,7 @@ #include "storage/storage_resources.h" #include "test_utils/randoms.h" #include "test_utils/test.h" +#include "utils/prefix_logger.h" #include #include @@ -91,7 +92,8 @@ class reconciliation_backend ss::sharded& ntp2shards) : _ntpt(ntpt.local()) , _shard_placement(spt.local()) - , _ntp2shards(ntp2shards) {} + , _ntp2shards(ntp2shards) + , _logger(clusterlog, "RB") {} ss::future<> stop() { for (auto& [_, rs] : _states) { @@ -115,7 +117,7 @@ class reconciliation_backend auto& rs = *rs_it->second; rs.pending_notifies += 1; vlog( - clusterlog.trace, + _logger.trace, "[{}] notify reconciliation, pending_notifies: {}", ntp, rs.pending_notifies); @@ -175,7 +177,7 @@ class reconciliation_backend auto ex = std::current_exception(); if (!ssx::is_shutdown_exception(ex)) { vlog( - clusterlog.error, + _logger.error, "[{}] unexpected exception during reconciliation: {}", ntp, ex); @@ -205,7 +207,7 @@ class reconciliation_backend if (res.has_value()) { if (res.value() == ss::stop_iteration::yes) { vlog( - clusterlog.trace, + _logger.trace, "[{}] reconciled, notify count: {}", ntp, notifies); @@ -217,7 +219,7 @@ class reconciliation_backend continue; } else { vlog( - clusterlog.trace, + _logger.trace, "[{}] reconciliation attempt error: {}", ntp, res.error()); @@ -226,7 +228,7 @@ class reconciliation_backend } catch (ss::abort_requested_exception const&) { } catch (...) { vlog( - clusterlog.warn, + _logger.warn, "[{}] exception occured during reconciliation: {}", ntp, std::current_exception()); @@ -250,7 +252,7 @@ class reconciliation_backend } vlog( - clusterlog.trace, + _logger.trace, "[{}] placement state on this shard: {}, expected_log_revision: {}", ntp, placement, @@ -298,12 +300,17 @@ class reconciliation_backend model::revision_id log_revision, bool state_expected) { auto ec = co_await _shard_placement.prepare_create(ntp, log_revision); - vlog(clusterlog.trace, "[{}] creating partition: {}", ntp, ec); + vlog(_logger.trace, "[{}] creating partition: {}", ntp, ec); if (ec) { co_return ec; } _launched.insert(ntp); + vlog( + _logger.trace, + "[{}] started partition log_revision: {}", + ntp, + log_revision); co_await ss::sleep(1ms * random_generators::get_int(30)); co_await _ntp2shards.invoke_on( @@ -358,7 +365,7 @@ class reconciliation_backend model::revision_id cmd_revision) { auto ec = co_await _shard_placement.prepare_delete(ntp, cmd_revision); vlog( - clusterlog.trace, + _logger.trace, "[{}] deleting partition at cmd_revision: {}, ec: {}", ntp, cmd_revision, @@ -373,6 +380,13 @@ class reconciliation_backend } bool launched_expected = _launched.erase(ntp); + if (launched_expected) { + vlog( + _logger.trace, + "[{}] stopped partition log_revision: {}", + ntp, + placement.current->log_revision); + } co_await ss::sleep(1ms * random_generators::get_int(30)); co_await _ntp2shards.invoke_on( @@ -442,7 +456,7 @@ class reconciliation_backend ntp, log_revision); if (maybe_dest.has_error()) { vlog( - clusterlog.trace, + _logger.trace, "[{}] preparing transfer error: {}", ntp, maybe_dest.error()); @@ -450,13 +464,20 @@ class reconciliation_backend } vlog( - clusterlog.trace, + _logger.trace, "[{}] preparing transfer dest: {}", ntp, maybe_dest.value()); ss::shard_id destination = maybe_dest.value(); bool launched_expected = _launched.erase(ntp); + if (launched_expected) { + vlog( + _logger.trace, + "[{}] stopped partition for transfer, log_revision: {}", + ntp, + log_revision); + } co_await _ntp2shards.invoke_on( 0, @@ -562,7 +583,7 @@ class reconciliation_backend }); co_await _shard_placement.finish_transfer_on_source(ntp, log_revision); - vlog(clusterlog.trace, "[{}] transferred", ntp); + vlog(_logger.trace, "[{}] transferred", ntp); co_return errc::success; } @@ -575,6 +596,7 @@ class reconciliation_backend _states; absl::flat_hash_set _launched; ss::gate _gate; + prefix_logger _logger; }; // Limit concurrency to 4 so that there are more interesting repeats in randomly @@ -859,12 +881,13 @@ class shard_placement_test_fixture : public seastar_test { TEST_F_CORO(shard_placement_test_fixture, StressTest) { model::revision_id cur_revision{1}; raft::group_id cur_group{1}; + prefix_logger logger(clusterlog, "TEST"); co_await start(); for (size_t i = 0; i < 10'000; ++i) { if (random_generators::get_int(15) == 0) { - vlog(clusterlog.info, "waiting for reconciliation"); + vlog(logger.info, "waiting for reconciliation"); for (size_t i = 0;; ++i) { ASSERT_TRUE_CORO(i < 50) << "taking too long to reconcile"; if (!(_shard_assigner->is_reconciled() @@ -875,7 +898,7 @@ TEST_F_CORO(shard_placement_test_fixture, StressTest) { } } - vlog(clusterlog.info, "reconciled"); + vlog(logger.info, "reconciled"); co_await quiescent_state_checks(); } @@ -889,7 +912,7 @@ TEST_F_CORO(shard_placement_test_fixture, StressTest) { auto group = cur_group++; auto revision = cur_revision++; vlog( - clusterlog.info, + logger.info, "[{}] OP: add, group: {}, log revision: {}", ntp, group, @@ -917,11 +940,11 @@ TEST_F_CORO(shard_placement_test_fixture, StressTest) { {op_t::transfer, op_t::remove, op_t::increase_log_rev}); switch (op) { case op_t::transfer: - vlog(clusterlog.info, "[{}] OP: reassign shard", ntp); + vlog(logger.info, "[{}] OP: reassign shard", ntp); _shard_assigner->assign_eventually(ntp); break; case op_t::remove: { - vlog(clusterlog.info, "[{}] OP: remove", ntp); + vlog(logger.info, "[{}] OP: remove", ntp); auto revision = cur_revision++; co_await ntpt.invoke_on_all([&](ntp_table& ntpt) { ntpt.ntp2meta.erase(ntp); @@ -935,7 +958,7 @@ TEST_F_CORO(shard_placement_test_fixture, StressTest) { case op_t::increase_log_rev: ntp_meta.log_revision = cur_revision++; vlog( - clusterlog.info, + logger.info, "[{}] OP: increase log revision to: {}", ntp, ntp_meta.log_revision); @@ -951,7 +974,7 @@ TEST_F_CORO(shard_placement_test_fixture, StressTest) { } } - vlog(clusterlog.info, "finished"); + vlog(logger.info, "finished"); } } // namespace cluster From c65c602140e0814e9e065687669c49c027a8b993 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Wed, 20 Mar 2024 15:23:39 +0100 Subject: [PATCH 11/17] c/shard_placement_table/ut: restart in between ops --- .../tests/shard_placement_table_test.cc | 147 ++++++++++++++---- 1 file changed, 115 insertions(+), 32 deletions(-) diff --git a/src/v/cluster/tests/shard_placement_table_test.cc b/src/v/cluster/tests/shard_placement_table_test.cc index 32c63b0bc944..9b35ee57c673 100644 --- a/src/v/cluster/tests/shard_placement_table_test.cc +++ b/src/v/cluster/tests/shard_placement_table_test.cc @@ -620,8 +620,14 @@ class shard_assigner { , _rb(rb) {} ss::future<> start() { - // TODO: bootstrap _shard_placement. We don't need it for now because we - // don't restart in this test yet. + for (const auto& [ntp, meta] : _ntpt.ntp2meta) { + auto maybe_target = _shard_placement.get_target(ntp); + if ( + !maybe_target + || maybe_target->log_revision != meta.log_revision) { + assign_eventually(ntp); + } + } ssx::background = assign_fiber(); co_return; @@ -632,6 +638,13 @@ class shard_assigner { return _gate.close(); } + void enable_persistence_eventually() { + if (!_enable_persistence) { + _enable_persistence = true; + _wakeup_event.set(); + } + } + void assign_eventually(const model::ntp& ntp) { _to_assign.insert(ntp); _wakeup_event.set(); @@ -652,6 +665,10 @@ class shard_assigner { co_return; } + if (_enable_persistence) { + co_await _shard_placement.enable_persistence(); + } + auto to_assign = std::exchange(_to_assign, {}); _in_progress = true; co_await ss::max_concurrent_for_each( @@ -696,6 +713,7 @@ class shard_assigner { ntp2shards_t& _ntp2shards; ss::sharded& _rb; + bool _enable_persistence = false; chunked_hash_set _to_assign; bool _in_progress = false; ssx::event _wakeup_event{"shard_assigner"}; @@ -710,7 +728,7 @@ class shard_placement_test_fixture : public seastar_test { : test_dir("test.data." + random_generators::gen_alphanum_string(10)) {} ss::future<> quiescent_state_checks() { - auto shard2states = co_await spt.map( + auto shard2states = co_await spt->map( [](shard_placement_table& spt) { return spt._states; }); absl::node_hash_map< @@ -750,8 +768,8 @@ class shard_placement_test_fixture : public seastar_test { << "ntp: " << ntp; const auto& shard2state = states_it->second; - auto entry_it = spt.local()._ntp2entry.find(ntp); - ASSERT_TRUE_CORO(entry_it != spt.local()._ntp2entry.end()) + auto entry_it = spt->local()._ntp2entry.find(ntp); + ASSERT_TRUE_CORO(entry_it != spt->local()._ntp2entry.end()) << "ntp: " << ntp; ASSERT_TRUE_CORO(entry_it->second->target) << "ntp: " << ntp; ASSERT_TRUE_CORO(entry_it->second->mtx.ready()) << "ntp: " << ntp; @@ -817,14 +835,55 @@ class shard_placement_test_fixture : public seastar_test { co_await ft.start(); co_await ft.invoke_on_all( [](features::feature_table& ft) { ft.testing_activate_all(); }); - co_await ntpt.start(); - co_await _ntp2shards.start_single(); - co_await sr.start(); - co_await kvs.start( + co_await restart_node(true); + } + + ss::future<> stop() { + if (_shard_assigner) { + co_await _shard_assigner->stop(); + } + if (rb) { + co_await rb->stop(); + } + if (spt) { + co_await spt->stop(); + } + if (kvs) { + co_await kvs->stop(); + } + co_await sr.stop(); + co_await _ntp2shards.stop(); + co_await ntpt.stop(); + co_await ft.stop(); + } + + ss::future<> restart_node(bool first_start) { + if (_shard_assigner) { + co_await _shard_assigner->stop(); + } + if (rb) { + co_await rb->stop(); + } + if (spt) { + co_await spt->stop(); + } + if (kvs) { + co_await kvs->stop(); + } + + for (auto& [ntp, shards] : _ntp2shards.local()) { + for (auto& [lr, p_shards] : shards.rev2shards) { + // "stop" mock partitions + p_shards.launched_on = std::nullopt; + } + } + + kvs = std::make_unique(); + co_await kvs->start( storage::kvstore_config( 1_MiB, config::mock_binding(10ms), @@ -832,35 +891,42 @@ class shard_placement_test_fixture : public seastar_test { storage::make_sanitized_file_config()), ss::sharded_parameter([this] { return std::ref(sr.local()); }), std::ref(ft)); - co_await kvs.invoke_on_all( + co_await kvs->invoke_on_all( [](storage::kvstore& kvs) { return kvs.start(); }); - co_await spt.start( - ss::sharded_parameter([this] { return std::ref(kvs.local()); })); + spt = std::make_unique(); + co_await spt->start( + ss::sharded_parameter([this] { return std::ref(kvs->local()); })); - co_await rb.start(std::ref(ntpt), std::ref(spt), std::ref(_ntp2shards)); + if (!first_start) { + chunked_hash_map local_group2ntp; + for (const auto& [ntp, meta] : ntpt.local().ntp2meta) { + local_group2ntp.emplace(meta.group, ntp); + } + co_await spt->local().initialize_from_kvstore(local_group2ntp); + + for (auto& [ntp, shards] : _ntp2shards.local()) { + if ( + shards.target + && !local_group2ntp.contains(shards.target->group)) { + // clear obsolete targets + shards.target = std::nullopt; + } + } + } + + rb = std::make_unique(); + co_await rb->start( + std::ref(ntpt), std::ref(*spt), std::ref(_ntp2shards)); _shard_assigner = std::make_unique( - ntpt, spt, _ntp2shards, rb); + ntpt, *spt, _ntp2shards, *rb); co_await _shard_assigner->start(); - co_await rb.invoke_on_all( + co_await rb->invoke_on_all( [](reconciliation_backend& rb) { return rb.start(); }); } - ss::future<> stop() { - if (_shard_assigner) { - co_await _shard_assigner->stop(); - } - co_await rb.stop(); - co_await spt.stop(); - co_await kvs.stop(); - co_await sr.stop(); - co_await _ntp2shards.stop(); - co_await ntpt.stop(); - co_await ft.stop(); - } - ss::future<> TearDownAsync() override { co_await stop(); co_await ss::recursive_remove_directory( @@ -872,9 +938,9 @@ class shard_placement_test_fixture : public seastar_test { ss::sharded ntpt; ss::sharded _ntp2shards; // only on shard 0 ss::sharded sr; - ss::sharded kvs; - ss::sharded spt; - ss::sharded rb; + std::unique_ptr> kvs; + std::unique_ptr> spt; + std::unique_ptr> rb; std::unique_ptr _shard_assigner; }; @@ -885,13 +951,20 @@ TEST_F_CORO(shard_placement_test_fixture, StressTest) { co_await start(); + // enable persistence midway through the test + size_t enable_persistence_at = random_generators::get_int(4'000, 6'000); + for (size_t i = 0; i < 10'000; ++i) { + if (i == enable_persistence_at) { + _shard_assigner->enable_persistence_eventually(); + } + if (random_generators::get_int(15) == 0) { vlog(logger.info, "waiting for reconciliation"); for (size_t i = 0;; ++i) { ASSERT_TRUE_CORO(i < 50) << "taking too long to reconcile"; if (!(_shard_assigner->is_reconciled() - && co_await rb.local().is_reconciled())) { + && co_await rb->local().is_reconciled())) { co_await ss::sleep(100ms); } else { break; @@ -900,6 +973,16 @@ TEST_F_CORO(shard_placement_test_fixture, StressTest) { vlog(logger.info, "reconciled"); co_await quiescent_state_checks(); + continue; + } + + if ( + spt->local().is_persistence_enabled() + && random_generators::get_int(50) == 0) { + vlog(logger.info, "restarting"); + co_await restart_node(false); + vlog(logger.info, "restarted"); + continue; } // small set of ntps to ensure frequent overlaps From 1e353b9a92b0b4c4f7e110776abf1e56b4e177d7 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Mon, 13 May 2024 16:23:38 +0200 Subject: [PATCH 12/17] c/shard_placement_table/ut: add helper to assert key set equality --- .../tests/shard_placement_table_test.cc | 37 ++++++++++++++++++- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/src/v/cluster/tests/shard_placement_table_test.cc b/src/v/cluster/tests/shard_placement_table_test.cc index 9b35ee57c673..e9fd047840cf 100644 --- a/src/v/cluster/tests/shard_placement_table_test.cc +++ b/src/v/cluster/tests/shard_placement_table_test.cc @@ -720,6 +720,31 @@ class shard_assigner { ss::gate _gate; }; +template +void assert_key_sets_equal( + const Left& left, + std::string_view left_str, + const Right& right, + std::string_view right_str) { + std::vector keys1; + for (const auto& kv : left) { + if (!right.contains(kv.first)) { + keys1.push_back(kv.first); + } + } + ASSERT_TRUE(keys1.empty()) << "keys in " << left_str << ", but not in " + << right_str << ": " << keys1; + + std::vector keys2; + for (const auto& kv : right) { + if (!left.contains(kv.first)) { + keys2.push_back(kv.first); + } + } + ASSERT_TRUE(keys2.empty()) << "keys in " << right_str << ", but not in " + << left_str << ": " << keys2; +} + } // namespace class shard_placement_test_fixture : public seastar_test { @@ -741,7 +766,11 @@ class shard_placement_test_fixture : public seastar_test { } } - ASSERT_EQ_CORO(ntp2shard2state.size(), ntpt.local().ntp2meta.size()); + assert_key_sets_equal( + ntp2shard2state, + "spt placement state map", + ntpt.local().ntp2meta, + "ntp2meta map"); auto& ntp2shards = _ntp2shards.local(); for (auto it = ntp2shards.begin(); it != ntp2shards.end();) { @@ -760,7 +789,11 @@ class shard_placement_test_fixture : public seastar_test { } } - ASSERT_EQ_CORO(ntp2shards.size(), ntpt.local().ntp2meta.size()); + assert_key_sets_equal( + ntp2shards, + "reference ntp state map", + ntpt.local().ntp2meta, + "ntp2meta map"); for (const auto& [ntp, meta] : ntpt.local().ntp2meta) { auto states_it = ntp2shard2state.find(ntp); From 8d67f79d6d8fef691599f0812e34170e2cd33ce6 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Wed, 20 Mar 2024 23:16:49 +0100 Subject: [PATCH 13/17] c/shard_placement_table/ut: add after-restart checks --- .../tests/shard_placement_table_test.cc | 93 ++++++++++++++++--- 1 file changed, 82 insertions(+), 11 deletions(-) diff --git a/src/v/cluster/tests/shard_placement_table_test.cc b/src/v/cluster/tests/shard_placement_table_test.cc index e9fd047840cf..a69c6f4b42a8 100644 --- a/src/v/cluster/tests/shard_placement_table_test.cc +++ b/src/v/cluster/tests/shard_placement_table_test.cc @@ -752,26 +752,25 @@ class shard_placement_test_fixture : public seastar_test { shard_placement_test_fixture() : test_dir("test.data." + random_generators::gen_alphanum_string(10)) {} - ss::future<> quiescent_state_checks() { + using ntp2shard2state_t = absl::node_hash_map< + model::ntp, + std::map>; + + ss::future get_ntp2shard2state() const { auto shard2states = co_await spt->map( [](shard_placement_table& spt) { return spt._states; }); - absl::node_hash_map< - model::ntp, - std::map> - ntp2shard2state; + ntp2shard2state_t ntp2shard2state; for (size_t s = 0; s < shard2states.size(); ++s) { for (const auto& [ntp, state] : shard2states[s]) { ntp2shard2state[ntp].emplace(s, state); } } - assert_key_sets_equal( - ntp2shard2state, - "spt placement state map", - ntpt.local().ntp2meta, - "ntp2meta map"); + co_return ntp2shard2state; + } + void clean_ntp2shards() { auto& ntp2shards = _ntp2shards.local(); for (auto it = ntp2shards.begin(); it != ntp2shards.end();) { auto it_copy = it++; @@ -788,7 +787,18 @@ class shard_placement_test_fixture : public seastar_test { ntp2shards.erase(it_copy); } } + } + + ss::future<> quiescent_state_checks() { + auto ntp2shard2state = co_await get_ntp2shard2state(); + assert_key_sets_equal( + ntp2shard2state, + "spt placement state map", + ntpt.local().ntp2meta, + "ntp2meta map"); + clean_ntp2shards(); + const auto& ntp2shards = _ntp2shards.local(); assert_key_sets_equal( ntp2shards, "reference ntp state map", @@ -853,7 +863,7 @@ class shard_placement_test_fixture : public seastar_test { ASSERT_EQ_CORO(shards.rev2shards.size(), 1) << "ntp: " << ntp; auto p_shards_it = shards.rev2shards.find(meta.log_revision); ASSERT_TRUE_CORO(p_shards_it != shards.rev2shards.end()) - << "ntp: " << ntp; + << "ntp: " << ntp << ", log_revision: " << meta.log_revision; const auto& p_shards = p_shards_it->second; ASSERT_EQ_CORO(p_shards.launched_on, target.shard) << "ntp: " << ntp; @@ -864,6 +874,65 @@ class shard_placement_test_fixture : public seastar_test { } } + ss::future<> check_spt_recovery() { + clean_ntp2shards(); + const auto& ntp2shards = _ntp2shards.local(); + auto ntp2shard2state = co_await get_ntp2shard2state(); + assert_key_sets_equal( + ntp2shards, + "reference ntp state map", + ntp2shard2state, + "spt placement state map"); + + for (const auto& [ntp, expected] : ntp2shards) { + auto states_it = ntp2shard2state.find(ntp); + ASSERT_TRUE_CORO(states_it != ntp2shard2state.end()) + << "ntp: " << ntp; + const auto& shard2state = states_it->second; + + // check main target map + auto entry_it = spt->local()._ntp2entry.find(ntp); + if (expected.target) { + ASSERT_TRUE_CORO(entry_it != spt->local()._ntp2entry.end()) + << "ntp: " << ntp; + ASSERT_EQ_CORO(entry_it->second->target, expected.target) + << "ntp: " << ntp; + ASSERT_TRUE_CORO(entry_it->second->mtx.ready()) + << "ntp: " << ntp; + } else { + ASSERT_TRUE_CORO(entry_it == spt->local()._ntp2entry.end()) + << "ntp: " << ntp; + } + + // check assigned markers + if (expected.target) { + ASSERT_TRUE_CORO(shard2state.contains(expected.target->shard)); + } + for (const auto& [s, placement] : shard2state) { + if (expected.target && s == expected.target->shard) { + ASSERT_TRUE_CORO(placement.assigned) + << "ntp: " << ntp << ", shard: " << s; + ASSERT_EQ_CORO( + placement.assigned->log_revision, + expected.target->log_revision) + << "ntp: " << ntp << ", shard: " << s; + } else { + ASSERT_TRUE_CORO(!placement.assigned) + << "ntp: " << ntp << ", shard: " << s; + } + } + + // check that all shards with state are known in the placement map. + for (ss::shard_id s : expected.shards_with_some_state) { + auto state_it = shard2state.find(s); + ASSERT_TRUE_CORO(state_it != shard2state.end()) + << "ntp: " << ntp << ", shard: " << s; + ASSERT_TRUE_CORO(state_it->second.current) + << "ntp: " << ntp << ", shard: " << s; + } + } + } + ss::future<> start() { co_await ft.start(); co_await ft.invoke_on_all( @@ -948,6 +1017,8 @@ class shard_placement_test_fixture : public seastar_test { } } + co_await check_spt_recovery(); + rb = std::make_unique(); co_await rb->start( std::ref(ntpt), std::ref(*spt), std::ref(_ntp2shards)); From 26366e266881cec42a7078dc2d038c07df48f00d Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Tue, 7 May 2024 13:35:22 +0200 Subject: [PATCH 14/17] c/shard_balancer: use local shard_placement_table instance everywhere --- src/v/cluster/controller.cc | 2 +- src/v/cluster/shard_balancer.cc | 12 +++++------- src/v/cluster/shard_balancer.h | 4 ++-- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/src/v/cluster/controller.cc b/src/v/cluster/controller.cc index 1cf7326cdeb7..1897932cbc2e 100644 --- a/src/v/cluster/controller.cc +++ b/src/v/cluster/controller.cc @@ -441,8 +441,8 @@ ss::future<> controller::start( }) .then([this] { return _shard_balancer.start_single( - std::ref(_tp_state), std::ref(_shard_placement), + std::ref(_tp_state), std::ref(_backend)); }) .then( diff --git a/src/v/cluster/shard_balancer.cc b/src/v/cluster/shard_balancer.cc index cf5ae32e857f..b6c96fb63a40 100644 --- a/src/v/cluster/shard_balancer.cc +++ b/src/v/cluster/shard_balancer.cc @@ -18,11 +18,11 @@ namespace cluster { shard_balancer::shard_balancer( - ss::sharded& topics, ss::sharded& spt, + ss::sharded& topics, ss::sharded& cb) - : _topics(topics) - , _shard_placement(spt) + : _shard_placement(spt.local()) + , _topics(topics) , _controller_backend(cb) , _self(*config::node().node_id()) {} @@ -34,8 +34,7 @@ ss::future<> shard_balancer::start() { auto tt_version = _topics.local().topics_map_revision(); - co_await _shard_placement.local().initialize_from_topic_table( - _topics, _self); + co_await _shard_placement.initialize_from_topic_table(_topics, _self); // we shouldn't be receiving any controller updates at this point, so no // risk of missing a notification between initializing shard_placement_table @@ -117,8 +116,7 @@ ss::future<> shard_balancer::assign_ntp(const model::ntp& ntp) { target); try { - co_await _shard_placement.local().set_target( - ntp, target, shard_callback); + co_await _shard_placement.set_target(ntp, target, shard_callback); } catch (...) { auto ex = std::current_exception(); if (!ssx::is_shutdown_exception(ex)) { diff --git a/src/v/cluster/shard_balancer.h b/src/v/cluster/shard_balancer.h index 3bc6feb29968..577390e68f7f 100644 --- a/src/v/cluster/shard_balancer.h +++ b/src/v/cluster/shard_balancer.h @@ -31,8 +31,8 @@ class shard_balancer { static constexpr ss::shard_id shard_id = 0; shard_balancer( - ss::sharded&, ss::sharded&, + ss::sharded&, ss::sharded&); ss::future<> start(); @@ -46,8 +46,8 @@ class shard_balancer { ss::future<> assign_ntp(const model::ntp&); private: + shard_placement_table& _shard_placement; ss::sharded& _topics; - ss::sharded& _shard_placement; ss::sharded& _controller_backend; model::node_id _self; From 91fb69b84f0bfba24bf8efc75b14e94d4f7980f3 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Tue, 7 May 2024 13:40:06 +0200 Subject: [PATCH 15/17] c/shard_balancer: initialize from kvstore if persistence is enabled --- src/v/cluster/shard_balancer.cc | 68 +++++++++++++++++++++++++++++++-- 1 file changed, 64 insertions(+), 4 deletions(-) diff --git a/src/v/cluster/shard_balancer.cc b/src/v/cluster/shard_balancer.cc index b6c96fb63a40..e924507f2355 100644 --- a/src/v/cluster/shard_balancer.cc +++ b/src/v/cluster/shard_balancer.cc @@ -14,6 +14,7 @@ #include "cluster/cluster_utils.h" #include "cluster/logger.h" #include "config/node_config.h" +#include "ssx/async_algorithm.h" namespace cluster { @@ -32,17 +33,76 @@ ss::future<> shard_balancer::start() { "method can only be invoked on shard {}", shard_id); + // We expect topic_table to remain unchanged throughout the method + // invocation because it is supposed to be called after local controller + // replay is finished but before we start getting new controller updates + // from the leader. auto tt_version = _topics.local().topics_map_revision(); - co_await _shard_placement.initialize_from_topic_table(_topics, _self); + if (_shard_placement.is_persistence_enabled()) { + // 1. collect the set of node-local ntps from topic_table + + chunked_hash_map local_group2ntp; + chunked_hash_map local_ntp2log_revision; + const auto& topics = _topics.local(); + ssx::async_counter counter; + for (const auto& [ns_tp, md_item] : topics.all_topics_metadata()) { + vassert( + tt_version == topics.topics_map_revision(), + "topic_table unexpectedly changed"); + + co_await ssx::async_for_each_counter( + counter, + md_item.get_assignments().begin(), + md_item.get_assignments().end(), + [&](const partition_assignment& p_as) { + vassert( + tt_version == topics.topics_map_revision(), + "topic_table unexpectedly changed"); + + model::ntp ntp{ns_tp.ns, ns_tp.tp, p_as.id}; + auto replicas_view = topics.get_replicas_view( + ntp, md_item, p_as); + auto log_rev = log_revision_on_node(replicas_view, _self); + if (log_rev) { + local_group2ntp.emplace( + replicas_view.assignment.group, ntp); + local_ntp2log_revision.emplace(ntp, *log_rev); + } + }); + } + + // 2. restore shard_placement_table from the kvstore + + co_await _shard_placement.initialize_from_kvstore(local_group2ntp); + + // 3. assign non-assigned ntps that have to be assigned + + co_await ssx::async_for_each_counter( + counter, + local_ntp2log_revision.begin(), + local_ntp2log_revision.end(), + [&](const std::pair kv) { + const auto& [ntp, log_revision] = kv; + auto existing_target = _shard_placement.get_target(ntp); + if ( + !existing_target + || existing_target->log_revision != log_revision) { + _to_assign.insert(ntp); + } + }); + co_await do_assign_ntps(); + } else { + co_await _shard_placement.initialize_from_topic_table(_topics, _self); + } - // we shouldn't be receiving any controller updates at this point, so no - // risk of missing a notification between initializing shard_placement_table - // and subscribing. vassert( tt_version == _topics.local().topics_map_revision(), "topic_table unexpectedly changed"); + // we shouldn't be receiving any controller updates at this point, so no + // risk of missing a notification between initializing shard_placement_table + // and subscribing. _topic_table_notify_handle = _topics.local().register_delta_notification( [this](topic_table::delta_range_t deltas_range) { for (const auto& delta : deltas_range) { From caa89ca1963d1a2bf4a418209376ca4a5eaae145 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Tue, 7 May 2024 13:59:37 +0200 Subject: [PATCH 16/17] features: add shard_placement_persistence feature flag This flag will signal nodes that they can switch to persisting their shard_placement_table locally in the kvstore. --- src/v/features/feature_table.cc | 2 ++ src/v/features/feature_table.h | 7 +++++++ 2 files changed, 9 insertions(+) diff --git a/src/v/features/feature_table.cc b/src/v/features/feature_table.cc index e784527d3533..8dc3f2539b45 100644 --- a/src/v/features/feature_table.cc +++ b/src/v/features/feature_table.cc @@ -101,6 +101,8 @@ std::string_view to_string_view(feature f) { return "role_based_access_control"; case feature::cluster_topic_manifest_format_v2: return "cluster_topic_manifest_format_v2"; + case feature::shard_placement_persistence: + return "shard_placement_persistence"; /* * testing features diff --git a/src/v/features/feature_table.h b/src/v/features/feature_table.h index fad0212ca210..67bc8ef1c5df 100644 --- a/src/v/features/feature_table.h +++ b/src/v/features/feature_table.h @@ -75,6 +75,7 @@ enum class feature : std::uint64_t { partition_shard_in_health_report = 1ULL << 43U, role_based_access_control = 1ULL << 44U, cluster_topic_manifest_format_v2 = 1ULL << 45U, + shard_placement_persistence = 1ULL << 46U, // Dummy features for testing only test_alpha = 1ULL << 61U, @@ -380,6 +381,12 @@ constexpr static std::array feature_schema{ feature::cluster_topic_manifest_format_v2, feature_spec::available_policy::always, feature_spec::prepare_policy::always}, + feature_spec{ + cluster::cluster_version{13}, + "shard_placement_persistence", + feature::shard_placement_persistence, + feature_spec::available_policy::always, + feature_spec::prepare_policy::always}, }; std::string_view to_string_view(feature); From faa76fd8f26ed39511e9b8c0999fcda55363dddf Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Tue, 7 May 2024 14:24:07 +0200 Subject: [PATCH 17/17] c/shard_balancer: enable persistence when the feature barrier allows --- src/v/cluster/controller.cc | 1 + src/v/cluster/shard_balancer.cc | 13 +++++++++++++ src/v/cluster/shard_balancer.h | 2 ++ 3 files changed, 16 insertions(+) diff --git a/src/v/cluster/controller.cc b/src/v/cluster/controller.cc index 1897932cbc2e..0ee689268772 100644 --- a/src/v/cluster/controller.cc +++ b/src/v/cluster/controller.cc @@ -442,6 +442,7 @@ ss::future<> controller::start( .then([this] { return _shard_balancer.start_single( std::ref(_shard_placement), + std::ref(_feature_table), std::ref(_tp_state), std::ref(_backend)); }) diff --git a/src/v/cluster/shard_balancer.cc b/src/v/cluster/shard_balancer.cc index e924507f2355..46979fe912b6 100644 --- a/src/v/cluster/shard_balancer.cc +++ b/src/v/cluster/shard_balancer.cc @@ -20,9 +20,11 @@ namespace cluster { shard_balancer::shard_balancer( ss::sharded& spt, + ss::sharded& features, ss::sharded& topics, ss::sharded& cb) : _shard_placement(spt.local()) + , _features(features.local()) , _topics(topics) , _controller_backend(cb) , _self(*config::node().node_id()) {} @@ -94,6 +96,11 @@ ss::future<> shard_balancer::start() { co_await do_assign_ntps(); } else { co_await _shard_placement.initialize_from_topic_table(_topics, _self); + + if (_features.is_active( + features::feature::shard_placement_persistence)) { + co_await _shard_placement.enable_persistence(); + } } vassert( @@ -146,6 +153,12 @@ ss::future<> shard_balancer::assign_fiber() { co_return; } + if ( + _features.is_active(features::feature::shard_placement_persistence) + && !_shard_placement.is_persistence_enabled()) { + co_await _shard_placement.enable_persistence(); + } + co_await do_assign_ntps(); } } diff --git a/src/v/cluster/shard_balancer.h b/src/v/cluster/shard_balancer.h index 577390e68f7f..e5d66712a4dc 100644 --- a/src/v/cluster/shard_balancer.h +++ b/src/v/cluster/shard_balancer.h @@ -32,6 +32,7 @@ class shard_balancer { shard_balancer( ss::sharded&, + ss::sharded&, ss::sharded&, ss::sharded&); @@ -47,6 +48,7 @@ class shard_balancer { private: shard_placement_table& _shard_placement; + features::feature_table& _features; ss::sharded& _topics; ss::sharded& _controller_backend; model::node_id _self;