Skip to content

Commit

Permalink
Handle host:port mismatches between peer.Connect and remote peer
Browse files Browse the repository at this point in the history
Currently, if we conect to peer as "a:1" and it sends us an init req
header with host:port set to "b:2", we only add it under the peer "b:2".

After this change, the connection will be added under both peers so that
future calls to "a:1" will be able to use the same connection.

When this happens, we log the host:port mismatch under a debug log.
In future, we may want to emit metrics when this happens.

This information (outboundHostPort vs remoteHostPort) is also exposed
via introspection.
  • Loading branch information
prashantv committed Jul 20, 2016
1 parent a522438 commit dcac85e
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 5 deletions.
27 changes: 25 additions & 2 deletions channel.go
Expand Up @@ -512,7 +512,19 @@ func (ch *Channel) Connect(ctx context.Context, hostPort string) (*Connection, e
return nil, err
}

return c, err
// It's possible that the connection we just created responds with a host:port
// that is not what we tried to connect to. E.g., we may have connected to
// 127.0.0.1:1234, but the returned host:port may be 10.0.0.1:1234.
// In this case, the connection won't be added to 127.0.0.1:1234 peer
// and so future calls to that peer may end up creating new connections. To
// avoid this issue, and to avoid clients being aware of any TCP relays, we
// add the connection to the intended peer.
if hostPort != c.remotePeerInfo.HostPort {
c.log.Debugf("Outbound connection host:port mismatch, adding to peer %v", c.remotePeerInfo.HostPort)
ch.addConnectionToPeer(hostPort, c, outbound)
}

return c, nil
}

// exchangeUpdated updates the peer heap.
Expand Down Expand Up @@ -564,7 +576,11 @@ func (ch *Channel) connectionActive(c *Connection, direction connectionDirection
return
}

p := ch.rootPeers().GetOrAdd(c.remotePeerInfo.HostPort)
ch.addConnectionToPeer(c.remotePeerInfo.HostPort, c, direction)
}

func (ch *Channel) addConnectionToPeer(hostPort string, c *Connection, direction connectionDirection) {
p := ch.rootPeers().GetOrAdd(hostPort)
if err := p.addConnection(c, direction); err != nil {
c.log.WithFields(
LogField{"remoteHostPort", c.remotePeerInfo.HostPort},
Expand Down Expand Up @@ -612,6 +628,13 @@ func (ch *Channel) connectionCloseStateChange(c *Connection) {
peer.connectionCloseStateChange(c)
ch.updatePeer(peer)
}
if c.outboundHP != "" && c.outboundHP != c.remotePeerInfo.HostPort {
// Outbound connections may be in multiple peers.
if peer, ok := ch.rootPeers().Get(c.outboundHP); ok {
peer.connectionCloseStateChange(c)
ch.updatePeer(peer)
}
}

chState := ch.State()
if chState != ChannelStartClose && chState != ChannelInboundClosed {
Expand Down
12 changes: 9 additions & 3 deletions connection.go
Expand Up @@ -157,6 +157,11 @@ type Connection struct {
commonStatsTags map[string]string
relay *Relayer

// outboundHP is the host:port we used to create this outbound connection.
// It may not match remotePeerInfo.HostPort, in which case the connection is
// added to peers for both host:ports. For inbound connections, this is empty.
outboundHP string

// closeNetworkCalled is used to avoid errors from being logged
// when this side closes a connection.
closeNetworkCalled atomic.Int32
Expand Down Expand Up @@ -213,16 +218,16 @@ func (ch *Channel) newOutboundConnection(timeout time.Duration, hostPort string,
return nil, err
}

return ch.newConnection(conn, connectionWaitingToSendInitReq, events), nil
return ch.newConnection(conn, hostPort, connectionWaitingToSendInitReq, events), nil
}

// Creates a new Connection based on an incoming connection from a peer
func (ch *Channel) newInboundConnection(conn net.Conn, events connectionEvents) (*Connection, error) {
return ch.newConnection(conn, connectionWaitingToRecvInitReq, events), nil
return ch.newConnection(conn, "" /* outboundHP */, connectionWaitingToRecvInitReq, events), nil
}

// Creates a new connection in a given initial state
func (ch *Channel) newConnection(conn net.Conn, initialState connectionState, events connectionEvents) *Connection {
func (ch *Channel) newConnection(conn net.Conn, outboundHP string, initialState connectionState, events connectionEvents) *Connection {
opts := &ch.connectionOptions

checksumType := opts.ChecksumType
Expand Down Expand Up @@ -264,6 +269,7 @@ func (ch *Channel) newConnection(conn net.Conn, initialState connectionState, ev
sendCh: make(chan *Frame, sendBufferSize),
stopCh: make(chan struct{}),
localPeerInfo: peerInfo,
outboundHP: outboundHP,
checksumType: checksumType,
inbound: newMessageExchangeSet(log, messageExchangeSetInbound),
outbound: newMessageExchangeSet(log, messageExchangeSetOutbound),
Expand Down
2 changes: 2 additions & 0 deletions introspection.go
Expand Up @@ -125,6 +125,7 @@ type ConnectionRuntimeState struct {
ConnectionState string `json:"connectionState"`
LocalHostPort string `json:"localHostPort"`
RemoteHostPort string `json:"remoteHostPort"`
OutboundHostPort string `json:"outboundHostPort"`
IsEphemeral bool `json:"isEphemeral"`
InboundExchange ExchangeSetRuntimeState `json:"inboundExchange"`
OutboundExchange ExchangeSetRuntimeState `json:"outboundExchange"`
Expand Down Expand Up @@ -314,6 +315,7 @@ func (c *Connection) IntrospectState(opts *IntrospectionOptions) ConnectionRunti
ConnectionState: c.state.String(),
LocalHostPort: c.conn.LocalAddr().String(),
RemoteHostPort: c.conn.RemoteAddr().String(),
OutboundHostPort: c.outboundHP,
IsEphemeral: c.remotePeerInfo.IsEphemeral,
InboundExchange: c.inbound.IntrospectState(opts),
OutboundExchange: c.outbound.IntrospectState(opts),
Expand Down
21 changes: 21 additions & 0 deletions peer_test.go
Expand Up @@ -29,6 +29,7 @@ import (

. "github.com/uber/tchannel-go"

"github.com/uber/tchannel-go/benchmark"
"github.com/uber/tchannel-go/raw"
"github.com/uber/tchannel-go/testutils"

Expand Down Expand Up @@ -1022,6 +1023,26 @@ func TestPeerScoreOnNewConnection(t *testing.T) {
}
}

func TestConnectToPeerHostPortMismatch(t *testing.T) {
testutils.WithTestServer(t, nil, func(ts *testutils.TestServer) {
ctx, cancel := NewContext(time.Second)
defer cancel()

// Set up a relay which will have a different host:port than the
// real TChannel HostPort.
relay, err := benchmark.NewTCPRawRelay([]string{ts.HostPort()})
require.NoError(t, err, "Failed to set up TCP relay")
defer relay.Close()

s2 := ts.NewServer(nil)
for i := 0; i < 10; i++ {
require.NoError(t, s2.Ping(ctx, relay.HostPort()), "Ping failed")
}

assert.Equal(t, 1, s2.IntrospectNumConnections(), "Unexpected number of connections")
})
}

func BenchmarkAddPeers(b *testing.B) {
for i := 0; i < b.N; i++ {
ch := testutils.NewClient(b, nil)
Expand Down

0 comments on commit dcac85e

Please sign in to comment.