Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds support for propagating grpc server-side cancellations #844

Merged
merged 11 commits into from Jul 11, 2019

Conversation

Projects
None yet
2 participants
@shamsimam
Copy link
Contributor

commented Jul 9, 2019

Jet PR: twosigma/jet#32

Changes proposed in this PR

  • adds support for propagating grpc server-side cancellations

Why are we making these changes?

gRPC headers/trailers including errors codes should be propagated correctly to the client.

@shamsimam shamsimam self-assigned this Jul 9, 2019

@shamsimam shamsimam force-pushed the grpc-server-side-cancel branch 2 times, most recently from 4907cf3 to 6dd5f97 Jul 9, 2019

@shamsimam shamsimam requested a review from sradack Jul 9, 2019

@shamsimam shamsimam force-pushed the grpc-server-side-cancel branch from 24f8236 to 1162f2e Jul 10, 2019

@sradack
Copy link
Contributor

left a comment

Ending this review as I just saw more commits come in.

@@ -148,6 +150,15 @@ public static CourierReply sendPackage(final String host,
} catch (final StatusRuntimeException e) {
logFunction.apply("RPC failed, status: " + e.getStatus());
return null;
} catch (ExecutionException e) {

This comment has been minimized.

Copy link
@sradack

sradack Jul 10, 2019

Contributor
Suggested change
} catch (ExecutionException e) {
} catch (final ExecutionException e) {

This comment has been minimized.

Copy link
@shamsimam

shamsimam Jul 10, 2019

Author Contributor

Done.

@@ -148,6 +150,15 @@ public static CourierReply sendPackage(final String host,
} catch (final StatusRuntimeException e) {
logFunction.apply("RPC failed, status: " + e.getStatus());
return null;
} catch (ExecutionException e) {
final Status status = Status.fromThrowable(e.getCause());

This comment has been minimized.

Copy link
@sradack

sradack Jul 10, 2019

Contributor

Is getCause() necessary? Status.fromThrowable() searches recursively through causes.

This comment has been minimized.

} catch (ExecutionException e) {
final Status status = Status.fromThrowable(e.getCause());
logFunction.apply("RPC execution failed: " + status);
return CourierReply

This comment has been minimized.

Copy link
@sradack

sradack Jul 10, 2019

Contributor

Can we avoid packing into CourierReply? CourierReply is meant to represent a message from the server.

This comment has been minimized.

Copy link
@shamsimam

shamsimam Jul 10, 2019

Author Contributor

Done.

@@ -214,6 +227,16 @@ public void onError(final Throwable throwable) {
logFunction.apply("releasing semaphore after receiving error");
lockStep.release();
}
if (throwable instanceof StatusRuntimeException) {
final StatusRuntimeException exception = (StatusRuntimeException) throwable;
final CourierSummary response = CourierSummary

This comment has been minimized.

Copy link
@sradack

sradack Jul 10, 2019

Contributor

Can we avoid packing into CourierSummary? CourierSummary is meant to represent a message from the server.

This comment has been minimized.

Copy link
@shamsimam

shamsimam Jul 10, 2019

Author Contributor

Done.

if (i >= cancelThreshold) {
logFunction.apply("cancelling sending messages");
awaitChannelTermination.set(false);
channel.shutdownNow();

This comment has been minimized.

Copy link
@sradack

sradack Jul 10, 2019

Contributor

We should also test explicit cancels. I found three ways:

final Context.CancellableContext cancellableContext = Context.current().withCancellation();
cancellableContext.run(() -> {
    futureStub.collectPackages(new StreamObserver<CourierSummary>() {
        // ...
    });
});
// later...
cancellableContext.cancel(new RuntimeException("cancelling grpc request"));
ListenableFuture<CourierReply> future = futureStub.sendPackage(request);
future.cancel(false);
collector.onError(Status.CANCELLED.asRuntimeException());

https://groups.google.com/forum/#!topic/grpc-io/quToVM4NhdQ
https://groups.google.com/forum/#!topic/grpc-io/MdDADccRpAQ

This comment has been minimized.

Copy link
@shamsimam

shamsimam Jul 10, 2019

Author Contributor

I added the client-side streaming, but server-side cancellation case.
The above examples are for client-side cancellation, they will be added in the client-side cancellation PR.

(let [trailers-map (async/<! trailers)
modified-trailers (merge grpc-headers trailers-map)]
(log/info "response trailers:" trailers-map)
(when (seq grpc-headers)

This comment has been minimized.

Copy link
@sradack

sradack Jul 10, 2019

Contributor

Is (seq grpc-headers) always true?

This comment has been minimized.

Copy link
@shamsimam

shamsimam Jul 10, 2019

Author Contributor

Yes, I added the if (seq grpc-headers) later which makes these seq checks unnecessary.

This comment has been minimized.

Copy link
@shamsimam

shamsimam Jul 10, 2019

Author Contributor

removed.

(log/info "response trailers:" trailers-map)
(when (seq grpc-headers)
(log/info "attaching grpc headers into trailer:" grpc-headers))
(when (seq modified-trailers)

This comment has been minimized.

Copy link
@sradack

sradack Jul 10, 2019

Contributor

Is (seq modified-trailers) always true?

This comment has been minimized.

Copy link
@shamsimam

shamsimam Jul 10, 2019

Author Contributor

Yes, I added the if (seq grpc-headers) later which makes these seq checks unnecessary.

This comment has been minimized.

Copy link
@shamsimam

shamsimam Jul 10, 2019

Author Contributor

removed.

;; mark the request as successful, grpc failures are reported in the headers
(deliver reservation-status-promise :success)
;; stop writing any content in the body
(async/close! body)

This comment has been minimized.

Copy link
@sradack

sradack Jul 10, 2019

Contributor

Isn't body being read by stream-http-response? Can we make this comment more clear?

This comment has been minimized.

Copy link
@shamsimam

shamsimam Jul 10, 2019

Author Contributor

Added comments, we want to avoid writing data to body in stream-http-response

This comment has been minimized.

Copy link
@sradack

sradack Jul 11, 2019

Contributor

stream-http-response writes to resp-chan. The body here is the body coming back from the backend.

This comment has been minimized.

Copy link
@shamsimam

shamsimam Jul 11, 2019

Author Contributor

stream-http-response reads from body, closing body triggers request completion of the request by closing resp-chan.

(log/info "eagerly closing response body as grpc status is" grpc-status)
(when abort-ch
;; disallow aborting the request
(async/close! abort-ch))

This comment has been minimized.

Copy link
@sradack

sradack Jul 10, 2019

Contributor

Can we say why we are disallowing aborting the request?

This comment has been minimized.

Copy link
@shamsimam

shamsimam Jul 10, 2019

Author Contributor

Added comment, we have deemed the request as success and will trigger normal completion by closing the body channel.

(deliver reservation-status-promise :success)
;; stop writing any content in the body
(async/close! body)
(async/close! error-chan))

This comment has been minimized.

Copy link
@sradack

sradack Jul 10, 2019

Contributor

Why do we close the error-chan?

This comment has been minimized.

Copy link
@shamsimam

shamsimam Jul 10, 2019

Author Contributor

Added comments, we want to avoid blocking in stream-http-response

@shamsimam shamsimam force-pushed the grpc-server-side-cancel branch 2 times, most recently from f4448b9 to c430a6b Jul 10, 2019

@shamsimam shamsimam force-pushed the grpc-server-side-cancel branch from c430a6b to 542ba84 Jul 10, 2019

initializes grpc client result Status to UNKNOWN instead of OK
asserts on individual fields on the ping response

@shamsimam shamsimam force-pushed the grpc-server-side-cancel branch from 542ba84 to de02079 Jul 11, 2019

public static RpcResult<CourierReply> sendPackage(final String host,
final int port,
final Map<String, Object> headers,
final String id,

This comment has been minimized.

Copy link
@sradack

sradack Jul 11, 2019

Contributor

Can we avoid overloading id with the actions to take? Can we avoid updating the signatures of these functions when we want to make a change on the Clojure side? This request is aspirational and not required for this PR.

This comment has been minimized.

Copy link
@shamsimam

shamsimam Jul 11, 2019

Author Contributor

Will do in a later PR. Created #854.

}

@Override
public void onCompleted() {

This comment has been minimized.

Copy link
@sradack

sradack Jul 11, 2019

Contributor

Can we confirm onCompleted and onError are mutually exclusive?

This comment has been minimized.

Copy link
@shamsimam

shamsimam Jul 11, 2019

Author Contributor

Yes, here are logs from two runs where onCompleted () is not called when there is an error:

When there is an error:

INFO: completed sending packages
INFO: status received from server:Status{code=CANCELLED, description=Cancelled by server, cause=null}
INFO: trailers received from server:Metadata(content-type=application/grpc)
INFO: error in aggregating summaries io.grpc.StatusRuntimeException: CANCELLED: Cancelled by server
INFO: client result: null
INFO: shutting down channel

Successful completion:

INFO: completed sending packages
INFO: headers received from server:Metadata(content-type=application/grpc,x-cid=cid-aggregate-packages-success.1562836825027,grpc-encoding=identity,grpc-accept-encoding=gzip)
INFO: received response CourierSummary{count=10, length=90}
INFO: status received from server:Status{code=OK, description=null, cause=null}
INFO: trailers received from server:Metadata()
INFO: completed aggregating summaries
INFO: client result: num_messages: 10, total_length: 90
INFO: shutting down channel
;; mark the request as successful, grpc failures are reported in the headers
(deliver reservation-status-promise :success)
;; stop writing any content in the body
(async/close! body)

This comment has been minimized.

Copy link
@sradack

sradack Jul 11, 2019

Contributor

stream-http-response writes to resp-chan. The body here is the body coming back from the backend.

shamsimam added some commits Jul 11, 2019

improves assertion messages
avoids warnings due to presence of overloaded methods in grpc client
improves assertion messages
avoids warnings due to presence of overloaded methods in grpc client
adds correlation-id to logs from grpc client

@shamsimam shamsimam force-pushed the grpc-server-side-cancel branch from bc96170 to 1809d61 Jul 11, 2019

@shamsimam

This comment has been minimized.

Copy link
Contributor Author

commented Jul 11, 2019

I have updated the courier client code to avoid race in reporting the error status.

@shamsimam

This comment has been minimized.

Copy link
Contributor Author

commented Jul 11, 2019

Fyi, consistently green runs internally since the client race fix.

@sradack sradack merged commit dd6bd41 into master Jul 11, 2019

2 checks passed

Mergeable Mergeable Run has been Completed!
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details

@sradack sradack deleted the grpc-server-side-cancel branch Jul 11, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.