diff --git a/src/Provider/Amqp/AmqpQueueProvider.php b/src/Provider/Amqp/AmqpQueueProvider.php index c667a38..54f2f1f 100644 --- a/src/Provider/Amqp/AmqpQueueProvider.php +++ b/src/Provider/Amqp/AmqpQueueProvider.php @@ -96,9 +96,12 @@ public function pushBatch(array $batch, $persistent = null) $mandatory = $this->_getMandatoryFlag(); $autoDeclare = $this->_getAutoDeclare(); $publishConfirm = $this->_getPublishConfirm(); + // max no. of times to attempt declaring the queue + $declareRetryLimit = 2; $needRetry = true; $needDeclare = false; + $declareAttempts = 0; $returnCallback = null; if($mandatory) @@ -108,8 +111,15 @@ public function pushBatch(array $batch, $persistent = null) $replyText, $exchange, $routingKey - ) use (&$needRetry, &$needDeclare, &$autoDeclare) { - if($autoDeclare && (!$needDeclare) && ($replyCode == 312)) + ) use ( + &$needRetry, &$needDeclare, &$autoDeclare, + $declareAttempts, $declareRetryLimit + ) + { + if($autoDeclare + && ($declareAttempts < $declareRetryLimit) + && ($replyCode == 312) + ) { $needDeclare = true; $needRetry = true; @@ -136,6 +146,7 @@ public function pushBatch(array $batch, $persistent = null) if($needDeclare) { $this->_log("Auto-declaring exchange and queue"); + $declareAttempts++; $this->declareExchange(); $this->declareQueue(); $this->bindQueue(); @@ -170,7 +181,10 @@ public function pushBatch(array $batch, $persistent = null) catch(\Exception $e) { $this->disconnectAll(); - if($autoDeclare && (!$needDeclare) && ($e->getCode() == 404)) + if($autoDeclare + && ($declareAttempts < $declareRetryLimit) + && ($e->getCode() == 404) + ) { $needRetry = true; $needDeclare = true;