Permalink
Browse files

Added possibility to set serialize/unserialize function for rpc serve…

…rs/clients
  • Loading branch information...
igaponov committed Feb 7, 2015
1 parent a15a4eb commit 4b062493f1f6aaa19205af04dd5c5d7da6ca5f2e
Showing with 25 additions and 2 deletions.
  1. +3 −0 CHANGELOG
  2. +6 −0 DependencyInjection/OldSoundRabbitMqExtension.php
  3. +7 −1 RabbitMq/RpcClient.php
  4. +9 −1 RabbitMq/RpcServer.php
View
@@ -1,3 +1,6 @@
- 2015-02-07
* Added possibility to set serialize/unserialize function for rpc servers/clients
- 2014-11-27
* Added interface `OldSound\RabbitMqBundle\Provider\QueuesProviderInterface`
* Added `queues_provider` configuration for multiple consumer
@@ -312,6 +312,9 @@ protected function loadRpcClients()
if ($this->collectorEnabled) {
$this->injectLoggedChannel($definition, $key, $client['connection']);
}
if (array_key_exists('unserializer', $client)) {
$definition->addMethodCall('setUnserializer', array($client['unserializer']));
}
$this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_rpc', $key), $definition);
}
@@ -340,6 +343,9 @@ protected function loadRpcServers()
if (array_key_exists('queue_options', $server)) {
$definition->addMethodCall('setQueueOptions', array($server['queue_options']));
}
if (array_key_exists('serializer', $server)) {
$definition->addMethodCall('setSerializer', array($server['serializer']));
}
$this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_server', $key), $definition);
}
}
View
@@ -13,6 +13,7 @@ class RpcClient extends BaseAmqp
protected $timeout = 0;
private $queueName;
private $unserializer = 'unserialize';
public function initClient($expectSerializedResponse = true)
{
@@ -60,7 +61,7 @@ public function processMessage(AMQPMessage $msg)
{
$messageBody = $msg->body;
if ($this->expectSerializedResponse) {
$messageBody = unserialize($messageBody);
$messageBody = call_user_func($this->unserializer, $messageBody);
}
$this->replies[$msg->get('correlation_id')] = $messageBody;
@@ -74,4 +75,9 @@ protected function getQueueName()
return $this->queueName;
}
public function setUnserializer($unserializer)
{
$this->unserializer = $unserializer;
}
}
View
@@ -6,6 +6,8 @@
class RpcServer extends BaseConsumer
{
private $serializer = 'serialize';
public function initServer($name)
{
$this->setExchangeOptions(array('name' => $name, 'type' => 'direct'));
@@ -17,7 +19,8 @@ public function processMessage(AMQPMessage $msg)
try {
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
$result = call_user_func($this->callback, $msg);
$this->sendReply(serialize($result), $msg->get('reply_to'), $msg->get('correlation_id'));
$result = call_user_func($this->serializer, $result);
$this->sendReply($result, $msg->get('reply_to'), $msg->get('correlation_id'));
$this->consumed++;
$this->maybeStopConsumer();
} catch (\Exception $e) {
@@ -30,4 +33,9 @@ protected function sendReply($result, $client, $correlationId)
$reply = new AMQPMessage($result, array('content_type' => 'text/plain', 'correlation_id' => $correlationId));
$this->getChannel()->basic_publish($reply, '', $client);
}
public function setSerializer($serializer)
{
$this->serializer = $serializer;
}
}

0 comments on commit 4b06249

Please sign in to comment.