Skip to content

Commit

Permalink
storage_service: Send heartbeat earlier for node ops
Browse files Browse the repository at this point in the history
Node ops has the following procedure:

1   for node in sync_nodes
      send prepare cmd to node

2   for node in sync_nodes
      send heartbeat cmd to node

If any of the prepare cmd in step 1 takes longer than the heartbeat
watchdog timeout, the heartbeat in step 2 will be too late to update the
watchdog, as a result the watchdog will abort the operation.

To prevent slow prepare cmd kills the node operations, we can start the
heartbeat earlier in the procedure.

Fixes #11011
Fixes #12969

Closes #12980
  • Loading branch information
asias authored and tgrabiec committed Feb 24, 2023
1 parent 61e67b8 commit ba919aa
Showing 1 changed file with 38 additions and 36 deletions.
74 changes: 38 additions & 36 deletions service/storage_service.cc
Expand Up @@ -2013,11 +2013,19 @@ future<> storage_service::decommission() {
bool raft_available = ss._group0->wait_for_raft().get();
bool left_token_ring = false;

// Step 2: Prepare to sync data
std::unordered_set<gms::inet_address> nodes_unknown_verb;
std::unordered_set<gms::inet_address> nodes_down;
auto req = node_ops_cmd_request{node_ops_cmd::decommission_prepare, uuid, ignore_nodes, leaving_nodes, {}};
try {
// Step 2: Start heartbeat updater
auto heartbeat_updater_done = make_lw_shared<bool>(false);
auto heartbeat_updater = ss.node_ops_cmd_heartbeat_updater(node_ops_cmd::decommission_heartbeat, uuid, nodes, heartbeat_updater_done);
auto stop_heartbeat_updater = defer([&] {
*heartbeat_updater_done = true;
heartbeat_updater.get();
});

// Step 3: Prepare to sync data
parallel_for_each(nodes, [&ss, &req, &nodes_unknown_verb, &nodes_down, uuid] (const gms::inet_address& node) {
return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
slogger.debug("decommission[{}]: Got prepare response from node={}", uuid, node);
Expand All @@ -2040,14 +2048,6 @@ future<> storage_service::decommission() {
throw std::runtime_error(msg);
}

// Step 3: Start heartbeat updater
auto heartbeat_updater_done = make_lw_shared<bool>(false);
auto heartbeat_updater = ss.node_ops_cmd_heartbeat_updater(node_ops_cmd::decommission_heartbeat, uuid, nodes, heartbeat_updater_done);
auto stop_heartbeat_updater = defer([&] {
*heartbeat_updater_done = true;
heartbeat_updater.get();
});

// Step 4: Start to sync data
slogger.info("DECOMMISSIONING: unbootstrap starts");
ss.unbootstrap().get();
Expand Down Expand Up @@ -2208,6 +2208,14 @@ void storage_service::run_bootstrap_ops(std::unordered_set<token>& bootstrap_tok
auto req = node_ops_cmd_request(node_ops_cmd::bootstrap_prepare, uuid, ignore_nodes, {}, {}, bootstrap_nodes);
slogger.info("bootstrap[{}]: Started bootstrap operation, bootstrap_nodes={}, sync_nodes={}, ignore_nodes={}", uuid, bootstrap_nodes, sync_nodes, ignore_nodes);
try {
// Step 2: Start heartbeat updater
auto heartbeat_updater_done = make_lw_shared<bool>(false);
auto heartbeat_updater = node_ops_cmd_heartbeat_updater(node_ops_cmd::bootstrap_heartbeat, uuid, sync_nodes, heartbeat_updater_done);
auto stop_heartbeat_updater = defer([&] {
*heartbeat_updater_done = true;
heartbeat_updater.get();
});

// Step 3: Prepare to sync data
parallel_for_each(sync_nodes, [this, &req, &nodes_unknown_verb, &nodes_down, uuid] (const gms::inet_address& node) {
return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
Expand All @@ -2231,14 +2239,6 @@ void storage_service::run_bootstrap_ops(std::unordered_set<token>& bootstrap_tok
throw std::runtime_error(msg);
}

// Step 4: Start heartbeat updater
auto heartbeat_updater_done = make_lw_shared<bool>(false);
auto heartbeat_updater = node_ops_cmd_heartbeat_updater(node_ops_cmd::bootstrap_heartbeat, uuid, sync_nodes, heartbeat_updater_done);
auto stop_heartbeat_updater = defer([&] {
*heartbeat_updater_done = true;
heartbeat_updater.get();
});

// Step 5: Sync data for bootstrap
_repair.local().bootstrap_with_repair(get_token_metadata_ptr(), bootstrap_tokens).get();

Expand Down Expand Up @@ -2302,7 +2302,15 @@ void storage_service::run_replace_ops(std::unordered_set<token>& bootstrap_token
auto req = node_ops_cmd_request{node_ops_cmd::replace_prepare, uuid, ignore_nodes, {}, replace_nodes};
slogger.info("replace[{}]: Started replace operation, replace_nodes={}, sync_nodes={}, ignore_nodes={}", uuid, replace_nodes, sync_nodes, ignore_nodes);
try {
// Step 2: Prepare to sync data
// Step 2: Start heartbeat updater
auto heartbeat_updater_done = make_lw_shared<bool>(false);
auto heartbeat_updater = node_ops_cmd_heartbeat_updater(node_ops_cmd::replace_heartbeat, uuid, sync_nodes, heartbeat_updater_done);
auto stop_heartbeat_updater = defer([&] {
*heartbeat_updater_done = true;
heartbeat_updater.get();
});

// Step 3: Prepare to sync data
parallel_for_each(sync_nodes, [this, &req, &nodes_unknown_verb, &nodes_down, uuid] (const gms::inet_address& node) {
return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
slogger.debug("replace[{}]: Got node_ops_cmd::replace_prepare response from node={}", uuid, node);
Expand All @@ -2325,15 +2333,6 @@ void storage_service::run_replace_ops(std::unordered_set<token>& bootstrap_token
throw std::runtime_error(msg);
}

// Step 3: Start heartbeat updater
auto heartbeat_updater_done = make_lw_shared<bool>(false);
auto heartbeat_updater = node_ops_cmd_heartbeat_updater(node_ops_cmd::replace_heartbeat, uuid, sync_nodes, heartbeat_updater_done);
auto stop_heartbeat_updater = defer([&] {
*heartbeat_updater_done = true;
heartbeat_updater.get();
});


// Step 4: Allow nodes in sync_nodes list to mark the replacing node as alive
_gossiper.advertise_to_nodes(sync_nodes_generations).get();
slogger.info("replace[{}]: Allow nodes={} to mark replacing node={} as alive", uuid, sync_nodes, get_broadcast_address());
Expand Down Expand Up @@ -2471,11 +2470,19 @@ future<> storage_service::removenode(locator::host_id host_id, std::list<locator
}
slogger.info("removenode[{}]: Started token movement, node={}, sync_nodes={}, ignore_nodes={}", uuid, endpoint, nodes, ignore_nodes);

// Step 3: Prepare to sync data
std::unordered_set<gms::inet_address> nodes_unknown_verb;
std::unordered_set<gms::inet_address> nodes_down;
auto req = node_ops_cmd_request{node_ops_cmd::removenode_prepare, uuid, ignore_nodes, {endpoint}, {}};
try {
// Step 3: Start heartbeat updater
auto heartbeat_updater_done = make_lw_shared<bool>(false);
auto heartbeat_updater = ss.node_ops_cmd_heartbeat_updater(node_ops_cmd::removenode_heartbeat, uuid, nodes, heartbeat_updater_done);
auto stop_heartbeat_updater = defer([&] {
*heartbeat_updater_done = true;
heartbeat_updater.get();
});

// Step 4: Prepare to sync data
parallel_for_each(nodes, [&ss, &req, &nodes_unknown_verb, &nodes_down, uuid] (const gms::inet_address& node) {
return ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
slogger.debug("removenode[{}]: Got prepare response from node={}", uuid, node);
Expand All @@ -2498,14 +2505,6 @@ future<> storage_service::removenode(locator::host_id host_id, std::list<locator
throw std::runtime_error(msg);
}

// Step 4: Start heartbeat updater
auto heartbeat_updater_done = make_lw_shared<bool>(false);
auto heartbeat_updater = ss.node_ops_cmd_heartbeat_updater(node_ops_cmd::removenode_heartbeat, uuid, nodes, heartbeat_updater_done);
auto stop_heartbeat_updater = defer([&] {
*heartbeat_updater_done = true;
heartbeat_updater.get();
});

// Step 5: Start to sync data
req.cmd = node_ops_cmd::removenode_sync_data;
parallel_for_each(nodes, [&ss, &req, uuid] (const gms::inet_address& node) {
Expand Down Expand Up @@ -2588,6 +2587,9 @@ void storage_service::node_ops_cmd_check(gms::inet_address coordinator, const no
msg = format("node_ops_cmd_check: Node {} rejected node_ops_cmd={} from node={} with ops_uuid={}, pending_node_ops={}, pending node ops is in progress",
get_broadcast_address(), req.cmd, coordinator, req.ops_uuid, ops_uuids);
}
} else if (req.cmd == node_ops_cmd::decommission_heartbeat || req.cmd == node_ops_cmd::removenode_heartbeat ||
req.cmd == node_ops_cmd::replace_heartbeat || req.cmd == node_ops_cmd::bootstrap_heartbeat) {
// We allow node_ops_cmd heartbeat to be sent before prepare cmd
} else {
if (ops_uuids.size() == 1 && ops_uuids.front() == req.ops_uuid) {
// Check is good, since we know this ops_uuid and this is the only ops_uuid we are working on.
Expand Down

0 comments on commit ba919aa

Please sign in to comment.