Skip to content

Commit

Permalink
Merge branch 'mvonahn-master'
Browse files Browse the repository at this point in the history
  • Loading branch information
John Kelly committed May 27, 2016
2 parents 438392d + 59811a2 commit 68aed1f
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 10 deletions.
27 changes: 27 additions & 0 deletions lib/Thumper/BaseAmqp.php
Expand Up @@ -39,6 +39,9 @@

abstract class BaseAmqp
{
const NON_PERSISTENT = 1;
const PERSISTENT = 2;

/**
* @var AbstractConnection
*/
Expand Down Expand Up @@ -88,6 +91,13 @@ abstract class BaseAmqp
*/
protected $routingKey = '';

/**
* @var array
*/
protected $parameters = array(
'content_type' => 'text/plain'
);

/**
* BaseAmqp constructor.
* @param AbstractConnection $connection
Expand Down Expand Up @@ -226,4 +236,21 @@ private function isValidExchangeName($exchangeName)
{
return preg_match('/^[A-Za-z0-9_\-\.\;]*$/', $exchangeName);
}

/**
* @param string $key
* @param string $value
*/
public function setParameter($key, $value)
{
$this->parameters[$key] = $value;
}

/**
* @return array
*/
public function getParameters()
{
return $this->parameters;
}
}
4 changes: 3 additions & 1 deletion lib/Thumper/Producer.php
Expand Up @@ -61,10 +61,12 @@ public function publish($messageBody, $routingKey = '')
}
$this->exchangeReady = true;
}

$this->setParameter('delivery_mode', self::PERSISTENT);

$message = new AMQPMessage(
$messageBody,
array('content_type' => 'text/plain', 'delivery_mode' => 2)
$this->getParameters()
);
$this->channel->basic_publish(
$message,
Expand Down
8 changes: 3 additions & 5 deletions lib/Thumper/RpcClient.php
Expand Up @@ -80,14 +80,12 @@ public function addRequest($messageBody, $server, $requestId, $routingKey = '')
if (empty($requestId)) {
throw new \InvalidArgumentException("You must provide a request ID");
}
$this->setParameter('correlation_id', $requestId);
$this->setParameter('reply_to', $this->queueName);

$message = new AMQPMessage(
$messageBody,
array(
'content_type' => 'text/plain',
'reply_to' => $this->queueName,
'correlation_id' => $requestId
)
$this->getParameters()
);

$this->channel
Expand Down
6 changes: 2 additions & 4 deletions lib/Thumper/RpcServer.php
Expand Up @@ -103,12 +103,10 @@ public function processMessage(AMQPMessage $message)
*/
protected function sendReply($result, $client, $correlationId)
{
$this->setParameter('correlation_id', $correlationId);
$reply = new AMQPMessage(
$result,
array(
'content_type' => 'text/plain',
'correlation_id' => $correlationId
)
$this->getParameters()
);
$this->channel
->basic_publish($reply, '', $client);
Expand Down

0 comments on commit 68aed1f

Please sign in to comment.