Skip to content

Commit

Permalink
fix: allow mode to be set for each request being proxied
Browse files Browse the repository at this point in the history
This is critical for us to allow single proxy instance to proxy across
the nodes and down to filesocket listeners.

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
  • Loading branch information
smira committed Nov 27, 2019
1 parent cc91c09 commit 6c9f7b3
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 29 deletions.
2 changes: 1 addition & 1 deletion proxy/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,4 @@ func (sb *SingleBackend) BuildError(err error) ([]byte, error) {
// are invoked. So decisions around authorization, monitoring etc. are better to be handled there.
//
// See the rather rich example.
type StreamDirector func(ctx context.Context, fullMethodName string) ([]Backend, error)
type StreamDirector func(ctx context.Context, fullMethodName string) (Mode, []Backend, error)
11 changes: 5 additions & 6 deletions proxy/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ func ExampleRegisterService() {
// Register a TestService with 4 of its methods explicitly.
proxy.RegisterService(server, director,
"talos.testproto.TestService",
proxy.WithMode(proxy.One2Many),
proxy.WithMethodNames("PingEmpty", "Ping", "PingError", "PingList"),
proxy.WithStreamedMethodNames("PingList"),
)
Expand Down Expand Up @@ -55,21 +54,21 @@ func ExampleStreamDirector() {
}
}

director = func(ctx context.Context, fullMethodName string) ([]proxy.Backend, error) {
director = func(ctx context.Context, fullMethodName string) (proxy.Mode, []proxy.Backend, error) {
// Make sure we never forward internal services.
if strings.HasPrefix(fullMethodName, "/com.example.internal.") {
return nil, status.Errorf(codes.Unimplemented, "Unknown method")
return proxy.One2One, nil, status.Errorf(codes.Unimplemented, "Unknown method")
}
md, ok := metadata.FromIncomingContext(ctx)

if ok {
// Decide on which backend to dial
if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" {
return []proxy.Backend{simpleBackendGen("api-service.staging.svc.local")}, nil
return proxy.One2One, []proxy.Backend{simpleBackendGen("api-service.staging.svc.local")}, nil
} else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" {
return []proxy.Backend{simpleBackendGen("api-service.prod.svc.local")}, nil
return proxy.One2One, []proxy.Backend{simpleBackendGen("api-service.prod.svc.local")}, nil
}
}
return nil, status.Errorf(codes.Unimplemented, "Unknown method")
return proxy.One2One, nil, status.Errorf(codes.Unimplemented, "Unknown method")
}
}
5 changes: 2 additions & 3 deletions proxy/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ var (
)

type handlerOptions struct {
mode Mode
serviceName string
methodNames []string
streamedMethods map[string]struct{}
Expand Down Expand Up @@ -51,7 +50,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
return status.Errorf(codes.Internal, "lowLevelServerStream not exists in context")
}

backends, err := s.director(serverStream.Context(), fullMethodName)
mode, backends, err := s.director(serverStream.Context(), fullMethodName)
if err != nil {
return err
}
Expand Down Expand Up @@ -80,7 +79,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
}
}

switch s.options.mode {
switch mode {
case One2One:
if len(backendConnections) != 1 {
return status.Errorf(codes.Internal, "one2one proxying can't should have exactly one connection (got %d)", len(backendConnections))
Expand Down
12 changes: 6 additions & 6 deletions proxy/handler_one2many_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ func (s *ProxyOne2ManySuite) TestPingEmptyTargets() {
{"1", "2"},
{"3", "2", "1"},
{"0", "4"},
{"3"},
} {
md := metadata.Pairs(clientMdKey, "true")
md.Set("targets", targets...)
Expand Down Expand Up @@ -550,20 +551,20 @@ func (s *ProxyOne2ManySuite) SetupSuite() {
}

// Setup of the proxy's Director.
director := func(ctx context.Context, fullName string) ([]proxy.Backend, error) {
director := func(ctx context.Context, fullName string) (proxy.Mode, []proxy.Backend, error) {
var targets []int

md, ok := metadata.FromIncomingContext(ctx)
if ok {
if _, exists := md[rejectingMdKey]; exists {
return nil, status.Errorf(codes.PermissionDenied, "testing rejection")
return proxy.One2Many, nil, status.Errorf(codes.PermissionDenied, "testing rejection")
}

if mdTargets, exists := md["targets"]; exists {
for _, strTarget := range mdTargets {
t, err := strconv.Atoi(strTarget)
if err != nil {
return nil, err
return proxy.One2Many, nil, err
}

targets = append(targets, t)
Expand All @@ -587,17 +588,16 @@ func (s *ProxyOne2ManySuite) SetupSuite() {
}
}

return result, nil
return proxy.One2Many, result, nil
}

s.proxy = grpc.NewServer(
grpc.CustomCodec(proxy.Codec()),
grpc.UnknownServiceHandler(proxy.TransparentHandler(director, proxy.WithMode(proxy.One2Many))),
grpc.UnknownServiceHandler(proxy.TransparentHandler(director)),
)
// Ping handler is handled as an explicit registration and not as a TransparentHandler.
proxy.RegisterService(s.proxy, director,
"talos.testproto.MultiService",
proxy.WithMode(proxy.One2Many),
proxy.WithMethodNames("Ping", "PingStream", "PingStreamError"),
proxy.WithStreamedMethodNames("PingStream", "PingStreamError"),
)
Expand Down
8 changes: 4 additions & 4 deletions proxy/handler_one2one_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,15 @@ func (s *ProxyOne2OneSuite) SetupSuite() {
// Setup of the proxy's Director.
s.serverClientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithCodec(proxy.Codec())) // nolint: staticcheck
require.NoError(s.T(), err, "must not error on deferred client Dial")
director := func(ctx context.Context, fullName string) ([]proxy.Backend, error) {
director := func(ctx context.Context, fullName string) (proxy.Mode, []proxy.Backend, error) {
md, ok := metadata.FromIncomingContext(ctx)
if ok {
if _, exists := md[rejectingMdKey]; exists {
return nil, status.Errorf(codes.PermissionDenied, "testing rejection")
return proxy.One2One, nil, status.Errorf(codes.PermissionDenied, "testing rejection")
}
}

return []proxy.Backend{
return proxy.One2One, []proxy.Backend{
&proxy.SingleBackend{
GetConn: func(ctx context.Context) (context.Context, *grpc.ClientConn, error) {
md, _ := metadata.FromIncomingContext(ctx)
Expand All @@ -225,7 +225,7 @@ func (s *ProxyOne2OneSuite) SetupSuite() {
}
s.proxy = grpc.NewServer(
grpc.CustomCodec(proxy.Codec()),
grpc.UnknownServiceHandler(proxy.TransparentHandler(director, proxy.WithMode(proxy.One2One))),
grpc.UnknownServiceHandler(proxy.TransparentHandler(director)),
)
// Ping handler is handled as an explicit registration and not as a TransparentHandler.
proxy.RegisterService(s.proxy, director,
Expand Down
9 changes: 0 additions & 9 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,6 @@ func WithStreamedDetector(detector StreamedDetectorFunc) Option {
}
}

// WithMode sets proxying mode: One2One or One2Many.
//
// Default mode is One2One.
func WithMode(mode Mode) Option {
return func(o *handlerOptions) {
o.mode = mode
}
}

// RegisterService sets up a proxy handler for a particular gRPC service and method.
// The behavior is the same as if you were registering a handler method, e.g. from a codegenerated pb.go file.
//
Expand Down

0 comments on commit 6c9f7b3

Please sign in to comment.