Skip to content

Commit

Permalink
Implement a separate connect timeout
Browse files Browse the repository at this point in the history
This allows a shorter timeout to apply than the call timeout, which is
useful for long lived streams
  • Loading branch information
prashantv committed Apr 21, 2016
1 parent 2d25830 commit a76a82f
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 9 deletions.
10 changes: 9 additions & 1 deletion channel.go
Expand Up @@ -461,7 +461,7 @@ func getTimeout(ctx context.Context) time.Duration {
return deadline.Sub(time.Now())
}

// Connect connects the channel.
// Connect creates a new outbound connection to hostPort.
func (ch *Channel) Connect(ctx context.Context, hostPort string) (*Connection, error) {
switch state := ch.State(); state {
case ChannelClient, ChannelListening:
Expand All @@ -474,6 +474,14 @@ func (ch *Channel) Connect(ctx context.Context, hostPort string) (*Connection, e
return nil, errInvalidStateForOp
}

// The context timeout applies to the whole call, but users may want a lower
// connect timeout (e.g. for streams).
if params := getTChannelParams(ctx); params != nil && params.connectTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, params.connectTimeout)
defer cancel()
}

events := connectionEvents{
OnCloseStateChange: ch.connectionCloseStateChange,
OnExchangeUpdated: ch.exchangeUpdated,
Expand Down
42 changes: 42 additions & 0 deletions connection_test.go
Expand Up @@ -639,3 +639,45 @@ func TestNetDialTimeout(t *testing.T) {
assert.Equal(t, ErrTimeout, err, "Ping expected to fail with timeout")
assert.True(t, d >= timeoutPeriod, "Timeout should take more than %v, took %v", timeoutPeriod, d)
}

func TestConnectTimeout(t *testing.T) {
opts := testutils.NewOpts().DisableLogVerification()
testutils.WithTestServer(t, opts, func(ts *testutils.TestServer) {
// Set up a relay that will delay the initial init req.
testComplete := make(chan struct{})

relayFunc := func(outgoing bool, f *Frame) *Frame {
select {
case <-time.After(200 * time.Millisecond):
return f
case <-testComplete:
// TODO: We should be able to forward the frame and have this test not fail.
// Currently, it fails since the sequence of events is:
// Server receives a TCP connection
// Channel.Close() is called on the server
// Server's TCP connection receives an init req
// Since we don't currently track pending connections, the open TCP connection is not closed, and
// we process the init req. This leaves an open connection at the end of the test.
return nil
}
}
relay, shutdown := testutils.FrameRelay(t, ts.HostPort(), relayFunc)
defer shutdown()

// Make a call with a long timeout, but short connect timeout.
// We expect the call to fall almost immediately with ErrTimeout.
ctx, cancel := NewContextBuilder(2 * time.Second).
SetConnectTimeout(time.Millisecond).
Build()
defer cancel()

client := ts.NewClient(opts)
err := client.Ping(ctx, relay)
assert.Equal(t, ErrTimeout, err, "Ping should timeout due to timeout relay")

// Note: we do not defer this, as we need to close(testComplete) before
// we call shutdown since shutdown waits for the relay to close, which
// is stuck waiting inside of our custom relay function.
close(testComplete)
})
}
9 changes: 5 additions & 4 deletions context.go
Expand Up @@ -36,10 +36,11 @@ const (
)

type tchannelCtxParams struct {
span *Span
call IncomingCall
options *CallOptions
retryOptions *RetryOptions
span *Span
call IncomingCall
options *CallOptions
retryOptions *RetryOptions
connectTimeout time.Duration
}

// IncomingCall exposes properties for incoming calls through the context.
Expand Down
20 changes: 16 additions & 4 deletions context_builder.go
Expand Up @@ -44,6 +44,9 @@ type ContextBuilder struct {
// RetryOptions are the retry options for this call.
RetryOptions *RetryOptions

// ConnectTimeout is the timeout for creating a TChannel connection.
ConnectTimeout time.Duration

// ParentContext to build the new context from. If empty, context.Background() is used.
// The new (child) context inherits a number of properties from the parent context:
// - the tracing Span, unless replaced via SetExternalSpan()
Expand Down Expand Up @@ -119,6 +122,14 @@ func (cb *ContextBuilder) SetRoutingDelegate(rd string) *ContextBuilder {
return cb
}

// SetConnectTimeout sets the ConnectionTimeout for this context.
// The context timeout applies to the whole call, while the connect
// timeout only applies to creating a new connection.
func (cb *ContextBuilder) SetConnectTimeout(d time.Duration) *ContextBuilder {
cb.ConnectTimeout = d
return cb
}

// DisableTracing disables tracing.
func (cb *ContextBuilder) DisableTracing() *ContextBuilder {
cb.TracingDisabled = true
Expand Down Expand Up @@ -216,10 +227,11 @@ func (cb *ContextBuilder) Build() (ContextWithHeaders, context.CancelFunc) {
}

params := &tchannelCtxParams{
options: cb.CallOptions,
span: span,
call: cb.incomingCall,
retryOptions: cb.RetryOptions,
options: cb.CallOptions,
span: span,
call: cb.incomingCall,
retryOptions: cb.RetryOptions,
connectTimeout: cb.ConnectTimeout,
}

parent := cb.ParentContext
Expand Down

0 comments on commit a76a82f

Please sign in to comment.