Skip to content

Commit

Permalink
Merge pull request #6954 from mmaslankaprv/v22.2.x
Browse files Browse the repository at this point in the history
Backport of #6251 #6905
  • Loading branch information
piyushredpanda authored Oct 26, 2022
2 parents c83a187 + e5776ba commit 96ed1cc
Show file tree
Hide file tree
Showing 32 changed files with 786 additions and 170 deletions.
8 changes: 8 additions & 0 deletions src/v/cluster/cluster_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,4 +329,12 @@ cluster::errc map_update_interruption_error_code(std::error_code ec) {
}
}

partition_allocation_domain
get_allocation_domain(const model::topic_namespace_view tp_ns) {
if (tp_ns == model::kafka_consumer_offsets_nt) {
return partition_allocation_domains::consumer_offsets;
}
return partition_allocation_domains::common;
}

} // namespace cluster
6 changes: 6 additions & 0 deletions src/v/cluster/cluster_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,4 +308,10 @@ inline bool contains_node(

cluster::errc map_update_interruption_error_code(std::error_code);

partition_allocation_domain get_allocation_domain(model::topic_namespace_view);
inline partition_allocation_domain
get_allocation_domain(const model::ntp& ntp) {
return get_allocation_domain(model::topic_namespace_view(ntp));
}

} // namespace cluster
3 changes: 2 additions & 1 deletion src/v/cluster/health_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/
#include "cluster/health_manager.h"

#include "cluster/cluster_utils.h"
#include "cluster/logger.h"
#include "cluster/members_table.h"
#include "cluster/partition_leaders_table.h"
Expand Down Expand Up @@ -82,7 +83,7 @@ ss::future<bool> health_manager::ensure_partition_replication(model::ntp ntp) {
ntp.tp.partition, _target_replication_factor);

auto allocation = _allocator.local().reallocate_partition(
constraints, *assignment);
constraints, *assignment, get_allocation_domain(ntp));
if (!allocation) {
vlog(
clusterlog.warn,
Expand Down
149 changes: 118 additions & 31 deletions src/v/cluster/members_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ members_backend::members_backend(
, _members_frontend(members_frontend)
, _raft0(raft0)
, _as(as)
, _retry_timeout(config::shard_local_cfg().members_backend_retry_ms()) {
, _retry_timeout(config::shard_local_cfg().members_backend_retry_ms())
, _max_concurrent_reallocations(
config::shard_local_cfg()
.partition_autobalancing_concurrent_moves.bind()) {
_retry_timer.set_callback([this] { start_reconciliation_loop(); });
}

Expand Down Expand Up @@ -166,19 +169,26 @@ bool is_in_replica_set(
});
}

void members_backend::calculate_reallocations(update_meta& meta) {
ss::future<> members_backend::calculate_reallocations(update_meta& meta) {
switch (meta.update.type) {
case members_manager::node_update_type::decommissioned:
calculate_reallocations_after_decommissioned(meta);
return;
co_await calculate_reallocations_after_decommissioned(meta);
co_return;
case members_manager::node_update_type::added:
calculate_reallocations_after_node_added(meta);
return;
co_await calculate_reallocations_after_node_added(
meta, partition_allocation_domains::consumer_offsets);
if (
meta.partition_reallocations.size()
< _max_concurrent_reallocations()) {
co_await calculate_reallocations_after_node_added(
meta, partition_allocation_domains::common);
}
co_return;
case members_manager::node_update_type::recommissioned:
calculate_reallocations_after_recommissioned(meta);
return;
default:
return;
co_await calculate_reallocations_after_recommissioned(meta);
co_return;
case members_manager::node_update_type::reallocation_finished:
co_return;
}
}
/**
Expand All @@ -199,8 +209,8 @@ struct replicas_to_move {
}
};

void members_backend::calculate_reallocations_after_decommissioned(
members_backend::update_meta& meta) const {
ss::future<> members_backend::calculate_reallocations_after_decommissioned(
members_backend::update_meta& meta) {
// reallocate all partitions for which any of replicas is placed on
// decommissioned node
for (const auto& [tp_ns, cfg] : _topics.local().topics_map()) {
Expand All @@ -209,6 +219,16 @@ void members_backend::calculate_reallocations_after_decommissioned(
}

for (const auto& pas : cfg.get_assignments()) {
// break when we already scheduled more than allowed reallocations
if (
meta.partition_reallocations.size()
>= _max_concurrent_reallocations()) {
vlog(
clusterlog.info,
"reached limit of max concurrent reallocations: {}",
meta.partition_reallocations.size());
break;
}
model::ntp ntp(tp_ns.ns, tp_ns.tp, pas.id);
if (is_in_replica_set(pas.replicas, meta.update.id)) {
auto previous_replica_set
Expand All @@ -231,15 +251,27 @@ void members_backend::calculate_reallocations_after_decommissioned(
}
}
}
co_return;
}

bool is_reassigned_to_node(
const members_backend::partition_reallocation& reallocation,
model::node_id node_id) {
if (!reallocation.allocation_units.has_value()) {
return false;
}
return is_in_replica_set(
reallocation.allocation_units->get_assignments().front().replicas,
node_id);
}
void members_backend::calculate_reallocations_after_node_added(
members_backend::update_meta& meta) const {

ss::future<> members_backend::calculate_reallocations_after_node_added(
members_backend::update_meta& meta, partition_allocation_domain domain) {
if (
config::shard_local_cfg().partition_autobalancing_mode()
== model::partition_autobalancing_mode::off) {
return;
co_return;
}
auto& topics = _topics.local().topics_map();
struct node_info {
size_t replicas_count;
size_t max_capacity;
Expand All @@ -254,19 +286,27 @@ void members_backend::calculate_reallocations_after_node_added(
id,
node_info{
.replicas_count = 0,
.max_capacity = n->max_capacity(),
.max_capacity = n->domain_partition_capacity(domain),
});
}
auto it = node_replicas.find(id);
it->second.replicas_count += n->allocated_partitions();
total_replicas += n->allocated_partitions();
const auto domain_allocated = n->domain_allocated_partitions(domain);
it->second.replicas_count += domain_allocated;
total_replicas += domain_allocated;
}

// 2. calculate number of replicas per node leading to even replica per
// node distribution
auto target_replicas_per_node
= total_replicas / _allocator.local().state().available_nodes();

vlog(
clusterlog.info,
"[update: {}] there are {} replicas in {} domain, requested to assign {} "
"replicas per node",
meta.update,
total_replicas,
domain,
target_replicas_per_node);
// 3. calculate how many replicas have to be moved from each node
std::vector<replicas_to_move> to_move_from_node;
for (auto& [id, info] : node_replicas) {
Expand All @@ -282,16 +322,33 @@ void members_backend::calculate_reallocations_after_node_added(
[](const replicas_to_move& m) { return m.left_to_move == 0; });
// nothing to do, exit early
if (all_empty) {
return;
co_return;
}

auto cmp = [](const replicas_to_move& lhs, const replicas_to_move& rhs) {
return lhs.left_to_move < rhs.left_to_move;
};

// 4. Pass over all partition metadata once, try to move until we reach even
// number of partitions per core on each node
if (clusterlog.is_enabled(ss::log_level::info)) {
for (const auto& [id, cnt] : to_move_from_node) {
vlog(
clusterlog.info,
"[update: {}] there are {} replicas to move from node {} in "
"domain {}, current allocations: {}",
meta.update,
cnt,
id,
domain,
node_replicas[id].replicas_count);
}
}
auto& topics = _topics.local().topics_map();
// 4. Pass over all partition metadata
for (auto& [tp_ns, metadata] : topics) {
// skip partitions outside of current domain
if (get_allocation_domain(tp_ns) != domain) {
continue;
}
// do not try to move internal partitions
if (
tp_ns.ns == model::kafka_internal_namespace
Expand All @@ -317,6 +374,10 @@ void members_backend::calculate_reallocations_after_node_added(
std::erase_if(to_move_from_node, [](const replicas_to_move& v) {
return v.left_to_move == 0;
});
// skip if this partition is already replicated on added node
if (is_in_replica_set(p.replicas, meta.update.id)) {
continue;
}

std::sort(to_move_from_node.begin(), to_move_from_node.end(), cmp);
for (auto& to_move : to_move_from_node) {
Expand All @@ -325,11 +386,28 @@ void members_backend::calculate_reallocations_after_node_added(
&& to_move.left_to_move > 0) {
partition_reallocation reallocation(
model::ntp(tp_ns.ns, tp_ns.tp, p.id), p.replicas.size());

reallocation.replicas_to_remove.emplace(to_move.id);
auto current_assignment = p;
reassign_replicas(current_assignment, reallocation);
// if this reassignment does not involve the node we are
// targetting do not add it
if (!is_reassigned_to_node(reallocation, meta.update.id)) {
continue;
}
reallocation.state = reallocation_state::reassigned;
meta.partition_reallocations.push_back(
std::move(reallocation));
to_move.left_to_move--;
// reached max concurrent reallocations, yield
if (
meta.partition_reallocations.size()
>= _max_concurrent_reallocations()) {
vlog(
clusterlog.info,
"reached limit of max concurrent reallocations: {}",
meta.partition_reallocations.size());
co_return;
}
break;
}
}
Expand Down Expand Up @@ -361,8 +439,8 @@ std::vector<model::ntp> members_backend::ntps_moving_from_node_older_than(
return ret;
}

void members_backend::calculate_reallocations_after_recommissioned(
update_meta& meta) const {
ss::future<> members_backend::calculate_reallocations_after_recommissioned(
update_meta& meta) {
auto it = _decommission_command_revision.find(meta.update.id);
vassert(
it != _decommission_command_revision.end(),
Expand Down Expand Up @@ -390,6 +468,7 @@ void members_backend::calculate_reallocations_after_recommissioned(

meta.partition_reallocations.push_back(std::move(reallocation));
}
co_return;
}

ss::future<> members_backend::reconcile() {
Expand Down Expand Up @@ -463,14 +542,15 @@ ss::future<> members_backend::reconcile() {

vlog(
clusterlog.info,
"[update: {}] reconciliation loop - reallocations: {}, finished: {}",
"[update: {}] reconciliation loop - pending reallocation count: {}, "
"finished: {}",
meta.update,
meta.partition_reallocations,
meta.partition_reallocations.size(),
meta.finished);

// calculate necessary reallocations
if (meta.partition_reallocations.empty()) {
calculate_reallocations(meta);
co_await calculate_reallocations(meta);
// if there is nothing to reallocate, just finish this update
vlog(
clusterlog.info,
Expand Down Expand Up @@ -559,10 +639,15 @@ ss::future<> members_backend::reconcile() {
"[update: {}] decommissioning in progress. recalculating "
"reallocations",
meta.update);
calculate_reallocations(meta);
co_await calculate_reallocations(meta);
}
}
}
// remove finished reallocations
std::erase_if(
meta.partition_reallocations, [](const partition_reallocation& r) {
return r.state == reallocation_state::finished;
});
}

ss::future<>
Expand Down Expand Up @@ -621,7 +706,9 @@ void members_backend::reassign_replicas(
});

auto res = _allocator.local().reallocate_partition(
reallocation.constraints.value(), current_assignment);
reallocation.constraints.value(),
current_assignment,
get_allocation_domain(reallocation.ntp));
if (res.has_value()) {
reallocation.set_new_replicas(std::move(res.value()));
}
Expand Down
10 changes: 6 additions & 4 deletions src/v/cluster/members_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class members_backend {
ss::future<> reallocate_replica_set(partition_reallocation&);

ss::future<> try_to_finish_update(update_meta&);
void calculate_reallocations(update_meta&);
ss::future<> calculate_reallocations(update_meta&);

ss::future<> handle_updates();
void handle_single_update(members_manager::node_update);
Expand All @@ -97,9 +97,10 @@ class members_backend {
void stop_node_addition(model::node_id id);
void handle_reallocation_finished(model::node_id);
void reassign_replicas(partition_assignment&, partition_reallocation&);
void calculate_reallocations_after_node_added(update_meta&) const;
void calculate_reallocations_after_decommissioned(update_meta&) const;
void calculate_reallocations_after_recommissioned(update_meta&) const;
ss::future<> calculate_reallocations_after_node_added(
update_meta&, partition_allocation_domain);
ss::future<> calculate_reallocations_after_decommissioned(update_meta&);
ss::future<> calculate_reallocations_after_recommissioned(update_meta&);
std::vector<model::ntp> ntps_moving_from_node_older_than(
model::node_id, model::revision_id) const;
void setup_metrics();
Expand All @@ -122,6 +123,7 @@ class members_backend {
ss::timer<> _retry_timer;
ss::condition_variable _new_updates;
ss::metrics::metric_groups _metrics;
config::binding<size_t> _max_concurrent_reallocations;
/**
* store revision of node decommissioning update, decommissioning command
* revision is stored when node is being decommissioned, it is used to
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/partition_balancer_planner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "cluster/partition_balancer_planner.h"

#include "cluster/cluster_utils.h"
#include "cluster/members_table.h"
#include "cluster/partition_balancer_types.h"
#include "cluster/scheduling/constraints.h"
Expand Down Expand Up @@ -253,7 +254,7 @@ result<allocation_units> partition_balancer_planner::get_reallocation(
assignments.group, assignments.id, stable_replicas);

auto reallocation = _partition_allocator.reallocate_partition(
std::move(constraints), stable_assigments);
std::move(constraints), stable_assigments, get_allocation_domain(ntp));

if (!reallocation) {
vlog(
Expand Down
Loading

0 comments on commit 96ed1cc

Please sign in to comment.