Skip to content

Commit

Permalink
Add concept of a 'Backend', but still one to one proxying
Browse files Browse the repository at this point in the history
  • Loading branch information
smira committed Nov 19, 2019
1 parent 7cc4610 commit 2aad63a
Show file tree
Hide file tree
Showing 8 changed files with 314 additions and 86 deletions.
13 changes: 12 additions & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,18 @@ jobs:
go list -mod=readonly all >/dev/null
- name: Test
run: go test -v ./...
run: |
go test -v -coverprofile coverage.txt -covermode=atomic ./...
- name: Test Race
run: |
go test -v -race ./...
- name: Upload Coverage report to CodeCov
uses: codecov/codecov-action@v1.0.0
with:
token: ${{secrets.CODECOV_TOKEN}}
file: ./coverage.txt

lint:
name: Lint
Expand Down
21 changes: 0 additions & 21 deletions checkup.sh

This file was deleted.

20 changes: 3 additions & 17 deletions fixup.sh
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
#!/bin/bash
# Script that checks the code for errors.

SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd -P)"

function print_real_go_files {
grep --files-without-match 'DO NOT EDIT!' $(find . -iname '*.go')
}

function generate_markdown {
generate_markdown() {
echo "Generating markdown"
oldpwd=$(pwd)
for i in $(find . -iname 'doc.go'); do
Expand All @@ -17,15 +10,8 @@ function generate_markdown {
${GOPATH}/bin/godocdown -heading=Title -o DOC.md
ln -s DOC.md README.md 2> /dev/null # can fail
cd ${oldpwd}
done;
}

function goimports_all {
echo "Running goimports"
goimports -l -w $(print_real_go_files)
return $?
done
}

generate_markdown
goimports_all
echo "returning $?"
echo "returning $?"
111 changes: 108 additions & 3 deletions proxy/DOC.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# proxy
--
import "github.com/mwitkow/grpc-proxy/proxy"
import "."

Package proxy provides a reverse proxy handler for gRPC.

Expand All @@ -25,6 +25,8 @@ Codec returns a proxying grpc.Codec with the default protobuf codec as parent.

See CodecWithParent.

nolint: staticcheck

#### func CodecWithParent

```go
Expand All @@ -39,6 +41,8 @@ treats a gRPC message frame as raw bytes. However, if the server handler, or the
client caller are not proxy-internal functions it will fall back to trying to
decode the message using a fallback codec.

nolint: staticcheck

#### func RegisterService

```go
Expand All @@ -64,13 +68,114 @@ backends. It should be used as a `grpc.UnknownServiceHandler`.
This can *only* be used if the `server` also uses grpcproxy.CodecForServer()
ServerOption.

#### type Backend

```go
type Backend interface {
// String provides backend name for logging and errors.
String() string

// GetConnection returns a grpc connection to the backend.
//
// The context returned from this function should be the context for the *outgoing* (to backend) call. In case you want
// to forward any Metadata between the inbound request and outbound requests, you should do it manually. However, you
// *must* propagate the cancel function (`context.WithCancel`) of the inbound context to the one returned.
GetConnection(ctx context.Context) (context.Context, *grpc.ClientConn, error)

// AppendInfo is called to enhance response from the backend with additional data.
//
// Usecase might be appending backend endpoint (or name) to the protobuf serialized response, so that response is enhanced
// with source information. This is particularly important for one to many calls, when it is required to identify
// response from each of the backends participating in the proxying.
//
// If not additional proxying is required, simply returning the buffer without changes works fine.
AppendInfo(resp []byte) ([]byte, error)

// BuildError is called to convert error from upstream into response field.
//
// BuildError is never called for one to one proxying, in that case all the errors are returned back to the caller
// as grpc errors.
//
// When proxying one to many, if one the requests fails or upstream returns an error, it is undesirable to fail the whole
// request and discard responses from other backends. BuildError converts (marshals) error from backend into protobuf encoded
// response which is analyzed by the caller, so that caller reaching out to N upstreams receives N1 successful responses and
// N2 error responses so that N1 + N2 == N.
//
// If BuildError returns nil, error is returned as grpc error (failing whole request).
BuildError(err error) ([]byte, error)
}
```

Backend wraps information about upstream connection.

For simple one-to-one proxying, not much should be done in the Backend, simply
providing a connection is enough.

When proxying one-to-many and aggregating results, Backend might be used to
append additional fields to upstream response to support more complicated
proxying.

#### type SingleBackend

```go
type SingleBackend struct {
// GetConn returns a grpc connection to the backend.
//
// The context returned from this function should be the context for the *outgoing* (to backend) call. In case you want
// to forward any Metadata between the inbound request and outbound requests, you should do it manually. However, you
// *must* propagate the cancel function (`context.WithCancel`) of the inbound context to the one returned.
GetConn func(ctx context.Context) (context.Context, *grpc.ClientConn, error)
}
```

SingleBackend implements a simple wrapper around get connection function of one
to one proxying.

SingleBackend implements Backend interface and might be used as an easy wrapper
for one to one proxying.

#### func (*SingleBackend) AppendInfo

```go
func (sb *SingleBackend) AppendInfo(resp []byte) ([]byte, error)
```
AppendInfo is called to enhance response from the backend with additional data.

#### func (*SingleBackend) BuildError

```go
func (sb *SingleBackend) BuildError(err error) ([]byte, error)
```
BuildError is called to convert error from upstream into response field.

#### func (*SingleBackend) GetConnection

```go
func (sb *SingleBackend) GetConnection(ctx context.Context) (context.Context, *grpc.ClientConn, error)
```
GetConnection returns a grpc connection to the backend.

#### func (*SingleBackend) String

```go
func (sb *SingleBackend) String() string
```

#### type StreamDirector

```go
type StreamDirector func(ctx context.Context, fullMethodName string) (*grpc.ClientConn, error)
type StreamDirector func(ctx context.Context, fullMethodName string) ([]Backend, error)
```

StreamDirector returns a gRPC ClientConn to be used to forward the call to.
StreamDirector returns a list of Backend objects to forward the call to.

There are two proxying modes:

1. one to one: StreamDirector returns a single Backend object - proxying is done verbatim, Backend.AppendInfo might
be used to enhance response with source information (or it might be skipped).
2. one to many: StreamDirector returns more than one Backend object - for unary calls responses from Backend objects
are aggregated by concatenating protobuf responses (requires top-level `repeated` protobuf definition) and errors
are wrapped as responses via BuildError. Responses are potentially enhanced via AppendInfo.

The presence of the `Context` allows for rich filtering, e.g. based on Metadata
(headers). If no handling is meant to be done, a `codes.NotImplemented` gRPC
Expand Down
92 changes: 85 additions & 7 deletions proxy/director.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,102 @@
// Copyright 2017 Michal Witkowski. All Rights Reserved.
// Copyright 2019 Andrey Smirnov. All Rights Reserved.
// See LICENSE for licensing terms.

package proxy

import (
"golang.org/x/net/context"
"context"

"google.golang.org/grpc"
)

// StreamDirector returns a gRPC ClientConn to be used to forward the call to.
// Backend wraps information about upstream connection.
//
// For simple one-to-one proxying, not much should be done in the Backend, simply
// providing a connection is enough.
//
// When proxying one-to-many and aggregating results, Backend might be used to
// append additional fields to upstream response to support more complicated
// proxying.
type Backend interface {
// String provides backend name for logging and errors.
String() string

// GetConnection returns a grpc connection to the backend.
//
// The context returned from this function should be the context for the *outgoing* (to backend) call. In case you want
// to forward any Metadata between the inbound request and outbound requests, you should do it manually. However, you
// *must* propagate the cancel function (`context.WithCancel`) of the inbound context to the one returned.
GetConnection(ctx context.Context) (context.Context, *grpc.ClientConn, error)

// AppendInfo is called to enhance response from the backend with additional data.
//
// Usecase might be appending backend endpoint (or name) to the protobuf serialized response, so that response is enhanced
// with source information. This is particularly important for one to many calls, when it is required to identify
// response from each of the backends participating in the proxying.
//
// If not additional proxying is required, simply returning the buffer without changes works fine.
AppendInfo(resp []byte) ([]byte, error)

// BuildError is called to convert error from upstream into response field.
//
// BuildError is never called for one to one proxying, in that case all the errors are returned back to the caller
// as grpc errors.
//
// When proxying one to many, if one the requests fails or upstream returns an error, it is undesirable to fail the whole
// request and discard responses from other backends. BuildError converts (marshals) error from backend into protobuf encoded
// response which is analyzed by the caller, so that caller reaching out to N upstreams receives N1 successful responses and
// N2 error responses so that N1 + N2 == N.
//
// If BuildError returns nil, error is returned as grpc error (failing whole request).
BuildError(err error) ([]byte, error)
}

// SingleBackend implements a simple wrapper around get connection function of one to one proxying.
//
// SingleBackend implements Backend interface and might be used as an easy wrapper for one to one proxying.
type SingleBackend struct {
// GetConn returns a grpc connection to the backend.
//
// The context returned from this function should be the context for the *outgoing* (to backend) call. In case you want
// to forward any Metadata between the inbound request and outbound requests, you should do it manually. However, you
// *must* propagate the cancel function (`context.WithCancel`) of the inbound context to the one returned.
GetConn func(ctx context.Context) (context.Context, *grpc.ClientConn, error)
}

func (sb *SingleBackend) String() string {
return "backend"
}

// GetConnection returns a grpc connection to the backend.
func (sb *SingleBackend) GetConnection(ctx context.Context) (context.Context, *grpc.ClientConn, error) {
return sb.GetConn(ctx)
}

// AppendInfo is called to enhance response from the backend with additional data.
func (sb *SingleBackend) AppendInfo(resp []byte) ([]byte, error) {
return resp, nil
}

// BuildError is called to convert error from upstream into response field.
func (sb *SingleBackend) BuildError(err error) ([]byte, error) {
return nil, nil
}

// StreamDirector returns a list of Backend objects to forward the call to.
//
// There are two proxying modes:
// 1. one to one: StreamDirector returns a single Backend object - proxying is done verbatim, Backend.AppendInfo might
// be used to enhance response with source information (or it might be skipped).
// 2. one to many: StreamDirector returns more than one Backend object - for unary calls responses from Backend objects
// are aggregated by concatenating protobuf responses (requires top-level `repeated` protobuf definition) and errors
// are wrapped as responses via BuildError. Responses are potentially enhanced via AppendInfo.
//
// The presence of the `Context` allows for rich filtering, e.g. based on Metadata (headers).
// If no handling is meant to be done, a `codes.NotImplemented` gRPC error should be returned.
//
// The context returned from this function should be the context for the *outgoing* (to backend) call. In case you want
// to forward any Metadata between the inbound request and outbound requests, you should do it manually. However, you
// *must* propagate the cancel function (`context.WithCancel`) of the inbound context to the one returned.
//
// It is worth noting that the StreamDirector will be fired *after* all server-side stream interceptors
// are invoked. So decisions around authorization, monitoring etc. are better to be handled there.
//
// See the rather rich example.
type StreamDirector func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error)
type StreamDirector func(ctx context.Context, fullMethodName string) ([]Backend, error)
34 changes: 22 additions & 12 deletions proxy/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
package proxy_test

import (
"context"
"strings"

"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -37,26 +37,36 @@ func ExampleTransparentHandler() {
// Provide sa simple example of a director that shields internal services and dials a staging or production backend.
// This is a *very naive* implementation that creates a new connection on every request. Consider using pooling.
func ExampleStreamDirector() {
director = func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
simpleBackendGen := func(hostname string) proxy.Backend {
return &proxy.SingleBackend{
GetConn: func(ctx context.Context) (context.Context, *grpc.ClientConn, error) {
md, _ := metadata.FromIncomingContext(ctx)

// Copy the inbound metadata explicitly.
outCtx := metadata.NewOutgoingContext(ctx, md.Copy())
// Make sure we use DialContext so the dialing can be cancelled/time out together with the context.
conn, err := grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec())) // nolint: staticcheck

return outCtx, conn, err
},
}
}

director = func(ctx context.Context, fullMethodName string) ([]proxy.Backend, error) {
// Make sure we never forward internal services.
if strings.HasPrefix(fullMethodName, "/com.example.internal.") {
return nil, nil, status.Errorf(codes.Unimplemented, "Unknown method")
return nil, status.Errorf(codes.Unimplemented, "Unknown method")
}
md, ok := metadata.FromIncomingContext(ctx)
// Copy the inbound metadata explicitly.
outCtx, _ := context.WithCancel(ctx)
outCtx = metadata.NewOutgoingContext(outCtx, md.Copy())

if ok {
// Decide on which backend to dial
if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" {
// Make sure we use DialContext so the dialing can be cancelled/time out together with the context.
conn, err := grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec())) // nolint: staticcheck
return outCtx, conn, err
return []proxy.Backend{simpleBackendGen("api-service.staging.svc.local")}, nil
} else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" {
conn, err := grpc.DialContext(ctx, "api-service.prod.svc.local", grpc.WithCodec(proxy.Codec())) // nolint: staticcheck
return outCtx, conn, err
return []proxy.Backend{simpleBackendGen("api-service.prod.svc.local")}, nil
}
}
return nil, nil, status.Errorf(codes.Unimplemented, "Unknown method")
return nil, status.Errorf(codes.Unimplemented, "Unknown method")
}
}

0 comments on commit 2aad63a

Please sign in to comment.