Skip to content

Commit

Permalink
relay: Ensure idle sweeper also considers relay calls (#714)
Browse files Browse the repository at this point in the history
The previous fix for #701 avoids the idle sweeper from closing
connections with pending calls. However, it didn't take into account
relay calls (which do not set up exchanges). Ensure we take any pending
relay calls into account as well.

Fixes #701 for pending calls on relays as well as clients/servers.
  • Loading branch information
prashantv committed Aug 9, 2018
1 parent fe44733 commit 065e069
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 14 deletions.
11 changes: 9 additions & 2 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,8 +767,15 @@ func (c *Connection) closeSendCh(connID uint32) {
}

// hasExchanges returns whether there's any pending inbound or outbound calls on this connection.
func (c *Connection) hasExchanges() bool {
return c.inbound.count() > 0 || c.outbound.count() > 0
func (c *Connection) hasPendingCalls() bool {
if c.inbound.count() > 0 || c.outbound.count() > 0 {
return true
}
if !c.relay.canClose() {
return true
}

return false
}

// checkExchanges is called whenever an exchange is removed, and when Close is called.
Expand Down
2 changes: 1 addition & 1 deletion idle_sweep.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (is *idleSweep) checkIdleConnections() {

// We shouldn't get to a state where we have pending calls, but the connection
// is idle. This either means the max-idle time is too low, or there's a stuck call.
if conn.hasExchanges() {
if conn.hasPendingCalls() {
conn.log.Error("Skip closing idle Connection as it has pending calls.")
continue
}
Expand Down
50 changes: 39 additions & 11 deletions idle_sweep_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,13 @@ func TestIdleSweepIgnoresConnectionsWithCalls(t *testing.T) {
listener := newPeerStatusListener()
// TODO: Log filtering doesn't require the message to be seen.
serverOpts := testutils.NewOpts().
AddLogFilter("Skip closing idle Connection as it has pending calls.", 1).
AddLogFilter("Skip closing idle Connection as it has pending calls.", 2).
SetOnPeerStatusChanged(listener.onStatusChange).
SetTimeNow(clock.Now).
SetTimeTicker(clientTicker.New).
SetRelayMaxTimeout(time.Hour).
SetMaxIdleTime(3 * time.Minute).
SetIdleCheckInterval(30 * time.Second).
NoRelay()
SetIdleCheckInterval(30 * time.Second)

testutils.WithTestServer(t, serverOpts, func(ts *testutils.TestServer) {
var (
Expand All @@ -326,21 +326,49 @@ func TestIdleSweepIgnoresConnectionsWithCalls(t *testing.T) {
}()
<-gotCall

// Ensure we have 2 connections on the server side.
assert.Equal(t, 2, ts.Server().IntrospectNumConnections(), "Expect connection to client 1 and client 2")
// If we are in no-relay mode, we expect 2 connections to the server (from each client).
// If we are in relay mode, the relay will have the 2 connections from clients + 1 connection to the server.
check := struct {
ch *Channel
preCloseConns int
tick func()
}{
ch: ts.Server(),
preCloseConns: 2,
tick: func() {
clock.Elapse(5 * time.Minute)
clientTicker.Tick()
},
}

// Let the idle checker to close client 1's connection.
clock.Elapse(5 * time.Minute)
clientTicker.Tick()
if relay := ts.Relay(); relay != nil {
check.ch = relay
check.preCloseConns++
oldTick := check.tick
check.tick = func() {
oldTick()

// The same ticker is being used by the server and the relay
// so we need to tick it twice.
clientTicker.Tick()
}
}

assert.Equal(t, check.preCloseConns, check.ch.IntrospectNumConnections(), "Expect connection to client 1 and client 2")

// Let the idle checker close client 1's connection.
check.tick()
listener.waitForZeroConnections(t, c1)

assert.Equal(t, 1, ts.Server().IntrospectNumConnections(), "Expect connection only to client 2")
// Make sure we have only a connection for client 2, which is active.
assert.Equal(t, check.preCloseConns-1, check.ch.IntrospectNumConnections(), "Expect connection only to client 2")
state := check.ch.IntrospectState(nil)
require.Empty(t, state.InactiveConnections, "Ensure all connections are active")

// Unblock the call.
close(block)
<-c2CallComplete
clock.Elapse(5 * time.Minute)
clientTicker.Tick()
check.tick()
listener.waitForZeroConnections(t, ts.Server(), c2)
})
}

0 comments on commit 065e069

Please sign in to comment.