Skip to content

Commit

Permalink
message: rpc_protocol_impl: define send_message_cancellable overload …
Browse files Browse the repository at this point in the history
…getting rpc::cancellable

Let the caller allocate a seastar::rpc::cancellable
and use it to cancel the message to enable
more flexible cancel functionality, like a combination
of a timeout and abort_source.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
  • Loading branch information
bhalevy committed May 8, 2023
1 parent 5fa459b commit eb79e45
Showing 1 changed file with 29 additions and 0 deletions.
29 changes: 29 additions & 0 deletions message/rpc_protocol_impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,35 @@ auto send_message_cancellable(messaging_service* ms, messaging_verb verb, msg_ad
});
}

// Requesting cancel on the provided rpc::cancellable drops the message from the outgoing queue (if it's still there)
// and causes the returned future to resolve exceptionally with `rpc::cancelled_error`.
// TODO: Remove duplicated code in send_message
template <typename MsgIn, typename... MsgOut>
auto send_message_cancellable(messaging_service* ms, messaging_verb verb, msg_addr id, seastar::rpc::cancellable& c_ref, MsgOut&&... msg) {
auto rpc_handler = ms->rpc()->make_client<MsgIn(MsgOut...)>(verb);
using futurator = futurize<std::result_of_t<decltype(rpc_handler)(rpc_protocol::client&, MsgOut...)>>;
if (ms->is_shutting_down()) {
return futurator::make_exception_future(rpc::closed_error());
}
auto rpc_client_ptr = ms->get_rpc_client(verb, id);
auto& rpc_client = *rpc_client_ptr;

return rpc_handler(rpc_client, c_ref, std::forward<MsgOut>(msg)...).handle_exception([ms = ms->shared_from_this(), id, verb, rpc_client_ptr = std::move(rpc_client_ptr)] (std::exception_ptr&& eptr) {
ms->increment_dropped_messages(verb);
if (try_catch<rpc::closed_error>(eptr)) {
// This is a transport error
ms->remove_error_rpc_client(verb, id);
return futurator::make_exception_future(std::move(eptr));
} else if (try_catch<rpc::canceled_error>(eptr)) {
// Translate low-level canceled_error into high-level abort_requested_exception.
return futurator::make_exception_future(abort_requested_exception{});
} else {
// This is expected to be a rpc server error, e.g., the rpc handler throws a std::runtime_error.
return futurator::make_exception_future(std::move(eptr));
}
});
}

// Send one way message for verb
template <typename... MsgOut>
auto send_message_oneway(messaging_service* ms, messaging_verb verb, msg_addr id, MsgOut&&... msg) {
Expand Down

0 comments on commit eb79e45

Please sign in to comment.