Skip to content

Commit

Permalink
relay: Add support for unary request relays
Browse files Browse the repository at this point in the history
  • Loading branch information
kriskowal committed Apr 20, 2017
1 parent 3cdb472 commit b1477d3
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ v1.8.0 (unreleased)
if the response body was empty.
- Adds support for the `UnrecognizedProcedureError` error and error checker,
indicating that the router was unable to find a handler for the request.
- Adds support for unary relay router middleware, so unrecognized requests
can be forwarded to a unary outbound.


v1.7.1 (2017-03-29)
Expand Down
63 changes: 63 additions & 0 deletions api/middleware/relay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package middleware

import (
"context"

"go.uber.org/yarpc/api/transport"
)

// NewRelayMiddleware returns a middleware that forwards all unrecognized
// procedures through a configured outbound client.
// The relay middleware currently only supports unary RPC.
func NewRelayMiddleware() *RelayMiddleware {
return &RelayMiddleware{}
}

// RelayMiddleware forwards requests for unrecognized procedures to the
// configured client.
type RelayMiddleware struct {
spec transport.HandlerSpec
}

// Procedures enumerates the recognized procedures through the middleware.
func (r *RelayMiddleware) Procedures(next transport.Router) []transport.Procedure {
return next.Procedures()
}

// SetClientConfig configures the middleware to send requests through an
// outbound configured on a dispatcher, including through all of its outbound
// middleware.
func (r *RelayMiddleware) SetClientConfig(cc transport.ClientConfig) {
// TODO support RPC types generally, not merely Unary
r.spec = transport.NewUnaryHandlerSpec(transport.NewUnaryRelay(cc.GetUnaryOutbound()))
}

// Choose returns the spec for all recognized procedures or the spec for
// forwarding unrecognized procedures.
func (r *RelayMiddleware) Choose(ctx context.Context, req *transport.Request, next transport.Router) (transport.HandlerSpec, error) {
handlerSpec, err := next.Choose(ctx, req)
if transport.IsUnrecognizedProcedureError(err) {
return r.spec, nil
}
return handlerSpec, err
}
117 changes: 117 additions & 0 deletions api/middleware/relay_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package middleware_test

import (
"context"
"testing"
"time"

"go.uber.org/yarpc"
"go.uber.org/yarpc/api/middleware"
"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/encoding/json"
"go.uber.org/yarpc/transport/http"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// TestRelay exercises relay middleware by setting up a service and a proxy for
// that service.
// The test sends a requests from the service to the proxy.
// The proxy handles the first request and forwards the second request back to
// the originating service.
func TestRelay(t *testing.T) {

s := setupService()
s.Register(json.Procedure("echo", (&handler{"self"}).handle))
require.NoError(t, s.Start(), "service should start")
defer s.Stop()

p := setupProxy()
require.NoError(t, p.Start(), "proxy should start")
s.Register(json.Procedure("proxy-echo", (&handler{"proxy"}).handle))
defer p.Stop()

client := json.New(s.ClientConfig("service"))
var res body
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()

err := client.Call(ctx, "proxy-echo", &body{"Hello, World!"}, &res)
require.NoError(t, err, "call to proxy without error")
assert.Equal(t, res, body{"proxy: Hello, World!"}, "receive echo response through proxy")

err = client.Call(ctx, "echo", &body{"Hello, World!"}, &res)
require.NoError(t, err, "call through proxy without error")
assert.Equal(t, res, body{"self: Hello, World!"}, "receive echo response through proxy")
}

// Listens on :30000
func setupService() *yarpc.Dispatcher {
t := http.NewTransport()
d := yarpc.NewDispatcher(yarpc.Config{
Name: "service",
Inbounds: yarpc.Inbounds{
t.NewInbound(":30000"),
},
Outbounds: yarpc.Outbounds{
"service": transport.Outbounds{
Unary: t.NewSingleOutbound("http://127.0.0.1:30001"),
},
},
})
return d
}

// Listens on :30001, forwards all requests to :30000
func setupProxy() *yarpc.Dispatcher {
t := http.NewTransport()
m := middleware.NewRelayMiddleware()
d := yarpc.NewDispatcher(yarpc.Config{
Name: "relay",
Inbounds: yarpc.Inbounds{
t.NewInbound(":30001"),
},
Outbounds: yarpc.Outbounds{
"forward": transport.Outbounds{
Unary: t.NewSingleOutbound("http://127.0.0.1:30000"),
},
},
RouterMiddleware: m,
})
m.SetClientConfig(d.ClientConfig("forward"))
return d
}

type body struct {
Message string `json:"message"`
}

type handler struct {
responder string
}

func (h *handler) handle(ctx context.Context, req *body) (*body, error) {
return &body{h.responder + ": " + req.Message}, nil
}
62 changes: 62 additions & 0 deletions api/transport/relay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package transport

import (
"bytes"
"context"

"go.uber.org/yarpc/internal/iopool"
)

// NewUnaryRelay creates an inbound unary request handler that relays a request
// to the given unary outbound caller. A relay is a utility for forwarding
// requests from a proxy or handle-or-forward registry.
func NewUnaryRelay(outbound UnaryOutbound) UnaryHandler {
return &unaryRelay{outbound: outbound}
}

type unaryRelay struct {
outbound UnaryOutbound
}

func (r *unaryRelay) Handle(ctx context.Context, request *Request, resw ResponseWriter) error {
res, err := r.outbound.Call(ctx, request)
if err != nil {
return err
}

// Read
var buff bytes.Buffer
if _, err := iopool.Copy(&buff, res.Body); err != nil {
return err
}

resw.AddHeaders(res.Headers)

// Write
_, err = iopool.Copy(resw, &buff)
if err != nil {
return err
}

return nil
}

0 comments on commit b1477d3

Please sign in to comment.