Skip to content

Commit

Permalink
Revert "feat(connecteventmanager): block Connected() until accepted (#…
Browse files Browse the repository at this point in the history
…435)" and tests

This reverts commit 7ec68c5.
This reverts commit 59a2bca.
This reverts commit 1d2f5e5.
  • Loading branch information
Jorropo committed Aug 21, 2023
1 parent db4e43a commit b15f2a5
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 127 deletions.
1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ The following emojis are used to highlight certain changes:

### Fixed

- Address a Bitswap findpeers / connect race condition that can prevent peer communication ([#435](https://github.com/ipfs/boxo/issues/435))
- HTTP Gateway API: Not having a block will result in a 5xx error rather than 404
- HTTP Gateway API: CAR requests will return 200s and a CAR file proving a requested path does not exist rather than returning an error

Expand Down
102 changes: 35 additions & 67 deletions bitswap/network/connecteventmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,16 @@ type connectEventManager struct {
cond sync.Cond
peers map[peer.ID]*peerState

changeQueue []change
changeQueue []peer.ID
stop bool
done chan struct{}
}

type change struct {
pid peer.ID
handled chan struct{}
}

type peerState struct {
newState, curState state
pending bool
}

type waitFn func()

func waitNoop() {}

func newConnectEventManager(connListeners ...ConnectionListener) *connectEventManager {
evtManager := &connectEventManager{
connListeners: connListeners,
Expand Down Expand Up @@ -75,16 +66,7 @@ func (c *connectEventManager) getState(p peer.ID) state {
}
}

func (c *connectEventManager) makeWaitFunc(handled chan struct{}) waitFn {
return func() {
select {
case <-handled:
case <-c.done:
}
}
}

func (c *connectEventManager) setState(p peer.ID, newState state) waitFn {
func (c *connectEventManager) setState(p peer.ID, newState state) {

Check warning on line 69 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L69

Added line #L69 was not covered by tests
state, ok := c.peers[p]
if !ok {
state = new(peerState)
Expand All @@ -93,20 +75,9 @@ func (c *connectEventManager) setState(p peer.ID, newState state) waitFn {
state.newState = newState
if !state.pending && state.newState != state.curState {
state.pending = true
change := change{p, make(chan struct{})}
c.changeQueue = append(c.changeQueue, change)
c.changeQueue = append(c.changeQueue, p)

Check warning on line 78 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L78

Added line #L78 was not covered by tests
c.cond.Broadcast()
return c.makeWaitFunc(change.handled)
} else if state.pending {
// Find the change in the queue and return a wait function for it
for _, change := range c.changeQueue {
if change.pid == p {
return c.makeWaitFunc(change.handled)
}
}
log.Error("a peer was marked as change pending but not found in the change queue")
}
return waitNoop
}

// Waits for a change to be enqueued, or for the event manager to be stopped. Returns false if the
Expand All @@ -124,70 +95,67 @@ func (c *connectEventManager) worker() {
defer close(c.done)

for c.waitChange() {
pch := c.changeQueue[0]
c.changeQueue[0] = change{} // free the resources (slicing won't do that)
pid := c.changeQueue[0]
c.changeQueue[0] = peer.ID("") // free the peer ID (slicing won't do that)

Check warning on line 99 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L98-L99

Added lines #L98 - L99 were not covered by tests
c.changeQueue = c.changeQueue[1:]

state, ok := c.peers[pch.pid]
state, ok := c.peers[pid]

Check warning on line 102 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L102

Added line #L102 was not covered by tests
// If we've disconnected and forgotten, continue.
if !ok {
// This shouldn't be possible because _this_ thread is responsible for
// removing peers from this map, and we shouldn't get duplicate entries in
// the change queue.
log.Error("a change was enqueued for a peer we're not tracking")
close(pch.handled)
continue
}

// Is there anything to do?
if state.curState != state.newState {
// Record the state update, then apply it.
oldState := state.curState
state.curState = state.newState

switch state.newState {
case stateDisconnected:
delete(c.peers, pch.pid)
fallthrough
case stateUnresponsive:
// Only trigger a disconnect event if the peer was responsive.
// We could be transitioning from unresponsive to disconnected.
if oldState == stateResponsive {
c.lk.Unlock()
for _, v := range c.connListeners {
v.PeerDisconnected(pch.pid)
}
c.lk.Lock()
}
case stateResponsive:
// Record the fact that this "state" is no longer in the queue.
state.pending = false

// Then, if there's nothing to do, continue.
if state.curState == state.newState {
continue

Check warning on line 117 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L113-L117

Added lines #L113 - L117 were not covered by tests
}

// Or record the state update, then apply it.
oldState := state.curState
state.curState = state.newState

switch state.newState {
case stateDisconnected:
delete(c.peers, pid)
fallthrough
case stateUnresponsive:
// Only trigger a disconnect event if the peer was responsive.
// We could be transitioning from unresponsive to disconnected.
if oldState == stateResponsive {

Check warning on line 131 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L121-L131

Added lines #L121 - L131 were not covered by tests
c.lk.Unlock()
for _, v := range c.connListeners {
v.PeerConnected(pch.pid)
v.PeerDisconnected(pid)

Check warning on line 134 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L134

Added line #L134 was not covered by tests
}
c.lk.Lock()
}
case stateResponsive:
c.lk.Unlock()
for _, v := range c.connListeners {
v.PeerConnected(pid)
}
c.lk.Lock()

Check warning on line 143 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L138-L143

Added lines #L138 - L143 were not covered by tests
}

// Record the fact that this "state" is no longer in the queue.
state.pending = false
// Signal that we've handled the state change
close(pch.handled)
}
}

// Called whenever we receive a new connection. May be called many times.
func (c *connectEventManager) Connected(p peer.ID) {
c.lk.Lock()
defer c.lk.Unlock()

Check warning on line 151 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L151

Added line #L151 was not covered by tests

// !responsive -> responsive

if c.getState(p) == stateResponsive {
c.lk.Unlock()
return
}
wait := c.setState(p, stateResponsive)
c.lk.Unlock()
wait()
c.setState(p, stateResponsive)

Check warning on line 158 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L158

Added line #L158 was not covered by tests
}

// Called when we drop the final connection to a peer.
Expand Down
76 changes: 17 additions & 59 deletions bitswap/network/connecteventmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/ipfs/boxo/bitswap/internal/testutil"
"github.com/ipfs/boxo/internal/test"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
)
Expand All @@ -17,8 +18,7 @@ type mockConnEvent struct {

type mockConnListener struct {
sync.Mutex
events []mockConnEvent
peerConnectedCb func(p peer.ID)
events []mockConnEvent
}

func newMockConnListener() *mockConnListener {
Expand All @@ -29,9 +29,6 @@ func (cl *mockConnListener) PeerConnected(p peer.ID) {
cl.Lock()
defer cl.Unlock()
cl.events = append(cl.events, mockConnEvent{connected: true, peer: p})
if cl.peerConnectedCb != nil {
cl.peerConnectedCb(p)
}
}

func (cl *mockConnListener) PeerDisconnected(p peer.ID) {
Expand All @@ -49,6 +46,8 @@ func wait(t *testing.T, c *connectEventManager) {
}

func TestConnectEventManagerConnectDisconnect(t *testing.T) {
test.Flaky(t)

connListener := newMockConnListener()
peers := testutil.GeneratePeers(2)
cem := newConnectEventManager(connListener)
Expand All @@ -65,26 +64,31 @@ func TestConnectEventManagerConnectDisconnect(t *testing.T) {
connected: true,
})

// Flush the event queue.
wait(t, cem)
require.Equal(t, expectedEvents, connListener.events)

// Block up the event loop.
connListener.Lock()
cem.Connected(peers[1])
expectedEvents = append(expectedEvents, mockConnEvent{
peer: peers[1],
connected: true,
})
require.Equal(t, expectedEvents, connListener.events)

// We don't expect this to show up.
cem.Disconnected(peers[0])
expectedEvents = append(expectedEvents, mockConnEvent{
peer: peers[0],
connected: false,
})
// Flush the event queue.
cem.Connected(peers[0])

connListener.Unlock()

wait(t, cem)
require.Equal(t, expectedEvents, connListener.events)
}

func TestConnectEventManagerMarkUnresponsive(t *testing.T) {
test.Flaky(t)

connListener := newMockConnListener()
p := testutil.GeneratePeers(1)[0]
cem := newConnectEventManager(connListener)
Expand Down Expand Up @@ -134,6 +138,8 @@ func TestConnectEventManagerMarkUnresponsive(t *testing.T) {
}

func TestConnectEventManagerDisconnectAfterMarkUnresponsive(t *testing.T) {
test.Flaky(t)

connListener := newMockConnListener()
p := testutil.GeneratePeers(1)[0]
cem := newConnectEventManager(connListener)
Expand Down Expand Up @@ -167,51 +173,3 @@ func TestConnectEventManagerDisconnectAfterMarkUnresponsive(t *testing.T) {
require.Empty(t, cem.peers) // all disconnected
require.Equal(t, expectedEvents, connListener.events)
}

func TestConnectEventManagerConnectFlowSynchronous(t *testing.T) {
connListener := newMockConnListener()
actionsCh := make(chan string)
connListener.peerConnectedCb = func(p peer.ID) {
actionsCh <- "PeerConnected:" + p.String()
time.Sleep(time.Millisecond * 50)
}

peers := testutil.GeneratePeers(2)
cem := newConnectEventManager(connListener)
cem.Start()
t.Cleanup(cem.Stop)

go func() {
actionsCh <- "Connected:" + peers[0].String()
cem.Connected(peers[0])
actionsCh <- "Done:" + peers[0].String()
actionsCh <- "Connected:" + peers[1].String()
cem.Connected(peers[1])
actionsCh <- "Done:" + peers[1].String()
close(actionsCh)
}()

// We expect Done to be sent _after_ PeerConnected, which demonstrates the
// call to Connected() blocks until PeerConnected() returns.
gotActions := make([]string, 0, 3)
for event := range actionsCh {
gotActions = append(gotActions, event)
}
expectedActions := []string{
"Connected:" + peers[0].String(),
"PeerConnected:" + peers[0].String(),
"Done:" + peers[0].String(),
"Connected:" + peers[1].String(),
"PeerConnected:" + peers[1].String(),
"Done:" + peers[1].String(),
}
require.Equal(t, expectedActions, gotActions)

// Flush the event queue.
wait(t, cem)
expectedEvents := []mockConnEvent{
{peer: peers[0], connected: true},
{peer: peers[1], connected: true},
}
require.Equal(t, expectedEvents, connListener.events)
}

0 comments on commit b15f2a5

Please sign in to comment.