Skip to content

Commit

Permalink
Merge 'Enable RBNO by default' from Asias He
Browse files Browse the repository at this point in the history
This pr fixes the seastar::rpc::closed_error error in the test_topology suite and enables RBNO by default.

Closes #12970

* github.com:scylladb/scylladb:
  Revert "Revert "storage_service: Enable Repair Based Node Operations (RBNO) by default for all node ops""
  storage_service: Wait for normal state handler to finish in replace
  storage_service: Wait for normal state handler to finish in bootstrap
  • Loading branch information
denesb committed Mar 1, 2023
2 parents 7dc5477 + 8fb7869 commit 84e26ed
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 1 deletion.
2 changes: 1 addition & 1 deletion db/config.cc
Expand Up @@ -794,7 +794,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, ignore_dead_nodes_for_replace(this, "ignore_dead_nodes_for_replace", value_status::Used, "", "List dead nodes to ingore for replace operation using a comma-separated list of host IDs. E.g., scylla --ignore-dead-nodes-for-replace 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c,125ed9f4-7777-1dbn-mac8-43fddce9123e")
, override_decommission(this, "override_decommission", value_status::Used, false, "Set true to force a decommissioned node to join the cluster")
, enable_repair_based_node_ops(this, "enable_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, true, "Set true to use enable repair based node operations instead of streaming based")
, allowed_repair_based_node_ops(this, "allowed_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, "replace", "A comma separated list of node operations which are allowed to enable repair based node operations. The operations can be bootstrap, replace, removenode, decommission and rebuild")
, allowed_repair_based_node_ops(this, "allowed_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, "replace,removenode,rebuild,bootstrap,decommission", "A comma separated list of node operations which are allowed to enable repair based node operations. The operations can be bootstrap, replace, removenode, decommission and rebuild")
, ring_delay_ms(this, "ring_delay_ms", value_status::Used, 30 * 1000, "Time a node waits to hear from other nodes before joining the ring in milliseconds. Same as -Dcassandra.ring_delay_ms in cassandra.")
, shadow_round_ms(this, "shadow_round_ms", value_status::Used, 300 * 1000, "The maximum gossip shadow round time. Can be used to reduce the gossip feature check time during node boot up.")
, fd_max_interval_ms(this, "fd_max_interval_ms", value_status::Used, 2 * 1000, "The maximum failure_detector interval time in milliseconds. Interval larger than the maximum will be ignored. Larger cluster may need to increase the default.")
Expand Down
25 changes: 25 additions & 0 deletions service/storage_service.cc
Expand Up @@ -874,6 +874,7 @@ future<> storage_service::handle_state_normal(inet_address endpoint) {
bool left = std::any_of(nodes.begin(), nodes.end(), [this] (const gms::inet_address& node) { return _gossiper.is_left(node); });
if (left) {
slogger.info("Skip to set host_id={} to be owned by node={}, because the node is removed from the cluster, nodes {} used to own the host_id", host_id, endpoint, nodes);
_normal_state_handled_on_boot.insert(endpoint);
co_return;
}
slogger.info("Set host_id={} to be owned by node={}", host_id, endpoint);
Expand Down Expand Up @@ -992,6 +993,7 @@ future<> storage_service::handle_state_normal(inet_address endpoint) {
slogger.debug("handle_state_normal: token_metadata.ring_version={}, token={} -> endpoint={}", ver, x.first, x.second);
}
}
_normal_state_handled_on_boot.insert(endpoint);
}

future<> storage_service::handle_state_leaving(inet_address endpoint) {
Expand Down Expand Up @@ -2171,6 +2173,7 @@ void storage_service::run_bootstrap_ops(std::unordered_set<token>& bootstrap_tok
sync_nodes.push_back(node);
}
}
wait_for_normal_state_handled_on_boot(sync_nodes, "bootstrap", uuid).get();
sync_nodes.push_front(get_broadcast_address());

// Step 2: Wait until no pending node operations
Expand Down Expand Up @@ -2290,6 +2293,7 @@ void storage_service::run_replace_ops(std::unordered_set<token>& bootstrap_token
sync_nodes.push_back(node);
}
}
wait_for_normal_state_handled_on_boot(sync_nodes, "replace", uuid).get();
sync_nodes.push_front(get_broadcast_address());
auto sync_nodes_generations = _gossiper.get_generation_for_nodes(sync_nodes).get();
// Map existing nodes to replacing nodes
Expand Down Expand Up @@ -3713,6 +3717,27 @@ future<> storage_service::notify_cql_change(inet_address endpoint, bool ready) {
}
}

bool storage_service::is_normal_state_handled_on_boot(gms::inet_address node) {
return _normal_state_handled_on_boot.contains(node);
}

// Wait for normal state handler to finish on boot
future<> storage_service::wait_for_normal_state_handled_on_boot(std::list<gms::inet_address> nodes, sstring ops, node_ops_id uuid) {
slogger.info("{}[{}]: Started waiting for normal state handler for nodes {}", ops, uuid, nodes);
auto start_time = std::chrono::steady_clock::now();
for (auto& node: nodes) {
while (!is_normal_state_handled_on_boot(node)) {
slogger.debug("{}[{}]: Waiting for normal state handler for node {}", ops, uuid, node);
co_await sleep_abortable(std::chrono::milliseconds(100), _abort_source);
if (std::chrono::steady_clock::now() > start_time + std::chrono::seconds(60)) {
throw std::runtime_error(format("{}[{}]: Node {} did not finish normal state handler, reject the node ops", ops, uuid, node));
}
}
}
slogger.info("{}[{}]: Finished waiting for normal state handler for nodes {}", ops, uuid, nodes);
co_return;
}

future<bool> storage_service::is_cleanup_allowed(sstring keyspace) {
return container().invoke_on(0, [keyspace = std::move(keyspace)] (storage_service& ss) {
auto my_address = ss.get_broadcast_address();
Expand Down
5 changes: 5 additions & 0 deletions service/storage_service.hh
Expand Up @@ -760,6 +760,11 @@ private:
public:
future<bool> is_cleanup_allowed(sstring keyspace);
bool is_repair_based_node_ops_enabled(streaming::stream_reason reason);

private:
std::unordered_set<gms::inet_address> _normal_state_handled_on_boot;
bool is_normal_state_handled_on_boot(gms::inet_address);
future<> wait_for_normal_state_handled_on_boot(std::list<gms::inet_address> nodes, sstring ops, node_ops_id uuid);
};

}

0 comments on commit 84e26ed

Please sign in to comment.