diff --git a/RabbitMq/RpcClient.php b/RabbitMq/RpcClient.php index aff159f3..c423522a 100644 --- a/RabbitMq/RpcClient.php +++ b/RabbitMq/RpcClient.php @@ -9,13 +9,13 @@ class RpcClient extends BaseAmqp { protected $requests = 0; protected $replies = array(); - protected $queueName; protected $expectSerializedResponse; protected $timeout = 0; + private $queueName; + public function initClient($expectSerializedResponse = true) { - list($this->queueName, ,) = $this->getChannel()->queue_declare("", false, false, true, true); $this->expectSerializedResponse = $expectSerializedResponse; } @@ -26,7 +26,7 @@ public function addRequest($msgBody, $server, $requestId = null, $routingKey = ' } $msg = new AMQPMessage($msgBody, array('content_type' => 'text/plain', - 'reply_to' => $this->queueName, + 'reply_to' => $this->getQueueName(), 'delivery_mode' => 1, // non durable 'expiration' => $expiration*1000, 'correlation_id' => $requestId)); @@ -43,13 +43,13 @@ public function addRequest($msgBody, $server, $requestId = null, $routingKey = ' public function getReplies() { $this->replies = array(); - $this->getChannel()->basic_consume($this->queueName, '', false, true, false, false, array($this, 'processMessage')); + $this->getChannel()->basic_consume($this->getQueueName(), '', false, true, false, false, array($this, 'processMessage')); while (count($this->replies) < $this->requests) { $this->getChannel()->wait(null, false, $this->timeout); } - $this->getChannel()->basic_cancel($this->queueName); + $this->getChannel()->basic_cancel($this->getQueueName()); $this->requests = 0; $this->timeout = 0; @@ -65,4 +65,13 @@ public function processMessage(AMQPMessage $msg) $this->replies[$msg->get('correlation_id')] = $messageBody; } + + protected function getQueueName() + { + if (null === $this->queueName) { + list($this->queueName, ,) = $this->getChannel()->queue_declare("", false, false, true, true); + } + + return $this->queueName; + } }