Skip to content

Commit

Permalink
Add OnPeerStatusChanged option to TChannel options
Browse files Browse the repository at this point in the history
  • Loading branch information
kriskowal committed Jan 23, 2017
1 parent 273dc8b commit 8d9eb1e
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 18 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
@@ -1,6 +1,13 @@
Changelog
=========

# (unreleased)

* Add OnPeerStatusChanged option to NewChannel to receive notifications when
TChannel drops a connection and potentially other state change notifications.
Accepts a function and sends the affected peer. The event handler function
must then call methods of the peer to inspect the new state.

# v1.2.3

* Improve error messages when an argument reader is closed without
Expand Down
24 changes: 17 additions & 7 deletions channel.go
Expand Up @@ -62,6 +62,10 @@ type ChannelOptions struct {
// unstable API - breaking changes are likely.
RelayHost RelayHost

// OnPeerStatusChanged receives notifications when a peer has become
// available or unavailable.
OnPeerStatusChanged func(*Peer)

// The list of service names that should be handled locally by this channel.
// This is an unstable API - breaking changes are likely.
RelayLocalHandlers []string
Expand Down Expand Up @@ -114,13 +118,14 @@ const (
type Channel struct {
channelConnectionCommon

chID uint32
createdStack string
commonStatsTags map[string]string
connectionOptions ConnectionOptions
peers *PeerList
relayHost RelayHost
relayMaxTimeout time.Duration
chID uint32
createdStack string
commonStatsTags map[string]string
connectionOptions ConnectionOptions
peers *PeerList
relayHost RelayHost
relayMaxTimeout time.Duration
onPeerStatusChanged func(*Peer)

// mutable contains all the members of Channel which are mutable.
mutable struct {
Expand Down Expand Up @@ -210,6 +215,8 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {
relayHost: opts.RelayHost,
relayMaxTimeout: validateRelayMaxTimeout(opts.RelayMaxTimeout, logger),
}

ch.onPeerStatusChanged = opts.OnPeerStatusChanged
ch.peers = newRootPeerList(ch).newChild()

ch.mutable.peerInfo = LocalPeerInfo{
Expand Down Expand Up @@ -621,6 +628,9 @@ func (ch *Channel) getMinConnectionState() connectionState {
func (ch *Channel) connectionCloseStateChange(c *Connection) {
ch.removeClosedConn(c)
if peer, ok := ch.rootPeers().Get(c.remotePeerInfo.HostPort); ok {
if ch.onPeerStatusChanged != nil {
ch.onPeerStatusChanged(peer)
}
peer.connectionCloseStateChange(c)
ch.updatePeer(peer)
}
Expand Down
43 changes: 32 additions & 11 deletions peer_test.go
Expand Up @@ -165,8 +165,12 @@ func TestPeerRemoveClosedConnection(t *testing.T) {
ctx, cancel := NewContext(time.Second)
defer cancel()

WithVerifiedServer(t, nil, func(ch *Channel, hostPort string) {
client := testutils.NewClient(t, nil)
opts := &testutils.ChannelOpts{}
serverPeerClosed := withPeerStatusChangedChannel(opts)
WithVerifiedServer(t, opts, func(ch *Channel, hostPort string) {
opts := &testutils.ChannelOpts{}
clientPeerClosed := withPeerStatusChangedChannel(opts)
client := testutils.NewClient(t, opts)
defer client.Close()

p := client.Peers().Add(hostPort)
Expand All @@ -183,20 +187,26 @@ func TestPeerRemoveClosedConnection(t *testing.T) {
_, outConns := p.NumConnections()
assert.Equal(t, 1, outConns, "Expected 1 remaining outgoing connection")

select {
case <-serverPeerClosed:
case <-time.After(time.Second):
t.Errorf("Timeout out waiting for server peer closed notification")
}

c, err := p.GetConnection(ctx)
require.NoError(t, err, "GetConnection failed")
assert.Equal(t, c2, c, "Expected second active connection")
})
}

func TestPeerConnectCancelled(t *testing.T) {
WithVerifiedServer(t, nil, func(ch *Channel, hostPort string) {
ctx, cancel := NewContext(109 * time.Millisecond)
cancel()
require.NoError(t, c2.Close(), "Failed to close second connection")
_, outConns = p.NumConnections()
assert.Equal(t, 0, outConns, "Expected 0 remaining outgoing connections")

_, err := ch.Connect(ctx, "10.255.255.1:1")
require.Error(t, err, "Connect should fail")
assert.EqualError(t, ErrRequestCancelled, err.Error(), "Unknown error")
select {
case peer := <-clientPeerClosed:
assert.Equal(t, peer, hostPort, "Expected client closed notification")
case <-time.After(time.Second):
t.Errorf("Timed out waiting for client peer closed notification")
}
})
}

Expand Down Expand Up @@ -242,6 +252,17 @@ func TestPeerGetConnectionWithNoActiveConnections(t *testing.T) {
})
}

func withPeerStatusChangedChannel(opts *testutils.ChannelOpts) <-chan string {
changed := make(chan string, 1)
opts.ChannelOptions.OnPeerStatusChanged = func(p *Peer) {
select {
case changed <- p.HostPort():
default:
}
}
return changed
}

func TestInboundEphemeralPeerRemoved(t *testing.T) {
ctx, cancel := NewContext(time.Second)
defer cancel()
Expand Down

0 comments on commit 8d9eb1e

Please sign in to comment.