Skip to content

Commit

Permalink
Handle parallel incoming connection attempts
Browse files Browse the repository at this point in the history
One major change as of #591 is to do the handshake synchronously instead
of using a Connection object and the associated message exchanges. This
is fine during the outbound path since the caller is waiting for an
active connection. However on the incoming path, we don't want a single
slow connection to affect other incoming connection attempts.

Instead of performing the incoming connection handshake on the accept
goroutine, start a new goroutine to perform the exchange.
  • Loading branch information
prashantv committed Feb 27, 2017
1 parent 631664f commit 5951b0e
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 12 deletions.
26 changes: 14 additions & 12 deletions channel.go
Expand Up @@ -445,18 +445,20 @@ func (ch *Channel) serve() {

acceptBackoff = 0

// Register the connection in the peer once the channel is set up.
events := connectionEvents{
OnActive: ch.inboundConnectionActive,
OnCloseStateChange: ch.connectionCloseStateChange,
OnExchangeUpdated: ch.exchangeUpdated,
}
if _, err := ch.inboundHandshake(context.Background(), netConn, events); err != nil {
// Server is getting overloaded - begin rejecting new connections
ch.log.WithFields(ErrField(err)).Error("Couldn't create new TChannelConnection for incoming conn.")
netConn.Close()
continue
}
// Perform the connection handshake in a background goroutine.
go func() {
// Register the connection in the peer once the channel is set up.
events := connectionEvents{
OnActive: ch.inboundConnectionActive,
OnCloseStateChange: ch.connectionCloseStateChange,
OnExchangeUpdated: ch.exchangeUpdated,
}
if _, err := ch.inboundHandshake(context.Background(), netConn, events); err != nil {
// Server is getting overloaded - begin rejecting new connections
ch.log.WithFields(ErrField(err)).Error("Couldn't create new TChannelConnection for incoming conn.")
netConn.Close()
}
}()
}
}

Expand Down
19 changes: 19 additions & 0 deletions connection_test.go
Expand Up @@ -24,6 +24,7 @@ import (
"errors"
"fmt"
"io"
"net"
"runtime"
"strings"
"sync"
Expand Down Expand Up @@ -774,3 +775,21 @@ func TestConnectTimeout(t *testing.T) {
close(testComplete)
})
}

func TestParallelConnectionAccepts(t *testing.T) {
opts := testutils.NewOpts().AddLogFilter("Couldn't create new TChannelConnection", 1)
testutils.WithTestServer(t, opts, func(ts *testutils.TestServer) {
testutils.RegisterEcho(ts.Server(), nil)

// Start a connection attempt that should timeout.
conn, err := net.Dial("tcp", ts.HostPort())
defer conn.Close()
require.NoError(t, err, "Dial failed")

// When we try to make a call using a new client, it will require a
// new connection, and this verifies that the previous connection attempt
// and handshake do not impact the call.
client := ts.NewClient(nil)
testutils.AssertEcho(t, client, ts.HostPort(), ts.ServiceName())
})
}

0 comments on commit 5951b0e

Please sign in to comment.