Skip to content

Commit

Permalink
WIP fix
Browse files Browse the repository at this point in the history
  • Loading branch information
prashantv committed Apr 1, 2016
1 parent d3110ad commit 66a0e08
Showing 1 changed file with 26 additions and 4 deletions.
30 changes: 26 additions & 4 deletions channel.go
Expand Up @@ -394,6 +394,12 @@ func (ch *Channel) serve() {
}
}

// Check whether the channel is still active.
if ch.State() != ChannelListening {
netConn.Close()
continue
}

acceptBackoff = 0

// Register the connection in the peer once the channel is set up.
Expand Down Expand Up @@ -524,8 +530,28 @@ func (ch *Channel) updatePeer(p *Peer) {
p.callOnUpdateComplete()
}

// addIncomingConnection returns whether the connection could be added to the channel.
// Incoming connections can only be added while a channel is listening.
func (ch *Channel) addIncomingConnection(c *Connection) bool {
ch.mutable.Lock()
defer ch.mutable.Unlock()

if ch.mutable.state != ChannelListening {
return false
}

ch.mutable.conns[c.connID] = c
return true
}

// incomingConnectionActive adds a new active connection to our peer list.
func (ch *Channel) incomingConnectionActive(c *Connection) {
c.log.Debugf("New incoming connection active: %v", c.remotePeerInfo.HostPort)
if !ch.addIncomingConnection(c) {
c.Close()
return
}

c.log.Debugf("Add connection as an active peer for %v", c.remotePeerInfo.HostPort)
// TODO: Alter TChannel spec to allow optionally include the service name
// when initializing a connection. As-is, we have to keep these peers in
Expand All @@ -534,10 +560,6 @@ func (ch *Channel) incomingConnectionActive(c *Connection) {
p := ch.rootPeers().GetOrAdd(c.remotePeerInfo.HostPort)
p.AddInboundConnection(c)
ch.updatePeer(p)

ch.mutable.Lock()
ch.mutable.conns[c.connID] = c
ch.mutable.Unlock()
}

// removeClosedConn removes a connection if it's closed.
Expand Down

0 comments on commit 66a0e08

Please sign in to comment.