Skip to content

Commit

Permalink
forward_service: introduce shutdown checks
Browse files Browse the repository at this point in the history
This commit introduces a new boolean flag, `shutdown`, to the
forward_service, along with a corresponding shutdown method. It also
adds checks throughout the forward_service to verify the value of the
shutdown flag before retrying or invoking functions that might use the
messaging service under the hood.

The flag is set before messaging service shutdown, by invoking
forward_service::shutdown in main. By checking the flag before each call
that potentially involves the messaging service, we can ensure that the
messaging service is still operational. If the flag is false, indicating
that the messaging service is still active, we can proceed with the
call. In the event that the messaging service is shutdown during the
call, appropriate exceptions should be thrown somewhere down in called
functions, avoiding potential hangs.

This fix should resolve the issue where forward_service retries could
block the shutdown.

Fixes #12604

Closes #13922
  • Loading branch information
havaker authored and xemul committed Jun 13, 2023
1 parent 9d1f62f commit e0855b1
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 0 deletions.
5 changes: 5 additions & 0 deletions main.cc
Expand Up @@ -1855,6 +1855,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
ss.local().drain_on_shutdown().get();
});

// Signal shutdown to the forward service before draining the messaging service.
auto shutdown_forward_service = defer_verbose_shutdown("forward service", [&forward_service] {
forward_service.invoke_on_all(&service::forward_service::shutdown).get();
});

auto drain_view_builder = defer_verbose_shutdown("view builder ops", [cfg] {
if (cfg->view_building()) {
view_builder.invoke_on_all(&db::view::view_builder::drain).get();
Expand Down
29 changes: 29 additions & 0 deletions service/forward_service.cc
Expand Up @@ -272,11 +272,25 @@ class retrying_dispatcher {

_forwarder._stats.requests_dispatched_to_other_nodes += 1;

// Check for a shutdown request before sending a forward_request to
// another node. During the drain process, the messaging service is shut
// down early (but not earlier than the forward_service::shutdown
// invocation), so by performing this check, we can prevent hanging on
// the RPC call.
if (_forwarder._shutdown) {
return make_exception_future<query::forward_result>(std::runtime_error("forward_service is shutting down"));
}

// Try to send this forward_request to another node.
return do_with(id, req, [this] (netw::msg_addr& id, query::forward_request& req) -> future<query::forward_result> {
return ser::forward_request_rpc_verbs::send_forward_request(
&_forwarder._messaging, id, req, _tr_info
).handle_exception_type([this, &req, &id] (rpc::closed_error& e) -> future<query::forward_result> {
if (_forwarder._shutdown) {
// Do not retry if shutting down.
return make_exception_future<query::forward_result>(e);
}

// In case of forwarding failure, retry using super-coordinator as a coordinator
flogger.warn("retrying forward_request={} on a super-coordinator after failing to send it to {} ({})", req, id, e.what());
tracing::trace(_tr_state, "retrying forward_request={} on a super-coordinator after failing to send it to {} ({})", req, id, e.what());
Expand All @@ -291,6 +305,11 @@ locator::token_metadata_ptr forward_service::get_token_metadata_ptr() const noex
return _shared_token_metadata.get();
}

future<> forward_service::shutdown() {
_shutdown = true;
return make_ready_future<>();
}

future<> forward_service::stop() {
return uninit_messaging_service();
}
Expand Down Expand Up @@ -461,6 +480,16 @@ future<query::forward_result> forward_service::execute_on_this_shard(

// Execute query.
while (!pager->is_exhausted()) {
// It is necessary to check for a shutdown request before each
// fetch_page operation. During the drain process, the messaging
// service is shut down early (but not earlier than the
// forward_service::shutdown invocation), so by performing this
// check, we can prevent hanging on the RPC call (which can be made
// during fetching a page).
if (_shutdown) {
throw std::runtime_error("forward_service is shutting down");
}

co_await pager->fetch_page(rs_builder, DEFAULT_INTERNAL_PAGING_SIZE, now, timeout);
}

Expand Down
3 changes: 3 additions & 0 deletions service/forward_service.hh
Expand Up @@ -131,6 +131,8 @@ class forward_service : public seastar::peering_sharded_service<forward_service>
} _stats;
seastar::metrics::metric_groups _metrics;

bool _shutdown = false;

public:
forward_service(netw::messaging_service& ms, service::storage_proxy& p, distributed<replica::database> &db,
const locator::shared_token_metadata& stm)
Expand All @@ -142,6 +144,7 @@ public:
init_messaging_service();
}

future<> shutdown();
future<> stop();

// Splits given `forward_request` and distributes execution of resulting
Expand Down

0 comments on commit e0855b1

Please sign in to comment.