diff --git a/CHANGELOG.md b/CHANGELOG.md index 22af7ce2..75d602de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,32 +1,37 @@ Changelog ========= -# v1.6.0 +# v1.7.0 (2017-08-04) -* Locks Apache Thrift to version 0.9.3 to maintain backward-compatibility. +* Cancel the context on incoming calls if the client connection is closed. +* Add `WithoutHeaders` to remove TChannel keys from a context. + +# v1.6.0 (2017-06-02) + +* Locks Apache Thrift to version 0.9.3, 0.10.0 to maintain backward-compatibility. * Add `OnPeerStatusChanged` channel option to receive a notification each time the number of available connections changes for any given peer. * Set DiffServ (QoS) bit on outbound connections. * Improve resilience of the frame parser. -# v1.5.0 +# v1.5.0 (2017-03-21) * Add `PeerList.Len` to expose the number of peers in the peer list. * Add `PeerList.GetNew` to only return previously unselected peers. -# v1.4.0 +# v1.4.0 (2017-03-01) * Add version information to the channel's LocalPeerInfo. * Add peers package for peer management utilities such as consistent peer selection. * Fix SetScoreStrategy not rescoring existing peers. (#583). -# v1.3.0 +# v1.3.0 (2017-02-01) * Exposes the channel's RootPeerList with `channel.RootPeers()`. * Support Thrift namespaces for thrift-gen. -# v1.2.3 +# v1.2.3 (2017-01-19) * Improve error messages when an argument reader is closed without reading the EOF. (#567) @@ -38,7 +43,7 @@ Changelog * thrift-gen: Fix "namespace go" being ignored even though the Apache thrift generated code was respecting it. (#559) -# v1.2.2 +# v1.2.2 (2016-12-21) * Add a unique channel ID for introspection (#548) * Don't drop existing headers on a context when using Wrap(ctx) (#547) @@ -50,15 +55,15 @@ Changelog * Add remote peer info to connection logger and introspection (#514) * Treat hostPorts ending in ":0" in the init headers as ephemeral (#513) -# v1.2.1 +# v1.2.1 (2016-09-29) * Fix data race on headers when making concurrent calls using the same context. (#505) -# v1.2.0 +# v1.2.0 (2016-09-15) * Adds support for routing keys (the TChannel rk transport header). -# v1.1.0 +# v1.1.0 (2016-08-25) * Integrate OpenTracing for distributed tracing and context propagation. As long as a Zipkin-style tracing is configured, TChannel frames still @@ -72,7 +77,7 @@ Changelog * Drop go1.4 support and add support for go1.7. * Pass thrift.Context to the thrift.Server's response callback (#465) -# 1.0.9 +# 1.0.9 (2016-07-20) * Expose meta endpoints on the "tchannel" service name. (#459) * Add Go version and tchannel-go library version to introspection. (#457) @@ -80,11 +85,11 @@ Changelog connection's reported host:port. (#452) * Expose the number of connections on a channel. (#451) -# 1.0.8 +# 1.0.8 (2016-07-15) * Remove dependency on "testing" from "tchannel-go" introduced in v1.0.7. -# 1.0.7 +# 1.0.7 (2016-07-15) * Add CallOptions() to IncomingCall which can be used as the call option when making outbound calls to proxy all transport headers. @@ -96,11 +101,11 @@ Changelog streaming calls where the stream timeout may be much longer than the connection timeout. -# 1.0.6 +# 1.0.6 (2016-06-16) * Fix trace span encoding fields in the wrong order -# 1.0.5 +# 1.0.5 (2016-04-04) * Use `context.Context` storage for headers so `thrift.Context` and `tchannel.ContextWithHeaders` can be passed to functions that use @@ -110,7 +115,7 @@ Changelog * Store goroutine stack traces on channel creation that can be accessed via introspection. -# 1.0.4 +# 1.0.4 (2016-03-09) * Improve handling of network failures during pending calls. Previously, calls would timeout, but now they fail as soon as the network failure is detected. @@ -119,7 +124,7 @@ Changelog * #228: Add registered methods to the introspection output. * Add ability to set a global handler for a SubChannel. -# 1.0.3 +# 1.0.3 (2016-02-15) * Improved performance when writing Thrift structs * Make closing message exchanges less disruptive, changes a panic due to @@ -127,7 +132,7 @@ Changelog * Introspection now includes information about all channels created in the current process. -# 1.0.2 +# 1.0.2 (2016-01-29) * Extend the `ContextBuilder` API to support setting the transport-level routing delegate header. @@ -136,7 +141,7 @@ Changelog timeouts. * Assorted logging and test improvements. -# 1.0.1 +# 1.0.1 (2016-01-19) * Bug fix for #181: Shuffle peers on PeerList.Add to avoid biases in peer selection. @@ -152,7 +157,7 @@ Changelog thrift.ReadHeaders, thrift.WriteHeaders) so callers can easily send Thrift payloads over the streaming interface. -# 1.0.0 +# 1.0.0 (2016-01-11) * First stable release. * Support making calls with JSON, Thrift or raw payloads. diff --git a/connection_test.go b/connection_test.go index 0060fdc3..6675e144 100644 --- a/connection_test.go +++ b/connection_test.go @@ -970,3 +970,73 @@ func TestPeerStatusChangeServer(t *testing.T) { }) assert.Len(t, changes, 0, "unexpected peer status changes") } + +func TestContextCanceledOnTCPClose(t *testing.T) { + // 1. Context canceled warning is expected as part of this test + // add log filter to ignore this error + // 2. We use our own relay in this test, so disable the relay + // that comes with the test server + opts := testutils.NewOpts().NoRelay().AddLogFilter("simpleHandler OnError", 1) + + testutils.WithTestServer(t, opts, func(ts *testutils.TestServer) { + serverDoneC := make(chan struct{}) + callForwarded := make(chan struct{}) + + ts.RegisterFunc("test", func(ctx context.Context, args *raw.Args) (*raw.Res, error) { + defer close(serverDoneC) + close(callForwarded) + <-ctx.Done() + assert.EqualError(t, ctx.Err(), "context canceled") + return &raw.Res{}, nil + }) + + // Set up a relay that can be used to terminate conns + // on both sides i.e. client and server + relayFunc := func(outgoing bool, f *Frame) *Frame { + return f + } + relayHostPort, shutdown := testutils.FrameRelay(t, ts.HostPort(), relayFunc) + + // Make a call with a long timeout. We shutdown the relay + // immediately after the server receives the call. Expected + // behavior is for both client/server to be done with the call + // immediately after relay shutsdown + ctx, cancel := NewContext(20 * time.Second) + defer cancel() + + clientCh := ts.NewClient(nil) + // initiate the call in a background routine and + // make it wait for the response + clientDoneC := make(chan struct{}) + go func() { + raw.Call(ctx, clientCh, relayHostPort, ts.ServiceName(), "test", nil, nil) + close(clientDoneC) + }() + + // wait for server to receive the call + select { + case <-callForwarded: + case <-time.After(2 * time.Second): + assert.Fail(t, "timed waiting for call to be forwarded") + } + + // now shutdown the relay to close conns + // on both sides + shutdown() + + // wait for both the client & server to be done + select { + case <-serverDoneC: + case <-time.After(2 * time.Second): + assert.Fail(t, "timed out waiting for server handler to exit") + } + + select { + case <-clientDoneC: + case <-time.After(2 * time.Second): + assert.Fail(t, "timed out waiting for client to exit") + } + + clientCh.Close() + }) +} diff --git a/context_header.go b/context_header.go index f0e21dca..3a333af0 100644 --- a/context_header.go +++ b/context_header.go @@ -114,3 +114,8 @@ func WrapWithHeaders(ctx context.Context, headers map[string]string) ContextWith newCtx := context.WithValue(ctx, contextKeyHeaders, h) return headerCtx{Context: newCtx} } + +// WithoutHeaders hides any TChannel headers from the given context. +func WithoutHeaders(ctx context.Context) context.Context { + return context.WithValue(context.WithValue(ctx, contextKeyTChannel, nil), contextKeyHeaders, nil) +} diff --git a/context_internal_test.go b/context_internal_test.go index d0c29183..b27d3ca9 100644 --- a/context_internal_test.go +++ b/context_internal_test.go @@ -55,3 +55,20 @@ func TestCurrentSpan(t *testing.T) { require.NotNil(t, span, "CurrentSpan() should always return something") assert.NotEqual(t, uint64(0), span.TraceID(), "mock tracer is now Zipkin-compatible") } + +func TestContextWithoutHeadersKeyHeaders(t *testing.T) { + ctx := WrapWithHeaders(context.Background(), map[string]string{"k1": "v1"}) + assert.Equal(t, map[string]string{"k1": "v1"}, ctx.Headers()) + ctx2 := WithoutHeaders(ctx) + assert.Nil(t, ctx2.Value(contextKeyHeaders)) + _, ok := ctx2.(ContextWithHeaders) + assert.False(t, ok) +} + +func TestContextWithoutHeadersKeyTChannel(t *testing.T) { + ctx, _ := NewContextBuilder(time.Second).SetShardKey("s1").Build() + ctx2 := WithoutHeaders(ctx) + assert.Nil(t, ctx2.Value(contextKeyTChannel)) + _, ok := ctx2.(ContextWithHeaders) + assert.False(t, ok) +} diff --git a/glide.lock b/glide.lock index 829c7de4..2b9abd17 100644 --- a/glide.lock +++ b/glide.lock @@ -2,7 +2,7 @@ hash: 111bdf9797d23d4b9f8833a9780e382dd53153af70ad286f99815cea4c2fdf4b updated: 2016-08-05T18:02:17.171302481-04:00 imports: - name: github.com/apache/thrift - version: 0e9fed1e12ed066865e46c6903782b2ef95f4650 + version: b2a4d4ae21c789b689dd162deb819665567f481c subpackages: - lib/go/thrift - name: github.com/bmizerany/perks @@ -10,20 +10,22 @@ imports: subpackages: - quantile - name: github.com/cactus/go-statsd-client - version: 91c326c3f7bd20f0226d3d1c289dd9f8ce28d33d + version: ad551ee7f9f3465fb1ce1695899c612f7808a06a subpackages: - statsd +- name: github.com/codahale/hdrhistogram + version: 3a0bb77429bd3a61596f5e8a3172445844342120 - name: github.com/crossdock/crossdock-go version: 228792ab861c86f8900b2db0439577feef9ec9d8 subpackages: - assert - require - name: github.com/davecgh/go-spew - version: 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d + version: adab96458c51a58dc1783b3335dcce5461522e75 subpackages: - spew - name: github.com/jessevdk/go-flags - version: f2785f5820ec967043de79c8be97edfc464ca745 + version: 6cf8f02b4ae8ba723ddc64dcfd403e530c06d927 - name: github.com/opentracing/opentracing-go version: 855519783f479520497c6b3445611b05fc42f009 subpackages: @@ -44,7 +46,7 @@ imports: - name: github.com/stretchr/objx version: 1a9d0bb9f541897e62256577b352fdbc1fb4fd94 - name: github.com/stretchr/testify - version: d77da356e56a7428ad25149ca77381849a6a5232 + version: 05e8a0eda380579888eb53c394909df027f06991 subpackages: - assert - mock @@ -60,10 +62,10 @@ imports: - utils - thrift-gen/agent - name: golang.org/x/net - version: b400c2eff1badec7022a8c8f5bea058b6315eed7 + version: f5079bd7f6f74e23c4d65efa0f4ce14cbd6a3c0f subpackages: - context - context/ctxhttp - name: gopkg.in/yaml.v2 - version: a83829b6f1293c91addabc89d0571c246397bbf4 -devImports: [] + version: 25c4ec802a7d637f88d584ab26798e94ad14c13b +testImports: [] diff --git a/glide.yaml b/glide.yaml index e307e732..6375bf23 100644 --- a/glide.yaml +++ b/glide.yaml @@ -1,7 +1,7 @@ package: github.com/uber/tchannel-go import: - package: github.com/apache/thrift - version: master + version: ">=0.9.3, <0.11.0" subpackages: - lib/go/thrift - package: github.com/cactus/go-statsd-client @@ -46,4 +46,5 @@ import: - package: github.com/prashantv/protectmem - package: github.com/opentracing/opentracing-go - package: github.com/uber/jaeger-client-go + version: ^2.7 - package: github.com/crossdock/crossdock-go diff --git a/inbound.go b/inbound.go index b967b8ea..87a80260 100644 --- a/inbound.go +++ b/inbound.go @@ -189,6 +189,11 @@ func (c *Connection) dispatchInbound(_ uint32, _ uint32, call *InboundCall, fram if c.log.Enabled(LogLevelDebug) { call.log.Debugf("Wait for timeout/cancellation interrupted by error: %v", call.mex.errCh.err) } + // when an exchange errors out, mark the exchange as expired + // and call cancel so the server handler's context is canceled + // TODO: move the cancel to the parent context at connnection level + call.response.cancel() + call.mex.inboundExpired() } }() diff --git a/relay.go b/relay.go index 0939a1dd..a09bd182 100644 --- a/relay.go +++ b/relay.go @@ -191,8 +191,8 @@ func NewRelayer(ch *Channel, conn *Connection) *Relayer { relayHost: ch.RelayHost(), maxTimeout: ch.relayMaxTimeout, localHandler: ch.relayLocal, - outbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "outbound"})), - inbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "inbound"})), + outbound: newRelayItems(conn.log.WithFields(LogField{"relayItems", "outbound"})), + inbound: newRelayItems(conn.log.WithFields(LogField{"relayItems", "inbound"})), peers: ch.RootPeers(), conn: conn, logger: conn.log, diff --git a/version.go b/version.go index 697b0e96..bedc7441 100644 --- a/version.go +++ b/version.go @@ -23,4 +23,4 @@ package tchannel // VersionInfo identifies the version of the TChannel library. // Due to lack of proper package management, this version string will // be maintained manually. -const VersionInfo = "1.6.0" +const VersionInfo = "1.7.0"