Skip to content

Commit

Permalink
feat: allow different formats for messages streaming/unary
Browse files Browse the repository at this point in the history
Streaming response are never aggregated, so wrapping messages into
`repeated` container looks like overhead as there will always be just a
single entry in `repeated` container.

Pass `streaming` flag down to `Backend` response building methods to
support this flow.

Test service was adjusted with the new proto layout.

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
  • Loading branch information
smira committed Nov 29, 2019
1 parent 6c9f7b3 commit 5c579a7
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 145 deletions.
23 changes: 8 additions & 15 deletions proxy/DOC.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,25 +84,27 @@ type Backend interface {

// AppendInfo is called to enhance response from the backend with additional data.
//
// Parameter streaming indicates if response is delivered in streaming mode or not.
//
// Usecase might be appending backend endpoint (or name) to the protobuf serialized response, so that response is enhanced
// with source information. This is particularly important for one to many calls, when it is required to identify
// response from each of the backends participating in the proxying.
//
// If not additional proxying is required, simply returning the buffer without changes works fine.
AppendInfo(resp []byte) ([]byte, error)
AppendInfo(streaming bool, resp []byte) ([]byte, error)

// BuildError is called to convert error from upstream into response field.
//
// BuildError is never called for one to one proxying, in that case all the errors are returned back to the caller
// as grpc errors.
// as grpc errors. Parameter streaming indicates if response is delivered in streaming mode or not.
//
// When proxying one to many, if one the requests fails or upstream returns an error, it is undesirable to fail the whole
// request and discard responses from other backends. BuildError converts (marshals) error from backend into protobuf encoded
// response which is analyzed by the caller, so that caller reaching out to N upstreams receives N1 successful responses and
// N2 error responses so that N1 + N2 == N.
//
// If BuildError returns nil, error is returned as grpc error (failing whole request).
BuildError(err error) ([]byte, error)
BuildError(streaming bool, err error) ([]byte, error)
}
```

Expand Down Expand Up @@ -148,15 +150,6 @@ func WithMethodNames(methodNames ...string) Option
WithMethodNames configures list of method names to proxy for non-transparent
handler.

#### func WithMode

```go
func WithMode(mode Mode) Option
```
WithMode sets proxying mode: One2One or One2Many.

Default mode is One2One.

#### func WithStreamedDetector

```go
Expand Down Expand Up @@ -258,14 +251,14 @@ for one to one proxying.
#### func (*SingleBackend) AppendInfo

```go
func (sb *SingleBackend) AppendInfo(resp []byte) ([]byte, error)
func (sb *SingleBackend) AppendInfo(streaming bool, resp []byte) ([]byte, error)
```
AppendInfo is called to enhance response from the backend with additional data.

#### func (*SingleBackend) BuildError

```go
func (sb *SingleBackend) BuildError(err error) ([]byte, error)
func (sb *SingleBackend) BuildError(streaming bool, err error) ([]byte, error)
```
BuildError is called to convert error from upstream into response field.

Expand All @@ -285,7 +278,7 @@ func (sb *SingleBackend) String() string
#### type StreamDirector

```go
type StreamDirector func(ctx context.Context, fullMethodName string) ([]Backend, error)
type StreamDirector func(ctx context.Context, fullMethodName string) (Mode, []Backend, error)
```

StreamDirector returns a list of Backend objects to forward the call to.
Expand Down
12 changes: 7 additions & 5 deletions proxy/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,27 @@ type Backend interface {

// AppendInfo is called to enhance response from the backend with additional data.
//
// Parameter streaming indicates if response is delivered in streaming mode or not.
//
// Usecase might be appending backend endpoint (or name) to the protobuf serialized response, so that response is enhanced
// with source information. This is particularly important for one to many calls, when it is required to identify
// response from each of the backends participating in the proxying.
//
// If not additional proxying is required, simply returning the buffer without changes works fine.
AppendInfo(resp []byte) ([]byte, error)
AppendInfo(streaming bool, resp []byte) ([]byte, error)

// BuildError is called to convert error from upstream into response field.
//
// BuildError is never called for one to one proxying, in that case all the errors are returned back to the caller
// as grpc errors.
// as grpc errors. Parameter streaming indicates if response is delivered in streaming mode or not.
//
// When proxying one to many, if one the requests fails or upstream returns an error, it is undesirable to fail the whole
// request and discard responses from other backends. BuildError converts (marshals) error from backend into protobuf encoded
// response which is analyzed by the caller, so that caller reaching out to N upstreams receives N1 successful responses and
// N2 error responses so that N1 + N2 == N.
//
// If BuildError returns nil, error is returned as grpc error (failing whole request).
BuildError(err error) ([]byte, error)
BuildError(streaming bool, err error) ([]byte, error)
}

// SingleBackend implements a simple wrapper around get connection function of one to one proxying.
Expand All @@ -74,12 +76,12 @@ func (sb *SingleBackend) GetConnection(ctx context.Context) (context.Context, *g
}

// AppendInfo is called to enhance response from the backend with additional data.
func (sb *SingleBackend) AppendInfo(resp []byte) ([]byte, error) {
func (sb *SingleBackend) AppendInfo(streaming bool, resp []byte) ([]byte, error) {
return resp, nil
}

// BuildError is called to convert error from upstream into response field.
func (sb *SingleBackend) BuildError(err error) ([]byte, error) {
func (sb *SingleBackend) BuildError(streaming bool, err error) ([]byte, error) {
return nil, nil
}

Expand Down
16 changes: 8 additions & 8 deletions proxy/handler_one2many.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func (s *handler) handlerOne2Many(fullMethodName string, serverStream grpc.Serve
}

// formatError tries to format error from upstream as message to the client
func (s *handler) formatError(src *backendConnection, backendErr error) ([]byte, error) {
payload, err := src.backend.BuildError(backendErr)
func (s *handler) formatError(streaming bool, src *backendConnection, backendErr error) ([]byte, error) {
payload, err := src.backend.BuildError(streaming, backendErr)
if err != nil {
return nil, fmt.Errorf("error building error for %s: %w", src.backend, err)
}
Expand All @@ -76,7 +76,7 @@ func (s *handler) formatError(src *backendConnection, backendErr error) ([]byte,
// if sendError fails to deliver the error, error is returned
// if sendError successfully delivers the error, nil is returned
func (s *handler) sendError(src *backendConnection, dst grpc.ServerStream, backendErr error) error {
payload, err := s.formatError(src, backendErr)
payload, err := s.formatError(true, src, backendErr)
if err != nil {
return err
}
Expand All @@ -100,7 +100,7 @@ func (s *handler) forwardClientsToServerMultiUnary(sources []backendConnection,
go func(src *backendConnection) {
errCh <- func() error {
if src.connError != nil {
payload, err := s.formatError(src, src.connError)
payload, err := s.formatError(false, src, src.connError)
if err != nil {
return err
}
Expand All @@ -120,7 +120,7 @@ func (s *handler) forwardClientsToServerMultiUnary(sources []backendConnection,
return nil
}

payload, err := s.formatError(src, err)
payload, err := s.formatError(false, src, err)
if err != nil {
return err
}
Expand All @@ -134,7 +134,7 @@ func (s *handler) forwardClientsToServerMultiUnary(sources []backendConnection,
// This is the only place to do it nicely.
md, err := src.clientStream.Header()
if err != nil {
payload, err := s.formatError(src, err)
payload, err := s.formatError(false, src, err)
if err != nil {
return err
}
Expand All @@ -149,7 +149,7 @@ func (s *handler) forwardClientsToServerMultiUnary(sources []backendConnection,
}

var err error
f.payload, err = src.backend.AppendInfo(f.payload)
f.payload, err = src.backend.AppendInfo(false, f.payload)
if err != nil {
return fmt.Errorf("error appending info for %s: %w", src.backend, err)
}
Expand Down Expand Up @@ -224,7 +224,7 @@ func (s *handler) forwardClientsToServerMultiStreaming(sources []backendConnecti
}

var err error
f.payload, err = src.backend.AppendInfo(f.payload)
f.payload, err = src.backend.AppendInfo(true, f.payload)
if err != nil {
return fmt.Errorf("error appending info for %s: %w", src.backend, err)
}
Expand Down
85 changes: 42 additions & 43 deletions proxy/handler_one2many_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,10 @@ func (s *assertingMultiService) PingList(ping *pb.PingRequest, stream pb.MultiSe
// Send user trailers and headers.
stream.SendHeader(metadata.Pairs(serverHeaderMdKey, "I like turtles.")) //nolint: errcheck
for i := 0; i < countListResponses; i++ {
stream.Send(&pb.MultiPingReply{ //nolint: errcheck
Response: []*pb.MultiPingResponse{
{
Value: ping.Value,
Counter: int32(i),
Server: s.server,
},
},
stream.Send(&pb.MultiPingResponse{ //nolint: errcheck
Value: ping.Value,
Counter: int32(i),
Server: s.server,
})
}
stream.SetTrailer(metadata.Pairs(serverTrailerMdKey, "I like ending turtles.")) //nolint: errcheck
Expand All @@ -107,14 +103,10 @@ func (s *assertingMultiService) PingStream(stream pb.MultiService_PingStreamServ
require.NoError(s.t, err, "can't fail reading stream")
return err
}
pong := &pb.MultiPingReply{
Response: []*pb.MultiPingResponse{
{
Value: ping.Value,
Counter: counter,
Server: s.server,
},
},
pong := &pb.MultiPingResponse{
Value: ping.Value,
Counter: counter,
Server: s.server,
}
if err := stream.Send(pong); err != nil {
require.NoError(s.t, err, "can't fail sending back a pong")
Expand Down Expand Up @@ -162,7 +154,17 @@ func (b *assertingBackend) GetConnection(ctx context.Context) (context.Context,
return outCtx, b.conn, err
}

func (b *assertingBackend) AppendInfo(resp []byte) ([]byte, error) {
func (b *assertingBackend) AppendInfo(streaming bool, resp []byte) ([]byte, error) {
payload, err := proto.Marshal(&pb.ResponseMetadataPrepender{
Metadata: &pb.ResponseMetadata{
Hostname: fmt.Sprintf("server%d", b.i),
},
})

if streaming {
return append(resp, payload...), err
}

// decode protobuf embedded header
typ, n1 := proto.DecodeVarint(resp)
_, n2 := proto.DecodeVarint(resp[n1:]) // length
Expand All @@ -171,12 +173,6 @@ func (b *assertingBackend) AppendInfo(resp []byte) ([]byte, error) {
return nil, fmt.Errorf("unexpected message format: %d", typ)
}

payload, err := proto.Marshal(&pb.ResponseMetadataPrepender{
Metadata: &pb.ResponseMetadata{
Hostname: fmt.Sprintf("server%d", b.i),
},
})

// cut off embedded message header
resp = resp[n1+n2:]
// build new embedded message header
Expand All @@ -186,8 +182,8 @@ func (b *assertingBackend) AppendInfo(resp []byte) ([]byte, error) {
return append(resp, payload...), err
}

func (b *assertingBackend) BuildError(err error) ([]byte, error) {
return proto.Marshal(&pb.EmptyReply{
func (b *assertingBackend) BuildError(streaming bool, err error) ([]byte, error) {
resp := &pb.EmptyReply{
Response: []*pb.EmptyResponse{
{
Metadata: &pb.ResponseMetadata{
Expand All @@ -196,7 +192,13 @@ func (b *assertingBackend) BuildError(err error) ([]byte, error) {
},
},
},
})
}

if streaming {
return proto.Marshal(resp.Response[0])
}

return proto.Marshal(resp)
}

type ProxyOne2ManySuite struct {
Expand Down Expand Up @@ -350,8 +352,7 @@ func (s *ProxyOne2ManySuite) TestPingStreamErrorPropagatesAppError() {
resp, err := stream.Recv()
s.Require().NoError(err)

s.Assert().Len(resp.Response, 1)
s.Assert().Equal("rpc error: code = FailedPrecondition desc = Userspace error.", resp.Response[0].Metadata.UpstreamError)
s.Assert().Equal("rpc error: code = FailedPrecondition desc = Userspace error.", resp.Metadata.UpstreamError)
}

require.NoError(s.T(), stream.CloseSend(), "no error on close send")
Expand All @@ -373,8 +374,7 @@ func (s *ProxyOne2ManySuite) TestPingStreamConnError() {
resp, err := stream.Recv()
s.Require().NoError(err)

s.Assert().Len(resp.Response, 1)
s.Assert().Equal("rpc error: code = Unavailable desc = backend connection failed", resp.Response[0].Metadata.UpstreamError)
s.Assert().Equal("rpc error: code = Unavailable desc = backend connection failed", resp.Metadata.UpstreamError)

_, err = stream.Recv()
require.Equal(s.T(), io.EOF, err, "stream should close with io.EOF, meaning OK")
Expand Down Expand Up @@ -407,11 +407,10 @@ func (s *ProxyOne2ManySuite) TestPingStream_FullDuplexWorks() {
resp, err := stream.Recv()
s.Require().NoError(err)

s.Assert().Len(resp.Response, 1)
s.Assert().EqualValues(i, resp.Response[0].Counter, "ping roundtrip must succeed with the correct id")
s.Assert().EqualValues(resp.Response[0].Metadata.Hostname, resp.Response[0].Server)
s.Assert().EqualValues(i, resp.Counter, "ping roundtrip must succeed with the correct id")
s.Assert().EqualValues(resp.Metadata.Hostname, resp.Server)

delete(expectedUpstreams, resp.Response[0].Metadata.Hostname)
delete(expectedUpstreams, resp.Metadata.Hostname)
}

s.Require().Empty(expectedUpstreams)
Expand Down Expand Up @@ -464,24 +463,24 @@ func (s *ProxyOne2ManySuite) TestPingStream_FullDuplexConcurrent() {
return err
}

if len(resp.Response) != 1 {
return fmt.Errorf("single response expected: %d", len(resp.Response))
if resp.Metadata == nil {
return fmt.Errorf("response metadata expected: %v", resp)
}

if resp.Response[0].Metadata.Hostname != resp.Response[0].Server {
return fmt.Errorf("mismatch on host metadata: %v != %v", resp.Response[0].Metadata.Hostname, resp.Response[0].Server)
if resp.Metadata.Hostname != resp.Server {
return fmt.Errorf("mismatch on host metadata: %v != %v", resp.Metadata.Hostname, resp.Server)
}

expectedCounter, ok := expectedUpstreams[resp.Response[0].Server]
expectedCounter, ok := expectedUpstreams[resp.Server]
if !ok {
return fmt.Errorf("unexpected host: %v", resp.Response[0].Server)
return fmt.Errorf("unexpected host: %v", resp.Server)
}

if expectedCounter != resp.Response[0].Counter {
return fmt.Errorf("unexpected counter value: %d != %d", expectedCounter, resp.Response[0].Counter)
if expectedCounter != resp.Counter {
return fmt.Errorf("unexpected counter value: %d != %d", expectedCounter, resp.Counter)
}

expectedUpstreams[resp.Response[0].Server]++
expectedUpstreams[resp.Server]++
}

return nil
Expand Down
7 changes: 0 additions & 7 deletions proxy/handler_one2one.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,6 @@ func (s *handler) forwardClientToServer(src *backendConnection, dst grpc.ServerS
break
}

var err error
f.payload, err = src.backend.AppendInfo(f.payload)
if err != nil {
ret <- err
break
}

if i == 0 {
// This is a bit of a hack, but client to server headers are only readable after first client msg is
// received but must be written to server stream before the first msg is flushed.
Expand Down

0 comments on commit 5c579a7

Please sign in to comment.