Skip to content

Commit

Permalink
Merge pull request #2929 from mmaslankaprv/v21.10.x
Browse files Browse the repository at this point in the history
Backport of #2856, #1576, #2917, #2901, #2937
  • Loading branch information
dotnwat committed Nov 11, 2021
2 parents 8df2069 + 0a34ae0 commit e7b6714
Show file tree
Hide file tree
Showing 23 changed files with 360 additions and 140 deletions.
2 changes: 1 addition & 1 deletion docs/rfcs/20200421_raft_recovery.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ In order to make it possible new fields have to be added to
// next index to send to this follower
model::offset next_index;
// timestamp of last append_entries_rpc call
clock_type::time_point last_append_timestamp;
clock_type::time_point last_sent_append_entries_req_timesptamp;
uint64_t failed_appends{0};
bool is_learner = false;
bool is_recovering = false;
Expand Down
8 changes: 7 additions & 1 deletion src/v/cluster/metadata_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,16 @@ ss::future<model::node_id> metadata_cache::get_leader(
}

std::optional<model::node_id>
metadata_cache::get_leader_id(const model::ntp& ntp) {
metadata_cache::get_leader_id(const model::ntp& ntp) const {
return _leaders.local().get_leader(ntp);
}

std::optional<model::node_id>
metadata_cache::get_previous_leader_id(const model::ntp& ntp) const {
return _leaders.local().get_previous_leader(
model::topic_namespace_view(ntp), ntp.tp.partition);
}

/// If present returns a leader of raft0 group
std::optional<model::node_id> metadata_cache::get_controller_leader_id() {
return _leaders.local().get_leader(model::controller_ntp);
Expand Down
5 changes: 4 additions & 1 deletion src/v/cluster/metadata_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ class metadata_cache {
return contains(model::topic_namespace_view(ntp), ntp.tp.partition);
}

std::optional<model::node_id> get_leader_id(const model::ntp&);
std::optional<model::node_id> get_leader_id(const model::ntp&) const;

std::optional<model::node_id>
get_previous_leader_id(const model::ntp&) const;
/// Returns metadata of all topics in cache internal format
// const cache_t& all_metadata() const { return _cache; }

Expand Down
51 changes: 38 additions & 13 deletions src/v/cluster/partition_leaders_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "cluster/partition_leaders_table.h"

#include "cluster/cluster_utils.h"
#include "cluster/logger.h"
#include "cluster/topic_table.h"
#include "model/fundamental.h"
#include "model/metadata.h"
Expand Down Expand Up @@ -37,19 +38,20 @@ ss::future<> partition_leaders_table::stop() {
return ss::now();
}

std::optional<model::node_id> partition_leaders_table::get_leader(
std::optional<partition_leaders_table::leader_meta>
partition_leaders_table::find_leader_meta(
model::topic_namespace_view tp_ns, model::partition_id pid) const {
const auto& topics_map = _topic_table.local().topics_map();
if (auto it = _leaders.find(leader_key_view{tp_ns, pid});
it != _leaders.end()) {
return it->second.id;
return it->second;
} else if (auto it = topics_map.find(tp_ns); it != topics_map.end()) {
// Possible leadership query for materialized topic, search for it
// in the topics table.
if (!it->second.is_topic_replicable()) {
// Leadership properties of non replicated topic are that of its
// parent
return get_leader(
return find_leader_meta(
model::topic_namespace_view{
tp_ns.ns, it->second.get_source_topic()},
pid);
Expand All @@ -58,6 +60,18 @@ std::optional<model::node_id> partition_leaders_table::get_leader(
return std::nullopt;
}

std::optional<model::node_id> partition_leaders_table::get_previous_leader(
model::topic_namespace_view tp_ns, model::partition_id pid) const {
const auto meta = find_leader_meta(tp_ns, pid);
return meta ? meta->previous_leader : std::nullopt;
}

std::optional<model::node_id> partition_leaders_table::get_leader(
model::topic_namespace_view tp_ns, model::partition_id pid) const {
const auto meta = find_leader_meta(tp_ns, pid);
return meta ? meta->current_leader : std::nullopt;
}

std::optional<model::node_id>
partition_leaders_table::get_leader(const model::ntp& ntp) const {
return get_leader(model::topic_namespace_view(ntp), ntp.tp.partition);
Expand All @@ -74,18 +88,29 @@ void partition_leaders_table::update_partition_leader(
auto [new_it, _] = _leaders.emplace(
leader_key{
model::topic_namespace(ntp.ns, ntp.tp.topic), ntp.tp.partition},
leader_meta{leader_id, term});
leader_meta{.current_leader = leader_id, .update_term = term});
it = new_it;
} else {
// existing partition
if (it->second.update_term > term) {
// Do nothing if update term is older
return;
}
// if current leader has value, store it as a previous leader
if (it->second.current_leader) {
it->second.previous_leader = it->second.current_leader;
}
it->second.current_leader = leader_id;
it->second.update_term = term;
}

if (it->second.update_term > term) {
// Do nothing if update term is older
return;
}
// existing partition
it->second.id = leader_id;
it->second.update_term = term;

vlog(
clusterlog.trace,
"updated partition: {} leader: {{term: {}, current leader: {}, previous "
"leader: {}}}",
ntp,
it->second.update_term,
it->second.current_leader,
it->second.previous_leader);
// notify waiters if update is setting the leader
if (!leader_id) {
return;
Expand Down
22 changes: 20 additions & 2 deletions src/v/cluster/partition_leaders_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <absl/container/flat_hash_map.h>
#include <absl/container/node_hash_map.h>

#include <optional>

namespace cluster {

/// Partition leaders contains information about currently elected partition
Expand All @@ -40,6 +42,14 @@ class partition_leaders_table {
std::optional<model::node_id>
get_leader(model::topic_namespace_view, model::partition_id) const;

/**
* Returns previous reader of partition if available. This is required by
* Kafka metadata APIs since it require us to return former leader id even
* it the leader is not present in a given time point
*/
std::optional<model::node_id> get_previous_leader(
model::topic_namespace_view, model::partition_id) const;

ss::future<model::node_id> wait_for_leader(
const model::ntp&,
ss::lowres_clock::time_point,
Expand All @@ -58,7 +68,7 @@ class partition_leaders_table {
// clang-format on
void for_each_leader(Func&& f) const {
for (auto& [k, v] : _leaders) {
f(k.tp_ns, k.pid, v.id, v.update_term);
f(k.tp_ns, k.pid, v.current_leader, v.update_term);
}
}

Expand Down Expand Up @@ -128,10 +138,18 @@ class partition_leaders_table {

// in order to filter out reordered requests we store last update term
struct leader_meta {
std::optional<model::node_id> id;
// current leader id, this may be empty if a group is in the middle of
// leader election
std::optional<model::node_id> current_leader;
// previous leader id, this is empty if and only if there were no leader
// elected for the topic before
std::optional<model::node_id> previous_leader;
model::term_id update_term;
};

std::optional<leader_meta>
find_leader_meta(model::topic_namespace_view, model::partition_id) const;

absl::flat_hash_map<leader_key, leader_meta, leader_key_hash, leader_key_eq>
_leaders;

Expand Down
69 changes: 62 additions & 7 deletions src/v/kafka/server/handlers/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "cluster/topics_frontend.h"
#include "cluster/types.h"
#include "config/configuration.h"
#include "config/node_config.h"
#include "kafka/server/errors.h"
#include "kafka/server/handlers/details/security.h"
#include "kafka/server/handlers/topics/topic_utils.h"
Expand All @@ -29,17 +30,69 @@

namespace kafka {

metadata_response::topic
make_topic_response_from_topic_metadata(model::topic_metadata&& tp_md) {
static constexpr model::node_id no_leader(-1);
/**
* We use simple heuristic to tolerate isolation of a node hosting both
* partition leader and follower.
*
* Kafka clients request metadata refresh in case they receive error that is
* related with stale metadata - f.e. NOT_LEADER. Metadata request can be
* processed by any broker and there is no general rule for that which
* broker to choose to refresh metadata from. (f.e. Java kafka client uses the
* broker with active least loaded connection.) This may lead to the situation
* in which client will ask for metadata always the same broker. When that
* broker is isolated from rest of the cluster it will never update its metadata
* view. This way the client will always receive stale metadata.
*
* This behavior may lead to a live lock in an event of network partition. If
* current partition leader is isolated from the cluster it will keep answering
* with its id in the leader_id field for that partition (according to policy
* where we return a former leader - there is no leader for that broker, it is a
* candidate). Client will retry produce or fetch request and receive NOT_LEADER
* error, this will force client to request metadata update, broker will respond
* with the same metadata and the whole cycle will loop indefinitely.
*
* In order to break the loop and force client to make progress we use following
* heuristics:
*
* 1) when current leader is unknown, return former leader (Kafka behavior)
*
* 2) when current leader is unknown and previous leader is equal to current
* node id select random replica_id as a leader (indicate leader isolation)
*
* With those heuristics we will always force the client to communicate with the
* nodes that may not be partitioned.
*/
model::node_id get_leader(
const model::ntp& ntp,
const cluster::metadata_cache& md_cache,
const std::vector<model::node_id>& replicas) {
const auto current = md_cache.get_leader_id(ntp);
if (current) {
return *current;
}

const auto previous = md_cache.get_previous_leader_id(ntp);
if (previous == config::node().node_id()) {
auto idx = fast_prng_source() % replicas.size();
return replicas[idx];
}

return previous.value_or(no_leader);
}

metadata_response::topic make_topic_response_from_topic_metadata(
const cluster::metadata_cache& md_cache, model::topic_metadata&& tp_md) {
metadata_response::topic tp;
tp.error_code = error_code::none;
auto tp_ns = tp_md.tp_ns;
tp.name = std::move(tp_md.tp_ns.tp);
tp.is_internal = false; // no internal topics yet
std::transform(
tp_md.partitions.begin(),
tp_md.partitions.end(),
std::back_inserter(tp.partitions),
[](model::partition_metadata& p_md) {
[tp_ns = std::move(tp_ns), &md_cache](model::partition_metadata& p_md) {
std::vector<model::node_id> replicas{};
replicas.reserve(p_md.replicas.size());
std::transform(
Expand All @@ -50,7 +103,8 @@ make_topic_response_from_topic_metadata(model::topic_metadata&& tp_md) {
metadata_response::partition p;
p.error_code = error_code::none;
p.partition_index = p_md.id;
p.leader_id = p_md.leader_node.value_or(model::node_id(-1));
p.leader_id = get_leader(
model::ntp(tp_ns.ns, tp_ns.tp, p_md.id), md_cache, replicas);
p.replica_nodes = std::move(replicas);
p.isr_nodes = p.replica_nodes;
p.offline_replicas = {};
Expand Down Expand Up @@ -95,9 +149,9 @@ create_topic(request_context& ctx, model::topic&& topic) {
res,
ctx.controller_api(),
tout + model::timeout_clock::now())
.then([tp_md = std::move(tp_md)]() mutable {
.then([&ctx, tp_md = std::move(tp_md)]() mutable {
return make_topic_response_from_topic_metadata(
std::move(tp_md.value()));
ctx.metadata_cache(), std::move(tp_md.value()));
});
})
.handle_exception([topic = std::move(topic)](
Expand Down Expand Up @@ -125,7 +179,8 @@ static metadata_response::topic make_topic_response(
details::authorized_operations(ctx, md.tp_ns.tp));
}

auto res = make_topic_response_from_topic_metadata(std::move(md));
auto res = make_topic_response_from_topic_metadata(
ctx.metadata_cache(), std::move(md));
res.topic_authorized_operations = auth_operations;
return res;
}
Expand Down
5 changes: 2 additions & 3 deletions src/v/raft/append_entries_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,13 @@ void append_entries_buffer::propagate_results(
replies.size());
auto resp_it = response_promises.begin();
for (auto& reply : replies) {
auto lstats = _consensus._log.offsets();
ss::visit(
reply,
[&resp_it, &lstats](append_entries_reply r) {
[&resp_it, this](append_entries_reply r) {
// this is important, we want to update response committed
// offset here as we flushed after the response structure was
// created
r.last_committed_log_index = lstats.committed_offset;
r.last_flushed_log_index = _consensus._flushed_offset;
resp_it->set_value(r);
},
[&resp_it](std::exception_ptr& e) {
Expand Down
Loading

0 comments on commit e7b6714

Please sign in to comment.