Skip to content

Commit

Permalink
close peer's connection to avoid fd leak
Browse files Browse the repository at this point in the history
Fixes #2967
  • Loading branch information
melekes committed Jan 18, 2019
1 parent bc88740 commit e45ade5
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 14 deletions.
8 changes: 8 additions & 0 deletions p2p/conn_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type ConnSet interface {
HasIP(net.IP) bool
Set(net.Conn, []net.IP)
Remove(net.Conn)
RemoveAddr(net.Addr)
}

type connSetItem struct {
Expand Down Expand Up @@ -62,6 +63,13 @@ func (cs *connSet) Remove(c net.Conn) {
delete(cs.conns, c.RemoteAddr().String())
}

func (cs *connSet) RemoveAddr(addr net.Addr) {
cs.Lock()
defer cs.Unlock()

delete(cs.conns, addr.String())
}

func (cs *connSet) Set(c net.Conn, ips []net.IP) {
cs.Lock()
defer cs.Unlock()
Expand Down
8 changes: 8 additions & 0 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ type Peer interface {

ID() ID // peer's cryptographic ID
RemoteIP() net.IP // remote IP of the connection
Addr() net.Addr // remote addr

IsOutbound() bool // did we dial the peer
IsPersistent() bool // do we redial this peer when we disconnect

CloseConn() error // close original connection

NodeInfo() NodeInfo // peer's info
Status() tmconn.ConnectionStatus
OriginalAddr() *NetAddress
Expand Down Expand Up @@ -296,6 +299,11 @@ func (p *peer) hasChannel(chID byte) bool {
return false
}

// CloseConn closes original connection. Used for cleaning up in cases where the peer had not been started at all.
func (p *peer) CloseConn() error {
return p.peerConn.conn.Close()
}

//---------------------------------------------------
// methods only used for testing
// TODO: can we remove these?
Expand Down
2 changes: 2 additions & 0 deletions p2p/peer_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func (mp *mockPeer) Get(s string) interface{} { return s }
func (mp *mockPeer) Set(string, interface{}) {}
func (mp *mockPeer) RemoteIP() net.IP { return mp.ip }
func (mp *mockPeer) OriginalAddr() *NetAddress { return nil }
func (mp *mockPeer) Addr() net.Addr { return nil }
func (mp *mockPeer) CloseConn() error { return nil }

// Returns a mock peer
func newMockPeer(ip net.IP) *mockPeer {
Expand Down
23 changes: 15 additions & 8 deletions p2p/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) {
sw.metrics.Peers.Add(float64(-1))
}
peer.Stop()
sw.transport.Cleanup(peer.Addr())
for _, reactor := range sw.reactors {
reactor.RemovePeer(peer, reason)
}
Expand Down Expand Up @@ -529,13 +530,18 @@ func (sw *Switch) acceptRoutine() {
"max", sw.config.MaxNumInboundPeers,
)

_ = p.Stop()
_ = p.CloseConn()
sw.transport.Cleanup(p.Addr())

continue
}

if err := sw.addPeer(p); err != nil {
_ = p.Stop()
_ = p.CloseConn()
sw.transport.Cleanup(p.Addr())
if p.IsRunning() {
_ = p.Stop()
}
sw.Logger.Info(
"Ignoring inbound connection: error while adding peer",
"err", err,
Expand Down Expand Up @@ -593,7 +599,9 @@ func (sw *Switch) addOutboundPeerWithConfig(
}

if err := sw.addPeer(p); err != nil {
_ = p.Stop()
if p.IsRunning() {
_ = p.Stop()
}
return err
}

Expand Down Expand Up @@ -628,7 +636,8 @@ func (sw *Switch) filterPeer(p Peer) error {
return nil
}

// addPeer starts up the Peer and adds it to the Switch.
// addPeer starts up the Peer and adds it to the Switch. Error is returned if
// the peer is filtered out or failed to start or can't be added.
func (sw *Switch) addPeer(p Peer) error {
if err := sw.filterPeer(p); err != nil {
return err
Expand All @@ -637,10 +646,8 @@ func (sw *Switch) addPeer(p Peer) error {
p.SetLogger(sw.Logger.With("peer", p.NodeInfo().NetAddress()))

// All good. Start peer
if sw.IsRunning() {
if err := sw.startInitPeer(p); err != nil {
return err
}
if err := sw.startInitPeer(p); err != nil {
return err
}

// Add the peer to .peers.
Expand Down
13 changes: 7 additions & 6 deletions p2p/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type Transport interface {

// Dial connects to the Peer for the address.
Dial(NetAddress, peerConfig) (Peer, error)

Cleanup(addr net.Addr)
}

// transportLifecycle bundles the methods for callers to control start and stop
Expand Down Expand Up @@ -274,6 +276,11 @@ func (mt *MultiplexTransport) acceptPeers() {
}
}

// Cleanup removes the given address from the connections set.
func (mt *MultiplexTransport) Cleanup(addr net.Addr) {
mt.conns.RemoveAddr(addr)
}

func (mt *MultiplexTransport) cleanup(c net.Conn) error {
mt.conns.Remove(c)

Expand Down Expand Up @@ -418,12 +425,6 @@ func (mt *MultiplexTransport) wrapPeer(
PeerMetrics(cfg.metrics),
)

// Wait for Peer to Stop so we can cleanup.
go func(c net.Conn) {
<-p.Quit()
_ = mt.cleanup(c)
}(c)

return p
}

Expand Down

0 comments on commit e45ade5

Please sign in to comment.