Skip to content

Commit

Permalink
Merge pull request #2020 from nspcc-dev/fix-infinite-loop-in-discoverer
Browse files Browse the repository at this point in the history
Fix infinite loop in discoverer
  • Loading branch information
roman-khimov committed Jun 18, 2021
2 parents 33db3f7 + 38d14d4 commit 0888dda
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 7 deletions.
25 changes: 18 additions & 7 deletions pkg/network/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type DefaultDiscovery struct {
connectedAddrs map[string]bool
goodAddrs map[string]bool
unconnectedAddrs map[string]int
attempted map[string]bool
isDead bool
requestCh chan int
pool chan string
Expand All @@ -52,6 +53,7 @@ func NewDefaultDiscovery(addrs []string, dt time.Duration, ts Transporter) *Defa
connectedAddrs: make(map[string]bool),
goodAddrs: make(map[string]bool),
unconnectedAddrs: make(map[string]int),
attempted: make(map[string]bool),
requestCh: make(chan int),
pool: make(chan string, maxPoolSize),
}
Expand Down Expand Up @@ -174,11 +176,13 @@ func (d *DefaultDiscovery) RegisterConnectedAddr(addr string) {
}

func (d *DefaultDiscovery) tryAddress(addr string) {
if err := d.transport.Dial(addr, d.dialTimeout); err != nil {
err := d.transport.Dial(addr, d.dialTimeout)
d.lock.Lock()
delete(d.attempted, addr)
d.lock.Unlock()
if err != nil {
d.RegisterBadAddr(addr)
d.RequestRemote(1)
} else {
d.RegisterConnectedAddr(addr)
}
}

Expand Down Expand Up @@ -212,24 +216,31 @@ func (d *DefaultDiscovery) run() {
requested = r
}
case addr := <-d.pool:
d.lock.RLock()
addrIsConnected := d.connectedAddrs[addr]
d.lock.RUnlock()
updatePoolCountMetric(d.PoolCount())
if !addrIsConnected {
d.lock.Lock()
if !d.connectedAddrs[addr] && !d.attempted[addr] {
d.attempted[addr] = true
go d.tryAddress(addr)
requested--
}
d.lock.Unlock()
default: // Empty pool
var added int
d.lock.Lock()
for _, addr := range d.seeds {
if !d.connectedAddrs[addr] {
delete(d.badAddrs, addr)
d.unconnectedAddrs[addr] = connRetries
d.pushToPoolOrDrop(addr)
added++
}
}
d.lock.Unlock()
// The pool is empty, but all seed nodes are already connected,
// we can end up in an infinite loop here, so drop the request.
if added == 0 {
requested = 0
}
}
}
if !ok {
Expand Down
1 change: 1 addition & 0 deletions pkg/network/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func TestDefaultDiscoverer(t *testing.T) {
select {
case a := <-ts.dialCh:
dialled = append(dialled, a)
d.RegisterConnectedAddr(a)
case <-time.After(time.Second):
t.Fatalf("timeout expecting for transport dial")
}
Expand Down

0 comments on commit 0888dda

Please sign in to comment.