Skip to content

Commit

Permalink
Pivot HTTP constructors to use a shared Transport #540
Browse files Browse the repository at this point in the history
  • Loading branch information
kriskowal committed Dec 7, 2016
1 parent c91f0f0 commit 95f6fb0
Show file tree
Hide file tree
Showing 24 changed files with 116 additions and 219 deletions.
17 changes: 6 additions & 11 deletions bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (

"go.uber.org/yarpc"
"go.uber.org/yarpc/encoding/raw"
"go.uber.org/yarpc/peer/hostport"
"go.uber.org/yarpc/peer/single"
yhttp "go.uber.org/yarpc/transport/http"
ytchannel "go.uber.org/yarpc/transport/tchannel"

Expand Down Expand Up @@ -123,19 +121,17 @@ func runTChannelClient(b *testing.B, c *tchannel.Channel, hostPort string) {
}

func Benchmark_HTTP_YARPCToYARPC(b *testing.B) {
httpTransport := yhttp.NewTransport()
serverCfg := yarpc.Config{
Name: "server",
Inbounds: yarpc.Inbounds{yhttp.NewInbound(":8999")},
Inbounds: yarpc.Inbounds{httpTransport.NewInbound(":8999")},
}

httpTransport := yhttp.NewTransport()
clientCfg := yarpc.Config{
Name: "client",
Outbounds: yarpc.Outbounds{
"server": {
Unary: yhttp.NewOutbound(
single.New(hostport.PeerIdentifier("http://localhost:8999"), httpTransport),
),
Unary: httpTransport.NewSingleOutbound("http://localhost:8999"),
},
},
}
Expand All @@ -155,9 +151,7 @@ func Benchmark_HTTP_YARPCToNetHTTP(b *testing.B) {
Name: "client",
Outbounds: yarpc.Outbounds{
"server": {
Unary: yhttp.NewOutbound(
single.New(hostport.PeerIdentifier("http://localhost:8998"), httpTransport),
),
Unary: httpTransport.NewSingleOutbound("http://localhost:8998"),
},
},
}
Expand All @@ -171,9 +165,10 @@ func Benchmark_HTTP_YARPCToNetHTTP(b *testing.B) {
}

func Benchmark_HTTP_NetHTTPToYARPC(b *testing.B) {
httpTransport := yhttp.NewTransport()
serverCfg := yarpc.Config{
Name: "server",
Inbounds: yarpc.Inbounds{yhttp.NewInbound(":8996")},
Inbounds: yarpc.Inbounds{httpTransport.NewInbound(":8996")},
}

withDispatcher(b, serverCfg, func(server yarpc.Dispatcher) {
Expand Down
3 changes: 2 additions & 1 deletion dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ func basicDispatcher(t *testing.T) Dispatcher {
ch, err := tchannel.NewChannel("test", nil)
require.NoError(t, err, "failed to create TChannel")

httpTransport := http.NewTransport()
return NewDispatcher(Config{
Name: "test",
Inbounds: Inbounds{
tch.NewInbound(ch).WithListenAddr(":0"),
http.NewInbound(":0"),
httpTransport.NewInbound(":0"),
},
})
}
Expand Down
10 changes: 4 additions & 6 deletions internal/crossdock/client/apachethrift/behavior.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (
"go.uber.org/yarpc"
"go.uber.org/yarpc/encoding/thrift"
"go.uber.org/yarpc/internal/crossdock/client/gauntlet"
"go.uber.org/yarpc/peer/hostport"
"go.uber.org/yarpc/peer/single"
"go.uber.org/yarpc/transport/http"

"github.com/crossdock/crossdock-go"
Expand All @@ -46,13 +44,13 @@ func Run(t crossdock.T) {
fatals.NotEmpty(server, "apachethriftserver is required")

httpTransport := http.NewTransport()
hostPort := hostport.PeerIdentifier(fmt.Sprintf("%v:%v", server, serverPort))
url := fmt.Sprintf("http://%v:%v/", server, serverPort)

thriftOutbound := http.NewOutbound(single.New(hostPort, httpTransport)).
thriftOutbound := httpTransport.NewSingleOutbound(url).
WithURLTemplate("http://host:port/thrift/ThriftTest")
secondOutbound := http.NewOutbound(single.New(hostPort, httpTransport)).
secondOutbound := httpTransport.NewSingleOutbound(url).
WithURLTemplate("http://host:port/thrift/SecondService")
multiplexOutbound := http.NewOutbound(single.New(hostPort, httpTransport)).
multiplexOutbound := httpTransport.NewSingleOutbound(url).
WithURLTemplate("http://host:port/thrift/multiplexed")

dispatcher := yarpc.NewDispatcher(yarpc.Config{
Expand Down
16 changes: 3 additions & 13 deletions internal/crossdock/client/ctxpropagation/behavior.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import (
"go.uber.org/yarpc/encoding/json"
"go.uber.org/yarpc/internal/crossdock/client/params"
server "go.uber.org/yarpc/internal/crossdock/server/yarpc"
"go.uber.org/yarpc/peer/hostport"
"go.uber.org/yarpc/peer/single"
"go.uber.org/yarpc/transport"
"go.uber.org/yarpc/transport/http"
tch "go.uber.org/yarpc/transport/tchannel"
Expand Down Expand Up @@ -324,20 +322,12 @@ func buildDispatcher(t crossdock.T) (dispatcher yarpc.Dispatcher, tconfig server
ch, err := tchannel.NewChannel("ctxclient", nil)
fatals.NoError(err, "failed to create TChannel")

var httpTransport *http.Transport
httpTransport := http.NewTransport()

var outbound transport.UnaryOutbound
switch trans := t.Param(params.Transport); trans {
case "http":
if httpTransport == nil {
httpTransport = http.NewTransport()
}
outbound = http.NewOutbound(
single.New(
hostport.PeerIdentifier(fmt.Sprintf("%s:8081", subject)),
httpTransport,
),
)
outbound = httpTransport.NewSingleOutbound(fmt.Sprintf("http://%s:8081", subject))
tconfig.TChannel = &server.TChannelTransport{Host: self, Port: 8087}
case "tchannel":
outbound = tch.NewOutbound(ch).WithHostPort(fmt.Sprintf("%s:8082", subject))
Expand All @@ -350,7 +340,7 @@ func buildDispatcher(t crossdock.T) (dispatcher yarpc.Dispatcher, tconfig server
Name: "ctxclient",
Inbounds: yarpc.Inbounds{
tch.NewInbound(ch).WithListenAddr(":8087"),
http.NewInbound(":8086"),
httpTransport.NewInbound(":8086"),
},
Outbounds: yarpc.Outbounds{
"yarpc-test": {
Expand Down
23 changes: 5 additions & 18 deletions internal/crossdock/client/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (
"go.uber.org/yarpc"
"go.uber.org/yarpc/encoding/raw"
"go.uber.org/yarpc/internal/crossdock/client/params"
"go.uber.org/yarpc/peer/hostport"
"go.uber.org/yarpc/peer/single"
"go.uber.org/yarpc/transport"
"go.uber.org/yarpc/transport/http"
tch "go.uber.org/yarpc/transport/tchannel"
Expand All @@ -43,21 +41,13 @@ func Create(t crossdock.T) yarpc.Dispatcher {
server := t.Param(params.Server)
fatals.NotEmpty(server, "server is required")

var httpTransport *http.Transport
httpTransport := http.NewTransport()

var unaryOutbound transport.UnaryOutbound
trans := t.Param(params.Transport)
switch trans {
case "http":
if httpTransport == nil {
httpTransport = http.NewTransport()
}
unaryOutbound = http.NewOutbound(
single.New(
hostport.PeerIdentifier(fmt.Sprintf("%s:8081", server)),
httpTransport,
),
)
unaryOutbound = httpTransport.NewSingleOutbound(fmt.Sprintf("http://%s:8081", server))
case "tchannel":
ch, err := tchannel.NewChannel("client", nil)
fatals.NoError(err, "couldn't create tchannel")
Expand All @@ -84,21 +74,18 @@ func CreateOnewayDispatcher(t crossdock.T, handler raw.OnewayHandler) (yarpc.Dis
server := t.Param("server_oneway")
fatals.NotEmpty(server, "oneway server is required")

httpTransport := http.NewTransport()
var outbound transport.OnewayOutbound

trans := t.Param("transport_oneway")
switch trans {
case "http":
outbound = http.NewOutbound(
single.New(
hostport.PeerIdentifier(fmt.Sprintf("%s:8084", server)),
http.NewTransport(),
))
outbound = httpTransport.NewSingleOutbound(fmt.Sprintf("http://%s:8084", server))
default:
fatals.Fail("", "unknown transport %q", trans)
}

callBackInbound := http.NewInbound(":0")
callBackInbound := httpTransport.NewInbound(":0")
dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: "oneway-client",
Outbounds: yarpc.Outbounds{
Expand Down
13 changes: 2 additions & 11 deletions internal/crossdock/client/httpserver/behavior.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@ import (
"strings"
"time"

yarpc "go.uber.org/yarpc"
"go.uber.org/yarpc"
"go.uber.org/yarpc/encoding/raw"
"go.uber.org/yarpc/internal/crossdock/client/params"
"go.uber.org/yarpc/peer/hostport"
"go.uber.org/yarpc/peer/single"
"go.uber.org/yarpc/transport"
"go.uber.org/yarpc/transport/http"

Expand All @@ -45,18 +43,11 @@ func Run(t crossdock.T) {
fatals.NotEmpty(server, "server is required")

httpTransport := http.NewTransport()
// TODO http transport lifecycle

disp := yarpc.NewDispatcher(yarpc.Config{
Name: "client",
Outbounds: yarpc.Outbounds{
"yarpc-test": {
Unary: http.NewOutbound(
single.New(
hostport.PeerIdentifier(fmt.Sprintf("%s:8085", server)),
httpTransport,
),
),
Unary: httpTransport.NewSingleOutbound(fmt.Sprintf("http://%s:8085", server)),
},
},
})
Expand Down
8 changes: 1 addition & 7 deletions internal/crossdock/server/oneway/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import (
"go.uber.org/yarpc/encoding/json"
"go.uber.org/yarpc/encoding/raw"
"go.uber.org/yarpc/encoding/thrift"
"go.uber.org/yarpc/peer/hostport"
"go.uber.org/yarpc/peer/single"
"go.uber.org/yarpc/transport"
"go.uber.org/yarpc/transport/http"
)
Expand Down Expand Up @@ -73,11 +71,7 @@ func (o *onewayHandler) callHome(ctx context.Context, reqMeta yarpc.ReqMeta, bod
panic("could not find callBackAddr in headers")
}

out := http.NewOutbound(
single.New(
hostport.PeerIdentifier(callBackAddr),
o.httpTransport))

out := o.httpTransport.NewSingleOutbound("http://" + callBackAddr)
out.Start()
defer out.Stop()

Expand Down
3 changes: 2 additions & 1 deletion internal/crossdock/server/oneway/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ var dispatcher yarpc.Dispatcher

// Start starts the test server that clients will make requests to
func Start() {
inbounds := []transport.Inbound{http.NewInbound(":8084")}
httpTransport := http.NewTransport()
inbounds := []transport.Inbound{httpTransport.NewInbound(":8084")}

dispatcher = yarpc.NewDispatcher(yarpc.Config{
Name: "oneway-server",
Expand Down
11 changes: 3 additions & 8 deletions internal/crossdock/server/yarpc/phone.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
"go.uber.org/yarpc"
"go.uber.org/yarpc/encoding/json"
"go.uber.org/yarpc/internal/clientconfig"
"go.uber.org/yarpc/peer/hostport"
"go.uber.org/yarpc/peer/single"
"go.uber.org/yarpc/transport"
"go.uber.org/yarpc/transport/http"
tch "go.uber.org/yarpc/transport/tchannel"
Expand Down Expand Up @@ -75,15 +73,12 @@ type PhoneResponse struct {
func Phone(ctx context.Context, reqMeta yarpc.ReqMeta, body *PhoneRequest) (*PhoneResponse, yarpc.ResMeta, error) {
var outbound transport.UnaryOutbound

httpTransport := http.NewTransport()

switch {
case body.Transport.HTTP != nil:
t := body.Transport.HTTP
outbound = http.NewOutbound(
single.New(
hostport.PeerIdentifier(fmt.Sprintf("%s:%d", t.Host, t.Port)),
http.NewTransport(), // TODO transport lifecycle
),
)
outbound = httpTransport.NewSingleOutbound(fmt.Sprintf("http://%s:%d", t.Host, t.Port))
case body.Transport.TChannel != nil:
t := body.Transport.TChannel
hostport := fmt.Sprintf("%s:%d", t.Host, t.Port)
Expand Down
3 changes: 2 additions & 1 deletion internal/crossdock/server/yarpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ func Start() {
log.Fatalln("couldn't create tchannel: %v", err)
}

httpTransport := http.NewTransport()
dispatcher = yarpc.NewDispatcher(yarpc.Config{
Name: "yarpc-test",
Inbounds: yarpc.Inbounds{
http.NewInbound(":8081"),
httpTransport.NewInbound(":8081"),
tch.NewInbound(ch).WithListenAddr(":8082"),
},
})
Expand Down
14 changes: 2 additions & 12 deletions internal/examples/json/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ import (

"go.uber.org/yarpc"
"go.uber.org/yarpc/encoding/json"
"go.uber.org/yarpc/peer/hostport"
"go.uber.org/yarpc/peer/single"
"go.uber.org/yarpc/transport"
"go.uber.org/yarpc/transport/http"
tch "go.uber.org/yarpc/transport/tchannel"
Expand Down Expand Up @@ -92,20 +90,12 @@ func main() {

flag.Parse()

var httpTransport *http.Transport
httpTransport := http.NewTransport()

var outbound transport.UnaryOutbound
switch strings.ToLower(outboundName) {
case "http":
if httpTransport == nil {
httpTransport = http.NewTransport()
}
outbound = http.NewOutbound(
single.New(
hostport.PeerIdentifier("127.0.0.1:24034"),
httpTransport,
),
)
outbound = httpTransport.NewSingleOutbound("http://127.0.0.1:24034")
case "tchannel":
channel, err := tchannel.NewChannel("keyvalue-client", nil)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion internal/examples/json/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,12 @@ func main() {
log.Fatalln(err)
}

httpTransport := http.NewTransport()
dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: "keyvalue",
Inbounds: yarpc.Inbounds{
tch.NewInbound(channel).WithListenAddr(":28941"),
http.NewInbound(":24034"),
httpTransport.NewInbound(":24034"),
},
InboundMiddleware: yarpc.InboundMiddleware{
Unary: requestLogInboundMiddleware{},
Expand Down
13 changes: 2 additions & 11 deletions internal/examples/thrift/hello/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
"go.uber.org/yarpc/internal/examples/thrift/hello/echo"
"go.uber.org/yarpc/internal/examples/thrift/hello/echo/yarpc/helloclient"
"go.uber.org/yarpc/internal/examples/thrift/hello/echo/yarpc/helloserver"
"go.uber.org/yarpc/peer/hostport"
"go.uber.org/yarpc/peer/single"

"go.uber.org/yarpc"
"go.uber.org/yarpc/transport/http"
Expand All @@ -39,22 +37,15 @@ import (
//go:generate thriftrw --plugin=yarpc echo.thrift

func main() {

httpTransport := http.NewTransport()

dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: "hello",
Inbounds: yarpc.Inbounds{
http.NewInbound(":8086"),
httpTransport.NewInbound(":8086"),
},
Outbounds: yarpc.Outbounds{
"hello": {
Unary: http.NewOutbound(
single.New(
hostport.PeerIdentifier("127.0.0.1:8086"),
httpTransport,
),
),
Unary: httpTransport.NewSingleOutbound("http://127.0.0.1:8086"),
},
},
})
Expand Down
Loading

0 comments on commit 95f6fb0

Please sign in to comment.