diff --git a/CHANGELOG.md b/CHANGELOG.md index b3a41d83f..ad4135076 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,8 @@ The following emojis are used to highlight certain changes: ### Fixed +- Address a Bitswap findpeers / connect race condition that can prevent peer communication ([#432](https://github.com/ipfs/boxo/issues/432)). + ### Security ## [v0.11.0] diff --git a/bitswap/network/connecteventmanager.go b/bitswap/network/connecteventmanager.go deleted file mode 100644 index 88337fce3..000000000 --- a/bitswap/network/connecteventmanager.go +++ /dev/null @@ -1,218 +0,0 @@ -package network - -import ( - "sync" - - "github.com/libp2p/go-libp2p/core/peer" -) - -type ConnectionListener interface { - PeerConnected(peer.ID) - PeerDisconnected(peer.ID) -} - -type state byte - -const ( - stateDisconnected = iota - stateResponsive - stateUnresponsive -) - -type connectEventManager struct { - connListeners []ConnectionListener - lk sync.RWMutex - cond sync.Cond - peers map[peer.ID]*peerState - - changeQueue []peer.ID - stop bool - done chan struct{} -} - -type peerState struct { - newState, curState state - pending bool -} - -func newConnectEventManager(connListeners ...ConnectionListener) *connectEventManager { - evtManager := &connectEventManager{ - connListeners: connListeners, - peers: make(map[peer.ID]*peerState), - done: make(chan struct{}), - } - evtManager.cond = sync.Cond{L: &evtManager.lk} - return evtManager -} - -func (c *connectEventManager) Start() { - go c.worker() -} - -func (c *connectEventManager) Stop() { - c.lk.Lock() - c.stop = true - c.lk.Unlock() - c.cond.Broadcast() - - <-c.done -} - -func (c *connectEventManager) getState(p peer.ID) state { - if state, ok := c.peers[p]; ok { - return state.newState - } else { - return stateDisconnected - } -} - -func (c *connectEventManager) setState(p peer.ID, newState state) { - state, ok := c.peers[p] - if !ok { - state = new(peerState) - c.peers[p] = state - } - state.newState = newState - if !state.pending && state.newState != state.curState { - state.pending = true - c.changeQueue = append(c.changeQueue, p) - c.cond.Broadcast() - } -} - -// Waits for a change to be enqueued, or for the event manager to be stopped. Returns false if the -// connect event manager has been stopped. -func (c *connectEventManager) waitChange() bool { - for !c.stop && len(c.changeQueue) == 0 { - c.cond.Wait() - } - return !c.stop -} - -func (c *connectEventManager) worker() { - c.lk.Lock() - defer c.lk.Unlock() - defer close(c.done) - - for c.waitChange() { - pid := c.changeQueue[0] - c.changeQueue[0] = peer.ID("") // free the peer ID (slicing won't do that) - c.changeQueue = c.changeQueue[1:] - - state, ok := c.peers[pid] - // If we've disconnected and forgotten, continue. - if !ok { - // This shouldn't be possible because _this_ thread is responsible for - // removing peers from this map, and we shouldn't get duplicate entries in - // the change queue. - log.Error("a change was enqueued for a peer we're not tracking") - continue - } - - // Record the fact that this "state" is no longer in the queue. - state.pending = false - - // Then, if there's nothing to do, continue. - if state.curState == state.newState { - continue - } - - // Or record the state update, then apply it. - oldState := state.curState - state.curState = state.newState - - switch state.newState { - case stateDisconnected: - delete(c.peers, pid) - fallthrough - case stateUnresponsive: - // Only trigger a disconnect event if the peer was responsive. - // We could be transitioning from unresponsive to disconnected. - if oldState == stateResponsive { - c.lk.Unlock() - for _, v := range c.connListeners { - v.PeerDisconnected(pid) - } - c.lk.Lock() - } - case stateResponsive: - c.lk.Unlock() - for _, v := range c.connListeners { - v.PeerConnected(pid) - } - c.lk.Lock() - } - } -} - -// Called whenever we receive a new connection. May be called many times. -func (c *connectEventManager) Connected(p peer.ID) { - c.lk.Lock() - defer c.lk.Unlock() - - // !responsive -> responsive - - if c.getState(p) == stateResponsive { - return - } - c.setState(p, stateResponsive) -} - -// Called when we drop the final connection to a peer. -func (c *connectEventManager) Disconnected(p peer.ID) { - c.lk.Lock() - defer c.lk.Unlock() - - // !disconnected -> disconnected - - if c.getState(p) == stateDisconnected { - return - } - - c.setState(p, stateDisconnected) -} - -// Called whenever a peer is unresponsive. -func (c *connectEventManager) MarkUnresponsive(p peer.ID) { - c.lk.Lock() - defer c.lk.Unlock() - - // responsive -> unresponsive - - if c.getState(p) != stateResponsive { - return - } - - c.setState(p, stateUnresponsive) -} - -// Called whenever we receive a message from a peer. -// -// - When we're connected to the peer, this will mark the peer as responsive (from unresponsive). -// - When not connected, we ignore this call. Unfortunately, a peer may disconnect before we process -// -// the "on message" event, so we can't treat this as evidence of a connection. -func (c *connectEventManager) OnMessage(p peer.ID) { - c.lk.RLock() - unresponsive := c.getState(p) == stateUnresponsive - c.lk.RUnlock() - - // Only continue if both connected, and unresponsive. - if !unresponsive { - return - } - - // unresponsive -> responsive - - // We need to make a modification so now take a write lock - c.lk.Lock() - defer c.lk.Unlock() - - // Note: state may have changed in the time between when read lock - // was released and write lock taken, so check again - if c.getState(p) != stateUnresponsive { - return - } - - c.setState(p, stateResponsive) -} diff --git a/bitswap/network/connecteventmanager_test.go b/bitswap/network/connecteventmanager_test.go deleted file mode 100644 index e3904ee55..000000000 --- a/bitswap/network/connecteventmanager_test.go +++ /dev/null @@ -1,175 +0,0 @@ -package network - -import ( - "sync" - "testing" - "time" - - "github.com/ipfs/boxo/bitswap/internal/testutil" - "github.com/ipfs/boxo/internal/test" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/stretchr/testify/require" -) - -type mockConnEvent struct { - connected bool - peer peer.ID -} - -type mockConnListener struct { - sync.Mutex - events []mockConnEvent -} - -func newMockConnListener() *mockConnListener { - return new(mockConnListener) -} - -func (cl *mockConnListener) PeerConnected(p peer.ID) { - cl.Lock() - defer cl.Unlock() - cl.events = append(cl.events, mockConnEvent{connected: true, peer: p}) -} - -func (cl *mockConnListener) PeerDisconnected(p peer.ID) { - cl.Lock() - defer cl.Unlock() - cl.events = append(cl.events, mockConnEvent{connected: false, peer: p}) -} - -func wait(t *testing.T, c *connectEventManager) { - require.Eventually(t, func() bool { - c.lk.RLock() - defer c.lk.RUnlock() - return len(c.changeQueue) == 0 - }, time.Second, time.Millisecond, "connection event manager never processed events") -} - -func TestConnectEventManagerConnectDisconnect(t *testing.T) { - test.Flaky(t) - - connListener := newMockConnListener() - peers := testutil.GeneratePeers(2) - cem := newConnectEventManager(connListener) - cem.Start() - t.Cleanup(cem.Stop) - - var expectedEvents []mockConnEvent - - // Connect A twice, should only see one event - cem.Connected(peers[0]) - cem.Connected(peers[0]) - expectedEvents = append(expectedEvents, mockConnEvent{ - peer: peers[0], - connected: true, - }) - - // Flush the event queue. - wait(t, cem) - require.Equal(t, expectedEvents, connListener.events) - - // Block up the event loop. - connListener.Lock() - cem.Connected(peers[1]) - expectedEvents = append(expectedEvents, mockConnEvent{ - peer: peers[1], - connected: true, - }) - - // We don't expect this to show up. - cem.Disconnected(peers[0]) - cem.Connected(peers[0]) - - connListener.Unlock() - - wait(t, cem) - require.Equal(t, expectedEvents, connListener.events) -} - -func TestConnectEventManagerMarkUnresponsive(t *testing.T) { - test.Flaky(t) - - connListener := newMockConnListener() - p := testutil.GeneratePeers(1)[0] - cem := newConnectEventManager(connListener) - cem.Start() - t.Cleanup(cem.Stop) - - var expectedEvents []mockConnEvent - - // Don't mark as connected when we receive a message (could have been delayed). - cem.OnMessage(p) - wait(t, cem) - require.Equal(t, expectedEvents, connListener.events) - - // Handle connected event. - cem.Connected(p) - wait(t, cem) - - expectedEvents = append(expectedEvents, mockConnEvent{ - peer: p, - connected: true, - }) - require.Equal(t, expectedEvents, connListener.events) - - // Becomes unresponsive. - cem.MarkUnresponsive(p) - wait(t, cem) - - expectedEvents = append(expectedEvents, mockConnEvent{ - peer: p, - connected: false, - }) - require.Equal(t, expectedEvents, connListener.events) - - // We have a new connection, mark them responsive. - cem.Connected(p) - wait(t, cem) - expectedEvents = append(expectedEvents, mockConnEvent{ - peer: p, - connected: true, - }) - require.Equal(t, expectedEvents, connListener.events) - - // No duplicate event. - cem.OnMessage(p) - wait(t, cem) - require.Equal(t, expectedEvents, connListener.events) -} - -func TestConnectEventManagerDisconnectAfterMarkUnresponsive(t *testing.T) { - test.Flaky(t) - - connListener := newMockConnListener() - p := testutil.GeneratePeers(1)[0] - cem := newConnectEventManager(connListener) - cem.Start() - t.Cleanup(cem.Stop) - - var expectedEvents []mockConnEvent - - // Handle connected event. - cem.Connected(p) - wait(t, cem) - - expectedEvents = append(expectedEvents, mockConnEvent{ - peer: p, - connected: true, - }) - require.Equal(t, expectedEvents, connListener.events) - - // Becomes unresponsive. - cem.MarkUnresponsive(p) - wait(t, cem) - - expectedEvents = append(expectedEvents, mockConnEvent{ - peer: p, - connected: false, - }) - require.Equal(t, expectedEvents, connListener.events) - - cem.Disconnected(p) - wait(t, cem) - require.Empty(t, cem.peers) // all disconnected - require.Equal(t, expectedEvents, connListener.events) -} diff --git a/bitswap/network/ipfs_impl.go b/bitswap/network/ipfs_impl.go index 039121cfc..3b557cb30 100644 --- a/bitswap/network/ipfs_impl.go +++ b/bitswap/network/ipfs_impl.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "sync" "sync/atomic" "time" @@ -49,6 +50,8 @@ func NewFromIpfsHost(host host.Host, r routing.ContentRouting, opts ...NetOpt) B protocolBitswap: s.ProtocolPrefix + ProtocolBitswap, supportedProtocols: s.SupportedProtocols, + + peers: make(map[peer.ID]*peerState), } return &bitswapNetwork @@ -72,9 +75,8 @@ type impl struct { // alignment. stats Stats - host host.Host - routing routing.ContentRouting - connectEvtMgr *connectEventManager + host host.Host + routing routing.ContentRouting protocolBitswapNoVers protocol.ID protocolBitswapOneZero protocol.ID @@ -85,6 +87,16 @@ type impl struct { // inbound messages from the network are forwarded to the receiver receivers []Receiver + + peersLk sync.Mutex + peers map[peer.ID]*peerState +} + +type peerState struct { + lk sync.Mutex + responsive bool + // isAConnectWaiting is protected by impl.peersLk. + isAConnectWaiting bool } type streamMessageSender struct { @@ -166,7 +178,7 @@ func (s *streamMessageSender) multiAttempt(ctx context.Context, fn func() error) // Protocol is not supported, so no need to try multiple times if errors.Is(err, multistream.ErrNotSupported[protocol.ID]{}) { - s.bsnet.connectEvtMgr.MarkUnresponsive(s.to) + s.bsnet.markPeerUnresponsive(s.to) return err } @@ -175,7 +187,7 @@ func (s *streamMessageSender) multiAttempt(ctx context.Context, fn func() error) // Failed too many times so mark the peer as unresponsive and return an error if i == s.opts.MaxRetries-1 { - s.bsnet.connectEvtMgr.MarkUnresponsive(s.to) + s.bsnet.markPeerUnresponsive(s.to) return err } @@ -345,23 +357,13 @@ func (bsnet *impl) newStreamToPeer(ctx context.Context, p peer.ID) (network.Stre func (bsnet *impl) Start(r ...Receiver) { bsnet.receivers = r - { - connectionListeners := make([]ConnectionListener, len(r)) - for i, v := range r { - connectionListeners[i] = v - } - bsnet.connectEvtMgr = newConnectEventManager(connectionListeners...) - } for _, proto := range bsnet.supportedProtocols { bsnet.host.SetStreamHandler(proto, bsnet.handleNewStream) } bsnet.host.Network().Notify((*netNotifiee)(bsnet)) - bsnet.connectEvtMgr.Start() - } func (bsnet *impl) Stop() { - bsnet.connectEvtMgr.Stop() bsnet.host.Network().StopNotify((*netNotifiee)(bsnet)) } @@ -425,7 +427,7 @@ func (bsnet *impl) handleNewStream(s network.Stream) { p := s.Conn().RemotePeer() ctx := context.Background() log.Debugf("bitswap net handleNewStream from %s", s.Conn().RemotePeer()) - bsnet.connectEvtMgr.OnMessage(s.Conn().RemotePeer()) + bsnet.markPeerResponsive(s.Conn().RemotePeer()) atomic.AddUint64(&bsnet.stats.MessagesRecvd, 1) for _, v := range bsnet.receivers { v.ReceiveMessage(ctx, p, received) @@ -444,6 +446,66 @@ func (bsnet *impl) Stats() Stats { } } +func (bsnet *impl) markPeerResponsive(p peer.ID) { + bsnet.peersLk.Lock() + state, ok := bsnet.peers[p] + if !ok { + state = &peerState{} + bsnet.peers[p] = state + } + // isAConnectWaiting let a potential markPeerUnresponsive concurrent + // know that it must not remove it from bsnet.peers. + state.isAConnectWaiting = true + bsnet.peersLk.Unlock() + + state.lk.Lock() + + bsnet.peersLk.Lock() + state.isAConnectWaiting = false + bsnet.peersLk.Unlock() + + defer state.lk.Unlock() + if state.responsive { + return + } + state.responsive = true + + for _, list := range bsnet.receivers { + list.PeerConnected(p) + } +} + +func (bsnet *impl) markPeerUnresponsive(p peer.ID) { + bsnet.peersLk.Lock() + state, ok := bsnet.peers[p] + if !ok { + // we are not connected anyway + bsnet.peersLk.Unlock() + return + } + bsnet.peersLk.Unlock() + + state.lk.Lock() + if !state.responsive { + state.lk.Unlock() + return + } + state.responsive = false + + for _, list := range bsnet.receivers { + list.PeerDisconnected(p) + } + + bsnet.peersLk.Lock() + state.lk.Unlock() + defer bsnet.peersLk.Unlock() + if state.isAConnectWaiting { + // someone is about to transition that back to connected so let them do their thing. + return + } + delete(bsnet.peers, p) +} + type netNotifiee impl func (nn *netNotifiee) impl() *impl { @@ -456,15 +518,21 @@ func (nn *netNotifiee) Connected(n network.Network, v network.Conn) { return } - nn.impl().connectEvtMgr.Connected(v.RemotePeer()) + // optimistically try to contact new peers + nn.impl().markPeerResponsive(v.RemotePeer()) } func (nn *netNotifiee) Disconnected(n network.Network, v network.Conn) { + // ignore transient connections + if v.Stat().Transient { + return + } // Only record a "disconnect" when we actually disconnect. - if n.Connectedness(v.RemotePeer()) == network.Connected { + p := v.RemotePeer() + if n.Connectedness(p) == network.Connected { return } - nn.impl().connectEvtMgr.Disconnected(v.RemotePeer()) + nn.impl().markPeerUnresponsive(p) } func (nn *netNotifiee) OpenedStream(n network.Network, s network.Stream) {} func (nn *netNotifiee) ClosedStream(n network.Network, v network.Stream) {}