Skip to content

Commit

Permalink
apacheGH-36369: [C++][FlightRPC] Fix a hang bug in FlightClient::Auth…
Browse files Browse the repository at this point in the history
…enticate*()

We need to drain all responses from server before we call gRPC's
Finish() to avoid hanging.
  • Loading branch information
kou committed Jun 30, 2023
1 parent 5a06b2e commit 57e72ea
Showing 1 changed file with 29 additions and 3 deletions.
32 changes: 29 additions & 3 deletions cpp/src/arrow/flight/transport/grpc/grpc_client.cc
Expand Up @@ -285,7 +285,7 @@ class FinishableDataStream : public internal::ClientDataStream {
// reader finishes, so it's OK to assume the client no longer
// wants to read and drain the read side. (If the client wants to
// indicate that it is done writing, but not done reading, it
// should use DoneWriting.
// should use DoneWriting.)
ReadPayloadType message;
while (ReadPayload(stream_.get(), &message)) {
// Drain the read side to avoid gRPC hanging in Finish()
Expand Down Expand Up @@ -740,11 +740,24 @@ class GrpcClientImpl : public internal::ClientTransport {
RETURN_NOT_OK(auth_handler_->Authenticate(&outgoing, &incoming));
// Explicitly close our side of the connection
bool finished_writes = stream->WritesDone();
RETURN_NOT_OK(FromGrpcStatus(stream->Finish(), &rpc.context));
if (!finished_writes) {
return MakeFlightError(FlightStatusCode::Internal,
"Could not finish writing before closing");
}
// Drain the read side, as otherwise gRPC Finish() will hang. We
// only call Finish() when the client closes the writer or the
// reader finishes, so it's OK to assume the client no longer
// wants to read and drain the read side.
pb::HandshakeResponse response;
if (!stream->Read(&response)) {
return MakeFlightError(FlightStatusCode::Internal,
"No handshake response from server");
}
if (stream->Read(&response)) {
return MakeFlightError(FlightStatusCode::Internal,
"Too much handshake response from server");
}
RETURN_NOT_OK(FromGrpcStatus(stream->Finish(), &rpc.context));
return Status::OK();
}

Expand All @@ -759,11 +772,24 @@ class GrpcClientImpl : public internal::ClientTransport {
stream = stub_->Handshake(&rpc.context);
// Explicitly close our side of the connection.
bool finished_writes = stream->WritesDone();
RETURN_NOT_OK(FromGrpcStatus(stream->Finish(), &rpc.context));
if (!finished_writes) {
return MakeFlightError(FlightStatusCode::Internal,
"Could not finish writing before closing");
}
// Drain the read side, as otherwise gRPC Finish() will hang. We
// only call Finish() when the client closes the writer or the
// reader finishes, so it's OK to assume the client no longer
// wants to read and drain the read side.
pb::HandshakeResponse response;
if (!stream->Read(&response)) {
return MakeFlightError(FlightStatusCode::Internal,
"No handshake response from server");
}
if (stream->Read(&response)) {
return MakeFlightError(FlightStatusCode::Internal,
"Too much handshake response from server");
}
RETURN_NOT_OK(FromGrpcStatus(stream->Finish(), &rpc.context));
// Grab bearer token from incoming headers.
return GetBearerTokenHeader(rpc.context);
}
Expand Down

0 comments on commit 57e72ea

Please sign in to comment.