Skip to content

Commit

Permalink
fix deadlock in sending connectedness events
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed May 2, 2024
1 parent 57c688e commit 2681035
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 48 deletions.
125 changes: 83 additions & 42 deletions p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ type Swarm struct {
// down before continuing.
refs sync.WaitGroup

emitter event.Emitter
emitter event.Emitter
connectednessEventCh chan struct{}
connectednessEmitterDone chan struct{}

rcmgr network.ResourceManager

Expand All @@ -156,8 +158,9 @@ type Swarm struct {

conns struct {
sync.RWMutex
m map[peer.ID][]*Conn
connectedness map[peer.ID]network.Connectedness
m map[peer.ID][]*Conn
connectednessEventQueue map[peer.ID][]network.Connectedness
lastConnectednessEvent map[peer.ID]network.Connectedness
}

listeners struct {
Expand Down Expand Up @@ -217,15 +220,17 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts
}
ctx, cancel := context.WithCancel(context.Background())
s := &Swarm{
local: local,
peers: peers,
emitter: emitter,
ctx: ctx,
ctxCancel: cancel,
dialTimeout: defaultDialTimeout,
dialTimeoutLocal: defaultDialTimeoutLocal,
maResolver: madns.DefaultResolver,
dialRanker: DefaultDialRanker,
local: local,
peers: peers,
emitter: emitter,
connectednessEventCh: make(chan struct{}, 1),
connectednessEmitterDone: make(chan struct{}),
ctx: ctx,
ctxCancel: cancel,
dialTimeout: defaultDialTimeout,
dialTimeoutLocal: defaultDialTimeoutLocal,
maResolver: madns.DefaultResolver,
dialRanker: DefaultDialRanker,

// A black hole is a binary property. On a network if UDP dials are blocked or there is
// no IPv6 connectivity, all dials will fail. So a low success rate of 5 out 100 dials
Expand All @@ -235,7 +240,8 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts
}

s.conns.m = make(map[peer.ID][]*Conn)
s.conns.connectedness = make(map[peer.ID]network.Connectedness)
s.conns.connectednessEventQueue = make(map[peer.ID][]network.Connectedness)
s.conns.lastConnectednessEvent = make(map[peer.ID]network.Connectedness)
s.listeners.m = make(map[transport.Listener]struct{})
s.transports.m = make(map[int]transport.Transport)
s.notifs.m = make(map[network.Notifiee]struct{})
Expand All @@ -256,7 +262,7 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts
s.backf.init(s.ctx)

s.bhd = newBlackHoleDetector(s.udpBlackHoleConfig, s.ipv6BlackHoleConfig, s.metricsTracer)

go s.connectednessEventEmitter()
return s, nil
}

Expand Down Expand Up @@ -302,12 +308,15 @@ func (s *Swarm) close() {

// Wait for everything to finish.
s.refs.Wait()
close(s.connectednessEventCh)
<-s.connectednessEmitterDone
s.emitter.Close()

// Remove the connectedness map only after we have closed the connection and sent all the disconnection
// events
s.conns.Lock()
s.conns.connectedness = nil
s.conns.connectednessEventQueue = nil
s.conns.lastConnectednessEvent = nil
s.conns.Unlock()

// Now close out any transports (if necessary). Do this after closing
Expand Down Expand Up @@ -391,12 +400,9 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
return nil, ErrSwarmClosed
}

oldState := s.conns.connectedness[p]

c.streams.m = make(map[*Stream]struct{})
s.conns.m[p] = append(s.conns.m[p], c)
newState := s.connectednessUnlocked(p)
s.conns.connectedness[p] = newState
s.maybeEnqueueConnectednessUnlocked(p)

// Add two swarm refs:
// * One will be decremented after the close notifications fire in Conn.doClose
Expand All @@ -422,15 +428,6 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
s.directConnNotifs.Unlock()
}

// Emit event after releasing `s.conns` lock so that a consumer can still
// use swarm methods that need the `s.conns` lock.
if oldState != newState {
s.emitter.Emit(event.EvtPeerConnectednessChanged{
Peer: p,
Connectedness: newState,
})
}

s.notifyAll(func(f network.Notifiee) {
f.Connected(s, c)
})
Expand Down Expand Up @@ -777,8 +774,6 @@ func (s *Swarm) removeConn(c *Conn) {
p := c.RemotePeer()

s.conns.Lock()

oldState := s.conns.connectedness[p]
cs := s.conns.m[p]
if len(cs) == 1 {
delete(s.conns.m, p)
Expand All @@ -795,24 +790,70 @@ func (s *Swarm) removeConn(c *Conn) {
}
}
}
s.maybeEnqueueConnectednessUnlocked(p)
s.conns.Unlock()
}

func (s *Swarm) lastConnectednessEventUnlocked(p peer.ID) network.Connectedness {
events := s.conns.connectednessEventQueue[p]
if len(events) > 0 {
return events[len(events)-1]
}
return s.conns.lastConnectednessEvent[p]
}

func (s *Swarm) maybeEnqueueConnectednessUnlocked(p peer.ID) {
oldState := s.lastConnectednessEventUnlocked(p)
newState := s.connectednessUnlocked(p)
if s.conns.connectedness != nil { // This shoud always be non nil but a check doesn't hurt
if newState == network.NotConnected {
delete(s.conns.connectedness, p)
if oldState != newState {
if s.conns.connectednessEventQueue != nil {
s.conns.connectednessEventQueue[p] = append(s.conns.connectednessEventQueue[p], newState)
select {
case s.connectednessEventCh <- struct{}{}:
default:
}
} else {
s.conns.connectedness[p] = newState
log.Errorf("SWARM BUG: nil connectedness map")
}
}
s.conns.Unlock()
}

if oldState != newState {
fmt.Println("going to emit event", newState)
s.emitter.Emit(event.EvtPeerConnectednessChanged{
Peer: p,
Connectedness: newState,
})
fmt.Println("emitted event", newState)
func (s *Swarm) connectednessEventEmitter() {
defer close(s.connectednessEmitterDone)
for range s.connectednessEventCh {
for {
var c network.Connectedness
var peer peer.ID
s.conns.Lock()
for p, v := range s.conns.connectednessEventQueue {
if len(v) == 0 {
// this shouldn't happen
delete(s.conns.connectednessEventQueue, p)
log.Errorf("SWARM BUG: empty connectedness event slice %v %v", p, v)
continue
}
c = v[0]
peer = p
s.conns.connectednessEventQueue[p] = v[1:]
if len(s.conns.connectednessEventQueue[p]) == 0 {
delete(s.conns.connectednessEventQueue, p)
}
if c == network.NotConnected {
delete(s.conns.lastConnectednessEvent, p)
} else {
s.conns.lastConnectednessEvent[p] = c
}
break
}
s.conns.Unlock()
if peer == "" {
break
}
s.emitter.Emit(event.EvtPeerConnectednessChanged{
Peer: peer,
Connectedness: c,
})
}
}
}

Expand Down
10 changes: 4 additions & 6 deletions p2p/net/swarm/swarm_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package swarm_test

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -68,6 +67,10 @@ func TestConnectednessEventsSingleConn(t *testing.T) {
}

func TestNoDeadlockWhenConsumingConnectednessEvents(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()

dialerEventBus := eventbus.NewBus()
dialer := swarmt.GenSwarm(t, swarmt.OptDialOnly, swarmt.EventBus(dialerEventBus))
defer dialer.Close()
Expand All @@ -87,10 +90,6 @@ func TestNoDeadlockWhenConsumingConnectednessEvents(t *testing.T) {
sub, err := dialerEventBus.Subscribe(new(event.EvtPeerConnectednessChanged))
require.NoError(t, err)

ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// A slow consumer
go func() {
for {
Expand Down Expand Up @@ -229,7 +228,6 @@ func TestConnectednessEventDeadlock(t *testing.T) {
continue
}
count++
fmt.Println(count)
s1.ClosePeer(evt.Peer)
}
}()
Expand Down

0 comments on commit 2681035

Please sign in to comment.