Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "feat(connecteventmanager): block Connected() until accepted (#435)" and tests #444

Merged
merged 1 commit into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ The following emojis are used to highlight certain changes:

### Fixed

- Address a Bitswap findpeers / connect race condition that can prevent peer communication ([#435](https://github.com/ipfs/boxo/issues/435))
- HTTP Gateway API: Not having a block will result in a 5xx error rather than 404
- HTTP Gateway API: CAR requests will return 200s and a CAR file proving a requested path does not exist rather than returning an error

Expand Down
102 changes: 35 additions & 67 deletions bitswap/network/connecteventmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,16 @@
cond sync.Cond
peers map[peer.ID]*peerState

changeQueue []change
changeQueue []peer.ID
stop bool
done chan struct{}
}

type change struct {
pid peer.ID
handled chan struct{}
}

type peerState struct {
newState, curState state
pending bool
}

type waitFn func()

func waitNoop() {}

func newConnectEventManager(connListeners ...ConnectionListener) *connectEventManager {
evtManager := &connectEventManager{
connListeners: connListeners,
Expand Down Expand Up @@ -75,16 +66,7 @@
}
}

func (c *connectEventManager) makeWaitFunc(handled chan struct{}) waitFn {
return func() {
select {
case <-handled:
case <-c.done:
}
}
}

func (c *connectEventManager) setState(p peer.ID, newState state) waitFn {
func (c *connectEventManager) setState(p peer.ID, newState state) {

Check warning on line 69 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L69

Added line #L69 was not covered by tests
state, ok := c.peers[p]
if !ok {
state = new(peerState)
Expand All @@ -93,20 +75,9 @@
state.newState = newState
if !state.pending && state.newState != state.curState {
state.pending = true
change := change{p, make(chan struct{})}
c.changeQueue = append(c.changeQueue, change)
c.changeQueue = append(c.changeQueue, p)

Check warning on line 78 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L78

Added line #L78 was not covered by tests
c.cond.Broadcast()
return c.makeWaitFunc(change.handled)
} else if state.pending {
// Find the change in the queue and return a wait function for it
for _, change := range c.changeQueue {
if change.pid == p {
return c.makeWaitFunc(change.handled)
}
}
log.Error("a peer was marked as change pending but not found in the change queue")
}
return waitNoop
}

// Waits for a change to be enqueued, or for the event manager to be stopped. Returns false if the
Expand All @@ -124,70 +95,67 @@
defer close(c.done)

for c.waitChange() {
pch := c.changeQueue[0]
c.changeQueue[0] = change{} // free the resources (slicing won't do that)
pid := c.changeQueue[0]
c.changeQueue[0] = peer.ID("") // free the peer ID (slicing won't do that)

Check warning on line 99 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L98-L99

Added lines #L98 - L99 were not covered by tests
c.changeQueue = c.changeQueue[1:]

state, ok := c.peers[pch.pid]
state, ok := c.peers[pid]

Check warning on line 102 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L102

Added line #L102 was not covered by tests
// If we've disconnected and forgotten, continue.
if !ok {
// This shouldn't be possible because _this_ thread is responsible for
// removing peers from this map, and we shouldn't get duplicate entries in
// the change queue.
log.Error("a change was enqueued for a peer we're not tracking")
close(pch.handled)
continue
}

// Is there anything to do?
if state.curState != state.newState {
// Record the state update, then apply it.
oldState := state.curState
state.curState = state.newState

switch state.newState {
case stateDisconnected:
delete(c.peers, pch.pid)
fallthrough
case stateUnresponsive:
// Only trigger a disconnect event if the peer was responsive.
// We could be transitioning from unresponsive to disconnected.
if oldState == stateResponsive {
c.lk.Unlock()
for _, v := range c.connListeners {
v.PeerDisconnected(pch.pid)
}
c.lk.Lock()
}
case stateResponsive:
// Record the fact that this "state" is no longer in the queue.
state.pending = false

// Then, if there's nothing to do, continue.
if state.curState == state.newState {
continue

Check warning on line 117 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L113-L117

Added lines #L113 - L117 were not covered by tests
}

// Or record the state update, then apply it.
oldState := state.curState
state.curState = state.newState

switch state.newState {
case stateDisconnected:
delete(c.peers, pid)
fallthrough
case stateUnresponsive:
// Only trigger a disconnect event if the peer was responsive.
// We could be transitioning from unresponsive to disconnected.
if oldState == stateResponsive {

Check warning on line 131 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L121-L131

Added lines #L121 - L131 were not covered by tests
c.lk.Unlock()
for _, v := range c.connListeners {
v.PeerConnected(pch.pid)
v.PeerDisconnected(pid)

Check warning on line 134 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L134

Added line #L134 was not covered by tests
}
c.lk.Lock()
}
case stateResponsive:
c.lk.Unlock()
for _, v := range c.connListeners {
v.PeerConnected(pid)
}
c.lk.Lock()

Check warning on line 143 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L138-L143

Added lines #L138 - L143 were not covered by tests
}

// Record the fact that this "state" is no longer in the queue.
state.pending = false
// Signal that we've handled the state change
close(pch.handled)
}
}

// Called whenever we receive a new connection. May be called many times.
func (c *connectEventManager) Connected(p peer.ID) {
c.lk.Lock()
defer c.lk.Unlock()

Check warning on line 151 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L151

Added line #L151 was not covered by tests

// !responsive -> responsive

if c.getState(p) == stateResponsive {
c.lk.Unlock()
return
}
wait := c.setState(p, stateResponsive)
c.lk.Unlock()
wait()
c.setState(p, stateResponsive)

Check warning on line 158 in bitswap/network/connecteventmanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/network/connecteventmanager.go#L158

Added line #L158 was not covered by tests
}

// Called when we drop the final connection to a peer.
Expand Down
76 changes: 17 additions & 59 deletions bitswap/network/connecteventmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/ipfs/boxo/bitswap/internal/testutil"
"github.com/ipfs/boxo/internal/test"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
)
Expand All @@ -17,8 +18,7 @@ type mockConnEvent struct {

type mockConnListener struct {
sync.Mutex
events []mockConnEvent
peerConnectedCb func(p peer.ID)
events []mockConnEvent
}

func newMockConnListener() *mockConnListener {
Expand All @@ -29,9 +29,6 @@ func (cl *mockConnListener) PeerConnected(p peer.ID) {
cl.Lock()
defer cl.Unlock()
cl.events = append(cl.events, mockConnEvent{connected: true, peer: p})
if cl.peerConnectedCb != nil {
cl.peerConnectedCb(p)
}
}

func (cl *mockConnListener) PeerDisconnected(p peer.ID) {
Expand All @@ -49,6 +46,8 @@ func wait(t *testing.T, c *connectEventManager) {
}

func TestConnectEventManagerConnectDisconnect(t *testing.T) {
test.Flaky(t)

connListener := newMockConnListener()
peers := testutil.GeneratePeers(2)
cem := newConnectEventManager(connListener)
Expand All @@ -65,26 +64,31 @@ func TestConnectEventManagerConnectDisconnect(t *testing.T) {
connected: true,
})

// Flush the event queue.
wait(t, cem)
require.Equal(t, expectedEvents, connListener.events)

// Block up the event loop.
connListener.Lock()
cem.Connected(peers[1])
expectedEvents = append(expectedEvents, mockConnEvent{
peer: peers[1],
connected: true,
})
require.Equal(t, expectedEvents, connListener.events)

// We don't expect this to show up.
cem.Disconnected(peers[0])
expectedEvents = append(expectedEvents, mockConnEvent{
peer: peers[0],
connected: false,
})
// Flush the event queue.
cem.Connected(peers[0])

connListener.Unlock()

wait(t, cem)
require.Equal(t, expectedEvents, connListener.events)
}

func TestConnectEventManagerMarkUnresponsive(t *testing.T) {
test.Flaky(t)

connListener := newMockConnListener()
p := testutil.GeneratePeers(1)[0]
cem := newConnectEventManager(connListener)
Expand Down Expand Up @@ -134,6 +138,8 @@ func TestConnectEventManagerMarkUnresponsive(t *testing.T) {
}

func TestConnectEventManagerDisconnectAfterMarkUnresponsive(t *testing.T) {
test.Flaky(t)

connListener := newMockConnListener()
p := testutil.GeneratePeers(1)[0]
cem := newConnectEventManager(connListener)
Expand Down Expand Up @@ -167,51 +173,3 @@ func TestConnectEventManagerDisconnectAfterMarkUnresponsive(t *testing.T) {
require.Empty(t, cem.peers) // all disconnected
require.Equal(t, expectedEvents, connListener.events)
}

func TestConnectEventManagerConnectFlowSynchronous(t *testing.T) {
connListener := newMockConnListener()
actionsCh := make(chan string)
connListener.peerConnectedCb = func(p peer.ID) {
actionsCh <- "PeerConnected:" + p.String()
time.Sleep(time.Millisecond * 50)
}

peers := testutil.GeneratePeers(2)
cem := newConnectEventManager(connListener)
cem.Start()
t.Cleanup(cem.Stop)

go func() {
actionsCh <- "Connected:" + peers[0].String()
cem.Connected(peers[0])
actionsCh <- "Done:" + peers[0].String()
actionsCh <- "Connected:" + peers[1].String()
cem.Connected(peers[1])
actionsCh <- "Done:" + peers[1].String()
close(actionsCh)
}()

// We expect Done to be sent _after_ PeerConnected, which demonstrates the
// call to Connected() blocks until PeerConnected() returns.
gotActions := make([]string, 0, 3)
for event := range actionsCh {
gotActions = append(gotActions, event)
}
expectedActions := []string{
"Connected:" + peers[0].String(),
"PeerConnected:" + peers[0].String(),
"Done:" + peers[0].String(),
"Connected:" + peers[1].String(),
"PeerConnected:" + peers[1].String(),
"Done:" + peers[1].String(),
}
require.Equal(t, expectedActions, gotActions)

// Flush the event queue.
wait(t, cem)
expectedEvents := []mockConnEvent{
{peer: peers[0], connected: true},
{peer: peers[1], connected: true},
}
require.Equal(t, expectedEvents, connListener.events)
}