Skip to content

Commit

Permalink
fix oneway crossdock test
Browse files Browse the repository at this point in the history
  • Loading branch information
peats-bond committed Dec 2, 2016
1 parent 2d617e0 commit 7cbb7f0
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 107 deletions.
5 changes: 5 additions & 0 deletions docker-compose.yml
Expand Up @@ -27,6 +27,9 @@ services:
- AXIS_APACHETHRIFTSERVER=go
- AXIS_GAUNTLET=go
- AXIS_HTTPSERVER=go
- AXIS_CLIENT_ONEWAY=go
- AXIS_SERVER_ONEWAY=go
- AXIS_TRANSPORT_ONEWAY=http

- BEHAVIOR_RAW=client,server,transport
- BEHAVIOR_JSON=client,server,transport
Expand All @@ -45,6 +48,8 @@ services:
# BEHAVIOR_INBOUNDTTL TODO
- BEHAVIOR_CTXPROPAGATION=ctxclient,ctxserver,transport
- BEHAVIOR_APACHETHRIFT=apachethriftclient,apachethriftserver
- BEHAVIOR_ONEWAY=client_oneway,server_oneway,transport_oneway,encoding

- REPORT=compact

go:
Expand Down
39 changes: 39 additions & 0 deletions internal/crossdock/client/dispatcher/dispatcher.go
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"

"go.uber.org/yarpc"
"go.uber.org/yarpc/encoding/raw"
"go.uber.org/yarpc/internal/crossdock/client/params"
"go.uber.org/yarpc/peer/hostport"
"go.uber.org/yarpc/peer/single"
Expand Down Expand Up @@ -74,3 +75,41 @@ func Create(t crossdock.T) yarpc.Dispatcher {
},
})
}

// CreateOneway returns a started dispatcher and returns the address the
// server should call back to (ie this host)
func CreateOneway(t crossdock.T, handler raw.OnewayHandler) (yarpc.Dispatcher, string) {
fatals := crossdock.Fatals(t)

server := t.Param("server_oneway")
fatals.NotEmpty(server, "oneway server is required")

var outbound transport.OnewayOutbound

trans := t.Param("transport_oneway")
switch trans {
case "http":
outbound = http.NewChooserOutbound(
single.New(
hostport.PeerIdentifier(fmt.Sprintf("%s:8084", server)),
http.NewTransport(),
))
default:
fatals.Fail("", "unknown transport %q", trans)
}

callBackInbound := http.NewInbound(":0")
dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: "oneway-client",
Outbounds: yarpc.Outbounds{
"oneway-server": {Oneway: outbound},
},
Inbounds: yarpc.Inbounds{callBackInbound},
})

// register procedure for server to call us back on
dispatcher.Register(raw.OnewayProcedure("call-back", raw.OnewayHandler(handler)))
fatals.NoError(dispatcher.Start(), "could not start Dispatcher")

return dispatcher, callBackInbound.Addr().String()
}
44 changes: 44 additions & 0 deletions internal/crossdock/client/dispatcher/dispatcher_test.go
Expand Up @@ -75,3 +75,47 @@ func TestCreate(t *testing.T) {
}
}
}

func TestCreateOneway(t *testing.T) {
tests := []struct {
params crossdock.Params
errOut string
}{
{
crossdock.Params{"server_oneway": "localhost"},
`unknown transport ""`,
},
{
crossdock.Params{"transport_oneway": "http"},
"oneway server is required",
},
{
crossdock.Params{"server_oneway": "localhost", "transport_oneway": "foo"},
`unknown transport "foo"`,
},
{
params: crossdock.Params{
"server_oneway": "localhost",
"transport_oneway": "http",
},
},
}

for _, tt := range tests {
entries := crossdock.Run(tt.params, func(ct crossdock.T) {
dispatcher, callBackAddr := CreateOneway(ct, nil)

// should get here only if the request succeeded
clientConfig := dispatcher.ClientConfig("yarpc-test")
assert.Equal(t, "client", clientConfig.Caller())
assert.Equal(t, "yarpc-test", clientConfig.Service())
assert.NotNil(t, callBackAddr)
})

if tt.errOut != "" && assert.Len(t, entries, 1) {
e := entries[0]
assert.Equal(t, crossdock.Failed, e.Status())
assert.Contains(t, e.Output(), tt.errOut)
}
}
}
11 changes: 7 additions & 4 deletions internal/crossdock/client/oneway/json.go
Expand Up @@ -34,15 +34,18 @@ type jsonToken struct {
}

// JSON starts an http run using JSON encoding
func JSON(t crossdock.T, dispatcher yarpc.Dispatcher) {
func JSON(t crossdock.T, dispatcher yarpc.Dispatcher, serverCalledBack <-chan []byte, callBackAddr string) {
assert := crossdock.Assert(t)
fatals := crossdock.Fatals(t)

client := json.New(dispatcher.ClientConfig("oneway-test"))
client := json.New(dispatcher.ClientConfig("oneway-server"))
token := getRandomID()

ack, err := client.CallOneway(
context.Background(),
yarpc.NewReqMeta().Procedure("echo/json"),
yarpc.NewReqMeta().
Procedure("echo/json").
Headers(yarpc.NewHeaders().With("callBackAddr", callBackAddr)),
&jsonToken{Token: token},
)

Expand All @@ -57,5 +60,5 @@ func JSON(t crossdock.T, dispatcher yarpc.Dispatcher) {
fatals.NotNil(ack, "ack is nil")

serverToken := <-serverCalledBack
fatals.Equal(token, string(serverToken), "Client/Server token mismatch")
assert.Equal(token, string(serverToken), "JSON token mismatch")
}
72 changes: 19 additions & 53 deletions internal/crossdock/client/oneway/oneway.go
Expand Up @@ -22,78 +22,44 @@ package oneway

import (
"context"
"fmt"

"go.uber.org/yarpc"
"go.uber.org/yarpc/encoding/raw"
"go.uber.org/yarpc/internal/crossdock/client/dispatcher"
"go.uber.org/yarpc/internal/crossdock/client/params"
"go.uber.org/yarpc/internal/crossdock/client/random"
"go.uber.org/yarpc/peer/hostport"
"go.uber.org/yarpc/peer/single"
"go.uber.org/yarpc/transport/http"

"github.com/crossdock/crossdock-go"
)

var serverCalledBack chan []byte

// Run starts an http run using encoding types
// Run starts the oneway behavior, testing a combination of encodings and transports
func Run(t crossdock.T) {
encoding := t.Param(params.Encoding)
t.Tag("encoding", encoding)
t.Tag("server", t.Param(params.Server))

fatals := crossdock.Fatals(t)
dispatcher := newDispatcher(t)

fatals.NoError(dispatcher.Start(), "could not start Dispatcher")
callBackHandler, serverCalledBack := newCallBackHandler()
dispatcher, callBackAddr := dispatcher.CreateOneway(t, callBackHandler)
defer dispatcher.Stop()

serverCalledBack = make(chan []byte)

encoding := t.Param(params.Encoding)
switch encoding {
case "raw":
Raw(t, dispatcher)
Raw(t, dispatcher, serverCalledBack, callBackAddr)
case "json":
JSON(t, dispatcher)
JSON(t, dispatcher, serverCalledBack, callBackAddr)
case "thrift":
Thrift(t, dispatcher)
Thrift(t, dispatcher, serverCalledBack, callBackAddr)
default:
fatals.Fail("unknown encoding", "%v", encoding)
crossdock.Fatals(t).Fail("unknown encoding", "%v", encoding)
}
}

func newDispatcher(t crossdock.T) yarpc.Dispatcher {
server := t.Param(params.Server)
crossdock.Fatals(t).NotEmpty(server, "server is required")

// TODO httpTransport lifecycle
httpTransport := http.NewTransport()
dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: "client",
Outbounds: yarpc.Outbounds{
"oneway-test": {
Oneway: http.NewChooserOutbound(single.New(
hostport.PeerIdentifier(fmt.Sprintf("%s:8084", server)),
httpTransport,
)),
},
},
//for call back
Inbounds: yarpc.Inbounds{http.NewInbound(fmt.Sprintf("%s:8089", server))},
})

// register procedure for remote server to call us back on
dispatcher.Register(raw.OnewayProcedure("call-back", callBack))

return dispatcher
}

func getRandomID() string {
return random.String(10)
// newCallBackHandler creates a oneway handler that fills a channel
// with the received body
func newCallBackHandler() (raw.OnewayHandler, <-chan []byte) {
serverCalledBack := make(chan []byte)
handler := func(ctx context.Context, reqMeta yarpc.ReqMeta, body []byte) error {
serverCalledBack <- body
return nil
}
return handler, serverCalledBack
}

func callBack(ctx context.Context, reqMeta yarpc.ReqMeta, body []byte) error {
serverCalledBack <- body
return nil
}
func getRandomID() string { return random.String(10) }
17 changes: 11 additions & 6 deletions internal/crossdock/client/oneway/raw.go
Expand Up @@ -30,14 +30,19 @@ import (
)

// Raw starts an http run using raw encoding
func Raw(t crossdock.T, dispatcher yarpc.Dispatcher) {
func Raw(t crossdock.T, dispatcher yarpc.Dispatcher, serverCalledBack <-chan []byte, callBackAddr string) {
assert := crossdock.Assert(t)
fatals := crossdock.Fatals(t)

client := raw.New(dispatcher.ClientConfig("oneway-test"))
ctx := context.Background()

client := raw.New(dispatcher.ClientConfig("oneway-server"))
token := []byte(getRandomID())
ack, err := client.CallOneway(ctx, yarpc.NewReqMeta().Procedure("echo/raw"), token)

ack, err := client.CallOneway(
context.Background(),
yarpc.NewReqMeta().
Procedure("echo/raw").
Headers(yarpc.NewHeaders().With("callBackAddr", callBackAddr)),
token)

// ensure channel hasn't been filled yet
select {
Expand All @@ -50,5 +55,5 @@ func Raw(t crossdock.T, dispatcher yarpc.Dispatcher) {
fatals.NotNil(ack, "ack is nil")

serverToken := <-serverCalledBack
fatals.Equal(token, serverToken, "Client/Server token mismatch.")
assert.Equal(token, serverToken, "Raw token mismatch")
}
16 changes: 10 additions & 6 deletions internal/crossdock/client/oneway/thrift.go
Expand Up @@ -30,14 +30,18 @@ import (
)

// Thrift starts an http oneway run using Thrift encoding
func Thrift(t crossdock.T, dispatcher yarpc.Dispatcher) {
func Thrift(t crossdock.T, dispatcher yarpc.Dispatcher, serverCalledBack <-chan []byte, callBackAddr string) {
assert := crossdock.Assert(t)
fatals := crossdock.Fatals(t)

client := onewayclient.New(dispatcher.ClientConfig("oneway-test"))
ctx := context.Background()

client := onewayclient.New(dispatcher.ClientConfig("oneway-server"))
token := getRandomID()
ack, err := client.Echo(ctx, nil, &token)

ack, err := client.Echo(
context.Background(),
yarpc.NewReqMeta().
Headers(yarpc.NewHeaders().With("callBackAddr", callBackAddr)),
&token)

// ensure channel hasn't been filled yet
select {
Expand All @@ -50,5 +54,5 @@ func Thrift(t crossdock.T, dispatcher yarpc.Dispatcher) {
fatals.NotNil(ack, "ack is nil")

serverToken := <-serverCalledBack
fatals.Equal(token, string(serverToken), "Client/Server token mismatch")
assert.Equal(token, string(serverToken), "Thrift token mismatch")
}
17 changes: 10 additions & 7 deletions internal/crossdock/main_test.go
Expand Up @@ -55,15 +55,8 @@ func TestCrossdock(t *testing.T) {
params params
axes axes
}{
{
name: "oneway",
axes: axes{
"encoding": []string{"raw", "json", "thrift"},
},
},
{
name: "raw",

axes: axes{"transport": []string{"http", "tchannel"}},
},
{
Expand Down Expand Up @@ -134,6 +127,16 @@ func TestCrossdock(t *testing.T) {
"apachethriftclient": "127.0.0.1",
},
},
{
name: "oneway",
params: params{
"server_oneway": "127.0.0.1",
},
axes: axes{
"encoding": []string{"raw", "json", "thrift"},
"transport_oneway": []string{"http"},
},
},
}

for _, bb := range behaviors {
Expand Down

0 comments on commit 7cbb7f0

Please sign in to comment.