Skip to content

Commit

Permalink
Merge pull request #7 from smira/one2many-4
Browse files Browse the repository at this point in the history
More tests, small code fixes, updated README.
  • Loading branch information
smira committed Nov 25, 2019
2 parents d9ce0b1 + fc0d27d commit dbf07a4
Show file tree
Hide file tree
Showing 6 changed files with 370 additions and 54 deletions.
52 changes: 45 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

[gRPC Go](https://github.com/grpc/grpc-go) Proxy server

This is a fork of awesome [mwitkow/grpc-proxy](https://github.com/mwitkow/grpc-proxy) with support
for one to many proxying added.

## Project Goal

Build a transparent reverse proxy for gRPC targets that will make it easy to expose gRPC services
Expand All @@ -14,29 +17,52 @@ over the internet. This includes:
* easy, declarative definition of backends and their mappings to frontends
* simple round-robin load balancing of inbound requests from a single connection to multiple backends

The project now exists as a **proof of concept**, with the key piece being the `proxy` package that
is a generic gRPC reverse proxy handler.
## Proxying Modes

There are two proxying modes supported:

* one to one: in this mode data passed back and forth is transmitted as is without any modifications;
* one to many: one client connection is mapped into multiple upstream connections, results might be aggregated
(for unary calls), errors translated into response messages; this mode requires special layout of protobuf messages.

## Proxy Handler

The package [`proxy`](proxy/) contains a generic gRPC reverse proxy handler that allows a gRPC server to
not know about registered handlers or their data types. Please consult the docs, here's an exaple usage.
not know about registered handlers or their data types. Please consult the docs, here's an example usage.

First, define `Backend` implementation to identify specific upstream. For one to one proxying, `SingleBackend`
might be used:

```go
backend := &proxy.SingleBackend{
GetConn: func(ctx context.Context) (context.Context, *grpc.ClientConn, error) {
md, _ := metadata.FromIncomingContext(ctx)

// Copy the inbound metadata explicitly.
outCtx := metadata.NewOutgoingContext(ctx, md.Copy())
// Make sure we use DialContext so the dialing can be cancelled/time out together with the context.
conn, err := grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec())) // nolint: staticcheck

return outCtx, conn, err
},
}
```

Defining a `StreamDirector` that decides where (if at all) to send the request
```go
director = func(ctx context.Context, fullMethodName string) (*grpc.ClientConn, error) {
director = func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
// Make sure we never forward internal services.
if strings.HasPrefix(fullMethodName, "/com.example.internal.") {
return nil, grpc.Errorf(codes.Unimplemented, "Unknown method")
return nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method")
}
md, ok := metadata.FromContext(ctx)
if ok {
// Decide on which backend to dial
if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" {
// Make sure we use DialContext so the dialing can be cancelled/time out together with the context.
return grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec()))
return ctx, backend1, nil
} else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" {
return grpc.DialContext(ctx, "api-service.prod.svc.local", grpc.WithCodec(proxy.Codec()))
return ctx, backend2, nil
}
}
return nil, grpc.Errorf(codes.Unimplemented, "Unknown method")
Expand All @@ -52,6 +78,18 @@ server := grpc.NewServer(
pb_test.RegisterTestServiceServer(server, &testImpl{})
```

## One to Many Proxying

In one to many proxying mode, it's critical to identify source of each message proxied back from the upstreams.
Also upstream error shouldn't fail whole request and instead return errors as messages back. In order to achieve
this goal, protobuf response message should follow the same structure:

1. Every response should be `repeated` list of response messages, so that responses from multiple upstreams might be
concatenated to build combined response from all the upstreams.

2. Response should contain common metadata fields which allow grpc-proxy to inject source information and error information
into response.

## License

`grpc-proxy` is released under the Apache 2.0 license. See [LICENSE.txt](LICENSE.txt).
2 changes: 1 addition & 1 deletion proxy/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func ExampleStreamDirector() {
// Copy the inbound metadata explicitly.
outCtx := metadata.NewOutgoingContext(ctx, md.Copy())
// Make sure we use DialContext so the dialing can be cancelled/time out together with the context.
conn, err := grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec())) // nolint: staticcheck
conn, err := grpc.DialContext(ctx, hostname, grpc.WithCodec(proxy.Codec())) // nolint: staticcheck

return outCtx, conn, err
},
Expand Down
28 changes: 17 additions & 11 deletions proxy/handler_multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,15 @@ func (s *handler) forwardClientsToServerMultiStreaming(sources []backendConnecti
dst.SetTrailer(src.clientStream.Trailer())
return nil
}
return s.sendError(src, dst, fmt.Errorf("error reading from client stream %s: %w", src.backend, err))
return s.sendError(src, dst, err)
}
if j == 0 {
// This is a bit of a hack, but client to server headers are only readable after first client msg is
// received but must be written to server stream before the first msg is flushed.
// This is the only place to do it nicely.
md, err := src.clientStream.Header()
if err != nil {
return s.sendError(src, dst, fmt.Errorf("error getting headers from client stream %s: %w", src.backend, err))
return s.sendError(src, dst, err)
}
if err := dst.SetHeader(md); err != nil {
return fmt.Errorf("error setting headers from client %s: %w", src.backend, err)
Expand Down Expand Up @@ -261,17 +261,23 @@ func (s *handler) forwardServerToClientsMulti(src grpc.ServerStream, destination
return
}

liveDestinations := 0
errCh := make(chan error)

for i := range destinations {
if destinations[i].clientStream == nil || destinations[i].connError != nil {
continue
}
go func(dst *backendConnection) {
errCh <- func() error {
if dst.clientStream == nil || dst.connError != nil {
return nil // skip it
}

if err := destinations[i].clientStream.SendMsg(f); err != nil {
// TODO: race with reading connError (?)
// skip this or keep using?
destinations[i].connError = err
} else {
return dst.clientStream.SendMsg(f)
}()
}(&destinations[i])
}

liveDestinations := 0
for range destinations {
if err := <-errCh; err == nil {
liveDestinations++
}
}
Expand Down

0 comments on commit dbf07a4

Please sign in to comment.