Skip to content

Commit

Permalink
Merge pull request #157 from halseth/query-batch-stall-fix
Browse files Browse the repository at this point in the history
query: handle closing matchSignal for peer query
  • Loading branch information
halseth committed Jun 20, 2019
2 parents 3eed275 + 0d4a221 commit 3f503ac
Showing 1 changed file with 22 additions and 5 deletions.
27 changes: 22 additions & 5 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,13 +385,16 @@ func queryChainServiceBatch(
mtxPeerStates.Lock()
peerStates[sp.Addr()] = queryMsgs[handleQuery]
mtxPeerStates.Unlock()

exiting := false
select {
case <-queryQuit:
return
exiting = true
case <-s.quit:
return
exiting = true
case <-quit:
return
exiting = true

case <-timeout:
// We failed, so set the query state back to
// zero and update our lastFailed state.
Expand All @@ -407,7 +410,7 @@ func queryChainServiceBatch(

// To allow other peers to pick up this query,
// let the peer that just timed out wait a
// cooldown period before handing the next
// cooldown period before handing it the next
// query.
select {
case <-time.After(QueryPeerCooldown):
Expand All @@ -419,7 +422,12 @@ func queryChainServiceBatch(
return
}

case <-matchSignal:
case _, ok := <-matchSignal:
if !ok {
exiting = true
break
}

// We got a match signal so we can mark this
// query a success.
atomic.StoreUint32(&queryStates[handleQuery],
Expand All @@ -428,6 +436,15 @@ func queryChainServiceBatch(
log.Tracef("Query #%v answered, updating state",
handleQuery)
}

// Before exiting the peer goroutine, reset the query
// state to ensure other peers can pick it up.
if exiting {
atomic.StoreUint32(
&queryStates[handleQuery], uint32(queryWaitSubmit),
)
return
}
}
}

Expand Down

0 comments on commit 3f503ac

Please sign in to comment.