Skip to content

Commit

Permalink
Publicize channel root peers
Browse files Browse the repository at this point in the history
  • Loading branch information
kriskowal committed Jan 24, 2017
1 parent 8d9eb1e commit 373af1b
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 17 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
Changelog
=========

# (unreleased)
# v1.3.0 (unreleased)

* Exposes the channel's RootPeerList with `channel.RootPeers()`.
* Add OnPeerStatusChanged option to NewChannel to receive notifications when
TChannel drops a connection and potentially other state change notifications.
Accepts a function and sends the affected peer. The event handler function
Expand All @@ -16,7 +17,7 @@ Changelog
but none was found (e.g., exception is from the future). (#566)
* Fix ListenIP selecting docker interfaces over physical networks. (#565)
* Fix for error when a Thrift payload has completed decoding and attempts
to close the argument reader without waiting till EOF. (#564)
to close the argument reader without waiting until EOF. (#564)
* thrift-gen: Fix "namespace go" being ignored even though the Apache thrift
generated code was respecting it. (#559)

Expand Down
16 changes: 8 additions & 8 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,18 +370,18 @@ func (ch *Channel) Peers() *PeerList {
return ch.peers
}

// rootPeers returns the root PeerList for the channel, which is the sole place
// RootPeers returns the root PeerList for the channel, which is the sole place
// new Peers are created. All children of the root list (including ch.Peers())
// automatically re-use peers from the root list and create new peers in the
// root list.
func (ch *Channel) rootPeers() *RootPeerList {
func (ch *Channel) RootPeers() *RootPeerList {
return ch.peers.parent
}

// BeginCall starts a new call to a remote peer, returning an OutboundCall that can
// be used to write the arguments of the call.
func (ch *Channel) BeginCall(ctx context.Context, hostPort, serviceName, methodName string, callOptions *CallOptions) (*OutboundCall, error) {
p := ch.rootPeers().GetOrAdd(hostPort)
p := ch.RootPeers().GetOrAdd(hostPort)
return p.BeginCall(ctx, serviceName, methodName, callOptions)
}

Expand Down Expand Up @@ -438,7 +438,7 @@ func (ch *Channel) serve() {

// Ping sends a ping message to the given hostPort and waits for a response.
func (ch *Channel) Ping(ctx context.Context, hostPort string) error {
peer := ch.rootPeers().GetOrAdd(hostPort)
peer := ch.RootPeers().GetOrAdd(hostPort)
conn, err := peer.GetConnection(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -531,7 +531,7 @@ func (ch *Channel) exchangeUpdated(c *Connection) {
return
}

p, ok := ch.rootPeers().Get(c.remotePeerInfo.HostPort)
p, ok := ch.RootPeers().Get(c.remotePeerInfo.HostPort)
if !ok {
return
}
Expand Down Expand Up @@ -582,7 +582,7 @@ func (ch *Channel) connectionActive(c *Connection, direction connectionDirection
}

func (ch *Channel) addConnectionToPeer(hostPort string, c *Connection, direction connectionDirection) {
p := ch.rootPeers().GetOrAdd(hostPort)
p := ch.RootPeers().GetOrAdd(hostPort)
if err := p.addConnection(c, direction); err != nil {
c.log.WithFields(
LogField{"remoteHostPort", c.remotePeerInfo.HostPort},
Expand Down Expand Up @@ -627,7 +627,7 @@ func (ch *Channel) getMinConnectionState() connectionState {
// connectionCloseStateChange is called when a connection's close state changes.
func (ch *Channel) connectionCloseStateChange(c *Connection) {
ch.removeClosedConn(c)
if peer, ok := ch.rootPeers().Get(c.remotePeerInfo.HostPort); ok {
if peer, ok := ch.RootPeers().Get(c.remotePeerInfo.HostPort); ok {
if ch.onPeerStatusChanged != nil {
ch.onPeerStatusChanged(peer)
}
Expand All @@ -636,7 +636,7 @@ func (ch *Channel) connectionCloseStateChange(c *Connection) {
}
if c.outboundHP != "" && c.outboundHP != c.remotePeerInfo.HostPort {
// Outbound connections may be in multiple peers.
if peer, ok := ch.rootPeers().Get(c.outboundHP); ok {
if peer, ok := ch.RootPeers().Get(c.outboundHP); ok {
peer.connectionCloseStateChange(c)
ch.updatePeer(peer)
}
Expand Down
2 changes: 1 addition & 1 deletion introspection.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (ch *Channel) IntrospectState(opts *IntrospectionOptions) *RuntimeState {
CreatedStack: ch.createdStack,
LocalPeer: ch.PeerInfo(),
SubChannels: ch.subChannels.IntrospectState(opts),
RootPeers: ch.rootPeers().IntrospectState(opts),
RootPeers: ch.RootPeers().IntrospectState(opts),
Peers: ch.Peers().IntrospectList(opts),
NumConnections: numConns,
Connections: connIDs,
Expand Down
2 changes: 1 addition & 1 deletion relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func NewRelayer(ch *Channel, conn *Connection) *Relayer {
localHandler: ch.relayLocal,
outbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "outbound"})),
inbound: newRelayItems(ch.Logger().WithFields(LogField{"relay", "inbound"})),
peers: ch.rootPeers(),
peers: ch.RootPeers(),
conn: conn,
logger: conn.log,
}
Expand Down
5 changes: 0 additions & 5 deletions utils_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@ import (
// MexChannelBufferSize is the size of the message exchange channel buffer.
const MexChannelBufferSize = mexChannelBufferSize

// RootPeers returns the root peer list from the Channel.
func (ch *Channel) RootPeers() *RootPeerList {
return ch.rootPeers()
}

// SetOnUpdate sets onUpdate for a peer, which is called when the peer's score is
// updated in all peer lists.
func (p *Peer) SetOnUpdate(f func(*Peer)) {
Expand Down

0 comments on commit 373af1b

Please sign in to comment.