diff --git a/docker-compose.yml b/docker-compose.yml index 1d8e32d906..4ab9cd09b8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -27,6 +27,9 @@ services: - AXIS_APACHETHRIFTSERVER=go - AXIS_GAUNTLET=go - AXIS_HTTPSERVER=go + - AXIS_CLIENT_ONEWAY=go + - AXIS_SERVER_ONEWAY=go + - AXIS_TRANSPORT_ONEWAY=http - BEHAVIOR_RAW=client,server,transport - BEHAVIOR_JSON=client,server,transport @@ -45,6 +48,8 @@ services: # BEHAVIOR_INBOUNDTTL TODO - BEHAVIOR_CTXPROPAGATION=ctxclient,ctxserver,transport - BEHAVIOR_APACHETHRIFT=apachethriftclient,apachethriftserver + - BEHAVIOR_ONEWAY=client_oneway,server_oneway,transport_oneway,encoding + - REPORT=compact go: diff --git a/internal/crossdock/client/dispatcher/dispatcher.go b/internal/crossdock/client/dispatcher/dispatcher.go index 67d189a9c9..24c39c4123 100644 --- a/internal/crossdock/client/dispatcher/dispatcher.go +++ b/internal/crossdock/client/dispatcher/dispatcher.go @@ -24,6 +24,7 @@ import ( "fmt" "go.uber.org/yarpc" + "go.uber.org/yarpc/encoding/raw" "go.uber.org/yarpc/internal/crossdock/client/params" "go.uber.org/yarpc/peer/hostport" "go.uber.org/yarpc/peer/single" @@ -74,3 +75,41 @@ func Create(t crossdock.T) yarpc.Dispatcher { }, }) } + +// CreateOneway returns a started dispatcher and returns the address the +// server should call back to (ie this host) +func CreateOneway(t crossdock.T, handler raw.OnewayHandler) (yarpc.Dispatcher, string) { + fatals := crossdock.Fatals(t) + + server := t.Param("server_oneway") + fatals.NotEmpty(server, "oneway server is required") + + var outbound transport.OnewayOutbound + + trans := t.Param("transport_oneway") + switch trans { + case "http": + outbound = http.NewChooserOutbound( + single.New( + hostport.PeerIdentifier(fmt.Sprintf("%s:8084", server)), + http.NewTransport(), + )) + default: + fatals.Fail("", "unknown transport %q", trans) + } + + callBackInbound := http.NewInbound(":0") + dispatcher := yarpc.NewDispatcher(yarpc.Config{ + Name: "oneway-client", + Outbounds: yarpc.Outbounds{ + "oneway-server": {Oneway: outbound}, + }, + Inbounds: yarpc.Inbounds{callBackInbound}, + }) + + // register procedure for server to call us back on + dispatcher.Register(raw.OnewayProcedure("call-back", raw.OnewayHandler(handler))) + fatals.NoError(dispatcher.Start(), "could not start Dispatcher") + + return dispatcher, callBackInbound.Addr().String() +} diff --git a/internal/crossdock/client/dispatcher/dispatcher_test.go b/internal/crossdock/client/dispatcher/dispatcher_test.go index 6cbb418b17..891fdd1a8b 100644 --- a/internal/crossdock/client/dispatcher/dispatcher_test.go +++ b/internal/crossdock/client/dispatcher/dispatcher_test.go @@ -75,3 +75,47 @@ func TestCreate(t *testing.T) { } } } + +func TestCreateOneway(t *testing.T) { + tests := []struct { + params crossdock.Params + errOut string + }{ + { + crossdock.Params{"server_oneway": "localhost"}, + `unknown transport ""`, + }, + { + crossdock.Params{"transport_oneway": "http"}, + "oneway server is required", + }, + { + crossdock.Params{"server_oneway": "localhost", "transport_oneway": "foo"}, + `unknown transport "foo"`, + }, + { + params: crossdock.Params{ + "server_oneway": "localhost", + "transport_oneway": "http", + }, + }, + } + + for _, tt := range tests { + entries := crossdock.Run(tt.params, func(ct crossdock.T) { + dispatcher, callBackAddr := CreateOneway(ct, nil) + + // should get here only if the request succeeded + clientConfig := dispatcher.ClientConfig("yarpc-test") + assert.Equal(t, "client", clientConfig.Caller()) + assert.Equal(t, "yarpc-test", clientConfig.Service()) + assert.NotNil(t, callBackAddr) + }) + + if tt.errOut != "" && assert.Len(t, entries, 1) { + e := entries[0] + assert.Equal(t, crossdock.Failed, e.Status()) + assert.Contains(t, e.Output(), tt.errOut) + } + } +} diff --git a/internal/crossdock/client/oneway/json.go b/internal/crossdock/client/oneway/json.go index f6debda5f4..c056e85f73 100644 --- a/internal/crossdock/client/oneway/json.go +++ b/internal/crossdock/client/oneway/json.go @@ -34,15 +34,18 @@ type jsonToken struct { } // JSON starts an http run using JSON encoding -func JSON(t crossdock.T, dispatcher yarpc.Dispatcher) { +func JSON(t crossdock.T, dispatcher yarpc.Dispatcher, serverCalledBack <-chan []byte, callBackAddr string) { + assert := crossdock.Assert(t) fatals := crossdock.Fatals(t) - client := json.New(dispatcher.ClientConfig("oneway-test")) + client := json.New(dispatcher.ClientConfig("oneway-server")) token := getRandomID() ack, err := client.CallOneway( context.Background(), - yarpc.NewReqMeta().Procedure("echo/json"), + yarpc.NewReqMeta(). + Procedure("echo/json"). + Headers(yarpc.NewHeaders().With("callBackAddr", callBackAddr)), &jsonToken{Token: token}, ) @@ -57,5 +60,5 @@ func JSON(t crossdock.T, dispatcher yarpc.Dispatcher) { fatals.NotNil(ack, "ack is nil") serverToken := <-serverCalledBack - fatals.Equal(token, string(serverToken), "Client/Server token mismatch") + assert.Equal(token, string(serverToken), "JSON token mismatch") } diff --git a/internal/crossdock/client/oneway/oneway.go b/internal/crossdock/client/oneway/oneway.go index 573a18cd6b..0cceef44c1 100644 --- a/internal/crossdock/client/oneway/oneway.go +++ b/internal/crossdock/client/oneway/oneway.go @@ -22,78 +22,44 @@ package oneway import ( "context" - "fmt" "go.uber.org/yarpc" "go.uber.org/yarpc/encoding/raw" + "go.uber.org/yarpc/internal/crossdock/client/dispatcher" "go.uber.org/yarpc/internal/crossdock/client/params" "go.uber.org/yarpc/internal/crossdock/client/random" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" - "go.uber.org/yarpc/transport/http" "github.com/crossdock/crossdock-go" ) -var serverCalledBack chan []byte - -// Run starts an http run using encoding types +// Run starts the oneway behavior, testing a combination of encodings and transports func Run(t crossdock.T) { - encoding := t.Param(params.Encoding) - t.Tag("encoding", encoding) - t.Tag("server", t.Param(params.Server)) - - fatals := crossdock.Fatals(t) - dispatcher := newDispatcher(t) - - fatals.NoError(dispatcher.Start(), "could not start Dispatcher") + callBackHandler, serverCalledBack := newCallBackHandler() + dispatcher, callBackAddr := dispatcher.CreateOneway(t, callBackHandler) defer dispatcher.Stop() - serverCalledBack = make(chan []byte) - + encoding := t.Param(params.Encoding) switch encoding { case "raw": - Raw(t, dispatcher) + Raw(t, dispatcher, serverCalledBack, callBackAddr) case "json": - JSON(t, dispatcher) + JSON(t, dispatcher, serverCalledBack, callBackAddr) case "thrift": - Thrift(t, dispatcher) + Thrift(t, dispatcher, serverCalledBack, callBackAddr) default: - fatals.Fail("unknown encoding", "%v", encoding) + crossdock.Fatals(t).Fail("unknown encoding", "%v", encoding) } } -func newDispatcher(t crossdock.T) yarpc.Dispatcher { - server := t.Param(params.Server) - crossdock.Fatals(t).NotEmpty(server, "server is required") - - // TODO httpTransport lifecycle - httpTransport := http.NewTransport() - dispatcher := yarpc.NewDispatcher(yarpc.Config{ - Name: "client", - Outbounds: yarpc.Outbounds{ - "oneway-test": { - Oneway: http.NewChooserOutbound(single.New( - hostport.PeerIdentifier(fmt.Sprintf("%s:8084", server)), - httpTransport, - )), - }, - }, - //for call back - Inbounds: yarpc.Inbounds{http.NewInbound(fmt.Sprintf("%s:8089", server))}, - }) - - // register procedure for remote server to call us back on - dispatcher.Register(raw.OnewayProcedure("call-back", callBack)) - - return dispatcher -} - -func getRandomID() string { - return random.String(10) +// newCallBackHandler creates a oneway handler that fills a channel +// with the received body +func newCallBackHandler() (raw.OnewayHandler, <-chan []byte) { + serverCalledBack := make(chan []byte) + handler := func(ctx context.Context, reqMeta yarpc.ReqMeta, body []byte) error { + serverCalledBack <- body + return nil + } + return handler, serverCalledBack } -func callBack(ctx context.Context, reqMeta yarpc.ReqMeta, body []byte) error { - serverCalledBack <- body - return nil -} +func getRandomID() string { return random.String(10) } diff --git a/internal/crossdock/client/oneway/raw.go b/internal/crossdock/client/oneway/raw.go index f6aadb4fed..57e3b8ab61 100644 --- a/internal/crossdock/client/oneway/raw.go +++ b/internal/crossdock/client/oneway/raw.go @@ -30,14 +30,19 @@ import ( ) // Raw starts an http run using raw encoding -func Raw(t crossdock.T, dispatcher yarpc.Dispatcher) { +func Raw(t crossdock.T, dispatcher yarpc.Dispatcher, serverCalledBack <-chan []byte, callBackAddr string) { + assert := crossdock.Assert(t) fatals := crossdock.Fatals(t) - client := raw.New(dispatcher.ClientConfig("oneway-test")) - ctx := context.Background() - + client := raw.New(dispatcher.ClientConfig("oneway-server")) token := []byte(getRandomID()) - ack, err := client.CallOneway(ctx, yarpc.NewReqMeta().Procedure("echo/raw"), token) + + ack, err := client.CallOneway( + context.Background(), + yarpc.NewReqMeta(). + Procedure("echo/raw"). + Headers(yarpc.NewHeaders().With("callBackAddr", callBackAddr)), + token) // ensure channel hasn't been filled yet select { @@ -50,5 +55,5 @@ func Raw(t crossdock.T, dispatcher yarpc.Dispatcher) { fatals.NotNil(ack, "ack is nil") serverToken := <-serverCalledBack - fatals.Equal(token, serverToken, "Client/Server token mismatch.") + assert.Equal(token, serverToken, "Raw token mismatch") } diff --git a/internal/crossdock/client/oneway/thrift.go b/internal/crossdock/client/oneway/thrift.go index 2b9e0ed647..b971b45a48 100644 --- a/internal/crossdock/client/oneway/thrift.go +++ b/internal/crossdock/client/oneway/thrift.go @@ -30,14 +30,18 @@ import ( ) // Thrift starts an http oneway run using Thrift encoding -func Thrift(t crossdock.T, dispatcher yarpc.Dispatcher) { +func Thrift(t crossdock.T, dispatcher yarpc.Dispatcher, serverCalledBack <-chan []byte, callBackAddr string) { + assert := crossdock.Assert(t) fatals := crossdock.Fatals(t) - client := onewayclient.New(dispatcher.ClientConfig("oneway-test")) - ctx := context.Background() - + client := onewayclient.New(dispatcher.ClientConfig("oneway-server")) token := getRandomID() - ack, err := client.Echo(ctx, nil, &token) + + ack, err := client.Echo( + context.Background(), + yarpc.NewReqMeta(). + Headers(yarpc.NewHeaders().With("callBackAddr", callBackAddr)), + &token) // ensure channel hasn't been filled yet select { @@ -50,5 +54,5 @@ func Thrift(t crossdock.T, dispatcher yarpc.Dispatcher) { fatals.NotNil(ack, "ack is nil") serverToken := <-serverCalledBack - fatals.Equal(token, string(serverToken), "Client/Server token mismatch") + assert.Equal(token, string(serverToken), "Thrift token mismatch") } diff --git a/internal/crossdock/main_test.go b/internal/crossdock/main_test.go index e6538cbed4..03b98794d2 100644 --- a/internal/crossdock/main_test.go +++ b/internal/crossdock/main_test.go @@ -55,15 +55,8 @@ func TestCrossdock(t *testing.T) { params params axes axes }{ - { - name: "oneway", - axes: axes{ - "encoding": []string{"raw", "json", "thrift"}, - }, - }, { name: "raw", - axes: axes{"transport": []string{"http", "tchannel"}}, }, { @@ -134,6 +127,16 @@ func TestCrossdock(t *testing.T) { "apachethriftclient": "127.0.0.1", }, }, + { + name: "oneway", + params: params{ + "server_oneway": "127.0.0.1", + }, + axes: axes{ + "encoding": []string{"raw", "json", "thrift"}, + "transport_oneway": []string{"http"}, + }, + }, } for _, bb := range behaviors { diff --git a/internal/crossdock/server/oneway/echo.go b/internal/crossdock/server/oneway/echo.go index d06c2b8290..4445be14bb 100644 --- a/internal/crossdock/server/oneway/echo.go +++ b/internal/crossdock/server/oneway/echo.go @@ -23,21 +23,28 @@ package oneway import ( "bytes" "context" + "fmt" "time" "go.uber.org/yarpc" + "go.uber.org/yarpc/encoding/json" "go.uber.org/yarpc/encoding/raw" + "go.uber.org/yarpc/encoding/thrift" + "go.uber.org/yarpc/peer/hostport" + "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport" + "go.uber.org/yarpc/transport/http" ) +const callBackAddrHeader = "callBackAddr" + type onewayHandler struct { - Outbound transport.OnewayOutbound + httpTransport *http.Transport } // EchoRaw implements the echo/raw procedure. func (o *onewayHandler) EchoRaw(ctx context.Context, reqMeta yarpc.ReqMeta, body []byte) error { - //make call back to the client - o.callHome(body) + o.callHome(ctx, reqMeta, body, raw.Encoding) return nil } @@ -45,27 +52,44 @@ type jsonToken struct{ Token string } // EchoJSON implements the echo/json procedure. func (o *onewayHandler) EchoJSON(ctx context.Context, reqMeta yarpc.ReqMeta, token *jsonToken) error { - //make call back to the client - o.callHome([]byte(token.Token)) + o.callHome(ctx, reqMeta, []byte(token.Token), json.Encoding) return nil } // Echo implements the Oneway::Echo procedure. func (o *onewayHandler) Echo(ctx context.Context, reqMeta yarpc.ReqMeta, Token *string) error { - o.callHome([]byte(*Token)) + o.callHome(ctx, reqMeta, []byte(*Token), thrift.Encoding) return nil } -func (o *onewayHandler) callHome(body []byte) { +// callHome extracts the call back address from headers, and makes a raw HTTP +// request using the same context and body +func (o *onewayHandler) callHome(ctx context.Context, reqMeta yarpc.ReqMeta, body []byte, encoding transport.Encoding) { // reduce the chance of a race condition time.Sleep(time.Millisecond * 100) - ctx := context.Background() - o.Outbound.CallOneway(ctx, &transport.Request{ - Caller: "oneway-test", - Service: "client", + callBackAddr, exists := reqMeta.Headers().Get(callBackAddrHeader) + if !exists { + panic("could not find callBackAddr in headers") + } + + out := http.NewChooserOutbound( + single.New( + hostport.PeerIdentifier(callBackAddr), + o.httpTransport)) + + out.Start() + defer out.Stop() + + _, err := out.CallOneway(ctx, &transport.Request{ + Caller: "oneway-server", + Service: "oneway-client", Procedure: "call-back", Encoding: raw.Encoding, Body: bytes.NewReader(body), }) + + if err != nil { + panic(fmt.Sprintf("could not make call back to client: %s", err)) + } } diff --git a/internal/crossdock/server/oneway/server.go b/internal/crossdock/server/oneway/server.go index 86085a0daf..b3fe5b291b 100644 --- a/internal/crossdock/server/oneway/server.go +++ b/internal/crossdock/server/oneway/server.go @@ -24,12 +24,10 @@ import ( "fmt" "go.uber.org/yarpc" - "go.uber.org/yarpc/encoding/json" "go.uber.org/yarpc/encoding/raw" "go.uber.org/yarpc/internal/crossdock/thrift/oneway/yarpc/onewayserver" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" + "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/http" ) @@ -37,29 +35,19 @@ var dispatcher yarpc.Dispatcher // Start starts the test server that clients will make requests to func Start() { - httpTransport := http.NewTransport() - h := onewayHandler{ - Outbound: http.NewChooserOutbound( - single.New( - hostport.PeerIdentifier("127.0.0.1:8089"), - httpTransport, - ), - ), - } + inbounds := []transport.Inbound{http.NewInbound(":8084")} dispatcher = yarpc.NewDispatcher(yarpc.Config{ - Name: "oneway-test", - Inbounds: yarpc.Inbounds{ - http.NewInbound(":8084"), - }, - Outbounds: yarpc.Outbounds{ - "client": {Oneway: h.Outbound}, - }, + Name: "oneway-server", + Inbounds: inbounds, }) + // Echo procedures make an RPC back to the caller with the same context, + // and body over http/raw + h := &onewayHandler{http.NewTransport()} dispatcher.Register(raw.OnewayProcedure("echo/raw", h.EchoRaw)) dispatcher.Register(json.OnewayProcedure("echo/json", h.EchoJSON)) - dispatcher.Register(onewayserver.New(&h)) + dispatcher.Register(onewayserver.New(h)) if err := dispatcher.Start(); err != nil { fmt.Println("error:", err.Error())