Skip to content

Commit

Permalink
Merge 'Set dc and rack in gossiper when loaded from system.peers and …
Browse files Browse the repository at this point in the history
…load the ignored nodes state for replace' from Benny Halevy

The problem this series solves is correctly ignoring DOWN nodes state
when replacing a node.

When a node is replaced and there are other nodes that are down, the
replacing node is told to ignore those DOWN nodes using the
`ignore_dead_nodes_for_replace` option.

Since the replacing node is bootstrapping it starts with an empty
system.peers table so it has no notion about any node state and it
learns about all other nodes via gossip shadow round done in
`storage_service::prepare_replacement_info`.

Normally, since the DOWN nodes to ignore already joined the ring, the
remaining node will have their endpoint state already in gossip, but if
the whole cluster was restarted while those DOWN nodes did not start,
the remaining nodes will only have a partial endpoint state from them,
which is loaded from system.peers.

Currently, the partial endpoint state contains only `HOST_ID` and
`TOKENS`, and in particular it lacks `STATUS`, `DC`, and `RACK`.

The first part of this series loads also `DC` and `RACK` from
system.peers to make them available to the replacing node as they are
crucial for building a correct replication map with network topology
replication strategy.

But still, without a `STATUS` those nodes are not considered as normal
token owners yet, and they do not go through handle_state_normal which
adds them to the topology and token_metadata.

The second part of this series uses the endpoint state retrieved in the
gossip shadow round to explicitly add the ignored nodes' state to
topology (including dc and rack) and token_metadata (tokens) in
`prepare_replacement_info`.  If there are more DOWN nodes that are not
explicitly ignored replace will fail (as it should).

Fixes #15787

Closes #15788

* github.com:scylladb/scylladb:
  storage_service: join_token_ring: load ignored nodes state if replacing
  storage_service: replacement_info: return ignore_nodes state
  locator: host_id_or_endpoint: keep value as variant
  gms: endpoint_state: add getters for host_id, dc_rack, and tokens
  storage_service: topology_state_load: set local STATUS state using add_saved_endpoint
  gossiper: add_saved_endpoint: set dc and rack
  gossiper: add_saved_endpoint: fixup indentation
  gossiper: add_saved_endpoint: make host_id mandatory
  gossiper: add load_endpoint_state
  gossiper: start_gossiping: log local state
  • Loading branch information
kbr-scylla committed Apr 16, 2024
2 parents 2c3d6fe + 655d624 commit eb9ba91
Show file tree
Hide file tree
Showing 11 changed files with 303 additions and 137 deletions.
2 changes: 1 addition & 1 deletion api/storage_service.cc
Expand Up @@ -103,7 +103,7 @@ static bool any_of_keyspaces_use_tablets(const http_context& ctx) {

locator::host_id validate_host_id(const sstring& param) {
auto hoep = locator::host_id_or_endpoint(param, locator::host_id_or_endpoint::param_type::host_id);
return hoep.id;
return hoep.id();
}

bool validate_bool(const sstring& param) {
Expand Down
63 changes: 44 additions & 19 deletions db/system_keyspace.cc
Expand Up @@ -1511,25 +1511,6 @@ future<> system_keyspace::peers_table_read_fixup() {
}
}

future<std::unordered_map<gms::inet_address, locator::endpoint_dc_rack>> system_keyspace::load_dc_rack_info() {
co_await peers_table_read_fixup();

const auto msg = co_await execute_cql(format("SELECT peer, data_center, rack from system.{}", PEERS));

std::unordered_map<gms::inet_address, locator::endpoint_dc_rack> ret;
for (const auto& row : *msg) {
if (!row.has("data_center") || !row.has("rack")) {
continue;
}
ret.emplace(row.get_as<net::inet_address>("peer"), locator::endpoint_dc_rack {
row.get_as<sstring>("data_center"),
row.get_as<sstring>("rack")
});
}

co_return ret;
}

future<> system_keyspace::build_bootstrap_info() {
sstring req = format("SELECT bootstrapped FROM system.{} WHERE key = ? ", LOCAL);
return execute_cql(req, sstring(LOCAL)).then([this] (auto msg) {
Expand Down Expand Up @@ -1810,6 +1791,50 @@ future<std::unordered_map<gms::inet_address, locator::host_id>> system_keyspace:
co_return ret;
}

future<std::unordered_map<locator::host_id, gms::loaded_endpoint_state>> system_keyspace::load_endpoint_state() {
co_await peers_table_read_fixup();

const auto msg = co_await execute_cql(format("SELECT peer, host_id, tokens, data_center, rack from system.{}", PEERS));

std::unordered_map<locator::host_id, gms::loaded_endpoint_state> ret;
for (const auto& row : *msg) {
gms::loaded_endpoint_state st;
auto ep = row.get_as<net::inet_address>("peer");
if (!row.has("host_id")) {
// Must never happen after `peers_table_read_fixup` call above
on_internal_error_noexcept(slogger, format("load_endpoint_state: node {} has no host_id in system.{}", ep, PEERS));
}
auto host_id = locator::host_id(row.get_as<utils::UUID>("host_id"));
if (row.has("tokens")) {
st.tokens = decode_tokens(deserialize_set_column(*peers(), row, "tokens"));
if (st.tokens.empty()) {
slogger.error("load_endpoint_state: node {}/{} has tokens column present but tokens are empty", host_id, ep);
continue;
}
} else {
slogger.warn("Endpoint {} has no tokens in system.{}", ep, PEERS);
}
if (row.has("data_center") && row.has("rack")) {
st.opt_dc_rack.emplace(locator::endpoint_dc_rack {
row.get_as<sstring>("data_center"),
row.get_as<sstring>("rack")
});
if (st.opt_dc_rack->dc.empty() || st.opt_dc_rack->rack.empty()) {
slogger.error("load_endpoint_state: node {}/{} has empty dc={} or rack={}", host_id, ep, st.opt_dc_rack->dc, st.opt_dc_rack->rack);
continue;
}
} else {
slogger.warn("Endpoint {} has no {} in system.{}", ep,
!row.has("data_center") && !row.has("rack") ? "data_center nor rack" : !row.has("data_center") ? "data_center" : "rack",
PEERS);
}
st.endpoint = ep;
ret.emplace(host_id, std::move(st));
}

co_return ret;
}

future<std::vector<gms::inet_address>> system_keyspace::load_peers() {
co_await peers_table_read_fixup();

Expand Down
5 changes: 3 additions & 2 deletions db/system_keyspace.hh
Expand Up @@ -15,6 +15,7 @@
#include <utility>
#include <vector>
#include "db/system_auth_keyspace.hh"
#include "gms/gossiper.hh"
#include "schema/schema_fwd.hh"
#include "utils/UUID.hh"
#include "query-result-set.hh"
Expand Down Expand Up @@ -352,9 +353,9 @@ public:


/**
* Return a map of IP addresses containing a map of dc and rack info
* Return a map of nodes and their loaded_endpoint_state
*/
future<std::unordered_map<gms::inet_address, locator::endpoint_dc_rack>> load_dc_rack_info();
future<std::unordered_map<locator::host_id, gms::loaded_endpoint_state>> load_endpoint_state();

enum class bootstrap_state {
NEEDS_BOOTSTRAP,
Expand Down
37 changes: 35 additions & 2 deletions gms/endpoint_state.cc
Expand Up @@ -12,9 +12,12 @@
#include "gms/i_endpoint_state_change_subscriber.hh"
#include <ostream>
#include <boost/lexical_cast.hpp>
#include "log.hh"

namespace gms {

logging::logger logger("endpoint_state");

static_assert(std::is_default_constructible_v<heart_beat_state>);
static_assert(std::is_nothrow_copy_constructible_v<heart_beat_state>);
static_assert(std::is_nothrow_move_constructible_v<heart_beat_state>);
Expand Down Expand Up @@ -51,10 +54,40 @@ bool endpoint_state::is_cql_ready() const noexcept {
}

locator::host_id endpoint_state::get_host_id() const noexcept {
locator::host_id host_id;
if (auto app_state = get_application_state_ptr(application_state::HOST_ID)) {
return locator::host_id(utils::UUID(app_state->value()));
host_id = locator::host_id(utils::UUID(app_state->value()));
if (!host_id) {
on_internal_error_noexcept(logger, format("Node has null host_id"));
}
}
return host_id;
}

std::optional<locator::endpoint_dc_rack> endpoint_state::get_dc_rack() const {
if (auto app_state = get_application_state_ptr(application_state::DC)) {
std::optional<locator::endpoint_dc_rack> ret;
ret->dc = app_state->value();
if ((app_state = get_application_state_ptr(application_state::RACK))) {
ret->rack = app_state->value();
if (ret->dc.empty() || ret->rack.empty()) {
on_internal_error_noexcept(logger, format("Node {} has empty dc={} or rack={}", get_host_id(), ret->dc, ret->rack));
}
return ret;
}
}
return std::nullopt;
}

std::unordered_set<dht::token> endpoint_state::get_tokens() const {
std::unordered_set<dht::token> ret;
if (auto app_state = get_application_state_ptr(application_state::TOKENS)) {
ret = versioned_value::tokens_from_string(app_state->value());
if (ret.empty()) {
on_internal_error_noexcept(logger, format("Node {} has empty tokens state", get_host_id()));
}
}
return locator::host_id::create_null_id();
return ret;
}

future<> i_endpoint_state_change_subscriber::on_application_state_change(inet_address endpoint,
Expand Down
7 changes: 7 additions & 0 deletions gms/endpoint_state.hh
Expand Up @@ -14,6 +14,7 @@
#include "gms/application_state.hh"
#include "gms/versioned_value.hh"
#include "locator/host_id.hh"
#include "locator/types.hh"

namespace gms {

Expand Down Expand Up @@ -152,6 +153,12 @@ public:
// or a null host_id if the application state is not found.
locator::host_id get_host_id() const noexcept;

std::optional<locator::endpoint_dc_rack> get_dc_rack() const;

// Return the value of the TOKENS application state
// or an empty set if the application state is not found.
std::unordered_set<dht::token> get_tokens() const;

friend fmt::formatter<endpoint_state>;
};

Expand Down
52 changes: 38 additions & 14 deletions gms/gossiper.cc
Expand Up @@ -2020,11 +2020,9 @@ future<> gossiper::start_gossiping(gms::generation_type generation_nbr, applicat
local_state.add_application_state(entry.first, entry.second);
}

auto generation = local_state.get_heart_beat_state().get_generation();
co_await replicate(get_broadcast_address(), local_state, permit.id());

co_await replicate(get_broadcast_address(), std::move(local_state), permit.id());

logger.trace("gossip started with generation {}", generation);
logger.info("Gossip started with local state: {}", local_state);
_enabled = true;
_nr_run = 0;
_scheduled_gossip_task.arm(INTERVAL);
Expand Down Expand Up @@ -2130,11 +2128,21 @@ void gossiper::build_seeds_list() {
}
}

future<> gossiper::add_saved_endpoint(inet_address ep, permit_id pid) {
if (ep == get_broadcast_address()) {
future<> gossiper::add_saved_endpoint(locator::host_id host_id, gms::loaded_endpoint_state st, permit_id pid) {
if (host_id == my_host_id()) {
logger.debug("Attempt to add self as saved endpoint");
co_return;
}
const auto& ep = st.endpoint;
if (!host_id) {
on_internal_error(logger, format("Attempt to add {} with null host_id as saved endpoint", ep));
}
if (ep == inet_address{}) {
on_internal_error(logger, format("Attempt to add {} with null inet_address as saved endpoint", host_id));
}
if (ep == get_broadcast_address()) {
on_internal_error(logger, format("Attempt to add {} with broadcast_address {} as saved endpoint", host_id, ep));
}

auto permit = co_await lock_endpoint(ep, pid);

Expand All @@ -2155,14 +2163,18 @@ future<> gossiper::add_saved_endpoint(inet_address ep, permit_id pid) {
// It will get updated as a whole by handle_major_state_change
// via do_apply_state_locally when (remote_generation > local_generation)
const auto tmptr = get_token_metadata_ptr();
auto host_id = tmptr->get_host_id_if_known(ep);
if (host_id) {
ep_state.add_application_state(gms::application_state::HOST_ID, versioned_value::host_id(host_id.value()));
auto tokens = tmptr->get_tokens(*host_id);
if (!tokens.empty()) {
std::unordered_set<dht::token> tokens_set(tokens.begin(), tokens.end());
ep_state.add_application_state(gms::application_state::TOKENS, versioned_value::tokens(tokens_set));
}
ep_state.add_application_state(gms::application_state::HOST_ID, versioned_value::host_id(host_id));
auto tokens = tmptr->get_tokens(host_id);
if (!tokens.empty()) {
std::unordered_set<dht::token> tokens_set(tokens.begin(), tokens.end());
ep_state.add_application_state(gms::application_state::TOKENS, versioned_value::tokens(tokens_set));
}
if (st.opt_dc_rack) {
ep_state.add_application_state(gms::application_state::DC, gms::versioned_value::datacenter(st.opt_dc_rack->dc));
ep_state.add_application_state(gms::application_state::RACK, gms::versioned_value::datacenter(st.opt_dc_rack->rack));
}
if (st.opt_status) {
ep_state.add_application_state(gms::application_state::STATUS, std::move(*st.opt_status));
}
auto generation = ep_state.get_heart_beat_state().get_generation();
co_await replicate(ep, std::move(ep_state), permit.id());
Expand Down Expand Up @@ -2683,4 +2695,16 @@ locator::token_metadata_ptr gossiper::get_token_metadata_ptr() const noexcept {
return _shared_token_metadata.get();
}

std::ostream& operator<<(std::ostream& os, const loaded_endpoint_state& st) {
fmt::print(os, "{}", st);
return os;
}

} // namespace gms

auto fmt::formatter<gms::loaded_endpoint_state>::format(const gms::loaded_endpoint_state& st, fmt::format_context& ctx) const -> decltype(ctx.out()) {
return fmt::format_to(ctx.out(), "{{ endpoint={} dc={} rack={} tokens={} }}", st.endpoint,
st.opt_dc_rack ? st.opt_dc_rack->dc : "",
st.opt_dc_rack ? st.opt_dc_rack->rack : "",
st.tokens);
}
21 changes: 20 additions & 1 deletion gms/gossiper.hh
Expand Up @@ -36,6 +36,7 @@
#include <seastar/core/abort_source.hh>
#include <seastar/core/scheduling.hh>
#include "locator/token_metadata.hh"
#include "locator/types.hh"

namespace db {
class config;
Expand Down Expand Up @@ -76,6 +77,15 @@ struct gossip_config {
uint32_t skip_wait_for_gossip_to_settle = -1;
};

struct loaded_endpoint_state {
gms::inet_address endpoint;
std::unordered_set<dht::token> tokens;
std::optional<locator::endpoint_dc_rack> opt_dc_rack;
std::optional<gms::versioned_value> opt_status;
};

std::ostream& operator<<(std::ostream& os, const loaded_endpoint_state& st);

/**
* This module is responsible for Gossiping information for the local endpoint. This abstraction
* maintains the list of live and dead endpoints. Periodically i.e. every 1 second this module
Expand Down Expand Up @@ -139,6 +149,9 @@ public:
return _gcfg.partitioner;
}

locator::host_id my_host_id() const noexcept {
return get_token_metadata_ptr()->get_topology().my_host_id();
}
inet_address get_broadcast_address() const noexcept {
return get_token_metadata_ptr()->get_topology().my_address();
}
Expand Down Expand Up @@ -608,7 +621,7 @@ public:
/**
* Add an endpoint we knew about previously, but whose state is unknown
*/
future<> add_saved_endpoint(inet_address ep, permit_id);
future<> add_saved_endpoint(locator::host_id host_id, loaded_endpoint_state st, permit_id);

future<> add_local_application_state(application_state state, versioned_value value);

Expand Down Expand Up @@ -704,3 +717,9 @@ struct gossip_get_endpoint_states_response {
};

} // namespace gms

template <>
struct fmt::formatter<gms::loaded_endpoint_state> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
auto format(const gms::loaded_endpoint_state&, fmt::format_context& ctx) const -> decltype(ctx.out());
};
46 changes: 23 additions & 23 deletions locator/token_metadata.cc
Expand Up @@ -975,12 +975,6 @@ token_metadata::get_endpoint_for_host_id(host_id host_id) const {
return _impl->get_endpoint_for_host_id(host_id);
}

host_id_or_endpoint token_metadata::parse_host_id_and_endpoint(const sstring& host_id_string) const {
auto res = host_id_or_endpoint(host_id_string);
res.resolve(*this);
return res;
}

std::unordered_map<inet_address, host_id>
token_metadata::get_endpoint_to_host_id_map_for_reading() const {
return _impl->get_endpoint_to_host_id_map_for_reading();
Expand Down Expand Up @@ -1250,45 +1244,51 @@ host_id_or_endpoint::host_id_or_endpoint(const sstring& s, param_type restrict)
switch (restrict) {
case param_type::host_id:
try {
id = host_id(utils::UUID(s));
_value = host_id(utils::UUID(s));
} catch (const marshal_exception& e) {
throw std::invalid_argument(format("Invalid host_id {}: {}", s, e.what()));
}
break;
case param_type::endpoint:
try {
endpoint = gms::inet_address(s);
_value = gms::inet_address(s);
} catch (std::invalid_argument& e) {
throw std::invalid_argument(format("Invalid inet_address {}: {}", s, e.what()));
}
break;
case param_type::auto_detect:
try {
id = host_id(utils::UUID(s));
_value = host_id(utils::UUID(s));
} catch (const marshal_exception& e) {
try {
endpoint = gms::inet_address(s);
_value = gms::inet_address(s);
} catch (std::invalid_argument& e) {
throw std::invalid_argument(format("Invalid host_id or inet_address {}", s));
}
}
}
}

void host_id_or_endpoint::resolve(const token_metadata& tm) {
if (id) {
auto endpoint_opt = tm.get_endpoint_for_host_id_if_known(id);
if (!endpoint_opt) {
throw std::runtime_error(format("Host ID {} not found in the cluster", id));
}
endpoint = *endpoint_opt;
} else {
auto opt_id = tm.get_host_id_if_known(endpoint);
if (!opt_id) {
throw std::runtime_error(format("Host inet address {} not found in the cluster", endpoint));
}
id = *opt_id;
host_id host_id_or_endpoint::resolve_id(const token_metadata& tm) const {
if (has_host_id()) {
return id();
}
auto opt_id = tm.get_host_id_if_known(endpoint());
if (!opt_id) {
throw std::runtime_error(format("Host inet address {} not found in the cluster", endpoint()));
}
return *opt_id;
}

gms::inet_address host_id_or_endpoint::resolve_endpoint(const token_metadata& tm) const {
if (has_endpoint()) {
return endpoint();
}
auto endpoint_opt = tm.get_endpoint_for_host_id_if_known(id());
if (!endpoint_opt) {
throw std::runtime_error(format("Host ID {} not found in the cluster", id()));
}
return *endpoint_opt;
}

} // namespace locator

0 comments on commit eb9ba91

Please sign in to comment.