Skip to content

Commit

Permalink
feat(connecteventmanager): block Connected() until accepted
Browse files Browse the repository at this point in the history
Ref: #432

Minimal attempt at solving #432
  • Loading branch information
rvagg committed Aug 17, 2023
1 parent dd32d67 commit 76f5ee2
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ The following emojis are used to highlight certain changes:

- Removed mentions of unused ARC algorithm ([#336](https://github.com/ipfs/boxo/issues/366#issuecomment-1597253540))
- Handle `_redirects` file when `If-None-Match` header is present ([#412](https://github.com/ipfs/boxo/pull/412))
- Address a Bitswap findpeers / connect race condition that can prevent peer communication ([#435](https://github.com/ipfs/boxo/issues/435))

### Security

Expand Down
78 changes: 62 additions & 16 deletions bitswap/network/connecteventmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,38 @@ type connectEventManager struct {

type peerState struct {
newState, curState state
pending bool
accepted chan struct{}
}

func newPeerState() *peerState {
return &peerState{accepted: make(chan struct{})}
}

func (p *peerState) isPending() bool {
select {
case <-p.accepted:
return false
default:
}
return true
}

func (p *peerState) accept() {
select {
case <-p.accepted:
default:
close(p.accepted)
}
}

func (p *peerState) setPending() {
if !p.isPending() {
p.accepted = make(chan struct{})
}
}

func (p *peerState) waitAccept() {
<-p.accepted
}

func newConnectEventManager(connListeners ...ConnectionListener) *connectEventManager {
Expand Down Expand Up @@ -61,22 +92,33 @@ func (c *connectEventManager) Stop() {
func (c *connectEventManager) getState(p peer.ID) state {
if state, ok := c.peers[p]; ok {
return state.newState
} else {
return stateDisconnected
}
return stateDisconnected
}

func (c *connectEventManager) setState(p peer.ID, newState state) {
state, ok := c.peers[p]
if !ok {
state = new(peerState)
func (c *connectEventManager) setState(p peer.ID, newState state) bool {
state, isExisting := c.peers[p]
if !isExisting {
state = newPeerState()
c.peers[p] = state
}
state.newState = newState
if !state.pending && state.newState != state.curState {
state.pending = true
c.changeQueue = append(c.changeQueue, p)
c.cond.Broadcast()
if state.newState != state.curState {
if !isExisting || !state.isPending() {
if isExisting {
state.setPending()
}
c.changeQueue = append(c.changeQueue, p)
c.cond.Broadcast()
}
return true
}
return false
}

func (c *connectEventManager) waitStateAccept(p peer.ID) {
if state, ok := c.peers[p]; ok {
state.waitAccept()
}
}

Expand Down Expand Up @@ -109,11 +151,9 @@ func (c *connectEventManager) worker() {
continue
}

// Record the fact that this "state" is no longer in the queue.
state.pending = false

// Then, if there's nothing to do, continue.
if state.curState == state.newState {
state.accept()
continue
}

Expand All @@ -124,6 +164,7 @@ func (c *connectEventManager) worker() {
switch state.newState {
case stateDisconnected:
delete(c.peers, pid)
state.accept()
fallthrough
case stateUnresponsive:
// Only trigger a disconnect event if the peer was responsive.
Expand All @@ -142,20 +183,25 @@ func (c *connectEventManager) worker() {
}
c.lk.Lock()
}
state.accept()
}
}

// Called whenever we receive a new connection. May be called many times.
func (c *connectEventManager) Connected(p peer.ID) {
c.lk.Lock()
defer c.lk.Unlock()

// !responsive -> responsive

if c.getState(p) == stateResponsive {
c.lk.Unlock()
return
}
c.setState(p, stateResponsive)
wait := c.setState(p, stateResponsive)
c.lk.Unlock()
if wait {
c.waitStateAccept(p)
}
}

// Called when we drop the final connection to a peer.
Expand Down

0 comments on commit 76f5ee2

Please sign in to comment.