From af0c6cff79c227ded41ef782ff49f84391333e97 Mon Sep 17 00:00:00 2001 From: Kris Kowal Date: Tue, 6 Dec 2016 11:13:53 -0800 Subject: [PATCH] Pivot TChannel constructors to use shared ChannelTransport #543 --- bench_test.go | 36 +++-- dispatcher_test.go | 13 +- .../client/ctxpropagation/behavior.go | 9 +- .../crossdock/client/dispatcher/dispatcher.go | 11 +- .../crossdock/client/tchserver/behavior.go | 9 +- internal/crossdock/server/yarpc/phone.go | 11 +- internal/crossdock/server/yarpc/server.go | 16 +- internal/examples/json/client/main.go | 11 +- internal/examples/json/server/main.go | 16 +- .../examples/thrift/keyvalue/client/main.go | 11 +- .../examples/thrift/keyvalue/server/main.go | 16 +- transport/roundtrip_test.go | 10 +- transport/tchannel/channel_inbound.go | 88 +++++++++++ .../{outbound.go => channel_outbound.go} | 93 +++++------- transport/tchannel/channel_transport.go | 138 ++++++++++++++++++ transport/tchannel/errors.go | 5 + transport/tchannel/inbound.go | 134 ----------------- transport/tchannel/inbound_test.go | 44 ++++-- transport/tchannel/options.go | 79 ++++++++++ transport/tchannel/outbound_test.go | 6 +- transport/tracer_test.go | 17 +-- 21 files changed, 452 insertions(+), 321 deletions(-) create mode 100644 transport/tchannel/channel_inbound.go rename transport/tchannel/{outbound.go => channel_outbound.go} (58%) create mode 100644 transport/tchannel/channel_transport.go create mode 100644 transport/tchannel/errors.go delete mode 100644 transport/tchannel/inbound.go create mode 100644 transport/tchannel/options.go diff --git a/bench_test.go b/bench_test.go index 4fbfa4c9b6..be330a9159 100644 --- a/bench_test.go +++ b/bench_test.go @@ -187,16 +187,17 @@ func Benchmark_HTTP_NetHTTPToNetHTTP(b *testing.B) { } func Benchmark_TChannel_YARPCToYARPC(b *testing.B) { - serverCh, err := tchannel.NewChannel("server", nil) - require.NoError(b, err, "failed to build server TChannel") - + serverTChannel := ytchannel.NewChannelTransport( + ytchannel.WithServiceName("server"), + ) serverCfg := yarpc.Config{ Name: "server", - Inbounds: yarpc.Inbounds{ytchannel.NewInbound(serverCh)}, + Inbounds: yarpc.Inbounds{serverTChannel.NewInbound()}, } - clientCh, err := tchannel.NewChannel("client", nil) - require.NoError(b, err, "failed to build client TChannel") + clientTChannel := ytchannel.NewChannelTransport( + ytchannel.WithServiceName("client"), + ) // no defer close on channels because YARPC will take care of that @@ -208,8 +209,7 @@ func Benchmark_TChannel_YARPCToYARPC(b *testing.B) { Name: "client", Outbounds: yarpc.Outbounds{ "server": { - Unary: ytchannel.NewOutbound(clientCh). - WithHostPort(serverCh.PeerInfo().HostPort), + Unary: clientTChannel.NewSingleOutbound(serverTChannel.ListenAddr()), }, }, } @@ -228,15 +228,12 @@ func Benchmark_TChannel_YARPCToTChannel(b *testing.B) { serverCh.Register(traw.Wrap(tchannelEcho{t: b}), "echo") require.NoError(b, serverCh.ListenAndServe(":0"), "failed to start up TChannel") - clientCh, err := tchannel.NewChannel("client", nil) - require.NoError(b, err, "failed to build client TChannel") - + clientTChannel := ytchannel.NewChannelTransport(ytchannel.WithServiceName("client")) clientCfg := yarpc.Config{ Name: "client", Outbounds: yarpc.Outbounds{ "server": { - Unary: ytchannel.NewOutbound(clientCh). - WithHostPort(serverCh.PeerInfo().HostPort), + Unary: clientTChannel.NewSingleOutbound(serverCh.PeerInfo().HostPort), }, }, } @@ -248,23 +245,24 @@ func Benchmark_TChannel_YARPCToTChannel(b *testing.B) { } func Benchmark_TChannel_TChannelToYARPC(b *testing.B) { - serverCh, err := tchannel.NewChannel("server", nil) - require.NoError(b, err, "failed to build server TChannel") + tchannelTransport := ytchannel.NewChannelTransport( + ytchannel.WithServiceName("server"), + ) serverCfg := yarpc.Config{ Name: "server", - Inbounds: yarpc.Inbounds{ytchannel.NewInbound(serverCh)}, + Inbounds: yarpc.Inbounds{tchannelTransport.NewInbound()}, } - withDispatcher(b, serverCfg, func(server yarpc.Dispatcher) { - server.Register(raw.Procedure("echo", yarpcEcho)) + withDispatcher(b, serverCfg, func(dispatcher yarpc.Dispatcher) { + dispatcher.Register(raw.Procedure("echo", yarpcEcho)) clientCh, err := tchannel.NewChannel("client", nil) require.NoError(b, err, "failed to build client TChannel") defer clientCh.Close() b.ResetTimer() - runTChannelClient(b, clientCh, serverCh.PeerInfo().HostPort) + runTChannelClient(b, clientCh, tchannelTransport.ListenAddr()) }) } diff --git a/dispatcher_test.go b/dispatcher_test.go index e0247ff7d6..93b9dd3503 100644 --- a/dispatcher_test.go +++ b/dispatcher_test.go @@ -28,24 +28,21 @@ import ( . "go.uber.org/yarpc" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/http" - tch "go.uber.org/yarpc/transport/tchannel" + "go.uber.org/yarpc/transport/tchannel" "go.uber.org/yarpc/transport/transporttest" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/uber/tchannel-go" ) func basicDispatcher(t *testing.T) Dispatcher { - ch, err := tchannel.NewChannel("test", nil) - require.NoError(t, err, "failed to create TChannel") - httpTransport := http.NewTransport() + tchannelTransport := tchannel.NewChannelTransport(tchannel.WithServiceName("test")) return NewDispatcher(Config{ Name: "test", Inbounds: Inbounds{ - tch.NewInbound(ch).WithListenAddr(":0"), + tchannelTransport.NewInbound(), httpTransport.NewInbound(":0"), }, }) @@ -74,7 +71,7 @@ func TestInboundsOrderIsMaintained(t *testing.T) { dispatcher := basicDispatcher(t) // Order must be maintained - _, ok := dispatcher.Inbounds()[0].(*tch.Inbound) + _, ok := dispatcher.Inbounds()[0].(*tchannel.ChannelInbound) assert.True(t, ok, "first inbound must be TChannel") _, ok = dispatcher.Inbounds()[1].(*http.Inbound) @@ -89,7 +86,7 @@ func TestInboundsOrderAfterStart(t *testing.T) { inbounds := dispatcher.Inbounds() - tchInbound := inbounds[0].(*tch.Inbound) + tchInbound := inbounds[0].(*tchannel.ChannelInbound) assert.NotEqual(t, "0.0.0.0:0", tchInbound.Channel().PeerInfo().HostPort) httpInbound := inbounds[1].(*http.Inbound) diff --git a/internal/crossdock/client/ctxpropagation/behavior.go b/internal/crossdock/client/ctxpropagation/behavior.go index 0ae0b51cd3..bda0a224e1 100644 --- a/internal/crossdock/client/ctxpropagation/behavior.go +++ b/internal/crossdock/client/ctxpropagation/behavior.go @@ -36,7 +36,6 @@ import ( "github.com/crossdock/crossdock-go" "github.com/opentracing/opentracing-go" - "github.com/uber/tchannel-go" ) // Run verifies that opentracing context is propagated across multiple hops. @@ -319,10 +318,8 @@ func buildDispatcher(t crossdock.T) (dispatcher yarpc.Dispatcher, tconfig server fatals.NotEmpty(self, "ctxclient is required") fatals.NotEmpty(subject, "ctxserver is required") - ch, err := tchannel.NewChannel("ctxclient", nil) - fatals.NoError(err, "failed to create TChannel") - httpTransport := http.NewTransport() + tchannelTransport := tch.NewChannelTransport(tch.WithListenAddr(":8087"), tch.WithServiceName("ctxclient")) var outbound transport.UnaryOutbound switch trans := t.Param(params.Transport); trans { @@ -330,7 +327,7 @@ func buildDispatcher(t crossdock.T) (dispatcher yarpc.Dispatcher, tconfig server outbound = httpTransport.NewSingleOutbound(fmt.Sprintf("http://%s:8081", subject)) tconfig.TChannel = &server.TChannelTransport{Host: self, Port: 8087} case "tchannel": - outbound = tch.NewOutbound(ch).WithHostPort(fmt.Sprintf("%s:8082", subject)) + outbound = tchannelTransport.NewSingleOutbound(fmt.Sprintf("%s:8082", subject)) tconfig.HTTP = &server.HTTPTransport{Host: self, Port: 8086} default: fatals.Fail("", "unknown transport %q", trans) @@ -339,7 +336,7 @@ func buildDispatcher(t crossdock.T) (dispatcher yarpc.Dispatcher, tconfig server dispatcher = yarpc.NewDispatcher(yarpc.Config{ Name: "ctxclient", Inbounds: yarpc.Inbounds{ - tch.NewInbound(ch).WithListenAddr(":8087"), + tchannelTransport.NewInbound(), httpTransport.NewInbound(":8086"), }, Outbounds: yarpc.Outbounds{ diff --git a/internal/crossdock/client/dispatcher/dispatcher.go b/internal/crossdock/client/dispatcher/dispatcher.go index 8bc096b279..22b52ee2ea 100644 --- a/internal/crossdock/client/dispatcher/dispatcher.go +++ b/internal/crossdock/client/dispatcher/dispatcher.go @@ -28,10 +28,9 @@ import ( "go.uber.org/yarpc/internal/crossdock/client/params" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/http" - tch "go.uber.org/yarpc/transport/tchannel" + "go.uber.org/yarpc/transport/tchannel" "github.com/crossdock/crossdock-go" - "github.com/uber/tchannel-go" ) // Create creates an RPC from the given parameters or fails the whole behavior. @@ -41,17 +40,15 @@ func Create(t crossdock.T) yarpc.Dispatcher { server := t.Param(params.Server) fatals.NotEmpty(server, "server is required") - httpTransport := http.NewTransport() - var unaryOutbound transport.UnaryOutbound trans := t.Param(params.Transport) switch trans { case "http": + httpTransport := http.NewTransport() unaryOutbound = httpTransport.NewSingleOutbound(fmt.Sprintf("http://%s:8081", server)) case "tchannel": - ch, err := tchannel.NewChannel("client", nil) - fatals.NoError(err, "couldn't create tchannel") - unaryOutbound = tch.NewOutbound(ch).WithHostPort(server + ":8082") + tchannelTransport := tchannel.NewChannelTransport(tchannel.WithServiceName("client")) + unaryOutbound = tchannelTransport.NewSingleOutbound(server + ":8082") default: fatals.Fail("", "unknown transport %q", trans) } diff --git a/internal/crossdock/client/tchserver/behavior.go b/internal/crossdock/client/tchserver/behavior.go index 09dfdc65be..bc649db173 100644 --- a/internal/crossdock/client/tchserver/behavior.go +++ b/internal/crossdock/client/tchserver/behavior.go @@ -25,10 +25,9 @@ import ( "go.uber.org/yarpc" "go.uber.org/yarpc/internal/crossdock/client/params" - tch "go.uber.org/yarpc/transport/tchannel" + "go.uber.org/yarpc/transport/tchannel" "github.com/crossdock/crossdock-go" - "github.com/uber/tchannel-go" ) const ( @@ -44,14 +43,12 @@ func Run(t crossdock.T) { server := t.Param(params.Server) serverHostPort := fmt.Sprintf("%v:%v", server, serverPort) - ch, err := tchannel.NewChannel("yarpc-client", nil) - fatals.NoError(err, "could not create channel") - + tchannelTransport := tchannel.NewChannelTransport(tchannel.WithServiceName("yarpc-client")) dispatcher := yarpc.NewDispatcher(yarpc.Config{ Name: "yarpc-client", Outbounds: yarpc.Outbounds{ serverName: { - Unary: tch.NewOutbound(ch).WithHostPort(serverHostPort), + Unary: tchannelTransport.NewSingleOutbound(serverHostPort), }, }, }) diff --git a/internal/crossdock/server/yarpc/phone.go b/internal/crossdock/server/yarpc/phone.go index 12bc300964..5097a67d0a 100644 --- a/internal/crossdock/server/yarpc/phone.go +++ b/internal/crossdock/server/yarpc/phone.go @@ -31,9 +31,7 @@ import ( "go.uber.org/yarpc/internal/clientconfig" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/http" - tch "go.uber.org/yarpc/transport/tchannel" - - "github.com/uber/tchannel-go" + "go.uber.org/yarpc/transport/tchannel" ) // HTTPTransport contains information about an HTTP transport. @@ -74,6 +72,7 @@ func Phone(ctx context.Context, reqMeta yarpc.ReqMeta, body *PhoneRequest) (*Pho var outbound transport.UnaryOutbound httpTransport := http.NewTransport() + tchannelTransport := tchannel.NewChannelTransport(tchannel.WithServiceName("yarpc-test-client")) switch { case body.Transport.HTTP != nil: @@ -82,11 +81,7 @@ func Phone(ctx context.Context, reqMeta yarpc.ReqMeta, body *PhoneRequest) (*Pho case body.Transport.TChannel != nil: t := body.Transport.TChannel hostport := fmt.Sprintf("%s:%d", t.Host, t.Port) - ch, err := tchannel.NewChannel("yarpc-test-client", nil) - if err != nil { - return nil, nil, fmt.Errorf("failed to build TChannel: %v", err) - } - outbound = tch.NewOutbound(ch).WithHostPort(hostport) + outbound = tchannelTransport.NewSingleOutbound(hostport) default: return nil, nil, fmt.Errorf("unconfigured transport") } diff --git a/internal/crossdock/server/yarpc/server.go b/internal/crossdock/server/yarpc/server.go index 6005575c1f..69552cab1c 100644 --- a/internal/crossdock/server/yarpc/server.go +++ b/internal/crossdock/server/yarpc/server.go @@ -22,7 +22,6 @@ package yarpc import ( "fmt" - "log" "go.uber.org/yarpc" "go.uber.org/yarpc/encoding/json" @@ -32,26 +31,23 @@ import ( "go.uber.org/yarpc/internal/crossdock/thrift/gauntlet/yarpc/thrifttestserver" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/http" - tch "go.uber.org/yarpc/transport/tchannel" - - "github.com/uber/tchannel-go" + "go.uber.org/yarpc/transport/tchannel" ) var dispatcher yarpc.Dispatcher // Start starts the test server that clients will make requests to func Start() { - ch, err := tchannel.NewChannel("yarpc-test", nil) - if err != nil { - log.Fatalln("couldn't create tchannel: %v", err) - } - + tchannelTransport := tchannel.NewChannelTransport( + tchannel.WithListenAddr(":8082"), + tchannel.WithServiceName("yarpc-test"), + ) httpTransport := http.NewTransport() dispatcher = yarpc.NewDispatcher(yarpc.Config{ Name: "yarpc-test", Inbounds: yarpc.Inbounds{ + tchannelTransport.NewInbound(), httpTransport.NewInbound(":8081"), - tch.NewInbound(ch).WithListenAddr(":8082"), }, }) diff --git a/internal/examples/json/client/main.go b/internal/examples/json/client/main.go index 8947fb0890..62377e5f24 100644 --- a/internal/examples/json/client/main.go +++ b/internal/examples/json/client/main.go @@ -34,9 +34,7 @@ import ( "go.uber.org/yarpc/encoding/json" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/http" - tch "go.uber.org/yarpc/transport/tchannel" - - "github.com/uber/tchannel-go" + "go.uber.org/yarpc/transport/tchannel" ) type getRequest struct { @@ -91,17 +89,14 @@ func main() { flag.Parse() httpTransport := http.NewTransport() + tchannelTransport := tchannel.NewChannelTransport(tchannel.WithServiceName("keyvalue-client")) var outbound transport.UnaryOutbound switch strings.ToLower(outboundName) { case "http": outbound = httpTransport.NewSingleOutbound("http://127.0.0.1:24034") case "tchannel": - channel, err := tchannel.NewChannel("keyvalue-client", nil) - if err != nil { - log.Fatalln(err) - } - outbound = tch.NewOutbound(channel).WithHostPort("localhost:28941") + outbound = tchannelTransport.NewSingleOutbound("localhost:28941") default: log.Fatalf("invalid outbound: %q\n", outboundName) } diff --git a/internal/examples/json/server/main.go b/internal/examples/json/server/main.go index 177ad14b1a..51ffa64550 100644 --- a/internal/examples/json/server/main.go +++ b/internal/examples/json/server/main.go @@ -23,16 +23,13 @@ package main import ( "context" "fmt" - "log" "os" "sync" "go.uber.org/yarpc" "go.uber.org/yarpc/encoding/json" "go.uber.org/yarpc/transport/http" - tch "go.uber.org/yarpc/transport/tchannel" - - "github.com/uber/tchannel-go" + "go.uber.org/yarpc/transport/tchannel" ) type getRequest struct { @@ -72,16 +69,15 @@ func (h *handler) Set(ctx context.Context, reqMeta yarpc.ReqMeta, body *setReque } func main() { - channel, err := tchannel.NewChannel("keyvalue", nil) - if err != nil { - log.Fatalln(err) - } - + tchannelTransport := tchannel.NewChannelTransport( + tchannel.WithServiceName("keyvalue"), + tchannel.WithListenAddr(":28941"), + ) httpTransport := http.NewTransport() dispatcher := yarpc.NewDispatcher(yarpc.Config{ Name: "keyvalue", Inbounds: yarpc.Inbounds{ - tch.NewInbound(channel).WithListenAddr(":28941"), + tchannelTransport.NewInbound(), httpTransport.NewInbound(":24034"), }, InboundMiddleware: yarpc.InboundMiddleware{ diff --git a/internal/examples/thrift/keyvalue/client/main.go b/internal/examples/thrift/keyvalue/client/main.go index af9b628956..5f25beb3cd 100644 --- a/internal/examples/thrift/keyvalue/client/main.go +++ b/internal/examples/thrift/keyvalue/client/main.go @@ -34,9 +34,7 @@ import ( "go.uber.org/yarpc/internal/examples/thrift/keyvalue/kv/yarpc/keyvalueclient" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/http" - tch "go.uber.org/yarpc/transport/tchannel" - - "github.com/uber/tchannel-go" + "go.uber.org/yarpc/transport/tchannel" ) func main() { @@ -49,17 +47,14 @@ func main() { flag.Parse() httpTransport := http.NewTransport() + tchannelTransport := tchannel.NewChannelTransport(tchannel.WithServiceName("keyvalue-client")) var outbound transport.UnaryOutbound switch strings.ToLower(outboundName) { case "http": outbound = httpTransport.NewSingleOutbound("http://127.0.0.1:24034") case "tchannel": - channel, err := tchannel.NewChannel("keyvalue-client", nil) - if err != nil { - log.Fatalln(err) - } - outbound = tch.NewOutbound(channel).WithHostPort("localhost:28941") + outbound = tchannelTransport.NewSingleOutbound("localhost:28941") default: log.Fatalf("invalid outbound: %q\n", outboundName) } diff --git a/internal/examples/thrift/keyvalue/server/main.go b/internal/examples/thrift/keyvalue/server/main.go index 0f1ec12c9d..f286b5db94 100644 --- a/internal/examples/thrift/keyvalue/server/main.go +++ b/internal/examples/thrift/keyvalue/server/main.go @@ -23,16 +23,13 @@ package main import ( "context" "fmt" - "log" "sync" "go.uber.org/yarpc" "go.uber.org/yarpc/internal/examples/thrift/keyvalue/kv" "go.uber.org/yarpc/internal/examples/thrift/keyvalue/kv/yarpc/keyvalueserver" "go.uber.org/yarpc/transport/http" - tch "go.uber.org/yarpc/transport/tchannel" - - "github.com/uber/tchannel-go" + "go.uber.org/yarpc/transport/tchannel" ) type handler struct { @@ -60,16 +57,15 @@ func (h *handler) SetValue(ctx context.Context, reqMeta yarpc.ReqMeta, key *stri } func main() { - channel, err := tchannel.NewChannel("keyvalue", nil) - if err != nil { - log.Fatalln(err) - } - + tchannelTransport := tchannel.NewChannelTransport( + tchannel.WithServiceName("keyvalue"), + tchannel.WithListenAddr(":28941"), + ) httpTransport := http.NewTransport() dispatcher := yarpc.NewDispatcher(yarpc.Config{ Name: "keyvalue", Inbounds: yarpc.Inbounds{ - tch.NewInbound(channel).WithListenAddr(":28941"), + tchannelTransport.NewInbound(), httpTransport.NewInbound(":24034"), }, }) diff --git a/transport/roundtrip_test.go b/transport/roundtrip_test.go index 3c0e43c9de..a1726edccc 100644 --- a/transport/roundtrip_test.go +++ b/transport/roundtrip_test.go @@ -131,16 +131,20 @@ func (tt tchannelTransport) WithRegistry(r transport.Registry, f func(transport. serverOpts := testutils.NewOpts().SetServiceName(testService) clientOpts := testutils.NewOpts().SetServiceName(testCaller) testutils.WithServer(tt.t, serverOpts, func(ch *tchannel.Channel, hostPort string) { - i := tch.NewInbound(ch) + ix := tch.NewChannelTransport(tch.WithChannel(ch)) + i := ix.NewInbound() i.SetRegistry(r) - require.NoError(tt.t, i.Start(), "failed to start") + require.NoError(tt.t, ix.Start(), "failed to start inbound transport") + require.NoError(tt.t, i.Start(), "failed to start inbound") defer i.Stop() // ^ the server is already listening so this will just set up the // handler. client := testutils.NewClient(tt.t, clientOpts) - o := tch.NewOutbound(client).WithHostPort(hostPort) + ox := tch.NewChannelTransport(tch.WithChannel(client)) + o := ox.NewSingleOutbound(hostPort) + require.NoError(tt.t, ox.Start(), "failed to start outbound transport") require.NoError(tt.t, o.Start(), "failed to start outbound") defer o.Stop() diff --git a/transport/tchannel/channel_inbound.go b/transport/tchannel/channel_inbound.go new file mode 100644 index 0000000000..2d6e3ba6db --- /dev/null +++ b/transport/tchannel/channel_inbound.go @@ -0,0 +1,88 @@ +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package tchannel + +import ( + "github.com/opentracing/opentracing-go" + "go.uber.org/yarpc/internal/errors" + "go.uber.org/yarpc/transport" +) + +type inboundConfig struct { +} + +// ChannelInbound is a TChannel Inbound backed by a pre-existing TChannel +// Channel. +type ChannelInbound struct { + ch Channel + addr string + registry transport.Registry + tracer opentracing.Tracer + transport *ChannelTransport +} + +// NewInbound returns a new TChannel inbound backed by a shared TChannel +// transport. The returned ChannelInbound does not support peer.Chooser +// and uses TChannel's own internal load balancing peer selection. +func (t *ChannelTransport) NewInbound() *ChannelInbound { + return &ChannelInbound{ + ch: t.ch, + tracer: t.tracer, + transport: t, + } +} + +// SetRegistry configures a registry to handle incoming requests. +// This satisfies the transport.Inbound interface, and would be called +// by a dispatcher when it starts. +func (i *ChannelInbound) SetRegistry(registry transport.Registry) { + i.registry = registry +} + +// Transports returns a slice containing the ChannelInbound's underlying +// ChannelTransport. +func (i *ChannelInbound) Transports() []transport.Transport { + return []transport.Transport{i.transport} +} + +// Channel returns the underlying Channel for this Inbound. +func (i *ChannelInbound) Channel() Channel { + return i.ch +} + +func (i *ChannelInbound) Start() error { + if i.registry == nil { + return errors.NoRegistryError{} + } + + // Set up handlers. This must occur after construction because the + // dispatcher, or its equivalent, calls SetRegistry before Start. + // This also means that starting inbounds should block starting the transport. + sc := i.ch.GetSubChannel(i.ch.ServiceName()) + existing := sc.GetHandlers() + sc.SetHandler(handler{existing: existing, Registry: i.registry, tracer: i.tracer}) + + return nil +} + +func (i *ChannelInbound) Stop() error { + return nil +} diff --git a/transport/tchannel/outbound.go b/transport/tchannel/channel_outbound.go similarity index 58% rename from transport/tchannel/outbound.go rename to transport/tchannel/channel_outbound.go index b1422d7bb5..aa834abe77 100644 --- a/transport/tchannel/outbound.go +++ b/transport/tchannel/channel_outbound.go @@ -1,80 +1,55 @@ -// Copyright (c) 2016 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - package tchannel import ( "context" "io" + "github.com/uber/tchannel-go" + "go.uber.org/atomic" "go.uber.org/yarpc/internal/encoding" "go.uber.org/yarpc/internal/errors" "go.uber.org/yarpc/transport" - - "github.com/opentracing/opentracing-go" - "github.com/uber/tchannel-go" - "go.uber.org/atomic" ) -var errOutboundNotStarted = errors.ErrOutboundNotStarted("tchannel.Outbound") - -// NewOutbound builds a new TChannel outbound which uses the given Channel to -// make requests. -func NewOutbound(ch Channel) *Outbound { - return &Outbound{channel: ch, started: atomic.NewBool(false)} +// NewOutbound builds a new TChannel outbound using the transport's shared +// channel to make requests to any connected peer. +func (t *ChannelTransport) NewOutbound() *ChannelOutbound { + return &ChannelOutbound{ + started: atomic.NewBool(false), + channel: t.ch, + transport: t, + } } -// Outbound is a TChannel outbound transport. -type Outbound struct { - started *atomic.Bool - channel Channel - - // If specified, this is the address to which the request will be made. - // Otherwise, the global peer list of the Channel will be used. - HostPort string +// NewSingleOutbound builds a new TChannel outbound using the transport's shared +// channel to a specific peer. +func (t *ChannelTransport) NewSingleOutbound(addr string) *ChannelOutbound { + return &ChannelOutbound{ + started: atomic.NewBool(false), + channel: t.ch, + transport: t, + addr: addr, + } } -// WithHostPort specifies that the requests made by this outbound should be to -// the given address. -// -// By default, if HostPort was not specified, the Outbound will use the -// TChannel global peer list. -func (o *Outbound) WithHostPort(hostPort string) *Outbound { - o.HostPort = hostPort - return o -} +// ChannelOutbound is an outbound transport using a shared TChannel. +type ChannelOutbound struct { + started *atomic.Bool + channel Channel + transport *ChannelTransport -// WithTracer configures a tracer for this outbound. -func (o *Outbound) WithTracer(tracer opentracing.Tracer) *Outbound { - // At this time, we delegate tracing responsibility to the underlying TChannel. - return o + // If specified, this is the address to which requests will be made. + // Otherwise, the global peer list of the Channel will be used. + addr string } // Transports returns the underlying TChannel Transport for this outbound. -func (o *Outbound) Transports() []transport.Transport { - // TODO factor out transport and return it here. - return []transport.Transport{} +func (o *ChannelOutbound) Transports() []transport.Transport { + return []transport.Transport{o.transport} } // Start starts the TChannel outbound. -func (o *Outbound) Start() error { +func (o *ChannelOutbound) Start() error { // TODO: Should we create the connection to HostPort (if specified) here or // wait for the first call? o.started.Swap(true) @@ -82,7 +57,7 @@ func (o *Outbound) Start() error { } // Stop stops the TChannel outbound. -func (o *Outbound) Stop() error { +func (o *ChannelOutbound) Stop() error { if o.started.Swap(false) { o.channel.Close() } @@ -90,7 +65,7 @@ func (o *Outbound) Stop() error { } // Call sends an RPC over this TChannel outbound. -func (o *Outbound) Call(ctx context.Context, req *transport.Request) (*transport.Response, error) { +func (o *ChannelOutbound) Call(ctx context.Context, req *transport.Request) (*transport.Response, error) { if !o.started.Load() { // panic because there's no recovery from this panic(errOutboundNotStarted) @@ -109,7 +84,7 @@ func (o *Outbound) Call(ctx context.Context, req *transport.Request) (*transport RoutingKey: req.RoutingKey, RoutingDelegate: req.RoutingDelegate, } - if o.HostPort != "" { + if o.addr != "" { // If the hostport is given, we use the BeginCall on the channel // instead of the subchannel. call, err = o.channel.BeginCall( @@ -118,7 +93,7 @@ func (o *Outbound) Call(ctx context.Context, req *transport.Request) (*transport // (kris): Consider instead moving TimeoutPerAttempt to an outer // layer, just clamp the context on outbound call. ctx, - o.HostPort, + o.addr, req.Service, req.Procedure, &callOptions, diff --git a/transport/tchannel/channel_transport.go b/transport/tchannel/channel_transport.go new file mode 100644 index 0000000000..1936da0f73 --- /dev/null +++ b/transport/tchannel/channel_transport.go @@ -0,0 +1,138 @@ +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package tchannel + +import ( + "fmt" + + "github.com/opentracing/opentracing-go" + "github.com/uber/tchannel-go" +) + +// NewChannelTransport creates a ChannelTransport, suitable for creating inbounds +// and outbounds with an existing, shared channel. +// +// The ChannelTransport uses the underlying TChannel load balancing and peer +// management, so it is not suitable for use with a peer.Chooser. +// A future version of YARPC will add a NewTransport constructor that returns +// a transport suitable for custom peer selection. +func NewChannelTransport(opts ...TransportOption) *ChannelTransport { + var config transportConfig + config.tracer = opentracing.GlobalTracer() + for _, opt := range opts { + opt(&config) + } + + // Attempt to construct a channel on behalf of the caller if none given. + // Defer the error until Start since NewChannelTransport does not have + // an error return. + var err error + ch := config.ch + if config.ch == nil { + if config.name == "" { + err = fmt.Errorf("can't instantiate TChannelChannelTransport without channel or service name option") + } else { + ch, err = tchannel.NewChannel(config.name, &tchannel.ChannelOptions{ + Tracer: config.tracer, + }) + } + } + + return &ChannelTransport{ + ch: ch, + err: err, + addr: config.addr, + tracer: config.tracer, + } +} + +// ChannelTransport maintains TChannel peers and creates inbounds and outbounds for +// TChannel. +// +// In a future version, the channel will be suitable for managing peers in a +// peer.List or other peer.Chooser. +type ChannelTransport struct { + ch Channel + name string + err error + addr string + tracer opentracing.Tracer +} + +// Channel returns the underlying TChannel "Channel" instance. +func (t *ChannelTransport) Channel() Channel { + return t.ch +} + +// ListenAddr exposes the listen address of the transport. +func (t *ChannelTransport) ListenAddr() string { + return t.addr +} + +// Start starts a TChannel transport, opening listening sockets and accepting +// inbound requests, and opening connections to retained peers. +// +// All inbounds must have been assigned a registry to accept inbound requests. +func (t *ChannelTransport) Start() error { + + // Return error deferred from constructor for the construction of a TChannel. + if t.err != nil { + return t.err + } + + if t.ch.State() == tchannel.ChannelListening { + // Channel.Start() was called before RPC.Start(). We still want to + // update the Handler and what t.addr means, but nothing else. + t.addr = t.ch.PeerInfo().HostPort + return nil + } + + // Default to ListenIP if addr wasn't given. + addr := t.addr + if addr == "" { + listenIP, err := tchannel.ListenIP() + if err != nil { + return err + } + + addr = listenIP.String() + ":0" + // TODO(abg): Find a way to export this to users + } + + // TODO(abg): If addr was just the port (":4040"), we want to use + // ListenIP() + ":4040" rather than just ":4040". + + if err := t.ch.ListenAndServe(addr); err != nil { + return err + } + + t.addr = t.ch.PeerInfo().HostPort + return nil +} + +// Stop stops a TChannel transport, closing listening sockets, rejecting +// inbound requests, draining pending requests, and closing all connections. +// +// Stop blocks until the program can gracefully exit. +func (t *ChannelTransport) Stop() error { + t.ch.Close() + return nil +} diff --git a/transport/tchannel/errors.go b/transport/tchannel/errors.go new file mode 100644 index 0000000000..ce2e438b59 --- /dev/null +++ b/transport/tchannel/errors.go @@ -0,0 +1,5 @@ +package tchannel + +import "go.uber.org/yarpc/internal/errors" + +var errOutboundNotStarted = errors.ErrOutboundNotStarted("tchannel.Outbound") diff --git a/transport/tchannel/inbound.go b/transport/tchannel/inbound.go deleted file mode 100644 index 310c8c80f4..0000000000 --- a/transport/tchannel/inbound.go +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright (c) 2016 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package tchannel - -import ( - "net" - - "go.uber.org/yarpc/internal/errors" - "go.uber.org/yarpc/transport" - - "github.com/opentracing/opentracing-go" - "github.com/uber/tchannel-go" -) - -// NewInbound builds a new TChannel inbound from the given Channel. Existing -// methods registered on the channel remain registered and are preferred when -// a call is received. -func NewInbound(ch Channel) *Inbound { - return &Inbound{ - ch: ch, - tracer: opentracing.GlobalTracer(), - } -} - -// Inbound is a TChannel Inbound. -type Inbound struct { - ch Channel - addr string - listener net.Listener - registry transport.Registry - tracer opentracing.Tracer -} - -// WithListenAddr changes the address on which the TChannel server will listen -// for connections. By default, the server listens on an OS-assigned port. -// -// This option has no effect if the Chanel provided to NewInbound is already -// listening for connections when Start() is called. -func (i *Inbound) WithListenAddr(addr string) *Inbound { - i.addr = addr - return i -} - -// WithTracer configures a tracer on the TChannel inbound. -func (i *Inbound) WithTracer(tracer opentracing.Tracer) *Inbound { - i.tracer = tracer - return i -} - -// SetRegistry configures a registry to handle incoming requests. -// This satisfies the transport.Inbound interface, and would be called -// by a dispatcher when it starts. -func (i *Inbound) SetRegistry(registry transport.Registry) { - i.registry = registry -} - -// Channel returns the underlying Channel for this Inbound. -func (i *Inbound) Channel() Channel { - return i.ch -} - -// Transports returns the underlying Transport for this Inbound. -func (i *Inbound) Transports() []transport.Transport { - // TODO factor out transport and return it here. - return []transport.Transport{} -} - -// Start starts the TChannel inbound transport. This immediately opens a listen -// socket. -func (i *Inbound) Start() error { - if i.registry == nil { - return errors.NoRegistryError{} - } - - sc := i.ch.GetSubChannel(i.ch.ServiceName()) - existing := sc.GetHandlers() - sc.SetHandler(handler{existing: existing, Registry: i.registry, tracer: i.tracer}) - - if i.ch.State() == tchannel.ChannelListening { - // Channel.Start() was called before RPC.Start(). We still want to - // update the Handler and what i.addr means, but nothing else. - i.addr = i.ch.PeerInfo().HostPort - return nil - } - - // Default to ListenIP if addr wasn't given. - addr := i.addr - if addr == "" { - listenIP, err := tchannel.ListenIP() - if err != nil { - return err - } - - addr = listenIP.String() + ":0" - // TODO(abg): Find a way to export this to users - } - - // TODO(abg): If addr was just the port (":4040"), we want to use - // ListenIP() + ":4040" rather than just ":4040". - - if err := i.ch.ListenAndServe(addr); err != nil { - return err - } - - i.addr = i.ch.PeerInfo().HostPort - return nil -} - -// Stop stops the TChannel inbound transport. This immediately stops listening -// for incoming connections. Existing connections begin to drain. -// New inbound requests are rejected. When there are no further pending -// requests, TChannel closes the connection. -func (i *Inbound) Stop() error { - i.ch.Close() - return nil -} diff --git a/transport/tchannel/inbound_test.go b/transport/tchannel/inbound_test.go index d7d7a9e032..17232fdf21 100644 --- a/transport/tchannel/inbound_test.go +++ b/transport/tchannel/inbound_test.go @@ -34,28 +34,34 @@ import ( func TestInboundStartNew(t *testing.T) { tests := []struct { - withInbound func(*tchannel.Channel, func(*Inbound)) + withInbound func(*tchannel.Channel, func(*ChannelInbound)) }{ { - func(ch *tchannel.Channel, f func(*Inbound)) { - i := NewInbound(ch) + func(ch *tchannel.Channel, f func(*ChannelInbound)) { + x := NewChannelTransport(WithChannel(ch)) + i := x.NewInbound() i.SetRegistry(new(transporttest.MockRegistry)) // Can't do Equal because we want to match the pointer, not a // DeepEqual. assert.True(t, ch == i.Channel(), "channel does not match") require.NoError(t, i.Start()) defer i.Stop() + require.NoError(t, x.Start()) + defer x.Stop() f(i) }, }, { - func(ch *tchannel.Channel, f func(*Inbound)) { - i := NewInbound(ch).WithListenAddr(":0") + func(ch *tchannel.Channel, f func(*ChannelInbound)) { + x := NewChannelTransport(WithChannel(ch)) + i := x.NewInbound() i.SetRegistry(new(transporttest.MockRegistry)) assert.True(t, ch == i.Channel(), "channel does not match") require.NoError(t, i.Start()) defer i.Stop() + require.NoError(t, x.Start()) + defer x.Stop() f(i) }, @@ -65,9 +71,11 @@ func TestInboundStartNew(t *testing.T) { for _, tt := range tests { ch, err := tchannel.NewChannel("foo", nil) require.NoError(t, err) - tt.withInbound(ch, func(i *Inbound) { + tt.withInbound(ch, func(i *ChannelInbound) { assert.Equal(t, tchannel.ChannelListening, ch.State()) assert.NoError(t, i.Stop()) + x := i.Transports()[0] + assert.NoError(t, x.Stop()) assert.Equal(t, tchannel.ChannelClosed, ch.State()) }) } @@ -80,13 +88,15 @@ func TestInboundStartAlreadyListening(t *testing.T) { require.NoError(t, ch.ListenAndServe(":0")) assert.Equal(t, tchannel.ChannelListening, ch.State()) - i := NewInbound(ch) + x := NewChannelTransport(WithChannel(ch)) + i := x.NewInbound() i.SetRegistry(new(transporttest.MockRegistry)) require.NoError(t, i.Start()) - defer i.Stop() + require.NoError(t, x.Start()) assert.NoError(t, i.Stop()) + assert.NoError(t, x.Stop()) assert.Equal(t, tchannel.ChannelClosed, ch.State()) } @@ -94,16 +104,19 @@ func TestInboundStopWithoutStarting(t *testing.T) { ch, err := tchannel.NewChannel("foo", nil) require.NoError(t, err) - i := NewInbound(ch) + x := NewChannelTransport(WithChannel(ch)) + i := x.NewInbound() assert.NoError(t, i.Stop()) } func TestInboundInvalidAddress(t *testing.T) { - ch, err := tchannel.NewChannel("foo", nil) - require.NoError(t, err) - i := NewInbound(ch).WithListenAddr("not valid") + x := NewChannelTransport(WithServiceName("foo"), WithListenAddr("not valid")) + i := x.NewInbound() i.SetRegistry(new(transporttest.MockRegistry)) - assert.Error(t, i.Start()) + assert.Nil(t, i.Start()) + defer i.Stop() + assert.Error(t, x.Start()) + defer x.Stop() } func TestInboundExistingMethods(t *testing.T) { @@ -116,10 +129,13 @@ func TestInboundExistingMethods(t *testing.T) { }, }, nil) - i := NewInbound(ch) + x := NewChannelTransport(WithChannel(ch)) + i := x.NewInbound() i.SetRegistry(new(transporttest.MockRegistry)) require.NoError(t, i.Start()) defer i.Stop() + require.NoError(t, x.Start()) + defer x.Stop() // Make a call to the "echo" method which should call our pre-registered method. ctx, cancel := json.NewContext(time.Second) diff --git a/transport/tchannel/options.go b/transport/tchannel/options.go new file mode 100644 index 0000000000..9b8ca44d5a --- /dev/null +++ b/transport/tchannel/options.go @@ -0,0 +1,79 @@ +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package tchannel + +import "github.com/opentracing/opentracing-go" + +// transportConfig is suitable for conveying options to TChannel transport +// constructors. +// At time of writing, there is only a ChannelTransport constructor, which +// supports options like WithChannel that only apply to this constructor form. +// The transportConfig should also be suitable, albeit with extraneous properties, +// if used for NewTransport, which will return a Transport suitable for YARPC +// peer lists. +// TODO update above when NewTransport is real. +type transportConfig struct { + ch Channel + tracer opentracing.Tracer + addr string + name string +} + +// TransportOption is for variadic arguments to NewChannelTransport. +// +// TransportOption will eventually also be suitable for passing to NewTransport. +type TransportOption func(*transportConfig) + +// WithTracer is an option that configures the tracer for a TChannel transport. +func WithTracer(tracer opentracing.Tracer) TransportOption { + return func(t *transportConfig) { + t.tracer = tracer + } +} + +// WithChannel informs NewChannelTransport that it should reuse an existing +// underlying TChannel Channel instance. This instance may already have +// handlers and be listening before this transport starts. Otherwise, +// The TransportChannel will listen on start, albeit with the default address +// ":0" (all interfaces, any port). +func WithChannel(ch Channel) TransportOption { + return func(t *transportConfig) { + t.ch = ch + } +} + +// WithListenAddr informs a transport constructor what address (in the form of +// host:port) to listen on. This option does not apply to NewChannelTransport +// if it is called with WithChannel and a channel that is already listening. +func WithListenAddr(addr string) TransportOption { + return func(t *transportConfig) { + t.addr = addr + } +} + +// WithServiceName informs the NewChannelTransport constructor which service +// name to use if it needs to construct a root Channel object, as when called +// without the WithChannel option. +func WithServiceName(name string) TransportOption { + return func(t *transportConfig) { + t.name = name + } +} diff --git a/transport/tchannel/outbound_test.go b/transport/tchannel/outbound_test.go index ca26467b8f..2e9ab292cf 100644 --- a/transport/tchannel/outbound_test.go +++ b/transport/tchannel/outbound_test.go @@ -41,11 +41,13 @@ import ( // and a hostPort var newOutbounds = []func(*tchannel.Channel, string) transport.UnaryOutbound{ func(ch *tchannel.Channel, hostPort string) transport.UnaryOutbound { + x := NewChannelTransport(WithChannel(ch)) ch.Peers().Add(hostPort) - return NewOutbound(ch) + return x.NewOutbound() }, func(ch *tchannel.Channel, hostPort string) transport.UnaryOutbound { - return NewOutbound(ch).WithHostPort(hostPort) + x := NewChannelTransport(WithChannel(ch)) + return x.NewSingleOutbound(hostPort) }, } diff --git a/transport/tracer_test.go b/transport/tracer_test.go index 442115ac72..2a4c32ecc4 100644 --- a/transport/tracer_test.go +++ b/transport/tracer_test.go @@ -34,7 +34,6 @@ import ( "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/mocktracer" - "github.com/uber/tchannel-go" ) type echoReqBody struct{} @@ -132,22 +131,22 @@ func createHTTPDispatcher(tracer opentracing.Tracer) yarpc.Dispatcher { } func createTChannelDispatcher(tracer opentracing.Tracer, t *testing.T) yarpc.Dispatcher { - // Establish the TChannel - ch, err := tchannel.NewChannel("yarpc-test", &tchannel.ChannelOptions{ - Tracer: tracer, - }) - assert.NoError(t, err) hp := "127.0.0.1:4040" - ch.ListenAndServe(hp) + + tchannelTransport := ytchannel.NewChannelTransport( + ytchannel.WithListenAddr(hp), + ytchannel.WithTracer(tracer), + ytchannel.WithServiceName("yarpc-test"), + ) dispatcher := yarpc.NewDispatcher(yarpc.Config{ Name: "yarpc-test", Inbounds: yarpc.Inbounds{ - ytchannel.NewInbound(ch).WithTracer(tracer), + tchannelTransport.NewInbound(), }, Outbounds: yarpc.Outbounds{ "yarpc-test": { - Unary: ytchannel.NewOutbound(ch).WithHostPort(hp), + Unary: tchannelTransport.NewSingleOutbound(hp), }, }, Tracer: tracer,