Skip to content

Commit

Permalink
Merge branch 'transport-options' into ✉️
Browse files Browse the repository at this point in the history
  • Loading branch information
abhinav committed Jul 13, 2016
2 parents a0e9f89 + ce9f469 commit 45913bf
Show file tree
Hide file tree
Showing 22 changed files with 144 additions and 142 deletions.
16 changes: 8 additions & 8 deletions crossdock/client/ctxpropagation/behavior.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,15 @@ func Run(t crossdock.T) {
}
}

rpc, tconfig := buildRPC(t)
fatals.NoError(rpc.Start(), "%v: RPC failed to start", tt.desc)
defer rpc.Stop()
dispatcher, tconfig := buildDispatcher(t)
fatals.NoError(dispatcher.Start(), "%v: Dispatcher failed to start", tt.desc)
defer dispatcher.Stop()

jsonClient := json.New(rpc.Channel("yarpc-test"))
jsonClient := json.New(dispatcher.Channel("yarpc-test"))
for name, handler := range tt.handlers {
handler.SetClient(jsonClient)
handler.SetTransport(tconfig)
json.Register(rpc, json.Procedure(name, handler.Handle))
json.Register(dispatcher, json.Procedure(name, handler.Handle))
}

ctx := context.Background()
Expand Down Expand Up @@ -276,7 +276,7 @@ func (h *multiHopHandler) Handle(reqMeta yarpc.ReqMeta, body interface{}) (inter
return map[string]interface{}{}, resMeta, err
}

func buildRPC(t crossdock.T) (rpc yarpc.RPC, tconfig server.TransportConfig) {
func buildDispatcher(t crossdock.T) (dispatcher yarpc.Dispatcher, tconfig server.TransportConfig) {
fatals := crossdock.Fatals(t)

self := t.Param("ctxclient")
Expand All @@ -299,7 +299,7 @@ func buildRPC(t crossdock.T) (rpc yarpc.RPC, tconfig server.TransportConfig) {
fatals.Fail("", "unknown transport %q", trans)
}

rpc = yarpc.New(yarpc.Config{
dispatcher = yarpc.NewDispatcher(yarpc.Config{
Name: "ctxclient",
Inbounds: []transport.Inbound{
tch.NewInbound(ch, tch.ListenAddr(":8087")),
Expand All @@ -308,5 +308,5 @@ func buildRPC(t crossdock.T) (rpc yarpc.RPC, tconfig server.TransportConfig) {
Outbounds: transport.Outbounds{"yarpc-test": outbound},
})

return rpc, tconfig
return dispatcher, tconfig
}
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package rpc
package dispatcher

import (
"fmt"
Expand All @@ -35,7 +35,7 @@ import (
)

// Create creates an RPC from the given parameters or fails the whole behavior.
func Create(t crossdock.T) yarpc.RPC {
func Create(t crossdock.T) yarpc.Dispatcher {
fatals := crossdock.Fatals(t)

server := t.Param(params.Server)
Expand All @@ -61,7 +61,7 @@ func Create(t crossdock.T) yarpc.RPC {
fatals.Fail("", "unknown transport %q", trans)
}

return yarpc.New(yarpc.Config{
return yarpc.NewDispatcher(yarpc.Config{
Name: "client",
Outbounds: transport.Outbounds{"yarpc-test": outbound},
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package rpc
package dispatcher

import (
"testing"
Expand Down Expand Up @@ -60,10 +60,10 @@ func TestCreate(t *testing.T) {

for _, tt := range tests {
entries := crossdock.Run(tt.params, func(ct crossdock.T) {
rpc := Create(ct)
dispatcher := Create(ct)

// should get here only if the request succeeded
ch := rpc.Channel("yarpc-test")
ch := dispatcher.Channel("yarpc-test")
assert.Equal(t, "client", ch.Caller)
assert.Equal(t, "yarpc-test", ch.Service)
})
Expand Down
10 changes: 5 additions & 5 deletions crossdock/client/echo/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"time"

"github.com/yarpc/yarpc-go"
disp "github.com/yarpc/yarpc-go/crossdock/client/dispatcher"
"github.com/yarpc/yarpc-go/crossdock/client/random"
"github.com/yarpc/yarpc-go/crossdock/client/rpc"
"github.com/yarpc/yarpc-go/encoding/json"

"github.com/crossdock/crossdock-go"
Expand All @@ -42,11 +42,11 @@ func JSON(t crossdock.T) {
t = createEchoT("json", t)
fatals := crossdock.Fatals(t)

rpc := rpc.Create(t)
fatals.NoError(rpc.Start(), "could not start RPC")
defer rpc.Stop()
dispatcher := disp.Create(t)
fatals.NoError(dispatcher.Start(), "could not start Dispatcher")
defer dispatcher.Stop()

client := json.New(rpc.Channel("yarpc-test"))
client := json.New(dispatcher.Channel("yarpc-test"))
ctx, _ := context.WithTimeout(context.Background(), time.Second)

var response jsonEcho
Expand Down
10 changes: 5 additions & 5 deletions crossdock/client/echo/raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"time"

"github.com/yarpc/yarpc-go"
disp "github.com/yarpc/yarpc-go/crossdock/client/dispatcher"
"github.com/yarpc/yarpc-go/crossdock/client/random"
"github.com/yarpc/yarpc-go/crossdock/client/rpc"
"github.com/yarpc/yarpc-go/encoding/raw"

"github.com/crossdock/crossdock-go"
Expand All @@ -38,11 +38,11 @@ func Raw(t crossdock.T) {
t = createEchoT("raw", t)
fatals := crossdock.Fatals(t)

rpc := rpc.Create(t)
fatals.NoError(rpc.Start(), "could not start RPC")
defer rpc.Stop()
dispatcher := disp.Create(t)
fatals.NoError(dispatcher.Start(), "could not start Dispatcher")
defer dispatcher.Stop()

client := raw.New(rpc.Channel("yarpc-test"))
client := raw.New(dispatcher.Channel("yarpc-test"))
ctx, _ := context.WithTimeout(context.Background(), time.Second)

token := random.Bytes(5)
Expand Down
10 changes: 5 additions & 5 deletions crossdock/client/echo/thrift.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"time"

"github.com/yarpc/yarpc-go"
disp "github.com/yarpc/yarpc-go/crossdock/client/dispatcher"
"github.com/yarpc/yarpc-go/crossdock/client/random"
"github.com/yarpc/yarpc-go/crossdock/client/rpc"
"github.com/yarpc/yarpc-go/crossdock/thrift/echo"
"github.com/yarpc/yarpc-go/crossdock/thrift/echo/yarpc/echoclient"

Expand All @@ -38,11 +38,11 @@ func Thrift(t crossdock.T) {
t = createEchoT("thrift", t)
fatals := crossdock.Fatals(t)

rpc := rpc.Create(t)
fatals.NoError(rpc.Start(), "could not start RPC")
defer rpc.Stop()
dispatcher := disp.Create(t)
fatals.NoError(dispatcher.Start(), "could not start Dispatcher")
defer dispatcher.Stop()

client := echoclient.New(rpc.Channel("yarpc-test"))
client := echoclient.New(dispatcher.Channel("yarpc-test"))
ctx, _ := context.WithTimeout(context.Background(), time.Second)

token := random.String(5)
Expand Down
14 changes: 7 additions & 7 deletions crossdock/client/gauntlet/behavior.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
"time"

"github.com/yarpc/yarpc-go"
disp "github.com/yarpc/yarpc-go/crossdock/client/dispatcher"
"github.com/yarpc/yarpc-go/crossdock/client/params"
"github.com/yarpc/yarpc-go/crossdock/client/random"
"github.com/yarpc/yarpc-go/crossdock/client/rpc"
"github.com/yarpc/yarpc-go/crossdock/thrift/gauntlet"
"github.com/yarpc/yarpc-go/crossdock/thrift/gauntlet/yarpc/secondserviceclient"
"github.com/yarpc/yarpc-go/crossdock/thrift/gauntlet/yarpc/thrifttestclient"
Expand Down Expand Up @@ -63,15 +63,15 @@ type TT struct {
func Run(t crossdock.T) {
fatals := crossdock.Fatals(t)

rpc := rpc.Create(t)
fatals.NoError(rpc.Start(), "could not start RPC")
defer rpc.Stop()
dispatcher := disp.Create(t)
fatals.NoError(dispatcher.Start(), "could not start Dispatcher")
defer dispatcher.Stop()

RunGauntlet(t, rpc, serverName)
RunGauntlet(t, dispatcher, serverName)
}

// RunGauntlet takes an rpc object and runs the gauntlet
func RunGauntlet(t crossdock.T, rpc yarpc.RPC, serverName string) {
func RunGauntlet(t crossdock.T, dispatcher yarpc.Dispatcher, serverName string) {
t = createGauntletT(t)
checks := crossdock.Checks(t)

Expand Down Expand Up @@ -362,7 +362,7 @@ func RunGauntlet(t crossdock.T, rpc yarpc.RPC, serverName string) {

desc := BuildDesc(tt)

client := buildClient(t, desc, tt.Service, rpc.Channel(serverName))
client := buildClient(t, desc, tt.Service, dispatcher.Channel(serverName))
f := client.MethodByName(tt.Function)
if !checks.True(f.IsValid(), "%v: invalid function", desc) {
continue
Expand Down
14 changes: 7 additions & 7 deletions crossdock/client/headers/behavior.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"time"

"github.com/yarpc/yarpc-go"
disp "github.com/yarpc/yarpc-go/crossdock/client/dispatcher"
"github.com/yarpc/yarpc-go/crossdock/client/params"
"github.com/yarpc/yarpc-go/crossdock/client/random"
"github.com/yarpc/yarpc-go/crossdock/client/rpc"
"github.com/yarpc/yarpc-go/crossdock/thrift/echo"
"github.com/yarpc/yarpc-go/crossdock/thrift/echo/yarpc/echoclient"
"github.com/yarpc/yarpc-go/encoding/json"
Expand All @@ -51,19 +51,19 @@ func Run(t crossdock.T) {
assert := crossdock.Assert(t)
checks := crossdock.Checks(t)

rpc := rpc.Create(t)
fatals.NoError(rpc.Start(), "could not start RPC")
defer rpc.Stop()
dispatcher := disp.Create(t)
fatals.NoError(dispatcher.Start(), "could not start Dispatcher")
defer dispatcher.Stop()

var caller headerCaller
encoding := t.Param(params.Encoding)
switch encoding {
case "raw":
caller = rawCaller{raw.New(rpc.Channel("yarpc-test"))}
caller = rawCaller{raw.New(dispatcher.Channel("yarpc-test"))}
case "json":
caller = jsonCaller{json.New(rpc.Channel("yarpc-test"))}
caller = jsonCaller{json.New(dispatcher.Channel("yarpc-test"))}
case "thrift":
caller = thriftCaller{echoclient.New(rpc.Channel("yarpc-test"))}
caller = thriftCaller{echoclient.New(dispatcher.Channel("yarpc-test"))}
default:
fatals.Fail("", "unknown encoding %q", encoding)
}
Expand Down
10 changes: 5 additions & 5 deletions crossdock/client/outboundttl/behavior.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"time"

"github.com/yarpc/yarpc-go"
"github.com/yarpc/yarpc-go/crossdock/client/rpc"
disp "github.com/yarpc/yarpc-go/crossdock/client/dispatcher"
"github.com/yarpc/yarpc-go/encoding/raw"
"github.com/yarpc/yarpc-go/transport"

Expand All @@ -42,11 +42,11 @@ func Run(t crossdock.T) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

rpc := rpc.Create(t)
fatals.NoError(rpc.Start(), "could not start RPC")
defer rpc.Stop()
dispatcher := disp.Create(t)
fatals.NoError(dispatcher.Start(), "could not start Dispatcher")
defer dispatcher.Stop()

ch := raw.New(rpc.Channel("yarpc-test"))
ch := raw.New(dispatcher.Channel("yarpc-test"))
_, _, err := ch.Call(yarpc.NewReqMeta(ctx).Procedure("sleep/raw"), nil)
fatals.Error(err, "expected a failure for timeout")

Expand Down
12 changes: 6 additions & 6 deletions crossdock/client/tchserver/behavior.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,22 @@ func Run(t crossdock.T) {
ch, err := tchannel.NewChannel("yarpc-client", nil)
fatals.NoError(err, "could not create channel")

rpc := yarpc.New(yarpc.Config{
dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: "yarpc-client",
Outbounds: transport.Outbounds{
serverName: tch.NewOutbound(ch, tch.HostPort(serverHostPort)),
},
})
fatals.NoError(rpc.Start(), "could not start RPC")
defer rpc.Stop()
fatals.NoError(dispatcher.Start(), "could not start Dispatcher")
defer dispatcher.Stop()

switch encoding {
case "raw":
runRaw(t, rpc)
runRaw(t, dispatcher)
case "json":
runJSON(t, rpc)
runJSON(t, dispatcher)
case "thrift":
runThrift(t, rpc)
runThrift(t, dispatcher)
default:
fatals.Fail("", "unknown encoding %q", encoding)
}
Expand Down
8 changes: 4 additions & 4 deletions crossdock/client/tchserver/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ import (
"golang.org/x/net/context"
)

func runJSON(t crossdock.T, rpc yarpc.RPC) {
func runJSON(t crossdock.T, dispatcher yarpc.Dispatcher) {
assert := crossdock.Assert(t)
checks := crossdock.Checks(t)

headers := yarpc.NewHeaders().With("hello", "json")
token := random.String(5)

resBody, resMeta, err := jsonCall(rpc, headers, token)
resBody, resMeta, err := jsonCall(dispatcher, headers, token)
if skipOnConnRefused(t, err) {
return
}
Expand All @@ -52,8 +52,8 @@ type jsonEcho struct {
Token string `json:"token"`
}

func jsonCall(rpc yarpc.RPC, headers yarpc.Headers, token string) (string, yarpc.CallResMeta, error) {
client := json.New(rpc.Channel(serverName))
func jsonCall(dispatcher yarpc.Dispatcher, headers yarpc.Headers, token string) (string, yarpc.CallResMeta, error) {
client := json.New(dispatcher.Channel(serverName))

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down
8 changes: 4 additions & 4 deletions crossdock/client/tchserver/raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ import (
"golang.org/x/net/context"
)

func runRaw(t crossdock.T, rpc yarpc.RPC) {
func runRaw(t crossdock.T, dispatcher yarpc.Dispatcher) {
assert := crossdock.Assert(t)
checks := crossdock.Checks(t)

// TODO headers should be at yarpc, not transport
headers := yarpc.NewHeaders().With("hello", "raw")
token := random.Bytes(5)

resBody, resMeta, err := rawCall(rpc, headers, token)
resBody, resMeta, err := rawCall(dispatcher, headers, token)
if skipOnConnRefused(t, err) {
return
}
Expand All @@ -49,8 +49,8 @@ func runRaw(t crossdock.T, rpc yarpc.RPC) {
}
}

func rawCall(rpc yarpc.RPC, headers yarpc.Headers, token []byte) ([]byte, yarpc.CallResMeta, error) {
client := raw.New(rpc.Channel(serverName))
func rawCall(dispatcher yarpc.Dispatcher, headers yarpc.Headers, token []byte) ([]byte, yarpc.CallResMeta, error) {
client := raw.New(dispatcher.Channel(serverName))

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down
10 changes: 5 additions & 5 deletions crossdock/client/tchserver/thrift.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ import (
"golang.org/x/net/context"
)

func runThrift(t crossdock.T, rpc yarpc.RPC) {
func runThrift(t crossdock.T, dispatcher yarpc.Dispatcher) {
assert := crossdock.Assert(t)
checks := crossdock.Checks(t)

headers := yarpc.NewHeaders().With("hello", "thrift")
token := random.String(5)

resBody, resMeta, err := thriftCall(rpc, headers, token)
resBody, resMeta, err := thriftCall(dispatcher, headers, token)
if skipOnConnRefused(t, err) {
return
}
Expand All @@ -49,11 +49,11 @@ func runThrift(t crossdock.T, rpc yarpc.RPC) {
assert.Equal(headers, resMeta.Headers(), "headers echoed")
}

gauntlet.RunGauntlet(t, rpc, serverName)
gauntlet.RunGauntlet(t, dispatcher, serverName)
}

func thriftCall(rpc yarpc.RPC, headers yarpc.Headers, token string) (string, yarpc.CallResMeta, error) {
client := echoclient.New(rpc.Channel(serverName))
func thriftCall(dispatcher yarpc.Dispatcher, headers yarpc.Headers, token string) (string, yarpc.CallResMeta, error) {
client := echoclient.New(dispatcher.Channel(serverName))

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down

0 comments on commit 45913bf

Please sign in to comment.