Skip to content

Commit

Permalink
[server] Remove Notification from shutdown path (grpc#33953)
Browse files Browse the repository at this point in the history
I'm fairly certain that this path should be non-blocking (and making it
so makes the promise based code far more tractable).

This moves the blocking behavior into the blocking server_cc.cc function
that calls `grpc_server_shutdown_and_notify` instead of in that
non-blocking function.

---------

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
  • Loading branch information
ctiller and ctiller committed Aug 2, 2023
1 parent daa8914 commit 91e7f22
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 23 deletions.
1 change: 0 additions & 1 deletion BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1577,7 +1577,6 @@ grpc_cc_library(
"//src/core:memory_quota",
"//src/core:metadata_compression_traits",
"//src/core:no_destruct",
"//src/core:notification",
"//src/core:packed_table",
"//src/core:per_cpu",
"//src/core:pipe",
Expand Down
8 changes: 1 addition & 7 deletions src/core/lib/surface/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,6 @@ void DonePublishedShutdown(void* /*done_arg*/, grpc_cq_completion* storage) {
// connection is NOT closed until the server is done with all those calls.
// -- Once there are no more calls in progress, the channel is closed.
void Server::ShutdownAndNotify(grpc_completion_queue* cq, void* tag) {
Notification* await_requests = nullptr;
ChannelBroadcaster broadcaster;
{
// Wait for startup to be finished. Locks mu_global.
Expand All @@ -966,12 +965,7 @@ void Server::ShutdownAndNotify(grpc_completion_queue* cq, void* tag) {
MutexLock lock(&mu_call_);
KillPendingWorkLocked(GRPC_ERROR_CREATE("Server Shutdown"));
}
await_requests = ShutdownUnrefOnShutdownCall();
}
// We expect no new requests but there can still be requests in-flight.
// Wait for them to complete before proceeding.
if (await_requests != nullptr) {
await_requests->WaitForNotification();
ShutdownUnrefOnShutdownCall();
}
StopListening();
broadcaster.BroadcastShutdown(/*send_goaway=*/true, absl::OkStatus());
Expand Down
16 changes: 1 addition & 15 deletions src/core/lib/surface/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@

#include <grpc/grpc.h>
#include <grpc/slice.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>

#include "src/core/lib/channel/channel_args.h"
Expand All @@ -46,7 +45,6 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/cpp_impl_of.h"
#include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/sync.h"
Expand Down Expand Up @@ -411,24 +409,13 @@ class Server : public InternallyRefCounted<Server>,
if (shutdown_refs_.fetch_sub(2, std::memory_order_acq_rel) == 2) {
MutexLock lock(&mu_global_);
MaybeFinishShutdown();
// The last request in-flight during shutdown is now complete.
if (requests_complete_ != nullptr) {
GPR_ASSERT(!requests_complete_->HasBeenNotified());
requests_complete_->Notify();
}
}
}
// Returns a notification pointer to wait on if there are requests in-flight,
// or null.
GRPC_MUST_USE_RESULT Notification* ShutdownUnrefOnShutdownCall()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_global_) {
void ShutdownUnrefOnShutdownCall() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_global_) {
if (shutdown_refs_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
// There is no request in-flight.
MaybeFinishShutdown();
return nullptr;
}
requests_complete_ = std::make_unique<Notification>();
return requests_complete_.get();
}

bool ShutdownCalled() const {
Expand Down Expand Up @@ -479,7 +466,6 @@ class Server : public InternallyRefCounted<Server>,
std::atomic<int> shutdown_refs_{1};
bool shutdown_published_ ABSL_GUARDED_BY(mu_global_) = false;
std::vector<ShutdownTag> shutdown_tags_ ABSL_GUARDED_BY(mu_global_);
std::unique_ptr<Notification> requests_complete_ ABSL_GUARDED_BY(mu_global_);

std::list<ChannelData*> channels_;

Expand Down
2 changes: 2 additions & 0 deletions src/cpp/server/server_cc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1266,6 +1266,8 @@ void Server::ShutdownInternal(gpr_timespec deadline) {
// shutdown. We should force a shutdown now by cancelling all inflight calls
if (status == grpc::CompletionQueue::NextStatus::TIMEOUT) {
grpc_server_cancel_all_calls(server_);
status =
shutdown_cq.AsyncNext(&tag, &ok, gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
// Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
// successfully shutdown
Expand Down
1 change: 1 addition & 0 deletions test/cpp/end2end/client_lb_end2end_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/env.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/tcp_client.h"
Expand Down

0 comments on commit 91e7f22

Please sign in to comment.