Skip to content

Commit

Permalink
Merge 'Fix node replace with tablets for RF=N' from Tomasz Grabiec
Browse files Browse the repository at this point in the history
This PR fixes a problem with replacing a node with tablets when
RF=N. Currently, this will fail because tablet replica allocation for
rebuild will not be able to find a viable destination, as the replacing node
is not considered to be a candidate. It cannot be a candidate because
replace rolls back on failure and we cannot roll back after tablets
were migrated.

The solution taken here is to not drain tablet replicas from replaced
node during topology request but leave it to happen later after the
replaced node is in left state and replacing node is in normal state.

The replacing node waits for this draining to be complete on boot
before the node is considered booted.

Fixes #17025

Nodes in the left state will be kept in tablet replica sets for a while after node
replace is done, until the new replica is rebuilt. So we need to know
about those node's location (dc, rack) for two reasons:

 1) algorithms which work with replica sets filter nodes based on their location. For example materialized views code which pairs base replicas with view replicas filters by datacenter first.

 2) tablet scheduler needs to identify each node's location in order to make decisions about new replica placement.

It's ok to not know the IP, and we don't keep it. Those nodes will not
be present in the IP-based replica sets, e.g. those returned by
get_natural_endpoints(), only in host_id-based replica
sets. storage_proxy request coordination is not affected.

Nodes in the left state are still not present in token ring, and not
considered to be members of the ring (datacanter endpoints excludes them).

In the future we could make the change even more transparent by only
loading locator::node* for those nodes and keeping node* in tablet replica sets.

Currently left nodes are never removed from topology, so will
accumulate in memory. We could garbage-collect them from topology
coordinator if a left node is absent in any replica set. That means we
need a new state - left_for_real.

Closes #17388

* github.com:scylladb/scylladb:
  test: py: Add test for view replica pairing after replace
  raft, api: Add RESTful API to query current leader of a raft group
  test: test_tablets_removenode: Verify replacing when there is no spare node
  doc: topology-on-raft: Document replace behavior with tablets
  tablets, raft topology: Rebuild tablets after replacing node is normal
  tablets: load_balancer: Access node attributes via node struct
  tablets: load_balancer: Extract ensure_node()
  mv: Switch to using host_id-based replica set
  effective_replication_map: Introduce host_id-based get_replicas()
  raft topology: Keep nodes in the left state to topology
  tablets: Introduce read_required_hosts()
  • Loading branch information
avikivity committed Mar 18, 2024
2 parents d1c35f9 + a233a69 commit 72bbe75
Show file tree
Hide file tree
Showing 26 changed files with 606 additions and 100 deletions.
24 changes: 24 additions & 0 deletions api/api-doc/raft.json
Expand Up @@ -38,6 +38,30 @@
]
}
]
},
{
"path":"/raft/leader_host",
"operations":[
{
"method":"GET",
"summary":"Returns host ID of the current leader of the given Raft group",
"type":"string",
"nickname":"get_leader_host",
"produces":[
"application/json"
],
"parameters":[
{
"name":"group_id",
"description":"The ID of the group. When absent, group0 is used.",
"required":false,
"allowMultiple":false,
"type":"string",
"paramType":"query"
}
]
}
]
}
]
}
14 changes: 14 additions & 0 deletions api/raft.cc
Expand Up @@ -60,10 +60,24 @@ void set_raft(http_context&, httpd::routes& r, sharded<service::raft_group_regis

co_return json_void{};
});
r::get_leader_host.set(r, [&raft_gr] (std::unique_ptr<http::request> req) -> future<json_return_type> {
return smp::submit_to(0, [&] {
auto& srv = std::invoke([&] () -> raft::server& {
if (req->query_parameters.contains("group_id")) {
raft::group_id id{utils::UUID{req->get_query_param("group_id")}};
return raft_gr.local().get_server(id);
} else {
return raft_gr.local().group0();
}
});
return json_return_type(srv.current_leader().to_sstring());
});
});
}

void unset_raft(http_context&, httpd::routes& r) {
r::trigger_snapshot.unset(r);
r::get_leader_host.unset(r);
}

}
Expand Down
5 changes: 4 additions & 1 deletion db/system_keyspace.cc
Expand Up @@ -2677,7 +2677,7 @@ static std::set<sstring> decode_features(const set_type_impl::native_type& featu
return fset;
}

future<service::topology> system_keyspace::load_topology_state() {
future<service::topology> system_keyspace::load_topology_state(const std::unordered_set<locator::host_id>& force_load_hosts) {
auto rs = co_await execute_cql(
format("SELECT * FROM system.{} WHERE key = '{}'", TOPOLOGY, TOPOLOGY));
assert(rs);
Expand Down Expand Up @@ -2793,6 +2793,9 @@ future<service::topology> system_keyspace::load_topology_state() {
}
} else if (nstate == service::node_state::left) {
ret.left_nodes.emplace(host_id);
if (force_load_hosts.contains(locator::host_id(host_id.uuid()))) {
map = &ret.left_nodes_rs;
}
} else if (nstate == service::node_state::none) {
map = &ret.new_nodes;
} else {
Expand Down
4 changes: 3 additions & 1 deletion db/system_keyspace.hh
Expand Up @@ -535,7 +535,9 @@ public:
// Assumes that the history table exists, i.e. Raft experimental feature is enabled.
future<bool> group0_history_contains(utils::UUID state_id);

future<service::topology> load_topology_state();
// force_load_hosts is a set of hosts which must be loaded even if they are in the left state.
future<service::topology> load_topology_state(const std::unordered_set<locator::host_id>& force_load_hosts);

future<std::optional<service::topology_features>> load_topology_features_state();

// Read CDC generation data with the given UUID as key.
Expand Down
20 changes: 12 additions & 8 deletions db/view/view.cc
Expand Up @@ -1611,21 +1611,24 @@ get_view_natural_endpoint(
const dht::token& view_token,
bool use_legacy_self_pairing) {
auto& topology = base_erm->get_token_metadata_ptr()->get_topology();
auto my_address = topology.my_address();
auto me = topology.my_host_id();
auto my_datacenter = topology.get_datacenter();
std::vector<gms::inet_address> base_endpoints, view_endpoints;
for (auto&& base_endpoint : base_erm->get_natural_endpoints(base_token)) {
std::vector<locator::host_id> base_endpoints, view_endpoints;

// We need to use get_replicas() for pairing to be stable in case base or view tablet
// is rebuilding a replica which has left the ring. get_natural_endpoints() filters such replicas.
for (auto&& base_endpoint : base_erm->get_replicas(base_token)) {
if (!network_topology || topology.get_datacenter(base_endpoint) == my_datacenter) {
base_endpoints.push_back(base_endpoint);
}
}

for (auto&& view_endpoint : view_erm->get_natural_endpoints(view_token)) {
for (auto&& view_endpoint : view_erm->get_replicas(view_token)) {
if (use_legacy_self_pairing) {
// If this base replica is also one of the view replicas, we use
// ourselves as the view replica.
if (view_endpoint == my_address) {
return view_endpoint;
if (view_endpoint == me) {
return topology.my_address();
}
// We have to remove any endpoint which is shared between the base
// and the view, as it will select itself and throw off the counts
Expand All @@ -1645,14 +1648,15 @@ get_view_natural_endpoint(
}

assert(base_endpoints.size() == view_endpoints.size());
auto base_it = std::find(base_endpoints.begin(), base_endpoints.end(), my_address);
auto base_it = std::find(base_endpoints.begin(), base_endpoints.end(), me);
if (base_it == base_endpoints.end()) {
// This node is not a base replica of this key, so we return empty
// FIXME: This case shouldn't happen, and if it happens, a view update
// would be lost. We should reported or count this case.
return {};
}
return view_endpoints[base_it - base_endpoints.begin()];
auto replica = view_endpoints[base_it - base_endpoints.begin()];
return topology.get_node(replica).endpoint();
}

static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, locator::effective_replication_map_ptr ermp,
Expand Down
23 changes: 22 additions & 1 deletion docs/dev/topology-over-raft.md
Expand Up @@ -15,6 +15,10 @@ Node state can be one of those:

Nodes in state left are never removed from the state.

Nodes in state `left` may still appear as tablet replicas in host_id-based replica sets
(`effective_replication_map::get_replicas()`), but they never appear in IP-based replica sets, e.g. those returned by
`effective_replication_map::get_natural_endpoints()`.

State transition diagram for nodes:
```mermaid
stateDiagram-v2
Expand Down Expand Up @@ -114,11 +118,28 @@ that there are no tablet transitions in the system.
Tablets are migrated in parallel and independently.

There is a variant of tablet migration track called tablet draining track, which is invoked
as a step of certain topology operations (e.g. decommission, removenode, replace). Its goal is to readjust tablet replicas
as a step of certain topology operations (e.g. decommission, removenode). Its goal is to readjust tablet replicas
so that a given topology change can proceed. For example, when decommissioning a node, we
need to migrate tablet replicas away from the node being decommissioned.
Tablet draining happens before making changes to vnode-based replication.

## Node replace with tablets

Tablet replicas on the replaced node are rebuilt after the replacing node is already in the normal state and
the replaced node is in the left state.

Until old replicas are rebuilt, the availability in the cluster is reduced. If another node becomes unavailable, we
may have two unavailable replicas for some tablets. Admin needs to know that and not start rolling restart for example.
To avoid surprises, the replaced node waits on boot for tablet replicas to finish rebuilding
so that admin sees the replace as finished after availability was restored.

### Impact on repair

When tablet is rebuilt in the background after replace, its primary replica may be on the node which is no
longer in topology. This means that running repair -pr on all nodes will not repair such a tablet, but it's fine because
we decided that repair can be optimistic. It's safe with regards to tombstone gc because expiry is decided per table per token range
based on actual repair time of that range. Unrepaired tablets will not have their token range marked as repaired.

# Tablet transitions

Tablets can undergo a process called "transition", which performs some maintenance action on the tablet which is
Expand Down
14 changes: 12 additions & 2 deletions locator/abstract_replication_strategy.cc
Expand Up @@ -492,14 +492,24 @@ auto vnode_effective_replication_map::clone_data_gently() const -> future<std::u
co_return std::move(result);
}

inet_address_vector_replica_set vnode_effective_replication_map::do_get_natural_endpoints(const token& tok,
host_id_vector_replica_set vnode_effective_replication_map::do_get_replicas(const token& tok,
bool is_vnode) const
{
const token& key_token = _rs->natural_endpoints_depend_on_token()
? (is_vnode ? tok : _tmptr->first_token(tok))
: default_replication_map_key;
const auto it = _replication_map.find(key_token);
return resolve_endpoints<inet_address_vector_replica_set>(it->second, *_tmptr);
return it->second;
}

inet_address_vector_replica_set vnode_effective_replication_map::do_get_natural_endpoints(const token& tok,
bool is_vnode) const
{
return resolve_endpoints<inet_address_vector_replica_set>(do_get_replicas(tok, is_vnode), *_tmptr);
}

host_id_vector_replica_set vnode_effective_replication_map::get_replicas(const token& tok) const {
return do_get_replicas(tok, false);
}

inet_address_vector_replica_set vnode_effective_replication_map::get_natural_endpoints(const token& search_token) const {
Expand Down
19 changes: 17 additions & 2 deletions locator/abstract_replication_strategy.hh
Expand Up @@ -210,10 +210,17 @@ public:
/// operation which adds a replica which has the same address as the replaced replica.
/// Use get_natural_endpoints_without_node_being_replaced() to get replicas without any pending replicas.
/// This won't be necessary after we implement https://github.com/scylladb/scylladb/issues/6403.
///
/// Excludes replicas which are in the left state. After replace, the replaced replica may
/// still be in the replica set of the tablet until tablet scheduler rebuilds the replacing replica.
/// The old replica will not be listed here. This is necessary to support replace-with-the-same-ip
/// scenario. Since we return IPs here, writes to the old replica would be incorrectly routed to the
/// new replica.
///
/// The returned addresses are present in the topology object associated with this instance.
virtual inet_address_vector_replica_set get_natural_endpoints(const token& search_token) const = 0;

/// Returns addresses of replicas for a given token.
/// Does not include pending replicas.
/// Returns a subset of replicas returned by get_natural_endpoints() without the pending replica.
virtual inet_address_vector_replica_set get_natural_endpoints_without_node_being_replaced(const token& search_token) const = 0;

/// Returns the set of pending replicas for a given token.
Expand All @@ -224,6 +231,12 @@ public:
/// Returns a list of nodes to which a read request should be directed.
virtual inet_address_vector_replica_set get_endpoints_for_reading(const token& search_token) const = 0;

/// Returns replicas for a given token.
/// During topology change returns replicas which should be targets for writes, excluding the pending replica.
/// Unlike get_natural_endpoints(), the replica set may include nodes in the left state which were
/// replaced but not yet rebuilt.
virtual host_id_vector_replica_set get_replicas(const token& search_token) const = 0;

virtual std::optional<tablet_routing_info> check_locality(const token& token) const = 0;


Expand Down Expand Up @@ -311,6 +324,7 @@ public: // effective_replication_map
inet_address_vector_replica_set get_natural_endpoints_without_node_being_replaced(const token& search_token) const override;
inet_address_vector_topology_change get_pending_endpoints(const token& search_token) const override;
inet_address_vector_replica_set get_endpoints_for_reading(const token& search_token) const override;
host_id_vector_replica_set get_replicas(const token& search_token) const override;
std::optional<tablet_routing_info> check_locality(const token& token) const override;
bool has_pending_ranges(locator::host_id endpoint) const override;
std::unique_ptr<token_range_splitter> make_splitter() const override;
Expand Down Expand Up @@ -371,6 +385,7 @@ public:
private:
dht::token_range_vector do_get_ranges(noncopyable_function<stop_iteration(bool& add_range, const inet_address& natural_endpoint)> consider_range_for_endpoint) const;
inet_address_vector_replica_set do_get_natural_endpoints(const token& tok, bool is_vnode) const;
host_id_vector_replica_set do_get_replicas(const token& tok, bool is_vnode) const;
stop_iteration for_each_natural_endpoint_until(const token& vnode_tok, const noncopyable_function<stop_iteration(const inet_address&)>& func) const;

public:
Expand Down
63 changes: 48 additions & 15 deletions locator/tablets.cc
Expand Up @@ -481,6 +481,17 @@ size_t tablet_metadata::external_memory_usage() const {
return result;
}

bool tablet_metadata::has_replica_on(host_id host) const {
for (auto&& [id, map] : _tablets) {
for (auto&& tablet : map.tablet_ids()) {
if (map.get_shard(tablet, host)) {
return true;
}
}
}
return false;
}

future<bool> check_tablet_replica_shards(const tablet_metadata& tm, host_id this_host) {
bool valid = true;
for (const auto& [table_id, tmap] : tm.all_tables()) {
Expand All @@ -506,27 +517,30 @@ class tablet_effective_replication_map : public effective_replication_map {
inet_address_vector_replica_set to_replica_set(const tablet_replica_set& replicas) const {
inet_address_vector_replica_set result;
result.reserve(replicas.size());
auto& topo = _tmptr->get_topology();
for (auto&& replica : replicas) {
auto* node = topo.find_node(replica.host);
if (node) {
result.emplace_back(node->endpoint());
}
}
return result;
}

host_id_vector_replica_set to_host_set(const tablet_replica_set& replicas) const {
host_id_vector_replica_set result;
result.reserve(replicas.size());
for (auto&& replica : replicas) {
result.emplace_back(_tmptr->get_endpoint_for_host_id(replica.host));
result.emplace_back(replica.host);
}
return result;
}

const tablet_map& get_tablet_map() const {
return _tmptr->tablets().get_tablet_map(_table);
}
public:
tablet_effective_replication_map(table_id table,
replication_strategy_ptr rs,
token_metadata_ptr tmptr,
size_t replication_factor)
: effective_replication_map(std::move(rs), std::move(tmptr), replication_factor)
, _table(table)
, _sharder(*_tmptr, table)
{ }

virtual ~tablet_effective_replication_map() = default;

virtual inet_address_vector_replica_set get_natural_endpoints(const token& search_token) const override {
const tablet_replica_set& get_replicas_for_write(dht::token search_token) const {
auto&& tablets = get_tablet_map();
auto tablet = tablets.get_tablet_id(search_token);
auto* info = tablets.get_tablet_transition_info(tablet);
Expand All @@ -545,8 +559,27 @@ class tablet_effective_replication_map : public effective_replication_map {
}
on_internal_error(tablet_logger, format("Invalid replica selector", static_cast<int>(info->writes)));
});
tablet_logger.trace("get_natural_endpoints({}): table={}, tablet={}, replicas={}", search_token, _table, tablet, replicas);
return to_replica_set(replicas);
tablet_logger.trace("get_replicas_for_write({}): table={}, tablet={}, replicas={}", search_token, _table, tablet, replicas);
return replicas;
}
public:
tablet_effective_replication_map(table_id table,
replication_strategy_ptr rs,
token_metadata_ptr tmptr,
size_t replication_factor)
: effective_replication_map(std::move(rs), std::move(tmptr), replication_factor)
, _table(table)
, _sharder(*_tmptr, table)
{ }

virtual ~tablet_effective_replication_map() = default;

virtual host_id_vector_replica_set get_replicas(const token& search_token) const override {
return to_host_set(get_replicas_for_write(search_token));
}

virtual inet_address_vector_replica_set get_natural_endpoints(const token& search_token) const override {
return to_replica_set(get_replicas_for_write(search_token));
}

virtual inet_address_vector_replica_set get_natural_endpoints_without_node_being_replaced(const token& search_token) const override {
Expand Down
1 change: 1 addition & 0 deletions locator/tablets.hh
Expand Up @@ -451,6 +451,7 @@ public:
const table_to_tablet_map& all_tables() const { return _tablets; }
table_to_tablet_map& all_tables() { return _tablets; }
size_t external_memory_usage() const;
bool has_replica_on(host_id) const;
public:
void set_balancing_enabled(bool value) { _balancing_enabled = value; }
void set_tablet_map(table_id, tablet_map);
Expand Down
24 changes: 15 additions & 9 deletions locator/topology.cc
Expand Up @@ -344,14 +344,20 @@ void topology::index_node(const node* node) {
}
}

const auto& dc = node->dc_rack().dc;
const auto& rack = node->dc_rack().rack;
const auto& endpoint = node->endpoint();
_dc_nodes[dc].emplace(node);
_dc_rack_nodes[dc][rack].emplace(node);
_dc_endpoints[dc].insert(endpoint);
_dc_racks[dc][rack].insert(endpoint);
_datacenters.insert(dc);
// We keep location of left nodes because they may still appear in tablet replica sets
// and algorithms expect to know which dc they belonged to. View replica pairing needs stable
// replica indexes.
// But we don't consider those nodes as members of the cluster so don't update dc registry.
if (!node->left()) {
const auto& dc = node->dc_rack().dc;
const auto& rack = node->dc_rack().rack;
const auto& endpoint = node->endpoint();
_dc_nodes[dc].emplace(node);
_dc_rack_nodes[dc][rack].emplace(node);
_dc_endpoints[dc].insert(endpoint);
_dc_racks[dc][rack].insert(endpoint);
_datacenters.insert(dc);
}

if (node->is_this_node()) {
_this_node = node;
Expand Down Expand Up @@ -551,7 +557,7 @@ std::weak_ordering topology::compare_endpoints(const inet_address& address, cons

void topology::for_each_node(std::function<void(const node*)> func) const {
for (const auto& np : _nodes) {
if (np) {
if (np && !np->left()) {
func(np.get());
}
}
Expand Down

0 comments on commit 72bbe75

Please sign in to comment.