From 95f6fb0090075e7a699fd1cd6c9f46d6ceba8f3c Mon Sep 17 00:00:00 2001 From: Kris Kowal Date: Tue, 6 Dec 2016 15:19:24 -0800 Subject: [PATCH] Pivot HTTP constructors to use a shared Transport #540 --- bench_test.go | 17 ++++------- dispatcher_test.go | 3 +- .../crossdock/client/apachethrift/behavior.go | 10 +++---- .../client/ctxpropagation/behavior.go | 16 ++-------- .../crossdock/client/dispatcher/dispatcher.go | 23 ++++----------- .../crossdock/client/httpserver/behavior.go | 13 ++------- internal/crossdock/server/oneway/echo.go | 8 +---- internal/crossdock/server/oneway/server.go | 3 +- internal/crossdock/server/yarpc/phone.go | 11 ++----- internal/crossdock/server/yarpc/server.go | 3 +- internal/examples/json/client/main.go | 14 ++------- internal/examples/json/server/main.go | 3 +- internal/examples/thrift/hello/main.go | 13 ++------- .../examples/thrift/keyvalue/client/main.go | 14 ++------- .../examples/thrift/keyvalue/server/main.go | 3 +- transport/http/handler_test.go | 15 ++-------- transport/http/inbound.go | 7 +++-- transport/http/inbound_test.go | 29 +++++++++---------- transport/http/outbound.go | 24 +++++++++++---- transport/http/outbound_test.go | 24 ++++----------- transport/http/transport.go | 15 ++++++++++ transport/roundtrip_test.go | 27 +++++------------ transport/tracer_test.go | 15 ++-------- yarpctest/recorder/recorder_test.go | 25 ++++------------ 24 files changed, 116 insertions(+), 219 deletions(-) diff --git a/bench_test.go b/bench_test.go index 0abf53f86..4fbfa4c9b 100644 --- a/bench_test.go +++ b/bench_test.go @@ -12,8 +12,6 @@ import ( "go.uber.org/yarpc" "go.uber.org/yarpc/encoding/raw" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" yhttp "go.uber.org/yarpc/transport/http" ytchannel "go.uber.org/yarpc/transport/tchannel" @@ -123,19 +121,17 @@ func runTChannelClient(b *testing.B, c *tchannel.Channel, hostPort string) { } func Benchmark_HTTP_YARPCToYARPC(b *testing.B) { + httpTransport := yhttp.NewTransport() serverCfg := yarpc.Config{ Name: "server", - Inbounds: yarpc.Inbounds{yhttp.NewInbound(":8999")}, + Inbounds: yarpc.Inbounds{httpTransport.NewInbound(":8999")}, } - httpTransport := yhttp.NewTransport() clientCfg := yarpc.Config{ Name: "client", Outbounds: yarpc.Outbounds{ "server": { - Unary: yhttp.NewOutbound( - single.New(hostport.PeerIdentifier("http://localhost:8999"), httpTransport), - ), + Unary: httpTransport.NewSingleOutbound("http://localhost:8999"), }, }, } @@ -155,9 +151,7 @@ func Benchmark_HTTP_YARPCToNetHTTP(b *testing.B) { Name: "client", Outbounds: yarpc.Outbounds{ "server": { - Unary: yhttp.NewOutbound( - single.New(hostport.PeerIdentifier("http://localhost:8998"), httpTransport), - ), + Unary: httpTransport.NewSingleOutbound("http://localhost:8998"), }, }, } @@ -171,9 +165,10 @@ func Benchmark_HTTP_YARPCToNetHTTP(b *testing.B) { } func Benchmark_HTTP_NetHTTPToYARPC(b *testing.B) { + httpTransport := yhttp.NewTransport() serverCfg := yarpc.Config{ Name: "server", - Inbounds: yarpc.Inbounds{yhttp.NewInbound(":8996")}, + Inbounds: yarpc.Inbounds{httpTransport.NewInbound(":8996")}, } withDispatcher(b, serverCfg, func(server yarpc.Dispatcher) { diff --git a/dispatcher_test.go b/dispatcher_test.go index bbc35ac0a..e0247ff7d 100644 --- a/dispatcher_test.go +++ b/dispatcher_test.go @@ -41,11 +41,12 @@ func basicDispatcher(t *testing.T) Dispatcher { ch, err := tchannel.NewChannel("test", nil) require.NoError(t, err, "failed to create TChannel") + httpTransport := http.NewTransport() return NewDispatcher(Config{ Name: "test", Inbounds: Inbounds{ tch.NewInbound(ch).WithListenAddr(":0"), - http.NewInbound(":0"), + httpTransport.NewInbound(":0"), }, }) } diff --git a/internal/crossdock/client/apachethrift/behavior.go b/internal/crossdock/client/apachethrift/behavior.go index dda0a2c9c..76b653800 100644 --- a/internal/crossdock/client/apachethrift/behavior.go +++ b/internal/crossdock/client/apachethrift/behavior.go @@ -26,8 +26,6 @@ import ( "go.uber.org/yarpc" "go.uber.org/yarpc/encoding/thrift" "go.uber.org/yarpc/internal/crossdock/client/gauntlet" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport/http" "github.com/crossdock/crossdock-go" @@ -46,13 +44,13 @@ func Run(t crossdock.T) { fatals.NotEmpty(server, "apachethriftserver is required") httpTransport := http.NewTransport() - hostPort := hostport.PeerIdentifier(fmt.Sprintf("%v:%v", server, serverPort)) + url := fmt.Sprintf("http://%v:%v/", server, serverPort) - thriftOutbound := http.NewOutbound(single.New(hostPort, httpTransport)). + thriftOutbound := httpTransport.NewSingleOutbound(url). WithURLTemplate("http://host:port/thrift/ThriftTest") - secondOutbound := http.NewOutbound(single.New(hostPort, httpTransport)). + secondOutbound := httpTransport.NewSingleOutbound(url). WithURLTemplate("http://host:port/thrift/SecondService") - multiplexOutbound := http.NewOutbound(single.New(hostPort, httpTransport)). + multiplexOutbound := httpTransport.NewSingleOutbound(url). WithURLTemplate("http://host:port/thrift/multiplexed") dispatcher := yarpc.NewDispatcher(yarpc.Config{ diff --git a/internal/crossdock/client/ctxpropagation/behavior.go b/internal/crossdock/client/ctxpropagation/behavior.go index 4da7af9c7..0ae0b51cd 100644 --- a/internal/crossdock/client/ctxpropagation/behavior.go +++ b/internal/crossdock/client/ctxpropagation/behavior.go @@ -30,8 +30,6 @@ import ( "go.uber.org/yarpc/encoding/json" "go.uber.org/yarpc/internal/crossdock/client/params" server "go.uber.org/yarpc/internal/crossdock/server/yarpc" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/http" tch "go.uber.org/yarpc/transport/tchannel" @@ -324,20 +322,12 @@ func buildDispatcher(t crossdock.T) (dispatcher yarpc.Dispatcher, tconfig server ch, err := tchannel.NewChannel("ctxclient", nil) fatals.NoError(err, "failed to create TChannel") - var httpTransport *http.Transport + httpTransport := http.NewTransport() var outbound transport.UnaryOutbound switch trans := t.Param(params.Transport); trans { case "http": - if httpTransport == nil { - httpTransport = http.NewTransport() - } - outbound = http.NewOutbound( - single.New( - hostport.PeerIdentifier(fmt.Sprintf("%s:8081", subject)), - httpTransport, - ), - ) + outbound = httpTransport.NewSingleOutbound(fmt.Sprintf("http://%s:8081", subject)) tconfig.TChannel = &server.TChannelTransport{Host: self, Port: 8087} case "tchannel": outbound = tch.NewOutbound(ch).WithHostPort(fmt.Sprintf("%s:8082", subject)) @@ -350,7 +340,7 @@ func buildDispatcher(t crossdock.T) (dispatcher yarpc.Dispatcher, tconfig server Name: "ctxclient", Inbounds: yarpc.Inbounds{ tch.NewInbound(ch).WithListenAddr(":8087"), - http.NewInbound(":8086"), + httpTransport.NewInbound(":8086"), }, Outbounds: yarpc.Outbounds{ "yarpc-test": { diff --git a/internal/crossdock/client/dispatcher/dispatcher.go b/internal/crossdock/client/dispatcher/dispatcher.go index c4c7c2d45..8bc096b27 100644 --- a/internal/crossdock/client/dispatcher/dispatcher.go +++ b/internal/crossdock/client/dispatcher/dispatcher.go @@ -26,8 +26,6 @@ import ( "go.uber.org/yarpc" "go.uber.org/yarpc/encoding/raw" "go.uber.org/yarpc/internal/crossdock/client/params" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/http" tch "go.uber.org/yarpc/transport/tchannel" @@ -43,21 +41,13 @@ func Create(t crossdock.T) yarpc.Dispatcher { server := t.Param(params.Server) fatals.NotEmpty(server, "server is required") - var httpTransport *http.Transport + httpTransport := http.NewTransport() var unaryOutbound transport.UnaryOutbound trans := t.Param(params.Transport) switch trans { case "http": - if httpTransport == nil { - httpTransport = http.NewTransport() - } - unaryOutbound = http.NewOutbound( - single.New( - hostport.PeerIdentifier(fmt.Sprintf("%s:8081", server)), - httpTransport, - ), - ) + unaryOutbound = httpTransport.NewSingleOutbound(fmt.Sprintf("http://%s:8081", server)) case "tchannel": ch, err := tchannel.NewChannel("client", nil) fatals.NoError(err, "couldn't create tchannel") @@ -84,21 +74,18 @@ func CreateOnewayDispatcher(t crossdock.T, handler raw.OnewayHandler) (yarpc.Dis server := t.Param("server_oneway") fatals.NotEmpty(server, "oneway server is required") + httpTransport := http.NewTransport() var outbound transport.OnewayOutbound trans := t.Param("transport_oneway") switch trans { case "http": - outbound = http.NewOutbound( - single.New( - hostport.PeerIdentifier(fmt.Sprintf("%s:8084", server)), - http.NewTransport(), - )) + outbound = httpTransport.NewSingleOutbound(fmt.Sprintf("http://%s:8084", server)) default: fatals.Fail("", "unknown transport %q", trans) } - callBackInbound := http.NewInbound(":0") + callBackInbound := httpTransport.NewInbound(":0") dispatcher := yarpc.NewDispatcher(yarpc.Config{ Name: "oneway-client", Outbounds: yarpc.Outbounds{ diff --git a/internal/crossdock/client/httpserver/behavior.go b/internal/crossdock/client/httpserver/behavior.go index 424ba06c8..88fac9c93 100644 --- a/internal/crossdock/client/httpserver/behavior.go +++ b/internal/crossdock/client/httpserver/behavior.go @@ -26,11 +26,9 @@ import ( "strings" "time" - yarpc "go.uber.org/yarpc" + "go.uber.org/yarpc" "go.uber.org/yarpc/encoding/raw" "go.uber.org/yarpc/internal/crossdock/client/params" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/http" @@ -45,18 +43,11 @@ func Run(t crossdock.T) { fatals.NotEmpty(server, "server is required") httpTransport := http.NewTransport() - // TODO http transport lifecycle - disp := yarpc.NewDispatcher(yarpc.Config{ Name: "client", Outbounds: yarpc.Outbounds{ "yarpc-test": { - Unary: http.NewOutbound( - single.New( - hostport.PeerIdentifier(fmt.Sprintf("%s:8085", server)), - httpTransport, - ), - ), + Unary: httpTransport.NewSingleOutbound(fmt.Sprintf("http://%s:8085", server)), }, }, }) diff --git a/internal/crossdock/server/oneway/echo.go b/internal/crossdock/server/oneway/echo.go index eeebe5141..235567635 100644 --- a/internal/crossdock/server/oneway/echo.go +++ b/internal/crossdock/server/oneway/echo.go @@ -30,8 +30,6 @@ import ( "go.uber.org/yarpc/encoding/json" "go.uber.org/yarpc/encoding/raw" "go.uber.org/yarpc/encoding/thrift" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/http" ) @@ -73,11 +71,7 @@ func (o *onewayHandler) callHome(ctx context.Context, reqMeta yarpc.ReqMeta, bod panic("could not find callBackAddr in headers") } - out := http.NewOutbound( - single.New( - hostport.PeerIdentifier(callBackAddr), - o.httpTransport)) - + out := o.httpTransport.NewSingleOutbound("http://" + callBackAddr) out.Start() defer out.Stop() diff --git a/internal/crossdock/server/oneway/server.go b/internal/crossdock/server/oneway/server.go index b3fe5b291..84de2507d 100644 --- a/internal/crossdock/server/oneway/server.go +++ b/internal/crossdock/server/oneway/server.go @@ -35,7 +35,8 @@ var dispatcher yarpc.Dispatcher // Start starts the test server that clients will make requests to func Start() { - inbounds := []transport.Inbound{http.NewInbound(":8084")} + httpTransport := http.NewTransport() + inbounds := []transport.Inbound{httpTransport.NewInbound(":8084")} dispatcher = yarpc.NewDispatcher(yarpc.Config{ Name: "oneway-server", diff --git a/internal/crossdock/server/yarpc/phone.go b/internal/crossdock/server/yarpc/phone.go index 1ecb0c00f..12bc30096 100644 --- a/internal/crossdock/server/yarpc/phone.go +++ b/internal/crossdock/server/yarpc/phone.go @@ -29,8 +29,6 @@ import ( "go.uber.org/yarpc" "go.uber.org/yarpc/encoding/json" "go.uber.org/yarpc/internal/clientconfig" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/http" tch "go.uber.org/yarpc/transport/tchannel" @@ -75,15 +73,12 @@ type PhoneResponse struct { func Phone(ctx context.Context, reqMeta yarpc.ReqMeta, body *PhoneRequest) (*PhoneResponse, yarpc.ResMeta, error) { var outbound transport.UnaryOutbound + httpTransport := http.NewTransport() + switch { case body.Transport.HTTP != nil: t := body.Transport.HTTP - outbound = http.NewOutbound( - single.New( - hostport.PeerIdentifier(fmt.Sprintf("%s:%d", t.Host, t.Port)), - http.NewTransport(), // TODO transport lifecycle - ), - ) + outbound = httpTransport.NewSingleOutbound(fmt.Sprintf("http://%s:%d", t.Host, t.Port)) case body.Transport.TChannel != nil: t := body.Transport.TChannel hostport := fmt.Sprintf("%s:%d", t.Host, t.Port) diff --git a/internal/crossdock/server/yarpc/server.go b/internal/crossdock/server/yarpc/server.go index 71b56cda2..6005575c1 100644 --- a/internal/crossdock/server/yarpc/server.go +++ b/internal/crossdock/server/yarpc/server.go @@ -46,10 +46,11 @@ func Start() { log.Fatalln("couldn't create tchannel: %v", err) } + httpTransport := http.NewTransport() dispatcher = yarpc.NewDispatcher(yarpc.Config{ Name: "yarpc-test", Inbounds: yarpc.Inbounds{ - http.NewInbound(":8081"), + httpTransport.NewInbound(":8081"), tch.NewInbound(ch).WithListenAddr(":8082"), }, }) diff --git a/internal/examples/json/client/main.go b/internal/examples/json/client/main.go index de76812ba..8947fb089 100644 --- a/internal/examples/json/client/main.go +++ b/internal/examples/json/client/main.go @@ -32,8 +32,6 @@ import ( "go.uber.org/yarpc" "go.uber.org/yarpc/encoding/json" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/http" tch "go.uber.org/yarpc/transport/tchannel" @@ -92,20 +90,12 @@ func main() { flag.Parse() - var httpTransport *http.Transport + httpTransport := http.NewTransport() var outbound transport.UnaryOutbound switch strings.ToLower(outboundName) { case "http": - if httpTransport == nil { - httpTransport = http.NewTransport() - } - outbound = http.NewOutbound( - single.New( - hostport.PeerIdentifier("127.0.0.1:24034"), - httpTransport, - ), - ) + outbound = httpTransport.NewSingleOutbound("http://127.0.0.1:24034") case "tchannel": channel, err := tchannel.NewChannel("keyvalue-client", nil) if err != nil { diff --git a/internal/examples/json/server/main.go b/internal/examples/json/server/main.go index 94129826f..177ad14b1 100644 --- a/internal/examples/json/server/main.go +++ b/internal/examples/json/server/main.go @@ -77,11 +77,12 @@ func main() { log.Fatalln(err) } + httpTransport := http.NewTransport() dispatcher := yarpc.NewDispatcher(yarpc.Config{ Name: "keyvalue", Inbounds: yarpc.Inbounds{ tch.NewInbound(channel).WithListenAddr(":28941"), - http.NewInbound(":24034"), + httpTransport.NewInbound(":24034"), }, InboundMiddleware: yarpc.InboundMiddleware{ Unary: requestLogInboundMiddleware{}, diff --git a/internal/examples/thrift/hello/main.go b/internal/examples/thrift/hello/main.go index 0399113a9..2923c4a18 100644 --- a/internal/examples/thrift/hello/main.go +++ b/internal/examples/thrift/hello/main.go @@ -29,8 +29,6 @@ import ( "go.uber.org/yarpc/internal/examples/thrift/hello/echo" "go.uber.org/yarpc/internal/examples/thrift/hello/echo/yarpc/helloclient" "go.uber.org/yarpc/internal/examples/thrift/hello/echo/yarpc/helloserver" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc" "go.uber.org/yarpc/transport/http" @@ -39,22 +37,15 @@ import ( //go:generate thriftrw --plugin=yarpc echo.thrift func main() { - httpTransport := http.NewTransport() - dispatcher := yarpc.NewDispatcher(yarpc.Config{ Name: "hello", Inbounds: yarpc.Inbounds{ - http.NewInbound(":8086"), + httpTransport.NewInbound(":8086"), }, Outbounds: yarpc.Outbounds{ "hello": { - Unary: http.NewOutbound( - single.New( - hostport.PeerIdentifier("127.0.0.1:8086"), - httpTransport, - ), - ), + Unary: httpTransport.NewSingleOutbound("http://127.0.0.1:8086"), }, }, }) diff --git a/internal/examples/thrift/keyvalue/client/main.go b/internal/examples/thrift/keyvalue/client/main.go index 923138fc8..af9b62895 100644 --- a/internal/examples/thrift/keyvalue/client/main.go +++ b/internal/examples/thrift/keyvalue/client/main.go @@ -32,8 +32,6 @@ import ( "go.uber.org/yarpc" "go.uber.org/yarpc/internal/examples/thrift/keyvalue/kv/yarpc/keyvalueclient" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/http" tch "go.uber.org/yarpc/transport/tchannel" @@ -50,20 +48,12 @@ func main() { flag.Parse() - var httpTransport *http.Transport + httpTransport := http.NewTransport() var outbound transport.UnaryOutbound switch strings.ToLower(outboundName) { case "http": - if httpTransport == nil { - httpTransport = http.NewTransport() - } - outbound = http.NewOutbound( - single.New( - hostport.PeerIdentifier("127.0.0.1:24034"), - httpTransport, - ), - ) + outbound = httpTransport.NewSingleOutbound("http://127.0.0.1:24034") case "tchannel": channel, err := tchannel.NewChannel("keyvalue-client", nil) if err != nil { diff --git a/internal/examples/thrift/keyvalue/server/main.go b/internal/examples/thrift/keyvalue/server/main.go index 69af2b712..0f1ec12c9 100644 --- a/internal/examples/thrift/keyvalue/server/main.go +++ b/internal/examples/thrift/keyvalue/server/main.go @@ -65,11 +65,12 @@ func main() { log.Fatalln(err) } + httpTransport := http.NewTransport() dispatcher := yarpc.NewDispatcher(yarpc.Config{ Name: "keyvalue", Inbounds: yarpc.Inbounds{ tch.NewInbound(channel).WithListenAddr(":28941"), - http.NewInbound(":24034"), + httpTransport.NewInbound(":24034"), }, }) diff --git a/transport/http/handler_test.go b/transport/http/handler_test.go index 9b08518fb..74a7773d5 100644 --- a/transport/http/handler_test.go +++ b/transport/http/handler_test.go @@ -33,8 +33,6 @@ import ( yarpc "go.uber.org/yarpc" "go.uber.org/yarpc/encoding/raw" "go.uber.org/yarpc/internal/registrytest" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/transporttest" @@ -329,7 +327,8 @@ func (th panickedHandler) Handle(context.Context, *transport.Request, transport. } func TestHandlerPanic(t *testing.T) { - inbound := NewInbound("localhost:0") + httpTransport := NewTransport() + inbound := httpTransport.NewInbound("localhost:0") serverDispatcher := yarpc.NewDispatcher(yarpc.Config{ Name: "yarpc-test", Inbounds: yarpc.Inbounds{inbound}, @@ -344,19 +343,11 @@ func TestHandlerPanic(t *testing.T) { require.NoError(t, serverDispatcher.Start()) defer serverDispatcher.Stop() - httpTransport := NewTransport() - // TODO http transport lifecycle - clientDispatcher := yarpc.NewDispatcher(yarpc.Config{ Name: "yarpc-test-client", Outbounds: yarpc.Outbounds{ "yarpc-test": { - Unary: NewOutbound( - single.New( - hostport.PeerIdentifier(inbound.Addr().String()), - httpTransport, - ), - ), + Unary: httpTransport.NewSingleOutbound(fmt.Sprintf("http://%s", inbound.Addr().String())), }, }, }) diff --git a/transport/http/inbound.go b/transport/http/inbound.go index ae0330d16..402dfc08f 100644 --- a/transport/http/inbound.go +++ b/transport/http/inbound.go @@ -31,11 +31,12 @@ import ( "github.com/opentracing/opentracing-go" ) -// NewInbound builds a new HTTP inbound that listens on the given address. -func NewInbound(addr string) *Inbound { +// NewInbound builds a new HTTP inbound that listens on the given address and +// sharing this transport. +func (t *Transport) NewInbound(addr string) *Inbound { return &Inbound{ addr: addr, - tracer: opentracing.GlobalTracer(), + tracer: t.tracer, } } diff --git a/transport/http/inbound_test.go b/transport/http/inbound_test.go index fb46a9035..33a910c63 100644 --- a/transport/http/inbound_test.go +++ b/transport/http/inbound_test.go @@ -34,8 +34,6 @@ import ( "go.uber.org/yarpc/encoding/raw" "go.uber.org/yarpc/internal/registrytest" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/transporttest" @@ -45,10 +43,12 @@ import ( ) func TestStartAddrInUse(t *testing.T) { - i1 := NewInbound(":0") + t1 := NewTransport() + i1 := t1.NewInbound(":0") i1.SetRegistry(new(transporttest.MockRegistry)) require.NoError(t, i1.Start(), "inbound 1 must start without an error") - i2 := NewInbound(i1.Addr().String()) + t2 := NewTransport() + i2 := t2.NewInbound(i1.Addr().String()) i2.SetRegistry(new(transporttest.MockRegistry)) err := i2.Start() @@ -64,7 +64,8 @@ func TestStartAddrInUse(t *testing.T) { } func TestNilAddrAfterStop(t *testing.T) { - i := NewInbound(":0") + x := NewTransport() + i := x.NewInbound(":0") i.SetRegistry(new(transporttest.MockRegistry)) require.NoError(t, i.Start()) assert.NotEqual(t, ":0", i.Addr().String()) @@ -74,7 +75,8 @@ func TestNilAddrAfterStop(t *testing.T) { } func TestInboundStartAndStop(t *testing.T) { - i := NewInbound(":0") + x := NewTransport() + i := x.NewInbound(":0") i.SetRegistry(new(transporttest.MockRegistry)) require.NoError(t, i.Start()) assert.NotEqual(t, ":0", i.Addr().String()) @@ -82,14 +84,16 @@ func TestInboundStartAndStop(t *testing.T) { } func TestInboundStartError(t *testing.T) { - err := NewInbound("invalid"). + x := NewTransport() + err := x.NewInbound("invalid"). WithRegistry(new(transporttest.MockRegistry)). Start() assert.Error(t, err, "expected failure") } func TestInboundStopWithoutStarting(t *testing.T) { - i := NewInbound(":8000") + x := NewTransport() + i := x.NewInbound(":8000") assert.Nil(t, i.Addr()) assert.NoError(t, i.Stop()) } @@ -106,7 +110,7 @@ func TestInboundMux(t *testing.T) { w.Write([]byte("healthy")) }) - i := NewInbound(":0").WithMux("/rpc/v1", mux) + i := httpTransport.NewInbound(":0").WithMux("/rpc/v1", mux) h := transporttest.NewMockUnaryHandler(mockCtrl) reg := transporttest.NewMockRegistry(mockCtrl) i.SetRegistry(reg) @@ -125,12 +129,7 @@ func TestInboundMux(t *testing.T) { } // this should fail - o := NewOutbound( - single.New( - hostport.PeerIdentifier(i.Addr().String()), - httpTransport, - ), - ) + o := httpTransport.NewSingleOutbound(addr) require.NoError(t, o.Start(), "failed to start outbound") defer o.Stop() diff --git a/transport/http/outbound.go b/transport/http/outbound.go index 64f9e6f58..9a310ef84 100644 --- a/transport/http/outbound.go +++ b/transport/http/outbound.go @@ -51,6 +51,19 @@ var ( var defaultURLTemplate, _ = url.Parse("http://localhost") +// NewOutbound builds a new HTTP outbound built around a peer.Chooser +// for getting potential downstream hosts. +// Chooser.Choose MUST return *hostport.Peer objects. +// Chooser.Start MUST be called before Outbound.Start +func (t *Transport) NewOutbound(chooser peer.Chooser) *Outbound { + return &Outbound{ + started: atomic.NewBool(false), + chooser: chooser, + urlTemplate: defaultURLTemplate, + tracer: t.tracer, + } +} + // NewOutbound builds a new HTTP outbound built around a peer.Chooser // for getting potential downstream hosts. // Chooser.Choose MUST return *hostport.Peer objects. @@ -64,16 +77,17 @@ func NewOutbound(chooser peer.Chooser) *Outbound { } } -// NewSingleOutbound creates an outbound from a single URL. +// NewSingleOutbound creates an outbound from a single URL (a bare host:port is +// not sufficient). // This form defers to the underlying HTTP agent's peer selection and load // balancing, using DNS. -func NewSingleOutbound(u string, t *Transport) *Outbound { - parsedURL, err := url.Parse(u) +func (t *Transport) NewSingleOutbound(URL string) *Outbound { + parsedURL, err := url.Parse(URL) if err != nil { panic(err.Error()) } - return NewOutbound(single.New(hostport.PeerIdentifier(parsedURL.Host), t)). - WithURLTemplate(u) + return t.NewOutbound(single.New(hostport.PeerIdentifier(parsedURL.Host), t)). + WithURLTemplate(URL) } // Outbound is an HTTP UnaryOutbound and OnewayOutbound diff --git a/transport/http/outbound_test.go b/transport/http/outbound_test.go index afdba88fa..f2922475f 100644 --- a/transport/http/outbound_test.go +++ b/transport/http/outbound_test.go @@ -143,7 +143,7 @@ func TestOutboundHeaders(t *testing.T) { defer cancel() } - out := NewSingleOutbound(server.URL, httpTransport) + out := httpTransport.NewSingleOutbound(server.URL) require.NoError(t, out.Start(), "failed to start outbound") defer out.Stop() @@ -189,7 +189,7 @@ func TestCallFailures(t *testing.T) { } for _, tt := range tests { - out := NewSingleOutbound(tt.url, httpTransport) + out := httpTransport.NewSingleOutbound(tt.url) require.NoError(t, out.Start(), "failed to start outbound") defer out.Stop() @@ -211,7 +211,7 @@ func TestCallFailures(t *testing.T) { func TestStartMultiple(t *testing.T) { httpTransport := NewTransport() - out := NewSingleOutbound("http://localhost:9999", httpTransport) + out := httpTransport.NewSingleOutbound("http://localhost:9999") var wg sync.WaitGroup signal := make(chan struct{}) @@ -232,14 +232,7 @@ func TestStartMultiple(t *testing.T) { func TestStopMultiple(t *testing.T) { httpTransport := NewTransport() - // TODO transport lifecycle - - out := NewOutbound( - single.New( - hostport.PeerIdentifier("127.0.0.1:9999"), - httpTransport, - ), - ) + out := httpTransport.NewSingleOutbound("http://127.0.0.1:9999") err := out.Start() require.NoError(t, err) @@ -263,14 +256,7 @@ func TestStopMultiple(t *testing.T) { func TestCallWithoutStarting(t *testing.T) { httpTransport := NewTransport() - // TODO transport lifecycle - - out := NewOutbound( - single.New( - hostport.PeerIdentifier("127.0.0.1:9999"), - httpTransport, - ), - ) + out := httpTransport.NewSingleOutbound("http://127.0.0.1:9999") assert.Panics(t, func() { ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel() diff --git a/transport/http/transport.go b/transport/http/transport.go index 20ca76e6e..4b6d4ea57 100644 --- a/transport/http/transport.go +++ b/transport/http/transport.go @@ -28,10 +28,13 @@ import ( "go.uber.org/yarpc/peer" "go.uber.org/yarpc/peer/hostport" + + "github.com/opentracing/opentracing-go" ) type transportConfig struct { keepAlive time.Duration + tracer opentracing.Tracer } var defaultTransportConfig = transportConfig{keepAlive: 30 * time.Second} @@ -49,9 +52,18 @@ func KeepAlive(t time.Duration) TransportOption { } } +// WithTracer configures a tracer for the transport and all its inbounds and +// outbounds. +func WithTracer(tracer opentracing.Tracer) TransportOption { + return func(c *transportConfig) { + c.tracer = tracer + } +} + // NewTransport creates a new http transport for managing peers and sending requests func NewTransport(opts ...TransportOption) *Transport { cfg := defaultTransportConfig + cfg.tracer = opentracing.GlobalTracer() for _, o := range opts { o(&cfg) } @@ -59,6 +71,7 @@ func NewTransport(opts ...TransportOption) *Transport { return &Transport{ client: buildClient(&cfg), peers: make(map[string]*hostport.Peer), + tracer: cfg.tracer, } } @@ -83,6 +96,8 @@ type Transport struct { client *http.Client peers map[string]*hostport.Peer + + tracer opentracing.Tracer } // RetainPeer gets or creates a Peer for the specified peer.Subscriber (usually a peer.Chooser) diff --git a/transport/roundtrip_test.go b/transport/roundtrip_test.go index 0aa39ff76..3c0e43c9d 100644 --- a/transport/roundtrip_test.go +++ b/transport/roundtrip_test.go @@ -30,8 +30,6 @@ import ( "go.uber.org/yarpc/encoding/raw" "go.uber.org/yarpc/internal/errors" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/http" tch "go.uber.org/yarpc/transport/tchannel" @@ -100,18 +98,13 @@ type httpTransport struct{ t *testing.T } func (ht httpTransport) WithRegistry(r transport.Registry, f func(transport.UnaryOutbound)) { httpTransport := http.NewTransport() - // TODO lifecycle - i := http.NewInbound("127.0.0.1:0").WithRegistry(r) + i := httpTransport.NewInbound("127.0.0.1:0") + i.SetRegistry(r) require.NoError(ht.t, i.Start(), "failed to start") defer i.Stop() - o := http.NewOutbound( - single.New( - hostport.PeerIdentifier(i.Addr().String()), - httpTransport, - ), - ) + o := httpTransport.NewSingleOutbound(fmt.Sprintf("http://%s", i.Addr().String())) require.NoError(ht.t, o.Start(), "failed to start outbound") defer o.Stop() f(o) @@ -119,18 +112,13 @@ func (ht httpTransport) WithRegistry(r transport.Registry, f func(transport.Unar func (ht httpTransport) WithRegistryOneway(r transport.Registry, f func(transport.OnewayOutbound)) { httpTransport := http.NewTransport() - // TODO lifecycle - i := http.NewInbound("127.0.0.1:0").WithRegistry(r) + i := httpTransport.NewInbound("127.0.0.1:0") + i.SetRegistry(r) require.NoError(ht.t, i.Start(), "failed to start") defer i.Stop() - o := http.NewOutbound( - single.New( - hostport.PeerIdentifier(i.Addr().String()), - httpTransport, - ), - ) + o := httpTransport.NewSingleOutbound(fmt.Sprintf("http://%s", i.Addr().String())) require.NoError(ht.t, o.Start(), "failed to start outbound") defer o.Stop() f(o) @@ -143,7 +131,8 @@ func (tt tchannelTransport) WithRegistry(r transport.Registry, f func(transport. serverOpts := testutils.NewOpts().SetServiceName(testService) clientOpts := testutils.NewOpts().SetServiceName(testCaller) testutils.WithServer(tt.t, serverOpts, func(ch *tchannel.Channel, hostPort string) { - i := tch.NewInbound(ch).WithRegistry(r) + i := tch.NewInbound(ch) + i.SetRegistry(r) require.NoError(tt.t, i.Start(), "failed to start") defer i.Stop() diff --git a/transport/tracer_test.go b/transport/tracer_test.go index f4eb4e15f..442115ac7 100644 --- a/transport/tracer_test.go +++ b/transport/tracer_test.go @@ -29,8 +29,6 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/yarpc" "go.uber.org/yarpc/encoding/json" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport/http" ytchannel "go.uber.org/yarpc/transport/tchannel" @@ -116,22 +114,15 @@ func createHTTPDispatcher(tracer opentracing.Tracer) yarpc.Dispatcher { // TODO: Use port 0 once https://github.com/yarpc/yarpc-go/issues/381 is // fixed. - httpTransport := http.NewTransport() - // TODO lifecycle - + httpTransport := http.NewTransport(http.WithTracer(tracer)) dispatcher := yarpc.NewDispatcher(yarpc.Config{ Name: "yarpc-test", Inbounds: yarpc.Inbounds{ - http.NewInbound(":18080").WithTracer(tracer), + httpTransport.NewInbound(":18080"), }, Outbounds: yarpc.Outbounds{ "yarpc-test": { - Unary: http.NewOutbound( - single.New( - hostport.PeerIdentifier("127.0.0.1:18080"), - httpTransport, - ), - ).WithTracer(tracer), + Unary: httpTransport.NewSingleOutbound("http://127.0.0.1:18080"), }, }, Tracer: tracer, diff --git a/yarpctest/recorder/recorder_test.go b/yarpctest/recorder/recorder_test.go index 5f878e985..b38e8d1ff 100644 --- a/yarpctest/recorder/recorder_test.go +++ b/yarpctest/recorder/recorder_test.go @@ -3,6 +3,7 @@ package recorder import ( "bytes" "context" + "fmt" "io/ioutil" "math/rand" "os" @@ -12,8 +13,6 @@ import ( "go.uber.org/yarpc" "go.uber.org/yarpc/encoding/raw" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/http" @@ -162,18 +161,12 @@ func (t *testingTMock) Fatal(args ...interface{}) { func withDisconnectedClient(t *testing.T, recorder *Recorder, f func(raw.Client)) { httpTransport := http.NewTransport() - // TODO lifecycle clientDisp := yarpc.NewDispatcher(yarpc.Config{ Name: "client", Outbounds: yarpc.Outbounds{ "server": { - Unary: http.NewOutbound( - single.New( - hostport.PeerIdentifier("127.0.0.1:65535"), - httpTransport, - ), - ), + Unary: httpTransport.NewSingleOutbound("http://127.0.0.1:65535"), }, }, OutboundMiddleware: yarpc.OutboundMiddleware{ @@ -188,8 +181,8 @@ func withDisconnectedClient(t *testing.T, recorder *Recorder, f func(raw.Client) } func withConnectedClient(t *testing.T, recorder *Recorder, f func(raw.Client)) { - serverHTTP := http.NewInbound(":0") - + httpTransport := http.NewTransport() + serverHTTP := httpTransport.NewInbound(":0") serverDisp := yarpc.NewDispatcher(yarpc.Config{ Name: "server", Inbounds: yarpc.Inbounds{serverHTTP}, @@ -203,19 +196,11 @@ func withConnectedClient(t *testing.T, recorder *Recorder, f func(raw.Client)) { require.NoError(t, serverDisp.Start()) defer serverDisp.Stop() - httpTransport := http.NewTransport() - // TODO http transport lifecycle - clientDisp := yarpc.NewDispatcher(yarpc.Config{ Name: "client", Outbounds: yarpc.Outbounds{ "server": { - Unary: http.NewOutbound( - single.New( - hostport.PeerIdentifier(serverHTTP.Addr().String()), - httpTransport, - ), - ), + Unary: httpTransport.NewSingleOutbound(fmt.Sprintf("http://%s", serverHTTP.Addr())), }, }, OutboundMiddleware: yarpc.OutboundMiddleware{