-
Notifications
You must be signed in to change notification settings - Fork 4.5k
grpc: Fix cardinality violations in non-client streaming RPCs. #8385
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #8385 +/- ##
==========================================
- Coverage 82.44% 82.43% -0.01%
==========================================
Files 413 414 +1
Lines 40424 40437 +13
==========================================
+ Hits 33328 33336 +8
- Misses 5742 5743 +1
- Partials 1354 1358 +4
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to implement all of this in server.go
to avoid adding state to the serverStream
?
As discussed this with @arjan-bal offline, Cardinality violations can only be detected when messages are being read from the stream. This reading process occurs specifically within the server.RecvMsg() function. Since RecvMsg() is invoked from the user-implemented handler, it's not possible to detect cardinality violations during the initial stream setup phase. |
@@ -1774,6 +1775,9 @@ func (ss *serverStream) RecvMsg(m any) (err error) { | |||
binlog.Log(ss.ctx, chc) | |||
} | |||
} | |||
if !ss.desc.ClientStreams { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not immediately clear to me how this conditions indicates RecvMsg is called twice. From my understanding, the handling of cardinality violations in the server stream should be similar to the one in client stream. The check here should be for 0 requests being received for a non-client-streaming request.
Lines 1142 to 1144 in e5de1e2
if !cs.desc.ServerStreams { | |
return status.Error(codes.Internal, "cardinality violation: received no response message from non-streaming RPC") | |
} |
For > 1 requests, there should be a block near the end of this method similar to the following:
Lines 1167 to 1178 in e5de1e2
if cs.desc.ServerStreams { | |
// Subsequent messages should be received by subsequent RecvMsg calls. | |
return nil | |
} | |
// Special handling for non-server-stream rpcs. | |
// This recv expects EOF or errors, so we don't collect inPayload. | |
if err := recv(a.parser, cs.codec, a.transportStream, a.decompressorV0, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decompressorV1, false); err == io.EOF { | |
return a.transportStream.Status().Err() // non-server streaming Recv returns nil on success | |
} else if err != nil { | |
return toRPCErr(err) | |
} | |
return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any issue if we try to follow the same pattern as client streams here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In server-streaming, client can't send more than one message. So the following code will be useless for server.recvmsg().
Lines 1167 to 1178 in e5de1e2
if cs.desc.ServerStreams { | |
// Subsequent messages should be received by subsequent RecvMsg calls. | |
return nil | |
} | |
// Special handling for non-server-stream rpcs. | |
// This recv expects EOF or errors, so we don't collect inPayload. | |
if err := recv(a.parser, cs.codec, a.transportStream, a.decompressorV0, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decompressorV1, false); err == io.EOF { | |
return a.transportStream.Status().Err() // non-server streaming Recv returns nil on success | |
} else if err != nil { | |
return toRPCErr(err) | |
} | |
return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message") |
The check here should be for 0 requests being received for a non-client-streaming request.
Lines 1142 to 1144 in e5de1e2
if !cs.desc.ServerStreams { return status.Error(codes.Internal, "cardinality violation: received no response message from non-streaming RPC") }
Right, in client.recvMsg() this check is triggered in case of 0 response from server.
But in server-stream, it is not possible to have 0 request. So the following case will only be triggered when server.recvmsg() is called more than once and receives an io.EOF
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In server-streaming, client can't send more than one message.
What is stopping a client from doing this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clientstream maintains a variable named sentlast
which will be set to true
when client sends a msg in case of non client-streaming RPCs.
Lines 905 to 910 in e5de1e2
if cs.sentLast { | |
return status.Errorf(codes.Internal, "SendMsg called after CloseSend") | |
} | |
if !cs.desc.ClientStreams { | |
cs.sentLast = true | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed earlier, can we have the client (incorrectly) create a bidi stream by manipulating the strem desc?
Ideally, a well behaving client should never cause cardinality violations. The reason for adding these checks is to fail the RPCs with good error codes and messages when a client misbehaves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the logic in server.recvmsg() to call recv() twice in case of non-client-stream RPCs. Also added a test with misbehaving client.
// Tests the behavior for unary RPC when server call | ||
// RecvMsg twice. Second call to RecvMsg should fail with Internal | ||
// error. | ||
func (s) TestUnaryRPC_ServerCallRecvMsgTwice(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be combined with TestServerStreaming_ServerCallRecvMsgTwice
into a single table drive test? They seem to be exactly the same except for a couple of lines. Similarly for the ClientCallSendMsgTwice
tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bump on this.
} | ||
} | ||
|
||
func (s) TestServerStreaming_ClientBehaveAsBidiStreaming(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a better suffix is ClientSendsMultipleMessages
because we want to test the case when a client sends multiple messages. Making the client bi-di is just a way to bypass the client side checks that stop a non-streaming client from sending multiple messages.
Can you please add a doc comment describing the test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already have a test with suffix ClientCallSendMsgTwice()
to test what happen when a correctly configured client for server-side streaming attempts to call SendMsg() twice.
I'll add a comment to describe what this test is doing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ClientSendsMultipleMessages
is not the same as ClientCallSendMsgTwice
. The former implies that the client is actually sending two messages, whereas the latter indicates that the client is calling the SendMsg method twice.
} | ||
defer cc.Close() | ||
|
||
desc := &grpc.StreamDesc{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comment describing the need to use a bi-di streaming client for this test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see the comment here.
ss := grpc.UnknownServiceHandler(func(any, grpc.ServerStream) error { | ||
return nil | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use real implementations in tests as much as possible so that the entire RPC stack is used and there are no unintended interactions. By mocking the server, there is a chance that the client side check is removed and the test continues to pass because the fake server doesn't have the required functionality to handle the call.
} | ||
} | ||
|
||
func (s) TestServerStreaming_ClientBehaveAsBidiStreaming(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ClientSendsMultipleMessages
is not the same as ClientCallSendMsgTwice
. The former implies that the client is actually sending two messages, whereas the latter indicates that the client is calling the SendMsg method twice.
} | ||
defer cc.Close() | ||
|
||
desc := &grpc.StreamDesc{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see the comment here.
go s.Serve(lis) | ||
defer s.Stop() | ||
|
||
// s := grpc.NewServer(ss) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this commented code.
defer lis.Close() | ||
|
||
s := grpc.NewServer() | ||
serviceDesc := grpc.ServiceDesc{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we don't need to change the stream descriptors on the server side for this test. If so, please use a stubserver and override one of the handlers.
defer lis.Close() | ||
|
||
s := grpc.NewServer() | ||
serviceDesc := grpc.ServiceDesc{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to fake the stream descriptors in this test? If not, we should use the test service client and server.
// Tests the behavior for unary RPC when server call | ||
// RecvMsg twice. Second call to RecvMsg should fail with Internal | ||
// error. | ||
func (s) TestUnaryRPC_ServerCallRecvMsgTwice(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bump on this.
Partially addresses: #7286
In non-client streaming RPCs, the client's SendMsg() method is designed to automatically close the send operation after its initial call. If someone attempts to call Client.SendMsg() twice for non-client streaming RPCs, if will return with error
Internal desc = SendMsg called after CloseSend
.To mirror this behavior, the server-side logic has been updated so that calling RecvMsg() more than once for non-client streaming RPCs will now similarly return an
Internal
error.RELEASE NOTES: