Skip to content

grpc: Fix cardinality violations in non-client streaming RPCs. #8385

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: master
Choose a base branch
from

Conversation

Pranjali-2501
Copy link
Contributor

@Pranjali-2501 Pranjali-2501 commented Jun 6, 2025

Partially addresses: #7286

In non-client streaming RPCs, the client's SendMsg() method is designed to automatically close the send operation after its initial call. If someone attempts to call Client.SendMsg() twice for non-client streaming RPCs, if will return with error Internal desc = SendMsg called after CloseSend.
To mirror this behavior, the server-side logic has been updated so that calling RecvMsg() more than once for non-client streaming RPCs will now similarly return an Internal error.

RELEASE NOTES:

  • grpc: return status code INTERNAL when client send more than one request in unary and server streaming RPC.

@Pranjali-2501 Pranjali-2501 added this to the 1.74 Release milestone Jun 6, 2025
Copy link

codecov bot commented Jun 6, 2025

Codecov Report

Attention: Patch coverage is 78.57143% with 3 lines in your changes missing coverage. Please review.

Project coverage is 82.43%. Comparing base (20bd1e7) to head (dff08b3).
Report is 6 commits behind head on master.

Files with missing lines Patch % Lines
stream.go 76.92% 2 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #8385      +/-   ##
==========================================
- Coverage   82.44%   82.43%   -0.01%     
==========================================
  Files         413      414       +1     
  Lines       40424    40437      +13     
==========================================
+ Hits        33328    33336       +8     
- Misses       5742     5743       +1     
- Partials     1354     1358       +4     
Files with missing lines Coverage Δ
server.go 82.08% <100.00%> (+0.01%) ⬆️
stream.go 82.09% <76.92%> (+0.43%) ⬆️

... and 22 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@dfawley dfawley removed their assignment Jun 6, 2025
@dfawley dfawley removed their request for review June 6, 2025 21:16
Copy link
Member

@dfawley dfawley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to implement all of this in server.go to avoid adding state to the serverStream?

@Pranjali-2501 Pranjali-2501 changed the title grpc: Fix cardinality violations in server streaming RPC. grpc: Fix cardinality violations in non-client streaming RPCs. Jun 11, 2025
@Pranjali-2501
Copy link
Contributor Author

Is it possible to implement all of this in server.go to avoid adding state to the serverStream?

As discussed this with @arjan-bal offline, Cardinality violations can only be detected when messages are being read from the stream. This reading process occurs specifically within the server.RecvMsg() function. Since RecvMsg() is invoked from the user-implemented handler, it's not possible to detect cardinality violations during the initial stream setup phase.

@@ -1774,6 +1775,9 @@ func (ss *serverStream) RecvMsg(m any) (err error) {
binlog.Log(ss.ctx, chc)
}
}
if !ss.desc.ClientStreams {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not immediately clear to me how this conditions indicates RecvMsg is called twice. From my understanding, the handling of cardinality violations in the server stream should be similar to the one in client stream. The check here should be for 0 requests being received for a non-client-streaming request.

grpc-go/stream.go

Lines 1142 to 1144 in e5de1e2

if !cs.desc.ServerStreams {
return status.Error(codes.Internal, "cardinality violation: received no response message from non-streaming RPC")
}

For > 1 requests, there should be a block near the end of this method similar to the following:

grpc-go/stream.go

Lines 1167 to 1178 in e5de1e2

if cs.desc.ServerStreams {
// Subsequent messages should be received by subsequent RecvMsg calls.
return nil
}
// Special handling for non-server-stream rpcs.
// This recv expects EOF or errors, so we don't collect inPayload.
if err := recv(a.parser, cs.codec, a.transportStream, a.decompressorV0, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decompressorV1, false); err == io.EOF {
return a.transportStream.Status().Err() // non-server streaming Recv returns nil on success
} else if err != nil {
return toRPCErr(err)
}
return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any issue if we try to follow the same pattern as client streams here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In server-streaming, client can't send more than one message. So the following code will be useless for server.recvmsg().

grpc-go/stream.go

Lines 1167 to 1178 in e5de1e2

if cs.desc.ServerStreams {
// Subsequent messages should be received by subsequent RecvMsg calls.
return nil
}
// Special handling for non-server-stream rpcs.
// This recv expects EOF or errors, so we don't collect inPayload.
if err := recv(a.parser, cs.codec, a.transportStream, a.decompressorV0, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decompressorV1, false); err == io.EOF {
return a.transportStream.Status().Err() // non-server streaming Recv returns nil on success
} else if err != nil {
return toRPCErr(err)
}
return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")

The check here should be for 0 requests being received for a non-client-streaming request.

grpc-go/stream.go

Lines 1142 to 1144 in e5de1e2

if !cs.desc.ServerStreams {
return status.Error(codes.Internal, "cardinality violation: received no response message from non-streaming RPC")
}

Right, in client.recvMsg() this check is triggered in case of 0 response from server.
But in server-stream, it is not possible to have 0 request. So the following case will only be triggered when server.recvmsg() is called more than once and receives an io.EOF.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In server-streaming, client can't send more than one message.

What is stopping a client from doing this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clientstream maintains a variable named sentlast which will be set to true when client sends a msg in case of non client-streaming RPCs.

grpc-go/stream.go

Lines 905 to 910 in e5de1e2

if cs.sentLast {
return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
}
if !cs.desc.ClientStreams {
cs.sentLast = true
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed earlier, can we have the client (incorrectly) create a bidi stream by manipulating the strem desc?

Ideally, a well behaving client should never cause cardinality violations. The reason for adding these checks is to fail the RPCs with good error codes and messages when a client misbehaves.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the logic in server.recvmsg() to call recv() twice in case of non-client-stream RPCs. Also added a test with misbehaving client.

// Tests the behavior for unary RPC when server call
// RecvMsg twice. Second call to RecvMsg should fail with Internal
// error.
func (s) TestUnaryRPC_ServerCallRecvMsgTwice(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be combined with TestServerStreaming_ServerCallRecvMsgTwice into a single table drive test? They seem to be exactly the same except for a couple of lines. Similarly for the ClientCallSendMsgTwice tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump on this.

}
}

func (s) TestServerStreaming_ClientBehaveAsBidiStreaming(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a better suffix is ClientSendsMultipleMessages because we want to test the case when a client sends multiple messages. Making the client bi-di is just a way to bypass the client side checks that stop a non-streaming client from sending multiple messages.

Can you please add a doc comment describing the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already have a test with suffix ClientCallSendMsgTwice() to test what happen when a correctly configured client for server-side streaming attempts to call SendMsg() twice.

I'll add a comment to describe what this test is doing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ClientSendsMultipleMessages is not the same as ClientCallSendMsgTwice. The former implies that the client is actually sending two messages, whereas the latter indicates that the client is calling the SendMsg method twice.

}
defer cc.Close()

desc := &grpc.StreamDesc{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment describing the need to use a bi-di streaming client for this test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the comment here.

@Pranjali-2501 Pranjali-2501 requested review from arjan-bal and removed request for easwars July 1, 2025 07:43
Comment on lines +3933 to +3935
ss := grpc.UnknownServiceHandler(func(any, grpc.ServerStream) error {
return nil
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use real implementations in tests as much as possible so that the entire RPC stack is used and there are no unintended interactions. By mocking the server, there is a chance that the client side check is removed and the test continues to pass because the fake server doesn't have the required functionality to handle the call.

}
}

func (s) TestServerStreaming_ClientBehaveAsBidiStreaming(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ClientSendsMultipleMessages is not the same as ClientCallSendMsgTwice. The former implies that the client is actually sending two messages, whereas the latter indicates that the client is calling the SendMsg method twice.

}
defer cc.Close()

desc := &grpc.StreamDesc{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the comment here.

go s.Serve(lis)
defer s.Stop()

// s := grpc.NewServer(ss)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this commented code.

defer lis.Close()

s := grpc.NewServer()
serviceDesc := grpc.ServiceDesc{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we don't need to change the stream descriptors on the server side for this test. If so, please use a stubserver and override one of the handlers.

defer lis.Close()

s := grpc.NewServer()
serviceDesc := grpc.ServiceDesc{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to fake the stream descriptors in this test? If not, we should use the test service client and server.

// Tests the behavior for unary RPC when server call
// RecvMsg twice. Second call to RecvMsg should fail with Internal
// error.
func (s) TestUnaryRPC_ServerCallRecvMsgTwice(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump on this.

@arjan-bal arjan-bal assigned Pranjali-2501 and unassigned arjan-bal Jul 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants