Skip to content

Commit

Permalink
Add support for connection.blocked and connection.unblocked
Browse files Browse the repository at this point in the history
  • Loading branch information
Eric Berg committed Nov 21, 2013
1 parent a095c5e commit 49091e5
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 11 deletions.
11 changes: 2 additions & 9 deletions PhpAmqpLib/Channel/AMQPChannel.php
Original file line number Diff line number Diff line change
Expand Up @@ -607,13 +607,6 @@ function ($keys, $key) use ($value) {
return $keys;
}

protected function dispatch_to_handler($handler, array $arguments)
{
if (is_callable($handler)) {
call_user_func_array($handler, $arguments);
}
}

/**
* reject one or several received messages.
*/
Expand Down Expand Up @@ -1059,7 +1052,7 @@ public function set_return_listener($callback)
*
* @param callable $callback
*/
public function set_nack_handler(Callable $callback)
public function set_nack_handler($callback)
{
$this->nack_handler = $callback;
}
Expand All @@ -1069,7 +1062,7 @@ public function set_nack_handler(Callable $callback)
*
* @param callable $callback
*/
public function set_ack_handler(Callable $callback)
public function set_ack_handler($callback)
{
$this->ack_handler = $callback;
}
Expand Down
7 changes: 7 additions & 0 deletions PhpAmqpLib/Channel/AbstractChannel.php
Original file line number Diff line number Diff line change
Expand Up @@ -272,4 +272,11 @@ public function wait($allowed_methods=null, $non_blocking = false, $timeout = 0)
};
}
}

protected function dispatch_to_handler($handler, array $arguments)
{
if (is_callable($handler)) {
call_user_func_array($handler, $arguments);
}
}
}
58 changes: 57 additions & 1 deletion PhpAmqpLib/Connection/AbstractConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,26 @@ class AbstractConnection extends AbstractChannel
'publisher_confirms' => array('t', true),
'consumer_cancel_notify' => array('t', true),
'exchange_exchange_bindings' => array('t', true),
'basic.nack' => array('t', true)
'basic.nack' => array('t', true),
'connection.blocked' => array('t', true)
)
)
);

/**
* Handles connection blocking from the server
*
* @var Callable
*/
private $connection_block_handler = null;

/**
* Handles connection unblocking from the server
*
* @var Callable
*/
private $connection_unblock_handler = null;

/**
* contructor parameters for clone
* @var array
Expand Down Expand Up @@ -596,4 +611,45 @@ protected function getIO() {
return $this->io;
}

/**
* Handles connection blocked notifications
*
* @param AMQPReader $args
*/
protected function connection_blocked(\PhpAmqpLib\Wire\AMQPReader $args)
{
// Call the block handler and pass in the reason
$this->dispatch_to_handler($this->connection_block_handler, array($args->read_shortstr()));
}

/**
* Handles connection unblocked notifications
*
* @param AMQPReader $args
*/
protected function connection_unblocked(\PhpAmqpLib\Wire\AMQPReader $args)
{
// No args to an unblock event
$this->dispatch_to_handler($this->connection_unblock_handler, array());
}

/**
* Sets a handler which is called whenever a connection.block is sent from the server
*
* @param callable $callback
*/
public function set_connection_block_handler($callback)
{
$this->connection_block_handler = $callback;
}

/**
* Sets a handler which is called whenever a connection.block is sent from the server
*
* @param callable $callback
*/
public function set_connection_unblock_handler($callback)
{
$this->connection_unblock_handler = $callback;
}
}
2 changes: 2 additions & 0 deletions PhpAmqpLib/Helper/Protocol/MethodMap091.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class MethodMap091
'10,41' => 'connection_open_ok',
'10,50' => 'connection_close',
'10,51' => 'connection_close_ok',
'10,60' => 'connection_blocked',
'10,61' => 'connection_unblocked',
'20,10' => 'channel_open',
'20,11' => 'channel_open_ok',
'20,20' => 'channel_flow',
Expand Down
11 changes: 11 additions & 0 deletions PhpAmqpLib/Helper/Protocol/Protocol091.php
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,17 @@ public static function connectionCloseOk($args) {
return $ret;
}

public function connectionBlocked($reason = '') {
$args = new AMQPWriter();
$args->write_shortstr($reason);
return array(10, 60, $args);
}

public static function connectionUnblocked($args) {
$ret = array();
return $ret;
}

public function channelOpen($out_of_band = '') {
$args = new AMQPWriter();
$args->write_shortstr($out_of_band);
Expand Down
2 changes: 2 additions & 0 deletions PhpAmqpLib/Helper/Protocol/Wait091.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class Wait091
'connection.open_ok' => '10,41',
'connection.close' => '10,50',
'connection.close_ok' => '10,51',
'connection.blocked' => '10,60',
'connection.unblocked' => '10,61',
'channel.open' => '20,10',
'channel.open_ok' => '20,11',
'channel.flow' => '20,20',
Expand Down
10 changes: 10 additions & 0 deletions PhpAmqpLib/Tests/Unit/Helper/Protocol/Protocol091Test.php
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,14 @@ public function testBasicReject()
list($class_id, $method_id, $args) = $this->protocol091->basicReject(1, false);
$this->assertEquals($expected, $args->getvalue());
}

public function testConnectionBlocked()
{
$expected = 'Low on memory';
list($class_id, $method_id, $args) = $this->protocol091->connectionBlocked($expected);

$this->assertEquals($class_id, 10);
$this->assertEquals($method_id, 60);
$this->assertEquals($expected, trim($args->getValue()));
}
}
2 changes: 2 additions & 0 deletions PhpAmqpLib/Wire/Constants091.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class Constants091
'10,41' => 'Connection.open_ok',
'10,50' => 'Connection.close',
'10,51' => 'Connection.close_ok',
'10,60' => 'Connection.blocked',
'10,61' => 'Connection.unblocked',
'20,10' => 'Channel.open',
'20,11' => 'Channel.open_ok',
'20,20' => 'Channel.flow',
Expand Down
8 changes: 7 additions & 1 deletion spec/amqp-rabbitmq-0.9.1.json
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,13 @@
"synchronous" : true},
{"id": 51,
"arguments": [],
"name": "close-ok"}],
"name": "close-ok"},
{"id": 60,
"arguments": [{"type": "shortstr", "name": "reason", "default-value": ""}],
"name": "blocked"},
{"id": 61,
"arguments": [],
"name": "unblocked"}],
"name": "connection",
"properties": []
},
Expand Down

0 comments on commit 49091e5

Please sign in to comment.