From c9aec00f8e404ebdfee803a99f0c4aae55503905 Mon Sep 17 00:00:00 2001 From: Kris Kowal Date: Tue, 6 Dec 2016 15:19:24 -0800 Subject: [PATCH] Pivot HTTP constructors to use a shared Transport #540 --- bench_test.go | 17 ++++------- dispatcher_test.go | 3 +- .../crossdock/client/apachethrift/behavior.go | 10 +++---- .../client/ctxpropagation/behavior.go | 16 ++-------- .../crossdock/client/dispatcher/dispatcher.go | 14 ++------- .../crossdock/client/httpserver/behavior.go | 13 ++------- internal/crossdock/client/oneway/oneway.go | 9 ++---- internal/crossdock/server/oneway/server.go | 11 ++----- internal/crossdock/server/yarpc/phone.go | 11 ++----- internal/crossdock/server/yarpc/server.go | 3 +- internal/examples/json/client/main.go | 14 ++------- internal/examples/json/server/main.go | 3 +- internal/examples/thrift/hello/main.go | 13 ++------- .../examples/thrift/keyvalue/client/main.go | 14 ++------- .../examples/thrift/keyvalue/server/main.go | 3 +- transport/http/handler_test.go | 15 ++-------- transport/http/inbound.go | 7 +++-- transport/http/inbound_test.go | 29 +++++++++---------- transport/http/outbound.go | 24 +++++++++++---- transport/http/outbound_test.go | 24 ++++----------- transport/http/transport.go | 15 ++++++++++ transport/roundtrip_test.go | 27 +++++------------ transport/tracer_test.go | 15 ++-------- yarpctest/recorder/recorder_test.go | 25 ++++------------ 24 files changed, 114 insertions(+), 221 deletions(-) diff --git a/bench_test.go b/bench_test.go index 0abf53f86e..4fbfa4c9b6 100644 --- a/bench_test.go +++ b/bench_test.go @@ -12,8 +12,6 @@ import ( "go.uber.org/yarpc" "go.uber.org/yarpc/encoding/raw" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" yhttp "go.uber.org/yarpc/transport/http" ytchannel "go.uber.org/yarpc/transport/tchannel" @@ -123,19 +121,17 @@ func runTChannelClient(b *testing.B, c *tchannel.Channel, hostPort string) { } func Benchmark_HTTP_YARPCToYARPC(b *testing.B) { + httpTransport := yhttp.NewTransport() serverCfg := yarpc.Config{ Name: "server", - Inbounds: yarpc.Inbounds{yhttp.NewInbound(":8999")}, + Inbounds: yarpc.Inbounds{httpTransport.NewInbound(":8999")}, } - httpTransport := yhttp.NewTransport() clientCfg := yarpc.Config{ Name: "client", Outbounds: yarpc.Outbounds{ "server": { - Unary: yhttp.NewOutbound( - single.New(hostport.PeerIdentifier("http://localhost:8999"), httpTransport), - ), + Unary: httpTransport.NewSingleOutbound("http://localhost:8999"), }, }, } @@ -155,9 +151,7 @@ func Benchmark_HTTP_YARPCToNetHTTP(b *testing.B) { Name: "client", Outbounds: yarpc.Outbounds{ "server": { - Unary: yhttp.NewOutbound( - single.New(hostport.PeerIdentifier("http://localhost:8998"), httpTransport), - ), + Unary: httpTransport.NewSingleOutbound("http://localhost:8998"), }, }, } @@ -171,9 +165,10 @@ func Benchmark_HTTP_YARPCToNetHTTP(b *testing.B) { } func Benchmark_HTTP_NetHTTPToYARPC(b *testing.B) { + httpTransport := yhttp.NewTransport() serverCfg := yarpc.Config{ Name: "server", - Inbounds: yarpc.Inbounds{yhttp.NewInbound(":8996")}, + Inbounds: yarpc.Inbounds{httpTransport.NewInbound(":8996")}, } withDispatcher(b, serverCfg, func(server yarpc.Dispatcher) { diff --git a/dispatcher_test.go b/dispatcher_test.go index bf3c6516fa..be1647007d 100644 --- a/dispatcher_test.go +++ b/dispatcher_test.go @@ -41,11 +41,12 @@ func basicDispatcher(t *testing.T) Dispatcher { ch, err := tchannel.NewChannel("test", nil) require.NoError(t, err, "failed to create TChannel") + httpTransport := http.NewTransport() return NewDispatcher(Config{ Name: "test", Inbounds: Inbounds{ tch.NewInbound(ch).WithListenAddr(":0"), - http.NewInbound(":0"), + httpTransport.NewInbound(":0"), }, }) } diff --git a/internal/crossdock/client/apachethrift/behavior.go b/internal/crossdock/client/apachethrift/behavior.go index dda0a2c9c6..76b653800c 100644 --- a/internal/crossdock/client/apachethrift/behavior.go +++ b/internal/crossdock/client/apachethrift/behavior.go @@ -26,8 +26,6 @@ import ( "go.uber.org/yarpc" "go.uber.org/yarpc/encoding/thrift" "go.uber.org/yarpc/internal/crossdock/client/gauntlet" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport/http" "github.com/crossdock/crossdock-go" @@ -46,13 +44,13 @@ func Run(t crossdock.T) { fatals.NotEmpty(server, "apachethriftserver is required") httpTransport := http.NewTransport() - hostPort := hostport.PeerIdentifier(fmt.Sprintf("%v:%v", server, serverPort)) + url := fmt.Sprintf("http://%v:%v/", server, serverPort) - thriftOutbound := http.NewOutbound(single.New(hostPort, httpTransport)). + thriftOutbound := httpTransport.NewSingleOutbound(url). WithURLTemplate("http://host:port/thrift/ThriftTest") - secondOutbound := http.NewOutbound(single.New(hostPort, httpTransport)). + secondOutbound := httpTransport.NewSingleOutbound(url). WithURLTemplate("http://host:port/thrift/SecondService") - multiplexOutbound := http.NewOutbound(single.New(hostPort, httpTransport)). + multiplexOutbound := httpTransport.NewSingleOutbound(url). WithURLTemplate("http://host:port/thrift/multiplexed") dispatcher := yarpc.NewDispatcher(yarpc.Config{ diff --git a/internal/crossdock/client/ctxpropagation/behavior.go b/internal/crossdock/client/ctxpropagation/behavior.go index 4da7af9c74..0ae0b51cd3 100644 --- a/internal/crossdock/client/ctxpropagation/behavior.go +++ b/internal/crossdock/client/ctxpropagation/behavior.go @@ -30,8 +30,6 @@ import ( "go.uber.org/yarpc/encoding/json" "go.uber.org/yarpc/internal/crossdock/client/params" server "go.uber.org/yarpc/internal/crossdock/server/yarpc" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/http" tch "go.uber.org/yarpc/transport/tchannel" @@ -324,20 +322,12 @@ func buildDispatcher(t crossdock.T) (dispatcher yarpc.Dispatcher, tconfig server ch, err := tchannel.NewChannel("ctxclient", nil) fatals.NoError(err, "failed to create TChannel") - var httpTransport *http.Transport + httpTransport := http.NewTransport() var outbound transport.UnaryOutbound switch trans := t.Param(params.Transport); trans { case "http": - if httpTransport == nil { - httpTransport = http.NewTransport() - } - outbound = http.NewOutbound( - single.New( - hostport.PeerIdentifier(fmt.Sprintf("%s:8081", subject)), - httpTransport, - ), - ) + outbound = httpTransport.NewSingleOutbound(fmt.Sprintf("http://%s:8081", subject)) tconfig.TChannel = &server.TChannelTransport{Host: self, Port: 8087} case "tchannel": outbound = tch.NewOutbound(ch).WithHostPort(fmt.Sprintf("%s:8082", subject)) @@ -350,7 +340,7 @@ func buildDispatcher(t crossdock.T) (dispatcher yarpc.Dispatcher, tconfig server Name: "ctxclient", Inbounds: yarpc.Inbounds{ tch.NewInbound(ch).WithListenAddr(":8087"), - http.NewInbound(":8086"), + httpTransport.NewInbound(":8086"), }, Outbounds: yarpc.Outbounds{ "yarpc-test": { diff --git a/internal/crossdock/client/dispatcher/dispatcher.go b/internal/crossdock/client/dispatcher/dispatcher.go index f07f79ff11..462cccb680 100644 --- a/internal/crossdock/client/dispatcher/dispatcher.go +++ b/internal/crossdock/client/dispatcher/dispatcher.go @@ -25,8 +25,6 @@ import ( "go.uber.org/yarpc" "go.uber.org/yarpc/internal/crossdock/client/params" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/http" tch "go.uber.org/yarpc/transport/tchannel" @@ -42,21 +40,13 @@ func Create(t crossdock.T) yarpc.Dispatcher { server := t.Param(params.Server) fatals.NotEmpty(server, "server is required") - var httpTransport *http.Transport + httpTransport := http.NewTransport() var unaryOutbound transport.UnaryOutbound trans := t.Param(params.Transport) switch trans { case "http": - if httpTransport == nil { - httpTransport = http.NewTransport() - } - unaryOutbound = http.NewOutbound( - single.New( - hostport.PeerIdentifier(fmt.Sprintf("%s:8081", server)), - httpTransport, - ), - ) + unaryOutbound = httpTransport.NewSingleOutbound(fmt.Sprintf("http://%s:8081", server)) case "tchannel": ch, err := tchannel.NewChannel("client", nil) fatals.NoError(err, "couldn't create tchannel") diff --git a/internal/crossdock/client/httpserver/behavior.go b/internal/crossdock/client/httpserver/behavior.go index 424ba06c8c..88fac9c931 100644 --- a/internal/crossdock/client/httpserver/behavior.go +++ b/internal/crossdock/client/httpserver/behavior.go @@ -26,11 +26,9 @@ import ( "strings" "time" - yarpc "go.uber.org/yarpc" + "go.uber.org/yarpc" "go.uber.org/yarpc/encoding/raw" "go.uber.org/yarpc/internal/crossdock/client/params" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/http" @@ -45,18 +43,11 @@ func Run(t crossdock.T) { fatals.NotEmpty(server, "server is required") httpTransport := http.NewTransport() - // TODO http transport lifecycle - disp := yarpc.NewDispatcher(yarpc.Config{ Name: "client", Outbounds: yarpc.Outbounds{ "yarpc-test": { - Unary: http.NewOutbound( - single.New( - hostport.PeerIdentifier(fmt.Sprintf("%s:8085", server)), - httpTransport, - ), - ), + Unary: httpTransport.NewSingleOutbound(fmt.Sprintf("http://%s:8085", server)), }, }, }) diff --git a/internal/crossdock/client/oneway/oneway.go b/internal/crossdock/client/oneway/oneway.go index 341d84693e..4964f8565a 100644 --- a/internal/crossdock/client/oneway/oneway.go +++ b/internal/crossdock/client/oneway/oneway.go @@ -28,8 +28,6 @@ import ( "go.uber.org/yarpc/encoding/raw" "go.uber.org/yarpc/internal/crossdock/client/params" "go.uber.org/yarpc/internal/crossdock/client/random" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport/http" "github.com/crossdock/crossdock-go" @@ -73,14 +71,11 @@ func newDispatcher(t crossdock.T) yarpc.Dispatcher { Name: "client", Outbounds: yarpc.Outbounds{ "oneway-test": { - Oneway: http.NewOutbound(single.New( - hostport.PeerIdentifier(fmt.Sprintf("%s:8084", server)), - httpTransport, - )), + Oneway: httpTransport.NewSingleOutbound(fmt.Sprintf("http://%s:8084", server)), }, }, //for call back - Inbounds: yarpc.Inbounds{http.NewInbound(fmt.Sprintf("%s:8089", server))}, + Inbounds: yarpc.Inbounds{httpTransport.NewInbound(fmt.Sprintf("%s:8089", server))}, }) // register procedure for remote server to call us back on diff --git a/internal/crossdock/server/oneway/server.go b/internal/crossdock/server/oneway/server.go index 886159386f..4b8fd4a403 100644 --- a/internal/crossdock/server/oneway/server.go +++ b/internal/crossdock/server/oneway/server.go @@ -28,8 +28,6 @@ import ( "go.uber.org/yarpc/encoding/json" "go.uber.org/yarpc/encoding/raw" "go.uber.org/yarpc/internal/crossdock/thrift/oneway/yarpc/onewayserver" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport/http" ) @@ -39,18 +37,13 @@ var dispatcher yarpc.Dispatcher func Start() { httpTransport := http.NewTransport() h := onewayHandler{ - Outbound: http.NewOutbound( - single.New( - hostport.PeerIdentifier("127.0.0.1:8089"), - httpTransport, - ), - ), + Outbound: httpTransport.NewSingleOutbound("http://127.0.0.1:8089"), } dispatcher = yarpc.NewDispatcher(yarpc.Config{ Name: "oneway-test", Inbounds: yarpc.Inbounds{ - http.NewInbound(":8084"), + httpTransport.NewInbound(":8084"), }, Outbounds: yarpc.Outbounds{ "client": {Oneway: h.Outbound}, diff --git a/internal/crossdock/server/yarpc/phone.go b/internal/crossdock/server/yarpc/phone.go index 1ecb0c00f7..12bc300964 100644 --- a/internal/crossdock/server/yarpc/phone.go +++ b/internal/crossdock/server/yarpc/phone.go @@ -29,8 +29,6 @@ import ( "go.uber.org/yarpc" "go.uber.org/yarpc/encoding/json" "go.uber.org/yarpc/internal/clientconfig" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/http" tch "go.uber.org/yarpc/transport/tchannel" @@ -75,15 +73,12 @@ type PhoneResponse struct { func Phone(ctx context.Context, reqMeta yarpc.ReqMeta, body *PhoneRequest) (*PhoneResponse, yarpc.ResMeta, error) { var outbound transport.UnaryOutbound + httpTransport := http.NewTransport() + switch { case body.Transport.HTTP != nil: t := body.Transport.HTTP - outbound = http.NewOutbound( - single.New( - hostport.PeerIdentifier(fmt.Sprintf("%s:%d", t.Host, t.Port)), - http.NewTransport(), // TODO transport lifecycle - ), - ) + outbound = httpTransport.NewSingleOutbound(fmt.Sprintf("http://%s:%d", t.Host, t.Port)) case body.Transport.TChannel != nil: t := body.Transport.TChannel hostport := fmt.Sprintf("%s:%d", t.Host, t.Port) diff --git a/internal/crossdock/server/yarpc/server.go b/internal/crossdock/server/yarpc/server.go index 71b56cda2c..6005575c1f 100644 --- a/internal/crossdock/server/yarpc/server.go +++ b/internal/crossdock/server/yarpc/server.go @@ -46,10 +46,11 @@ func Start() { log.Fatalln("couldn't create tchannel: %v", err) } + httpTransport := http.NewTransport() dispatcher = yarpc.NewDispatcher(yarpc.Config{ Name: "yarpc-test", Inbounds: yarpc.Inbounds{ - http.NewInbound(":8081"), + httpTransport.NewInbound(":8081"), tch.NewInbound(ch).WithListenAddr(":8082"), }, }) diff --git a/internal/examples/json/client/main.go b/internal/examples/json/client/main.go index de76812bae..8947fb0890 100644 --- a/internal/examples/json/client/main.go +++ b/internal/examples/json/client/main.go @@ -32,8 +32,6 @@ import ( "go.uber.org/yarpc" "go.uber.org/yarpc/encoding/json" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/http" tch "go.uber.org/yarpc/transport/tchannel" @@ -92,20 +90,12 @@ func main() { flag.Parse() - var httpTransport *http.Transport + httpTransport := http.NewTransport() var outbound transport.UnaryOutbound switch strings.ToLower(outboundName) { case "http": - if httpTransport == nil { - httpTransport = http.NewTransport() - } - outbound = http.NewOutbound( - single.New( - hostport.PeerIdentifier("127.0.0.1:24034"), - httpTransport, - ), - ) + outbound = httpTransport.NewSingleOutbound("http://127.0.0.1:24034") case "tchannel": channel, err := tchannel.NewChannel("keyvalue-client", nil) if err != nil { diff --git a/internal/examples/json/server/main.go b/internal/examples/json/server/main.go index 94129826f4..177ad14b1a 100644 --- a/internal/examples/json/server/main.go +++ b/internal/examples/json/server/main.go @@ -77,11 +77,12 @@ func main() { log.Fatalln(err) } + httpTransport := http.NewTransport() dispatcher := yarpc.NewDispatcher(yarpc.Config{ Name: "keyvalue", Inbounds: yarpc.Inbounds{ tch.NewInbound(channel).WithListenAddr(":28941"), - http.NewInbound(":24034"), + httpTransport.NewInbound(":24034"), }, InboundMiddleware: yarpc.InboundMiddleware{ Unary: requestLogInboundMiddleware{}, diff --git a/internal/examples/thrift/hello/main.go b/internal/examples/thrift/hello/main.go index 0399113a9a..2923c4a18a 100644 --- a/internal/examples/thrift/hello/main.go +++ b/internal/examples/thrift/hello/main.go @@ -29,8 +29,6 @@ import ( "go.uber.org/yarpc/internal/examples/thrift/hello/echo" "go.uber.org/yarpc/internal/examples/thrift/hello/echo/yarpc/helloclient" "go.uber.org/yarpc/internal/examples/thrift/hello/echo/yarpc/helloserver" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc" "go.uber.org/yarpc/transport/http" @@ -39,22 +37,15 @@ import ( //go:generate thriftrw --plugin=yarpc echo.thrift func main() { - httpTransport := http.NewTransport() - dispatcher := yarpc.NewDispatcher(yarpc.Config{ Name: "hello", Inbounds: yarpc.Inbounds{ - http.NewInbound(":8086"), + httpTransport.NewInbound(":8086"), }, Outbounds: yarpc.Outbounds{ "hello": { - Unary: http.NewOutbound( - single.New( - hostport.PeerIdentifier("127.0.0.1:8086"), - httpTransport, - ), - ), + Unary: httpTransport.NewSingleOutbound("http://127.0.0.1:8086"), }, }, }) diff --git a/internal/examples/thrift/keyvalue/client/main.go b/internal/examples/thrift/keyvalue/client/main.go index 923138fc8a..af9b628956 100644 --- a/internal/examples/thrift/keyvalue/client/main.go +++ b/internal/examples/thrift/keyvalue/client/main.go @@ -32,8 +32,6 @@ import ( "go.uber.org/yarpc" "go.uber.org/yarpc/internal/examples/thrift/keyvalue/kv/yarpc/keyvalueclient" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/http" tch "go.uber.org/yarpc/transport/tchannel" @@ -50,20 +48,12 @@ func main() { flag.Parse() - var httpTransport *http.Transport + httpTransport := http.NewTransport() var outbound transport.UnaryOutbound switch strings.ToLower(outboundName) { case "http": - if httpTransport == nil { - httpTransport = http.NewTransport() - } - outbound = http.NewOutbound( - single.New( - hostport.PeerIdentifier("127.0.0.1:24034"), - httpTransport, - ), - ) + outbound = httpTransport.NewSingleOutbound("http://127.0.0.1:24034") case "tchannel": channel, err := tchannel.NewChannel("keyvalue-client", nil) if err != nil { diff --git a/internal/examples/thrift/keyvalue/server/main.go b/internal/examples/thrift/keyvalue/server/main.go index 69af2b7123..0f1ec12c9d 100644 --- a/internal/examples/thrift/keyvalue/server/main.go +++ b/internal/examples/thrift/keyvalue/server/main.go @@ -65,11 +65,12 @@ func main() { log.Fatalln(err) } + httpTransport := http.NewTransport() dispatcher := yarpc.NewDispatcher(yarpc.Config{ Name: "keyvalue", Inbounds: yarpc.Inbounds{ tch.NewInbound(channel).WithListenAddr(":28941"), - http.NewInbound(":24034"), + httpTransport.NewInbound(":24034"), }, }) diff --git a/transport/http/handler_test.go b/transport/http/handler_test.go index 9b08518fb3..74a7773d56 100644 --- a/transport/http/handler_test.go +++ b/transport/http/handler_test.go @@ -33,8 +33,6 @@ import ( yarpc "go.uber.org/yarpc" "go.uber.org/yarpc/encoding/raw" "go.uber.org/yarpc/internal/registrytest" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/transporttest" @@ -329,7 +327,8 @@ func (th panickedHandler) Handle(context.Context, *transport.Request, transport. } func TestHandlerPanic(t *testing.T) { - inbound := NewInbound("localhost:0") + httpTransport := NewTransport() + inbound := httpTransport.NewInbound("localhost:0") serverDispatcher := yarpc.NewDispatcher(yarpc.Config{ Name: "yarpc-test", Inbounds: yarpc.Inbounds{inbound}, @@ -344,19 +343,11 @@ func TestHandlerPanic(t *testing.T) { require.NoError(t, serverDispatcher.Start()) defer serverDispatcher.Stop() - httpTransport := NewTransport() - // TODO http transport lifecycle - clientDispatcher := yarpc.NewDispatcher(yarpc.Config{ Name: "yarpc-test-client", Outbounds: yarpc.Outbounds{ "yarpc-test": { - Unary: NewOutbound( - single.New( - hostport.PeerIdentifier(inbound.Addr().String()), - httpTransport, - ), - ), + Unary: httpTransport.NewSingleOutbound(fmt.Sprintf("http://%s", inbound.Addr().String())), }, }, }) diff --git a/transport/http/inbound.go b/transport/http/inbound.go index ae0330d163..402dfc08f1 100644 --- a/transport/http/inbound.go +++ b/transport/http/inbound.go @@ -31,11 +31,12 @@ import ( "github.com/opentracing/opentracing-go" ) -// NewInbound builds a new HTTP inbound that listens on the given address. -func NewInbound(addr string) *Inbound { +// NewInbound builds a new HTTP inbound that listens on the given address and +// sharing this transport. +func (t *Transport) NewInbound(addr string) *Inbound { return &Inbound{ addr: addr, - tracer: opentracing.GlobalTracer(), + tracer: t.tracer, } } diff --git a/transport/http/inbound_test.go b/transport/http/inbound_test.go index fb46a90352..33a910c634 100644 --- a/transport/http/inbound_test.go +++ b/transport/http/inbound_test.go @@ -34,8 +34,6 @@ import ( "go.uber.org/yarpc/encoding/raw" "go.uber.org/yarpc/internal/registrytest" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/transporttest" @@ -45,10 +43,12 @@ import ( ) func TestStartAddrInUse(t *testing.T) { - i1 := NewInbound(":0") + t1 := NewTransport() + i1 := t1.NewInbound(":0") i1.SetRegistry(new(transporttest.MockRegistry)) require.NoError(t, i1.Start(), "inbound 1 must start without an error") - i2 := NewInbound(i1.Addr().String()) + t2 := NewTransport() + i2 := t2.NewInbound(i1.Addr().String()) i2.SetRegistry(new(transporttest.MockRegistry)) err := i2.Start() @@ -64,7 +64,8 @@ func TestStartAddrInUse(t *testing.T) { } func TestNilAddrAfterStop(t *testing.T) { - i := NewInbound(":0") + x := NewTransport() + i := x.NewInbound(":0") i.SetRegistry(new(transporttest.MockRegistry)) require.NoError(t, i.Start()) assert.NotEqual(t, ":0", i.Addr().String()) @@ -74,7 +75,8 @@ func TestNilAddrAfterStop(t *testing.T) { } func TestInboundStartAndStop(t *testing.T) { - i := NewInbound(":0") + x := NewTransport() + i := x.NewInbound(":0") i.SetRegistry(new(transporttest.MockRegistry)) require.NoError(t, i.Start()) assert.NotEqual(t, ":0", i.Addr().String()) @@ -82,14 +84,16 @@ func TestInboundStartAndStop(t *testing.T) { } func TestInboundStartError(t *testing.T) { - err := NewInbound("invalid"). + x := NewTransport() + err := x.NewInbound("invalid"). WithRegistry(new(transporttest.MockRegistry)). Start() assert.Error(t, err, "expected failure") } func TestInboundStopWithoutStarting(t *testing.T) { - i := NewInbound(":8000") + x := NewTransport() + i := x.NewInbound(":8000") assert.Nil(t, i.Addr()) assert.NoError(t, i.Stop()) } @@ -106,7 +110,7 @@ func TestInboundMux(t *testing.T) { w.Write([]byte("healthy")) }) - i := NewInbound(":0").WithMux("/rpc/v1", mux) + i := httpTransport.NewInbound(":0").WithMux("/rpc/v1", mux) h := transporttest.NewMockUnaryHandler(mockCtrl) reg := transporttest.NewMockRegistry(mockCtrl) i.SetRegistry(reg) @@ -125,12 +129,7 @@ func TestInboundMux(t *testing.T) { } // this should fail - o := NewOutbound( - single.New( - hostport.PeerIdentifier(i.Addr().String()), - httpTransport, - ), - ) + o := httpTransport.NewSingleOutbound(addr) require.NoError(t, o.Start(), "failed to start outbound") defer o.Stop() diff --git a/transport/http/outbound.go b/transport/http/outbound.go index 64f9e6f58b..9a310ef841 100644 --- a/transport/http/outbound.go +++ b/transport/http/outbound.go @@ -51,6 +51,19 @@ var ( var defaultURLTemplate, _ = url.Parse("http://localhost") +// NewOutbound builds a new HTTP outbound built around a peer.Chooser +// for getting potential downstream hosts. +// Chooser.Choose MUST return *hostport.Peer objects. +// Chooser.Start MUST be called before Outbound.Start +func (t *Transport) NewOutbound(chooser peer.Chooser) *Outbound { + return &Outbound{ + started: atomic.NewBool(false), + chooser: chooser, + urlTemplate: defaultURLTemplate, + tracer: t.tracer, + } +} + // NewOutbound builds a new HTTP outbound built around a peer.Chooser // for getting potential downstream hosts. // Chooser.Choose MUST return *hostport.Peer objects. @@ -64,16 +77,17 @@ func NewOutbound(chooser peer.Chooser) *Outbound { } } -// NewSingleOutbound creates an outbound from a single URL. +// NewSingleOutbound creates an outbound from a single URL (a bare host:port is +// not sufficient). // This form defers to the underlying HTTP agent's peer selection and load // balancing, using DNS. -func NewSingleOutbound(u string, t *Transport) *Outbound { - parsedURL, err := url.Parse(u) +func (t *Transport) NewSingleOutbound(URL string) *Outbound { + parsedURL, err := url.Parse(URL) if err != nil { panic(err.Error()) } - return NewOutbound(single.New(hostport.PeerIdentifier(parsedURL.Host), t)). - WithURLTemplate(u) + return t.NewOutbound(single.New(hostport.PeerIdentifier(parsedURL.Host), t)). + WithURLTemplate(URL) } // Outbound is an HTTP UnaryOutbound and OnewayOutbound diff --git a/transport/http/outbound_test.go b/transport/http/outbound_test.go index afdba88faf..f2922475ff 100644 --- a/transport/http/outbound_test.go +++ b/transport/http/outbound_test.go @@ -143,7 +143,7 @@ func TestOutboundHeaders(t *testing.T) { defer cancel() } - out := NewSingleOutbound(server.URL, httpTransport) + out := httpTransport.NewSingleOutbound(server.URL) require.NoError(t, out.Start(), "failed to start outbound") defer out.Stop() @@ -189,7 +189,7 @@ func TestCallFailures(t *testing.T) { } for _, tt := range tests { - out := NewSingleOutbound(tt.url, httpTransport) + out := httpTransport.NewSingleOutbound(tt.url) require.NoError(t, out.Start(), "failed to start outbound") defer out.Stop() @@ -211,7 +211,7 @@ func TestCallFailures(t *testing.T) { func TestStartMultiple(t *testing.T) { httpTransport := NewTransport() - out := NewSingleOutbound("http://localhost:9999", httpTransport) + out := httpTransport.NewSingleOutbound("http://localhost:9999") var wg sync.WaitGroup signal := make(chan struct{}) @@ -232,14 +232,7 @@ func TestStartMultiple(t *testing.T) { func TestStopMultiple(t *testing.T) { httpTransport := NewTransport() - // TODO transport lifecycle - - out := NewOutbound( - single.New( - hostport.PeerIdentifier("127.0.0.1:9999"), - httpTransport, - ), - ) + out := httpTransport.NewSingleOutbound("http://127.0.0.1:9999") err := out.Start() require.NoError(t, err) @@ -263,14 +256,7 @@ func TestStopMultiple(t *testing.T) { func TestCallWithoutStarting(t *testing.T) { httpTransport := NewTransport() - // TODO transport lifecycle - - out := NewOutbound( - single.New( - hostport.PeerIdentifier("127.0.0.1:9999"), - httpTransport, - ), - ) + out := httpTransport.NewSingleOutbound("http://127.0.0.1:9999") assert.Panics(t, func() { ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel() diff --git a/transport/http/transport.go b/transport/http/transport.go index 20ca76e6ef..215b3b794f 100644 --- a/transport/http/transport.go +++ b/transport/http/transport.go @@ -26,12 +26,15 @@ import ( "sync" "time" + "github.com/opentracing/opentracing-go" + "go.uber.org/yarpc/peer" "go.uber.org/yarpc/peer/hostport" ) type transportConfig struct { keepAlive time.Duration + tracer opentracing.Tracer } var defaultTransportConfig = transportConfig{keepAlive: 30 * time.Second} @@ -49,9 +52,18 @@ func KeepAlive(t time.Duration) TransportOption { } } +// WithTracer configures a tracer for the transport and all its inbounds and +// outbounds. +func WithTracer(tracer opentracing.Tracer) TransportOption { + return func(c *transportConfig) { + c.tracer = tracer + } +} + // NewTransport creates a new http transport for managing peers and sending requests func NewTransport(opts ...TransportOption) *Transport { cfg := defaultTransportConfig + cfg.tracer = opentracing.GlobalTracer() for _, o := range opts { o(&cfg) } @@ -59,6 +71,7 @@ func NewTransport(opts ...TransportOption) *Transport { return &Transport{ client: buildClient(&cfg), peers: make(map[string]*hostport.Peer), + tracer: cfg.tracer, } } @@ -83,6 +96,8 @@ type Transport struct { client *http.Client peers map[string]*hostport.Peer + + tracer opentracing.Tracer } // RetainPeer gets or creates a Peer for the specified peer.Subscriber (usually a peer.Chooser) diff --git a/transport/roundtrip_test.go b/transport/roundtrip_test.go index 0aa39ff76b..3c0e43c9de 100644 --- a/transport/roundtrip_test.go +++ b/transport/roundtrip_test.go @@ -30,8 +30,6 @@ import ( "go.uber.org/yarpc/encoding/raw" "go.uber.org/yarpc/internal/errors" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/http" tch "go.uber.org/yarpc/transport/tchannel" @@ -100,18 +98,13 @@ type httpTransport struct{ t *testing.T } func (ht httpTransport) WithRegistry(r transport.Registry, f func(transport.UnaryOutbound)) { httpTransport := http.NewTransport() - // TODO lifecycle - i := http.NewInbound("127.0.0.1:0").WithRegistry(r) + i := httpTransport.NewInbound("127.0.0.1:0") + i.SetRegistry(r) require.NoError(ht.t, i.Start(), "failed to start") defer i.Stop() - o := http.NewOutbound( - single.New( - hostport.PeerIdentifier(i.Addr().String()), - httpTransport, - ), - ) + o := httpTransport.NewSingleOutbound(fmt.Sprintf("http://%s", i.Addr().String())) require.NoError(ht.t, o.Start(), "failed to start outbound") defer o.Stop() f(o) @@ -119,18 +112,13 @@ func (ht httpTransport) WithRegistry(r transport.Registry, f func(transport.Unar func (ht httpTransport) WithRegistryOneway(r transport.Registry, f func(transport.OnewayOutbound)) { httpTransport := http.NewTransport() - // TODO lifecycle - i := http.NewInbound("127.0.0.1:0").WithRegistry(r) + i := httpTransport.NewInbound("127.0.0.1:0") + i.SetRegistry(r) require.NoError(ht.t, i.Start(), "failed to start") defer i.Stop() - o := http.NewOutbound( - single.New( - hostport.PeerIdentifier(i.Addr().String()), - httpTransport, - ), - ) + o := httpTransport.NewSingleOutbound(fmt.Sprintf("http://%s", i.Addr().String())) require.NoError(ht.t, o.Start(), "failed to start outbound") defer o.Stop() f(o) @@ -143,7 +131,8 @@ func (tt tchannelTransport) WithRegistry(r transport.Registry, f func(transport. serverOpts := testutils.NewOpts().SetServiceName(testService) clientOpts := testutils.NewOpts().SetServiceName(testCaller) testutils.WithServer(tt.t, serverOpts, func(ch *tchannel.Channel, hostPort string) { - i := tch.NewInbound(ch).WithRegistry(r) + i := tch.NewInbound(ch) + i.SetRegistry(r) require.NoError(tt.t, i.Start(), "failed to start") defer i.Stop() diff --git a/transport/tracer_test.go b/transport/tracer_test.go index f4eb4e15f4..442115ac72 100644 --- a/transport/tracer_test.go +++ b/transport/tracer_test.go @@ -29,8 +29,6 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/yarpc" "go.uber.org/yarpc/encoding/json" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport/http" ytchannel "go.uber.org/yarpc/transport/tchannel" @@ -116,22 +114,15 @@ func createHTTPDispatcher(tracer opentracing.Tracer) yarpc.Dispatcher { // TODO: Use port 0 once https://github.com/yarpc/yarpc-go/issues/381 is // fixed. - httpTransport := http.NewTransport() - // TODO lifecycle - + httpTransport := http.NewTransport(http.WithTracer(tracer)) dispatcher := yarpc.NewDispatcher(yarpc.Config{ Name: "yarpc-test", Inbounds: yarpc.Inbounds{ - http.NewInbound(":18080").WithTracer(tracer), + httpTransport.NewInbound(":18080"), }, Outbounds: yarpc.Outbounds{ "yarpc-test": { - Unary: http.NewOutbound( - single.New( - hostport.PeerIdentifier("127.0.0.1:18080"), - httpTransport, - ), - ).WithTracer(tracer), + Unary: httpTransport.NewSingleOutbound("http://127.0.0.1:18080"), }, }, Tracer: tracer, diff --git a/yarpctest/recorder/recorder_test.go b/yarpctest/recorder/recorder_test.go index 5f878e9853..b38e8d1ff9 100644 --- a/yarpctest/recorder/recorder_test.go +++ b/yarpctest/recorder/recorder_test.go @@ -3,6 +3,7 @@ package recorder import ( "bytes" "context" + "fmt" "io/ioutil" "math/rand" "os" @@ -12,8 +13,6 @@ import ( "go.uber.org/yarpc" "go.uber.org/yarpc/encoding/raw" - "go.uber.org/yarpc/peer/hostport" - "go.uber.org/yarpc/peer/single" "go.uber.org/yarpc/transport" "go.uber.org/yarpc/transport/http" @@ -162,18 +161,12 @@ func (t *testingTMock) Fatal(args ...interface{}) { func withDisconnectedClient(t *testing.T, recorder *Recorder, f func(raw.Client)) { httpTransport := http.NewTransport() - // TODO lifecycle clientDisp := yarpc.NewDispatcher(yarpc.Config{ Name: "client", Outbounds: yarpc.Outbounds{ "server": { - Unary: http.NewOutbound( - single.New( - hostport.PeerIdentifier("127.0.0.1:65535"), - httpTransport, - ), - ), + Unary: httpTransport.NewSingleOutbound("http://127.0.0.1:65535"), }, }, OutboundMiddleware: yarpc.OutboundMiddleware{ @@ -188,8 +181,8 @@ func withDisconnectedClient(t *testing.T, recorder *Recorder, f func(raw.Client) } func withConnectedClient(t *testing.T, recorder *Recorder, f func(raw.Client)) { - serverHTTP := http.NewInbound(":0") - + httpTransport := http.NewTransport() + serverHTTP := httpTransport.NewInbound(":0") serverDisp := yarpc.NewDispatcher(yarpc.Config{ Name: "server", Inbounds: yarpc.Inbounds{serverHTTP}, @@ -203,19 +196,11 @@ func withConnectedClient(t *testing.T, recorder *Recorder, f func(raw.Client)) { require.NoError(t, serverDisp.Start()) defer serverDisp.Stop() - httpTransport := http.NewTransport() - // TODO http transport lifecycle - clientDisp := yarpc.NewDispatcher(yarpc.Config{ Name: "client", Outbounds: yarpc.Outbounds{ "server": { - Unary: http.NewOutbound( - single.New( - hostport.PeerIdentifier(serverHTTP.Addr().String()), - httpTransport, - ), - ), + Unary: httpTransport.NewSingleOutbound(fmt.Sprintf("http://%s", serverHTTP.Addr())), }, }, OutboundMiddleware: yarpc.OutboundMiddleware{