diff --git a/docker-compose.yml b/docker-compose.yml index 4ab9cd09b8..233ebc74b1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -49,6 +49,7 @@ services: - BEHAVIOR_CTXPROPAGATION=ctxclient,ctxserver,transport - BEHAVIOR_APACHETHRIFT=apachethriftclient,apachethriftserver - BEHAVIOR_ONEWAY=client_oneway,server_oneway,transport_oneway,encoding + - BEHAVIOR_ONEWAY_CTXPROPAGATION=client_oneway,server_oneway,transport_oneway - REPORT=compact diff --git a/internal/crossdock/client/onewayctxpropagation/behavior.go b/internal/crossdock/client/onewayctxpropagation/behavior.go new file mode 100644 index 0000000000..b71e41a12c --- /dev/null +++ b/internal/crossdock/client/onewayctxpropagation/behavior.go @@ -0,0 +1,105 @@ +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package onewayctxpropagation + +import ( + "context" + + "go.uber.org/yarpc" + "go.uber.org/yarpc/encoding/raw" + "go.uber.org/yarpc/internal/crossdock/client/dispatcher" + + "github.com/crossdock/crossdock-go" + opentracing "github.com/opentracing/opentracing-go" +) + +// Run starts the behavior, testing oneway context propagation +func Run(t crossdock.T) { + assert := crossdock.Assert(t) + fatals := crossdock.Fatals(t) + + baggage := map[string]string{ + "hello": "world", + "foo": "bar", + } + + // create handler + callBackHandler, serverCalledBack := newCallBackHandler() + dispatcher, callBackAddr := dispatcher.CreateOneway(t, callBackHandler) + defer dispatcher.Stop() + + client := raw.New(dispatcher.ClientConfig("oneway-server")) + + // make call + ack, err := client.CallOneway( + newContextWithBaggage(baggage), + yarpc.NewReqMeta(). + Procedure("echo/raw"). + Headers(yarpc.NewHeaders(). + With("callBackAddr", callBackAddr)), + []byte{}) + + fatals.NoError(err, "call to oneway/raw failed: %v", err) + fatals.NotNil(ack, "ack is nil") + + // wait for server to call us back + gotBaggage := <-serverCalledBack + assert.Equal(baggage, gotBaggage, "server baggage: %s", gotBaggage) +} + +// newCallBackHandler creates a oneway handler that fills a channel +// with the received body +func newCallBackHandler() (raw.OnewayHandler, <-chan map[string]string) { + serverCalledBack := make(chan map[string]string) + handler := func(ctx context.Context, reqMeta yarpc.ReqMeta, body []byte) error { + serverCalledBack <- extractBaggage(ctx) + return nil + } + return handler, serverCalledBack +} + +func newContextWithBaggage(baggage map[string]string) context.Context { + span := opentracing.GlobalTracer().StartSpan("add baggage") + for k, v := range baggage { + span.SetBaggageItem(k, v) + } + return opentracing.ContextWithSpan(context.Background(), span) +} + +func extractBaggage(ctx context.Context) map[string]string { + baggage := make(map[string]string) + + span := opentracing.SpanFromContext(ctx) + if span == nil { + return baggage + } + spanContext := span.Context() + if spanContext == nil { + return baggage + } + + spanContext.ForeachBaggageItem(func(k, v string) bool { + baggage[k] = v + return true + }) + + return baggage +} diff --git a/internal/crossdock/client/start.go b/internal/crossdock/client/start.go index 8a567e4a4e..aa39400deb 100644 --- a/internal/crossdock/client/start.go +++ b/internal/crossdock/client/start.go @@ -30,6 +30,7 @@ import ( "go.uber.org/yarpc/internal/crossdock/client/headers" "go.uber.org/yarpc/internal/crossdock/client/httpserver" "go.uber.org/yarpc/internal/crossdock/client/oneway" + "go.uber.org/yarpc/internal/crossdock/client/onewayctxpropagation" "go.uber.org/yarpc/internal/crossdock/client/tchclient" "go.uber.org/yarpc/internal/crossdock/client/tchserver" "go.uber.org/yarpc/internal/crossdock/client/timeout" @@ -38,20 +39,21 @@ import ( ) var behaviors = crossdock.Behaviors{ - "raw": echo.Raw, - "json": echo.JSON, - "thrift": echo.Thrift, - "headers": headers.Run, - "errors_httpclient": errorshttpclient.Run, - "errors_tchclient": errorstchclient.Run, - "tchclient": tchclient.Run, - "tchserver": tchserver.Run, - "thriftgauntlet": gauntlet.Run, - "timeout": timeout.Run, - "ctxpropagation": ctxpropagation.Run, - "httpserver": httpserver.Run, - "apachethrift": apachethrift.Run, - "oneway": oneway.Run, + "raw": echo.Raw, + "json": echo.JSON, + "thrift": echo.Thrift, + "headers": headers.Run, + "errors_httpclient": errorshttpclient.Run, + "errors_tchclient": errorstchclient.Run, + "tchclient": tchclient.Run, + "tchserver": tchserver.Run, + "thriftgauntlet": gauntlet.Run, + "timeout": timeout.Run, + "ctxpropagation": ctxpropagation.Run, + "httpserver": httpserver.Run, + "apachethrift": apachethrift.Run, + "oneway": oneway.Run, + "oneway_ctxpropagation": onewayctxpropagation.Run, } // Start registers behaviors and begins the Crossdock client diff --git a/internal/crossdock/main_test.go b/internal/crossdock/main_test.go index 03b98794d2..57a1726e3e 100644 --- a/internal/crossdock/main_test.go +++ b/internal/crossdock/main_test.go @@ -137,6 +137,15 @@ func TestCrossdock(t *testing.T) { "transport_oneway": []string{"http"}, }, }, + { + name: "oneway_ctxpropagation", + params: params{ + "server_oneway": "127.0.0.1", + }, + axes: axes{ + "transport_oneway": []string{"http"}, + }, + }, } for _, bb := range behaviors {