diff --git a/src/HumusAmqpModule/Consumer.php b/src/HumusAmqpModule/Consumer.php index 1a63e43..c11b161 100644 --- a/src/HumusAmqpModule/Consumer.php +++ b/src/HumusAmqpModule/Consumer.php @@ -256,12 +256,11 @@ public function consume($msgAmount = 0) { $this->target = $msgAmount; - do { + foreach ($this->queues as $index => $queue) { if (!$this->timestampLastAck) { $this->timestampLastAck = microtime(1); } - $queue = $this->fetchNextQueue(); $message = $queue->get(); if ($message instanceof AMQPEnvelope) { @@ -272,7 +271,7 @@ public function consume($msgAmount = 0) $processFlag = false; } $this->handleProcessFlag($message, $processFlag); - } else { + } elseif (0 == $index) { // all queues checked, no messages found usleep($this->waitTimeout); } @@ -280,12 +279,15 @@ public function consume($msgAmount = 0) if ($this->countMessagesUnacked > 0 && ($this->countMessagesUnacked == $this->blockSize - || ($now - $this->timestampLastAck) > $this->idleTimeout - )) { + || ($now - $this->timestampLastAck) > $this->idleTimeout + )) { $this->ackOrNackBlock(); } - } while ($this->keepAlive && ($this->countMessagesConsumed < $this->target || 0 == $this->target)); + if (!$this->keepAlive || (0 != $this->target && $this->countMessagesConsumed >= $this->target)) { + break; + } + } } /**