Skip to content

Commit

Permalink
consumer: don't allow redistribute to run with no connections
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Apr 8, 2015
1 parent aebe75c commit 2e31ccc
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -955,24 +955,29 @@ func (r *Consumer) redistributeRDY() {
return
}

numConns := int32(len(r.conns()))
// if an external heuristic set needRDYRedistributed we want to wait
// until we can actually redistribute to proceed
conns := r.conns()
if len(conns) == 0 {
return
}

maxInFlight := r.getMaxInFlight()
if numConns > maxInFlight {
if len(conns) > int(maxInFlight) {
r.log(LogLevelDebug, "redistributing RDY state (%d conns > %d max_in_flight)",
numConns, maxInFlight)
len(conns), maxInFlight)
atomic.StoreInt32(&r.needRDYRedistributed, 1)
}

if r.inBackoff() && numConns > 1 {
r.log(LogLevelDebug, "redistributing RDY state (in backoff and %d conns > 1)", numConns)
if r.inBackoff() && len(conns) > 1 {
r.log(LogLevelDebug, "redistributing RDY state (in backoff and %d conns > 1)", len(conns))
atomic.StoreInt32(&r.needRDYRedistributed, 1)
}

if !atomic.CompareAndSwapInt32(&r.needRDYRedistributed, 1, 0) {
return
}

conns := r.conns()
possibleConns := make([]*Conn, 0, len(conns))
for _, c := range conns {
lastMsgDuration := time.Now().Sub(c.LastMessageTime())
Expand Down

0 comments on commit 2e31ccc

Please sign in to comment.