Skip to content

Loading…

fixed coding standard to be psr compliant #42

Merged
merged 2 commits into from

2 participants

@brikou

fixed cs using php cs fixer

@videlalvaro videlalvaro commented on an outdated diff
PhpAmqpLib/Channel/AMQPChannel.php
((6 lines not shown))
* @throws \InvalidArgumentException if $callback is not callable
*/
- public function set_return_listener($callback){
- if(!is_callable($callback))
- throw new \InvalidArgumentException('$callback should be callable.');
- $this->basic_return_callback = $callback;
+ public function set_return_listener($callback)
+ {
+ if(!is_callable($callback))
+ throw new \InvalidArgumentException('$callback should be callable.');
@videlalvaro Owner

Now that I see we need double quotes here right? Else $callback won't be in the output

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@videlalvaro
Owner

Would you mind fixing that small issue I've found that was there even before your changes? After that I press the button :)

@videlalvaro videlalvaro merged commit 8a393d2 into videlalvaro:master

1 check passed

Details default The Travis build passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
Showing with 317 additions and 277 deletions.
  1. +46 −35 PhpAmqpLib/Channel/AMQPChannel.php
  2. +74 −76 PhpAmqpLib/Channel/AbstractChannel.php
  3. +22 −17 PhpAmqpLib/Connection/AMQPConnection.php
  4. +2 −1 PhpAmqpLib/Connection/AMQPSSLConnection.php
  5. +1 −1 PhpAmqpLib/Exception/AMQPChannelException.php
  6. +1 −1 PhpAmqpLib/Exception/AMQPConnectionException.php
  7. +1 −1 PhpAmqpLib/Exception/AMQPException.php
  8. +6 −7 PhpAmqpLib/Helper/MiscHelper.php
  9. +21 −1 PhpAmqpLib/Helper/Protocol/FrameBuilder.php
  10. +1 −1 PhpAmqpLib/Message/AMQPMessage.php
  11. +1 −1 PhpAmqpLib/Tests/Functional/Bug40Test.php
  12. +1 −1 PhpAmqpLib/Tests/Functional/FileTransferTest.php
  13. +1 −1 PhpAmqpLib/Tests/Functional/PublishConsumeTest.php
  14. +1 −2 PhpAmqpLib/Tests/Unit/Helper/Protocol/FrameBuilderTest.php
  15. +7 −9 PhpAmqpLib/Tests/Unit/Wire/AMQPWriterTest.php
  16. +1 −3 PhpAmqpLib/Tests/Unit/WireTest.php
  17. +1 −1 PhpAmqpLib/Wire/AMQPDecimal.php
  18. +21 −14 PhpAmqpLib/Wire/AMQPReader.php
  19. +29 −16 PhpAmqpLib/Wire/AMQPWriter.php
  20. +11 −13 PhpAmqpLib/Wire/BufferedInput.php
  21. +12 −11 PhpAmqpLib/Wire/GenericContent.php
  22. +4 −5 benchmark/consumer.php
  23. +3 −4 benchmark/file_consume.php
  24. +0 −2 benchmark/file_publish.php
  25. +0 −2 benchmark/producer.php
  26. +4 −4 demo/amqp_consumer.php
  27. +7 −7 demo/amqp_consumer_exclusive.php
  28. +5 −5 demo/amqp_consumer_fanout_1.php
  29. +5 −5 demo/amqp_consumer_fanout_2.php
  30. +4 −4 demo/amqp_consumer_non_blocking.php
  31. +16 −16 demo/amqp_ha_consumer.php
  32. +0 −1 demo/amqp_publisher.php
  33. +0 −1 demo/amqp_publisher_exclusive.php
  34. +0 −1 demo/amqp_publisher_fanout.php
  35. +1 −1 demo/basic_get.php
  36. +3 −3 demo/basic_return.php
  37. +1 −1 demo/config.php
  38. +3 −2 demo/ssl_connection.php
View
81 PhpAmqpLib/Channel/AMQPChannel.php
@@ -36,14 +36,14 @@ class AMQPChannel extends AbstractChannel
"90,31" => "tx_rollback_ok"
);
/**
- *
- * @var callable these parameters will be passed to function
+ *
+ * @var callable these parameters will be passed to function
* in case of basic_return:
- * param int $reply_code
- * param string $reply_text
- * param string $exchange
- * param string $routing_key
- * param AMQPMessage $msg
+ * param int $reply_code
+ * param string $reply_text
+ * param string $exchange
+ * param string $routing_key
+ * param AMQPMessage $msg
*/
protected $basic_return_callback = null;
@@ -122,6 +122,7 @@ public function close($reply_code=0,
);
$this->send_method_frame(array(20, 40), $args);
+
return $this->wait(array(
"20,41" // Channel.close_ok
));
@@ -157,6 +158,7 @@ public function flow($active)
{
$args = $this->frameBuilder->flow($active);
$this->send_method_frame(array(20, 20), $args);
+
return $this->wait(array(
"20,21" //Channel.flow_ok
));
@@ -187,6 +189,7 @@ protected function x_open($out_of_band="")
$args = $this->frameBuilder->xOpen($out_of_band);
$this->send_method_frame(array(20, 10), $args);
+
return $this->wait(array(
"20,11" //Channel.open_ok
));
@@ -211,6 +214,7 @@ public function access_request($realm, $exclusive=false,
$passive, $active,
$write, $read);
$this->send_method_frame(array(30, 10), $args);
+
return $this->wait(array(
"30,11" //Channel.access_request_ok
));
@@ -222,6 +226,7 @@ public function access_request($realm, $exclusive=false,
protected function access_request_ok($args)
{
$this->default_ticket = $args->read_short();
+
return $this->default_ticket;
}
@@ -452,6 +457,7 @@ public function basic_cancel($consumer_tag, $nowait=false)
{
$args = $this->frameBuilder->basicCancel($consumer_tag, $nowait);
$this->send_method_frame(array(60, 30), $args);
+
return $this->wait(array(
"60,31" // Channel.basic_cancel_ok
));
@@ -487,6 +493,7 @@ public function basic_consume($queue="", $consumer_tag="", $no_local=false,
}
$this->callbacks[$consumer_tag] = $callback;
+
return $consumer_tag;
}
@@ -538,6 +545,7 @@ public function basic_get($queue="", $no_ack=false, $ticket=null)
$args = $this->frameBuilder->basicGet($queue, $no_ack, $ticket);
$this->send_method_frame(array(60, 70), $args);
+
return $this->wait(array(
"60,71", //Channel.basic_get_ok
"60,72" // Channel.basic_get_empty
@@ -570,6 +578,7 @@ protected function basic_get_ok($args, $msg)
"routing_key" => $routing_key,
"message_count" => $message_count
);
+
return $msg;
}
@@ -593,7 +602,6 @@ public function basic_publish($msg, $exchange="", $routing_key="",
$msg->body);
}
-
/**
* specify quality of service
*/
@@ -601,12 +609,12 @@ public function basic_qos($prefetch_size, $prefetch_count, $a_global)
{
$args = $this->frameBuilder->basicQos($prefetch_size, $prefetch_count, $a_global);
$this->send_method_frame(array(60, 10), $args);
+
return $this->wait(array(
- "60,11" //Channel.basic_qos_ok
- ));
+ "60,11" //Channel.basic_qos_ok
+ ));
}
-
/**
* confirm the requested qos
*/
@@ -641,25 +649,26 @@ protected function basic_return($args, $msg)
$reply_text = $args->read_shortstr();
$exchange = $args->read_shortstr();
$routing_key = $args->read_shortstr();
-
- if( !is_null($this->basic_return_callback )){
- call_user_func_array($this->basic_return_callback, array(
- $reply_code,
- $reply_text,
- $exchange,
- $routing_key,
- $msg,
- ));
- } else if ($this->debug) {
- MiscHelper::debug_msg("Skipping unhandled basic_return message");
+
+ if ( !is_null($this->basic_return_callback )) {
+ call_user_func_array($this->basic_return_callback, array(
+ $reply_code,
+ $reply_text,
+ $exchange,
+ $routing_key,
+ $msg,
+ ));
+ } elseif ($this->debug) {
+ MiscHelper::debug_msg("Skipping unhandled basic_return message");
}
}
public function tx_commit()
{
$this->send_method_frame(array(90, 20));
+
return $this->wait(array(
- "90,21" //Channel.tx_commit_ok
- ));
+ "90,21" //Channel.tx_commit_ok
+ ));
}
/**
@@ -669,16 +678,16 @@ protected function tx_commit_ok($args)
{
}
-
/**
* abandon the current transaction
*/
public function tx_rollback()
{
$this->send_method_frame(array(90, 30));
+
return $this->wait(array(
- "90,31" //Channel.tx_rollback_ok
- ));
+ "90,31" //Channel.tx_rollback_ok
+ ));
}
/**
@@ -694,9 +703,10 @@ protected function tx_rollback_ok($args)
public function tx_select()
{
$this->send_method_frame(array(90, 10));
+
return $this->wait(array(
- "90,11" //Channel.tx_select_ok
- ));
+ "90,11" //Channel.tx_select_ok
+ ));
}
/**
@@ -717,13 +727,14 @@ protected function getTicket($ticket)
}
/**
* set callback for basic_return
- * @param callable $callback
+ * @param callable $callback
* @throws \InvalidArgumentException if $callback is not callable
*/
- public function set_return_listener($callback){
- if(!is_callable($callback))
- throw new \InvalidArgumentException('$callback should be callable.');
- $this->basic_return_callback = $callback;
+ public function set_return_listener($callback)
+ {
+ if (!is_callable($callback)) {
+ throw new \InvalidArgumentException("$callback should be callable.");
+ }
+ $this->basic_return_callback = $callback;
}
-
}
View
150 PhpAmqpLib/Channel/AbstractChannel.php
@@ -9,17 +9,17 @@
class AbstractChannel
{
- /**
- * taken from http://www.rabbitmq.com/amqp-0-9-1-reference.html#constants
- * @var array
- */
- protected static $FRAME_TYPES = array(
- 1 => 'frame-method',
- 2 => 'frame-header',
- 3 => 'frame-body',
- 8 => 'frame-heartbeat',
- );
-
+ /**
+ * taken from http://www.rabbitmq.com/amqp-0-9-1-reference.html#constants
+ * @var array
+ */
+ protected static $FRAME_TYPES = array(
+ 1 => 'frame-method',
+ 2 => 'frame-header',
+ 3 => 'frame-body',
+ 8 => 'frame-heartbeat',
+ );
+
private static $CONTENT_METHODS = array(
"60,60", // Basic.deliver
"60,71", // Basic.get_ok
@@ -33,66 +33,66 @@ class AbstractChannel
// All the method names
public static $GLOBAL_METHOD_NAMES = array(
- "10,10" => "Connection.start",
- "10,11" => "Connection.start_ok",
- "10,20" => "Connection.secure",
- "10,21" => "Connection.secure_ok",
- "10,30" => "Connection.tune",
- "10,31" => "Connection.tune_ok",
- "10,40" => "Connection.open",
- "10,41" => "Connection.open_ok",
- "10,50" => "Connection.redirect",
- "10,60" => "Connection.close",
- "10,61" => "Connection.close_ok",
- "20,10" => "Channel.open",
- "20,11" => "Channel.open_ok",
- "20,20" => "Channel.flow",
- "20,21" => "Channel.flow_ok",
- "20,30" => "Channel.alert",
- "20,40" => "Channel.close",
- "20,41" => "Channel.close_ok",
- "30,10" => "Channel.access_request",
- "30,11" => "Channel.access_request_ok",
- "40,10" => "Channel.exchange_declare",
- "40,11" => "Channel.exchange_declare_ok",
- "40,20" => "Channel.exchange_delete",
- "40,21" => "Channel.exchange_delete_ok",
- "50,10" => "Channel.queue_declare",
- "50,11" => "Channel.queue_declare_ok",
- "50,20" => "Channel.queue_bind",
- "50,21" => "Channel.queue_bind_ok",
- "50,30" => "Channel.queue_purge",
- "50,31" => "Channel.queue_purge_ok",
- "50,40" => "Channel.queue_delete",
- "50,41" => "Channel.queue_delete_ok",
- "50,50" => "Channel.queue_unbind",
- "50,51" => "Channel.queue_unbind_ok",
- "60,10" => "Channel.basic_qos",
- "60,11" => "Channel.basic_qos_ok",
- "60,20" => "Channel.basic_consume",
- "60,21" => "Channel.basic_consume_ok",
- "60,30" => "Channel.basic_cancel",
- "60,31" => "Channel.basic_cancel_ok",
- "60,40" => "Channel.basic_publish",
- "60,50" => "Channel.basic_return",
- "60,60" => "Channel.basic_deliver",
- "60,70" => "Channel.basic_get",
- "60,71" => "Channel.basic_get_ok",
- "60,72" => "Channel.basic_get_empty",
- "60,80" => "Channel.basic_ack",
- "60,90" => "Channel.basic_reject",
- "60,100" => "Channel.basic_recover",
- "90,10" => "Channel.tx_select",
- "90,11" => "Channel.tx_select_ok",
- "90,20" => "Channel.tx_commit",
- "90,21" => "Channel.tx_commit_ok",
- "90,30" => "Channel.tx_rollback",
- "90,31" => "Channel.tx_rollback_ok"
+ "10,10" => "Connection.start",
+ "10,11" => "Connection.start_ok",
+ "10,20" => "Connection.secure",
+ "10,21" => "Connection.secure_ok",
+ "10,30" => "Connection.tune",
+ "10,31" => "Connection.tune_ok",
+ "10,40" => "Connection.open",
+ "10,41" => "Connection.open_ok",
+ "10,50" => "Connection.redirect",
+ "10,60" => "Connection.close",
+ "10,61" => "Connection.close_ok",
+ "20,10" => "Channel.open",
+ "20,11" => "Channel.open_ok",
+ "20,20" => "Channel.flow",
+ "20,21" => "Channel.flow_ok",
+ "20,30" => "Channel.alert",
+ "20,40" => "Channel.close",
+ "20,41" => "Channel.close_ok",
+ "30,10" => "Channel.access_request",
+ "30,11" => "Channel.access_request_ok",
+ "40,10" => "Channel.exchange_declare",
+ "40,11" => "Channel.exchange_declare_ok",
+ "40,20" => "Channel.exchange_delete",
+ "40,21" => "Channel.exchange_delete_ok",
+ "50,10" => "Channel.queue_declare",
+ "50,11" => "Channel.queue_declare_ok",
+ "50,20" => "Channel.queue_bind",
+ "50,21" => "Channel.queue_bind_ok",
+ "50,30" => "Channel.queue_purge",
+ "50,31" => "Channel.queue_purge_ok",
+ "50,40" => "Channel.queue_delete",
+ "50,41" => "Channel.queue_delete_ok",
+ "50,50" => "Channel.queue_unbind",
+ "50,51" => "Channel.queue_unbind_ok",
+ "60,10" => "Channel.basic_qos",
+ "60,11" => "Channel.basic_qos_ok",
+ "60,20" => "Channel.basic_consume",
+ "60,21" => "Channel.basic_consume_ok",
+ "60,30" => "Channel.basic_cancel",
+ "60,31" => "Channel.basic_cancel_ok",
+ "60,40" => "Channel.basic_publish",
+ "60,50" => "Channel.basic_return",
+ "60,60" => "Channel.basic_deliver",
+ "60,70" => "Channel.basic_get",
+ "60,71" => "Channel.basic_get_ok",
+ "60,72" => "Channel.basic_get_empty",
+ "60,80" => "Channel.basic_ack",
+ "60,90" => "Channel.basic_reject",
+ "60,100" => "Channel.basic_recover",
+ "90,10" => "Channel.tx_select",
+ "90,11" => "Channel.tx_select_ok",
+ "90,20" => "Channel.tx_commit",
+ "90,21" => "Channel.tx_commit_ok",
+ "90,30" => "Channel.tx_rollback",
+ "90,31" => "Channel.tx_rollback_ok"
);
-
+
protected $debug;
/**
- *
+ *
* @var AMQPConnection
*/
protected $connection;
@@ -113,7 +113,6 @@ public function getChannelId()
return $this->channel_id;
}
-
public function dispatch($method_sig, $args, $content)
{
if (!array_key_exists($method_sig, $this->method_map)) {
@@ -135,7 +134,7 @@ public function next_frame()
MiscHelper::debug_msg("waiting for a new frame");
}
- if(!empty($this->frame_queue)) {
+ if (!empty($this->frame_queue)) {
return array_shift($this->frame_queue);
}
@@ -174,7 +173,7 @@ public function wait_content()
if ($frame_type != 3) {
throw new \Exception("Expecting Content body, received frame type $frame_type ("
- .self::$FRAME_TYPES[$frame_type].")");
+ .self::$FRAME_TYPES[$frame_type].")");
}
$body_parts[] = $payload;
@@ -239,10 +238,10 @@ public function wait($allowed_methods=null, $non_blocking = false)
$frm = $this->next_frame();
$frame_type = $frm[0];
$payload = $frm[1];
-
+
if ($frame_type != 1) {
throw new \Exception("Expecting AMQP method, received frame type: $frame_type ("
- .self::$FRAME_TYPES[$frame_type].")");
+ .self::$FRAME_TYPES[$frame_type].")");
}
if (strlen($payload) < 4) {
@@ -256,8 +255,7 @@ public function wait($allowed_methods=null, $non_blocking = false)
if ($this->debug) {
MiscHelper::debug_msg("> $method_sig: " . self::$GLOBAL_METHOD_NAMES[MiscHelper::methodSig($method_sig)]);
}
-
-
+
if (in_array($method_sig, self::$CONTENT_METHODS)) {
$content = $this->wait_content();
} else {
@@ -283,4 +281,4 @@ public function wait($allowed_methods=null, $non_blocking = false)
}
}
-}
+}
View
39 PhpAmqpLib/Connection/AMQPConnection.php
@@ -31,12 +31,12 @@ class AMQPConnection extends AbstractChannel
* contructor parameters for clone
* @var array
*/
- protected $construct_params ;
- /**
- * close the connection in destructur
- * @var bool
- */
- protected $close_on_destruct = true ;
+ protected $construct_params ;
+ /**
+ * close the connection in destructur
+ * @var bool
+ */
+ protected $close_on_destruct = true ;
public function __construct($host, $port,
$user, $password,
@@ -48,7 +48,7 @@ public function __construct($host, $port,
$read_write_timeout = 3,
$context = null)
{
- $this->construct_params = func_get_args();
+ $this->construct_params = func_get_args();
if ($user && $password) {
$login_response = new AMQPWriter();
@@ -112,14 +112,14 @@ public function __construct($host, $port,
/**
* clossing will use the old properties to make a new connection to the same server
*/
- function __clone()
+ public function __clone()
{
- call_user_func_array(array($this, '__construct'), $this->construct_params);
+ call_user_func_array(array($this, '__construct'), $this->construct_params);
}
public function __destruct()
{
- if($this->close_on_destruct){
+ if ($this->close_on_destruct) {
if (isset($this->input) && $this->input) {
// close() always tries to connect to the server to shutdown
// the connection. If the server has gone away, it will
@@ -136,9 +136,10 @@ public function __destruct()
* it`s useful after the fork when you don`t want to close parent process connection
* @param bool $close
*/
- public function set_close_on_destruct($close = true){
- $this->close_on_destruct = (bool) $close;
- }
+ public function set_close_on_destruct($close = true)
+ {
+ $this->close_on_destruct = (bool) $close;
+ }
protected function close_socket()
{
@@ -146,7 +147,7 @@ protected function close_socket()
MiscHelper::debug_msg("closing socket");
}
- if(is_resource($this->sock)) {
+ if (is_resource($this->sock)) {
fclose($this->sock);
}
$this->sock = null;
@@ -214,7 +215,7 @@ public function send_content($channel, $class_id, $weight, $body_size,
$pkt = $pkt->getvalue();
$this->write($pkt);
- while($body) {
+ while ($body) {
$payload = substr($body,0, $this->frame_max-8);
$body = substr($body,$this->frame_max-8);
$pkt = new AMQPWriter();
@@ -289,7 +290,7 @@ protected function wait_channel($channel_id)
return array($frame_type, $payload);
}
- // Not the channel we were looking for. Queue this frame
+ // Not the channel we were looking for. Queue this frame
//for later, when the other channel is looking for frames.
array_push($this->channels[$frame_channel]->frame_queue,
array($frame_type, $payload));
@@ -315,6 +316,7 @@ public function channel($channel_id = null)
$channel_id = $channel_id ? $channel_id : $this->get_free_channel_id();
$ch = new AMQPChannel($this->connection, $channel_id);
$this->channels[$channel_id] = $ch;
+
return $ch;
}
}
@@ -330,6 +332,7 @@ public function close($reply_code=0, $reply_text="", $method_sig=array(0, 0))
$args->write_short($method_sig[0]); // class_id
$args->write_short($method_sig[1]); // method_id
$this->send_method_frame(array(10, 60), $args);
+
return $this->wait(array(
"10,61", // Connection.close_ok
));
@@ -354,6 +357,7 @@ public static function dump_table($table)
}
$tokens[] = $name . '=' . $val;
}
+
return implode(', ', $tokens);
}
@@ -395,6 +399,7 @@ protected function x_open($virtual_host, $capabilities="", $insist=false)
$args->write_shortstr($capabilities);
$args->write_bit($insist);
$this->send_method_frame(array(10, 40), $args);
+
return $this->wait(array(
"10,41", // Connection.open_ok
"10,50" // Connection.redirect
@@ -426,6 +431,7 @@ protected function redirect($args)
if ($this->debug) {
MiscHelper::debug_msg("Redirected to [". $host . "], known_hosts [" . $this->known_hosts . "]" );
}
+
return $host;
}
@@ -469,7 +475,6 @@ protected function start($args)
}
-
protected function x_start_ok($client_properties, $mechanism, $response, $locale)
{
$args = new AMQPWriter();
View
3 PhpAmqpLib/Connection/AMQPSSLConnection.php
@@ -27,6 +27,7 @@ private function create_ssl_context($options)
foreach ($options as $k => $v) {
stream_context_set_option($ssl_context, 'ssl', $k, $v);
}
+
return $ssl_context;
}
-}
+}
View
2 PhpAmqpLib/Exception/AMQPChannelException.php
@@ -10,4 +10,4 @@ public function __construct($reply_code, $reply_text, $method_sig)
{
parent::__construct($reply_code, $reply_text, $method_sig);
}
-}
+}
View
2 PhpAmqpLib/Exception/AMQPConnectionException.php
@@ -10,4 +10,4 @@ public function __construct($reply_code, $reply_text, $method_sig)
{
parent::__construct($reply_code, $reply_text, $method_sig);
}
-}
+}
View
2 PhpAmqpLib/Exception/AMQPException.php
@@ -29,4 +29,4 @@ public function __construct($reply_code, $reply_text, $method_sig)
$mn
);
}
-}
+}
View
13 PhpAmqpLib/Helper/MiscHelper.php
@@ -36,10 +36,10 @@ public static function saveBytes($bytes)
* @author Aidan Lister <aidan@php.net>
* @author Peter Waller <iridum@php.net>
* @link http://aidanlister.com/repos/v/function.hexdump.php
- * @param string $data The string to be dumped
- * @param bool $htmloutput Set to false for non-HTML output
- * @param bool $uppercase Set to true for uppercase hex
- * @param bool $return Set to true to return the dump
+ * @param string $data The string to be dumped
+ * @param bool $htmloutput Set to false for non-HTML output
+ * @param bool $uppercase Set to true for uppercase hex
+ * @param bool $return Set to true to return the dump
*/
public static function hexdump($data, $htmloutput = true, $uppercase = false, $return = false)
{
@@ -54,8 +54,7 @@ public static function hexdump($data, $htmloutput = true, $uppercase = false, $r
$x = ($uppercase === false) ? 'x' : 'X';
// Iterate string
- for ($i = $j = 0; $i < $len; $i++)
- {
+ for ($i = $j = 0; $i < $len; $i++) {
// Convert to hexidecimal
$hexi .= sprintf("%02$x ", ord($data[$i]));
@@ -104,4 +103,4 @@ public static function hexdump($data, $htmloutput = true, $uppercase = false, $r
return $dump;
}
}
-}
+}
View
22 PhpAmqpLib/Helper/Protocol/FrameBuilder.php
@@ -13,6 +13,7 @@ public function channelClose($reply_code, $reply_text, $class_id, $method_id)
->write_shortstr($reply_text)
->write_short($class_id)
->write_short($method_id);
+
return $args;
}
@@ -20,6 +21,7 @@ public function flow($active)
{
$args = new AMQPWriter();
$args->write_bit($active);
+
return $args;
}
@@ -27,6 +29,7 @@ public function xFlowOk($active)
{
$args = new AMQPWriter();
$args->write_bit($active);
+
return $args;
}
@@ -34,6 +37,7 @@ public function xOpen($out_of_band)
{
$args = new AMQPWriter();
$args->write_shortstr($out_of_band);
+
return $args;
}
@@ -51,6 +55,7 @@ public function accessRequest($realm,
->write_bit($active)
->write_bit($write)
->write_bit($read);
+
return $args;
}
@@ -74,6 +79,7 @@ public function exchangeDeclare($exchange,
->write_bit($internal)
->write_bit($nowait)
->write_table($arguments);
+
return $args;
}
@@ -84,6 +90,7 @@ public function exchangeDelete($exchange, $if_unused, $nowait, $ticket)
->write_shortstr($exchange)
->write_bit($if_unused)
->write_bit($nowait);
+
return $args;
}
@@ -97,6 +104,7 @@ public function queueBind($queue, $exchange, $routing_key, $nowait, $arguments,
->write_bit($nowait)
->write_table($arguments)
;
+
return $args;
}
@@ -109,6 +117,7 @@ public function queueUnbind($queue, $exchange, $routing_key, $arguments, $ticket
->write_shortstr($routing_key)
->write_table($arguments)
;
+
return $args;
}
@@ -130,6 +139,7 @@ public function queueDeclare($queue,
->write_bit($auto_delete)
->write_bit($nowait)
->write_table($arguments);
+
return $args;
}
@@ -142,6 +152,7 @@ public function queueDelete($queue, $if_unused, $if_empty, $nowait, $ticket)
->write_bit($if_empty)
->write_bit($nowait)
;
+
return $args;
}
@@ -152,6 +163,7 @@ public function queuePurge($queue, $nowait, $ticket)
->write_shortstr($queue)
->write_bit($nowait)
;
+
return $args;
}
@@ -161,6 +173,7 @@ public function basicAck($delivery_tag, $multiple)
$args->write_longlong($delivery_tag)
->write_bit($multiple)
;
+
return $args;
}
@@ -170,6 +183,7 @@ public function basicCancel($consumer_tag, $nowait)
$args->write_shortstr($consumer_tag)
->write_bit($nowait)
;
+
return $args;
}
@@ -185,6 +199,7 @@ public function basicConsume($queue, $consumer_tag, $no_local,
->write_bit($exclusive)
->write_bit($nowait)
;
+
return $args;
}
@@ -195,6 +210,7 @@ public function basicGet($queue, $no_ack, $ticket)
->write_shortstr($queue)
->write_bit($no_ack)
;
+
return $args;
}
@@ -207,6 +223,7 @@ public function basicPublish($exchange, $routing_key, $mandatory, $immediate, $t
->write_bit($mandatory)
->write_bit($immediate)
;
+
return $args;
}
@@ -217,6 +234,7 @@ public function basicQos($prefetch_size, $prefetch_count, $a_global)
->write_short($prefetch_count)
->write_bit($a_global)
;
+
return $args;
}
@@ -224,6 +242,7 @@ public function basicRecover($requeue)
{
$args = new AMQPWriter();
$args->write_bit($requeue);
+
return $args;
}
@@ -233,6 +252,7 @@ public function basicReject($delivery_tag, $requeue)
$args->write_longlong($delivery_tag)
->write_bit($requeue)
;
+
return $args;
}
-}
+}
View
2 PhpAmqpLib/Message/AMQPMessage.php
@@ -32,4 +32,4 @@ public function __construct($body = '', $properties = null)
parent::__construct($properties, static::$PROPERTIES);
}
-}
+}
View
2 PhpAmqpLib/Tests/Functional/Bug40Test.php
@@ -86,4 +86,4 @@ public function tearDown()
$this->ch->close();
$this->conn->close();
}
-}
+}
View
2 PhpAmqpLib/Tests/Functional/FileTransferTest.php
@@ -59,4 +59,4 @@ public function tearDown()
$this->ch->close();
$this->conn->close();
}
-}
+}
View
2 PhpAmqpLib/Tests/Functional/PublishConsumeTest.php
@@ -78,4 +78,4 @@ public function tearDown()
$this->ch->close();
$this->conn->close();
}
-}
+}
View
3 PhpAmqpLib/Tests/Unit/Helper/Protocol/FrameBuilderTest.php
@@ -103,7 +103,6 @@ public function testQueueUnbind()
$this->assertEquals($expected, $args->getvalue());
}
-
public function testQueueDeclare()
{
$expected = "\x00\x00\x03foo\x00\x00\x00\x00\x00";
@@ -193,4 +192,4 @@ public function testBasicReject()
$args = $this->frameBuilder->basicReject(1, false);
$this->assertEquals($expected, $args->getvalue());
}
-}
+}
View
16 PhpAmqpLib/Tests/Unit/Wire/AMQPWriterTest.php
@@ -2,37 +2,35 @@
namespace PhpAmqpLib\Tests\Unit;
-use PhpAmqpLib\Wire\AMQPReader;
use PhpAmqpLib\Wire\AMQPWriter;
-
class AMQPWriterTest extends \PHPUnit_Framework_TestCase
{
protected $_writer;
-
+
public function setUp()
{
$this->_writer = new AMQPWriter();
}
-
+
public function tearDown()
{
$this->_writer = null;
}
-
+
public function testWriteArray()
{
$this->_writer->write_array(array(
'rabbit@localhost',
'hare@localhost'
));
-
+
$out = $this->_writer->getvalue();
-
+
$this->assertEquals(44, strlen($out));
-
+
$expected = "\x00\x00\x00(S\x00\x00\x00\x10rabbit@localhostS\x00\x00\x00\x0Ehare@localhost";
-
+
$this->assertEquals($expected, $out);
}
}
View
4 PhpAmqpLib/Tests/Unit/WireTest.php
@@ -92,8 +92,6 @@ protected function longstrWriteRead($v)
$this->writeAndRead($v, 'write_longstr', 'read_longstr');
}
-
-
public function testLongLongWriteRead()
{
// First test with values represented as strings
@@ -121,4 +119,4 @@ protected function writeAndRead($v, $write_method, $read_method)
$r = new AMQPReader($w->getvalue());
$this->assertEquals($v, $r->{$read_method}());
}
-}
+}
View
2 PhpAmqpLib/Wire/AMQPDecimal.php
@@ -27,4 +27,4 @@ public function asBCvalue()
{
return bcdiv($this->n, bcpow(10,$this->e));
}
-}
+}
View
35 PhpAmqpLib/Wire/AMQPReader.php
@@ -12,18 +12,16 @@ class AMQPReader
public function __construct($str, $sock=null)
{
$this->str = $str;
- if ($sock !== null)
- {
+ if ($sock !== null) {
$this->sock = new BufferedInput($sock);
- } else
- {
+ } else {
$this->sock = null;
}
$this->offset = 0;
$this->bitcount = $this->bits = 0;
- if(((int)4294967296)!=0)
+ if(((int) 4294967296)!=0)
$this->is64bits = true;
else
$this->is64bits = false;
@@ -43,6 +41,7 @@ public function close()
public function read($n)
{
$this->bitcount = $this->bits = 0;
+
return $this->rawread($n);
}
@@ -58,7 +57,7 @@ private function rawread($n)
$res .= $buf;
}
- if(strlen($res)!=$n) {
+ if (strlen($res)!=$n) {
throw new \Exception("Error reading data. Received " .
strlen($res) . " instead of expected $n bytes");
}
@@ -72,19 +71,20 @@ private function rawread($n)
$this->str = substr($this->str,$n);
$this->offset += $n;
}
+
return $res;
}
public function read_bit()
{
- if(!$this->bitcount)
- {
+ if (!$this->bitcount) {
$this->bits = ord($this->rawread(1));
$this->bitcount = 8;
}
$result = ($this->bits & 1) == 1;
$this->bits >>= 1;
$this->bitcount -= 1;
+
return $result;
}
@@ -92,6 +92,7 @@ public function read_octet()
{
$this->bitcount = $this->bits = 0;
list(,$res) = unpack('C', $this->rawread(1));
+
return $res;
}
@@ -99,6 +100,7 @@ public function read_short()
{
$this->bitcount = $this->bits = 0;
list(,$res) = unpack('n', $this->rawread(2));
+
return $res;
}
@@ -116,10 +118,10 @@ public function read_short()
public function read_php_int()
{
list(,$res) = unpack('N', $this->rawread(4));
- if($this->is64bits)
- {
+ if ($this->is64bits) {
$sres = sprintf ( "%u", $res );
- return (int)$sres;
+
+ return (int) $sres;
} else {
return $res;
}
@@ -132,6 +134,7 @@ public function read_long()
$this->bitcount = $this->bits = 0;
list(,$res) = unpack('N', $this->rawread(4));
$sres = sprintf ( "%u", $res );
+
return $sres;
}
@@ -141,6 +144,7 @@ private function read_signed_long()
// In PHP unpack('N') always return signed value,
// on both 32 and 64 bit systems!
list(,$res) = unpack('N', $this->rawread(4));
+
return $res;
}
@@ -168,6 +172,7 @@ public function read_shortstr()
{
$this->bitcount = $this->bits = 0;
list(,$slen) = unpack('C', $this->rawread(1));
+
return $this->rawread($slen);
}
@@ -182,6 +187,7 @@ public function read_longstr()
$slen = $this->read_php_int();
if($slen<0)
throw new \Exception("Strings longer than supported on this platform");
+
return $this->rawread($slen);
}
@@ -212,6 +218,7 @@ public function read_table()
$val = $table_data->read_value($ftype);
$result[$name] = array($ftype,$val);
}
+
return $result;
}
@@ -230,7 +237,7 @@ public function read_array()
$result = array();
// Read values until we reach the end of the array
- while($this->offset < $endOffset) {
+ while ($this->offset < $endOffset) {
$fieldType = $this->rawread(1);
$result[] = $this->read_value($fieldType);
}
@@ -241,7 +248,7 @@ public function read_array()
/**
* Reads the next value as the provided field type.
*
- * @param string $fieldType the char field type
+ * @param string $fieldType the char field type
* @return mixed
*/
public function read_value($fieldType)
@@ -249,7 +256,7 @@ public function read_value($fieldType)
$this->bitcount = $this->bits = 0;
$val = NULL;
- switch($fieldType) {
+ switch ($fieldType) {
case 'S': // Long string
$val = $this->read_longstr();
break;
View
45 PhpAmqpLib/Wire/AMQPWriter.php
@@ -32,7 +32,7 @@ private static function bytesplit($x, $bytes)
while ($bytes > 0) {
$b = bcmod($x,'256');
- $res[] = (int)$b;
+ $res[] = (int) $b;
$x = bcdiv($x,'256', 0);
$bytes--;
}
@@ -61,6 +61,7 @@ private function flushbits()
public function getvalue()
{
$this->flushbits();
+
return $this->out;
}
@@ -71,6 +72,7 @@ public function write($s)
{
$this->flushbits();
$this->out .= $s;
+
return $this;
}
@@ -97,6 +99,7 @@ public function write_bit($b)
array_push($this->bits, $last);
$this->bitcount += 1;
+
return $this;
}
@@ -111,6 +114,7 @@ public function write_octet($n)
$this->flushbits();
$this->out .= chr($n);
+
return $this;
}
@@ -125,6 +129,7 @@ public function write_short($n)
$this->flushbits();
$this->out .= pack('n', $n);
+
return $this;
}
@@ -135,6 +140,7 @@ public function write_long($n)
{
$this->flushbits();
$this->out .= implode("", AMQPWriter::chrbytesplit($n,4));
+
return $this;
}
@@ -144,6 +150,7 @@ private function write_signed_long($n)
// although format spec for 'N' mentions unsigned
// it will deal with sinned integers as well. tested.
$this->out .= pack('N', $n);
+
return $this;
}
@@ -154,6 +161,7 @@ public function write_longlong($n)
{
$this->flushbits();
$this->out .= implode("", AMQPWriter::chrbytesplit($n,8));
+
return $this;
}
@@ -170,6 +178,7 @@ public function write_shortstr($s)
$this->write_octet(strlen($s));
$this->out .= $s;
+
return $this;
}
@@ -182,51 +191,54 @@ public function write_longstr($s)
$this->flushbits();
$this->write_long(strlen($s));
$this->out .= $s;
+
return $this;
}
-
+
/**
* Supports the writing of Array types, so that you can implement
* array methods, like Rabbitmq's HA parameters
- *
+ *
* @param array $a
- *
+ *
* @return self
*/
public function write_array($a)
{
$this->flushbits();
$data = new AMQPWriter();
-
+
foreach ($a as $v) {
if (is_string($v)) {
$data->write('S');
$data->write_longstr($v);
- } else if (is_int($v)) {
+ } elseif (is_int($v)) {
$data->write('I');
$data->write_signed_long($v);
- } else if ($v instanceof AMQPDecimal) {
+ } elseif ($v instanceof AMQPDecimal) {
$data->write('D');
$data->write_octet($v->e);
$data->write_signed_long($v->n);
- } else if (is_array($v)) {
+ } elseif (is_array($v)) {
$data->write('A');
$data->write_array($v);
}
}
-
+
$data = $data->getvalue();
$this->write_long(strlen($data));
$this->write($data);
+
return $this;
}
-
+
/**
* Write unix time_t value as 64 bit timestamp.
*/
public function write_timestamp($v)
{
$this->write_longlong($v);
+
return $this;
}
@@ -244,29 +256,30 @@ public function write_table($d)
if ($ftype=='S') {
$table_data->write('S');
$table_data->write_longstr($v);
- } else if ($ftype=='I') {
+ } elseif ($ftype=='I') {
$table_data->write('I');
$table_data->write_signed_long($v);
- } else if ($ftype=='D') {
+ } elseif ($ftype=='D') {
// 'D' type values are passed AMQPDecimal instances.
$table_data->write('D');
$table_data->write_octet($v->e);
$table_data->write_signed_long($v->n);
- } else if ($ftype=='T') {
+ } elseif ($ftype=='T') {
$table_data->write('T');
$table_data->write_timestamp($v);
- } else if ($ftype=='F') {
+ } elseif ($ftype=='F') {
$table_data->write('F');
$table_data->write_table($v);
- } else if ($ftype = 'A') {
+ } elseif ($ftype = 'A') {
$table_data->write('A');
$table_data->write_array($v);
}
}
-
+
$table_data = $table_data->getvalue();
$this->write_long(strlen($table_data));
$this->write($table_data);
+
return $this;
}
}
View
24 PhpAmqpLib/Wire/BufferedInput.php
@@ -20,13 +20,12 @@ public function real_sock()
public function read($n)
{
- if ($this->offset >= strlen($this->buffer))
- {
- if (!($rv = $this->populate_buffer()))
- {
+ if ($this->offset >= strlen($this->buffer)) {
+ if (!($rv = $this->populate_buffer())) {
return $rv;
}
}
+
return $this->read_buffer($n);
}
@@ -39,8 +38,7 @@ public function close()
private function read_buffer($n)
{
$n = min($n, strlen($this->buffer) - $this->offset);
- if ($n === 0)
- {
+ if ($n === 0) {
// substr("", 0, 0) => false, which screws up read loops that are
// expecting non-blocking reads to return "". This avoids that edge
// case when the buffer is empty/used up.
@@ -48,6 +46,7 @@ private function read_buffer($n)
}
$block = substr($this->buffer, $this->offset, $n);
$this->offset += $n;
+
return $block;
}
@@ -59,20 +58,19 @@ private function reset($block)
private function populate_buffer()
{
- if(feof($this->sock))
- {
+ if (feof($this->sock)) {
$this->reset("");
+
return false;
}
$block = fread($this->sock, $this->block_size);
- if ($block !== false)
- {
+ if ($block !== false) {
$this->reset($block);
+
return true;
- } else
- {
+ } else {
return $block;
}
}
-}
+}
View
23 PhpAmqpLib/Wire/GenericContent.php
@@ -29,7 +29,6 @@ public function __construct($props, $prop_types=null)
$this->properties = $d;
}
-
/**
* Look for additional properties in the 'properties' dictionary,
* and if present - the 'delivery_info' dictionary.
@@ -37,22 +36,25 @@ public function __construct($props, $prop_types=null)
public function get($name)
{
if(isset($this->properties[$name]))
+
return $this->properties[$name];
if(isset($this->delivery_info) && isset($this->delivery_info[$name]))
+
return $this->delivery_info[$name];
throw new \OutOfBoundsException("No '$name' property");
}
- /**
- * allows to set the property after creation of the object
- */
- public function set($name, $value){
- if(array_key_exists($name, $this->prop_types))
- $this->properties[$name] = $value;
- else
- throw new \OutOfBoundsException("No '$name' property");
+ /**
+ * allows to set the property after creation of the object
+ */
+ public function set($name, $value)
+ {
+ if(array_key_exists($name, $this->prop_types))
+ $this->properties[$name] = $value;
+ else
+ throw new \OutOfBoundsException("No '$name' property");
}
/**
* Given the raw bytes containing the property-flags and
@@ -92,7 +94,6 @@ public function load_properties($raw_bytes)
$this->properties = $d;
}
-
/**
* serialize the 'properties' attribute (a dictionary) into the
* raw bytes making up a set of property flags and a property
@@ -138,4 +139,4 @@ public function serialize_properties()
return $result->getvalue();
}
-}
+}
View
9 benchmark/consumer.php
@@ -2,7 +2,6 @@
include(__DIR__ . '/config.php');
use PhpAmqpLib\Connection\AMQPConnection;
-use PhpAmqpLib\Message\AMQPMessage;
$exchange = 'bench_exchange';
$queue = 'bench_queue';
@@ -15,14 +14,14 @@
$ch->exchange_declare($exchange, 'direct', false, false, false);
$ch->queue_bind($queue, $exchange);
-class Consumer
+class consumer
{
protected $msgCount = 0;
protected $startTime = null;
public function process_message($msg)
{
- if($this->startTime === null) {
+ if ($this->startTime === null) {
$this->startTime = microtime(true);
}
@@ -36,7 +35,8 @@ public function process_message($msg)
$ch->basic_consume($queue, '', false, true, false, false, array(new Consumer(), 'process_message'));
-function shutdown($ch, $conn){
+function shutdown($ch, $conn)
+{
$ch->close();
$conn->close();
}
@@ -45,4 +45,3 @@ function shutdown($ch, $conn){
while (count($ch->callbacks)) {
$ch->wait();
}
-?>
View
7 benchmark/file_consume.php
@@ -6,7 +6,6 @@
include(__DIR__ . '/config.php');
use PhpAmqpLib\Connection\AMQPConnection;
-use PhpAmqpLib\Message\AMQPMessage;
$exchange = 'file_exchange';
$queue = 'file_queue';
@@ -26,7 +25,7 @@ class Consumer
public function process_message($msg)
{
- if($this->startTime === null) {
+ if ($this->startTime === null) {
$this->startTime = microtime(true);
}
@@ -40,7 +39,8 @@ public function process_message($msg)
$ch->basic_consume($queue, '', false, true, false, false, array(new Consumer(), 'process_message'));
-function shutdown($ch, $conn){
+function shutdown($ch, $conn)
+{
$ch->close();
$conn->close();
}
@@ -49,4 +49,3 @@ function shutdown($ch, $conn){
while (count($ch->callbacks)) {
$ch->wait();
}
-?>
View
2 benchmark/file_publish.php
@@ -62,5 +62,3 @@ function generate_random_content($bytes)
$ch->close();
$conn->close();
-
-?>
View
2 benchmark/producer.php
@@ -23,7 +23,6 @@
$ch->queue_bind($queue, $exchange);
-
$msg_body = <<<EOT
abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz
abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz
@@ -54,4 +53,3 @@
$ch->close();
$conn->close();
-?>
View
8 demo/amqp_consumer.php
@@ -37,8 +37,8 @@
$ch->queue_bind($queue, $exchange);
-function process_message($msg) {
-
+function process_message($msg)
+{
echo "\n--------\n";
echo $msg->body;
echo "\n--------\n";
@@ -65,7 +65,8 @@ function process_message($msg) {
$ch->basic_consume($queue, $consumer_tag, false, false, false, false, 'process_message');
-function shutdown($ch, $conn){
+function shutdown($ch, $conn)
+{
$ch->close();
$conn->close();
}
@@ -75,4 +76,3 @@ function shutdown($ch, $conn){
while (count($ch->callbacks)) {
$ch->wait();
}
-?>
View
14 demo/amqp_consumer_exclusive.php
@@ -16,7 +16,7 @@
$ch = $conn->channel();
/*
- name: $queue // should be unique in fanout exchange. Let RabbitMQ create
+ name: $queue // should be unique in fanout exchange. Let RabbitMQ create
// a queue name for us
passive: false // don't check if a queue with the same name exists
durable: false // the queue will not survive server restarts
@@ -37,8 +37,8 @@
$ch->queue_bind($queue_name, $exchange);
-function process_message($msg) {
-
+function process_message($msg)
+{
echo "\n--------\n";
echo $msg->body;
echo "\n--------\n";
@@ -59,21 +59,21 @@ function process_message($msg) {
no_local: Don't receive messages published by this consumer.
no_ack: Tells the server if the consumer will acknowledge the messages.
exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
- nowait: don't wait for a server response. In case of error the server will raise a channel
+ nowait: don't wait for a server response. In case of error the server will raise a channel
exception
callback: A PHP Callback
*/
$ch->basic_consume($queue_name, $consumer_tag, false, false, true, false, 'process_message');
-function shutdown($ch, $conn){
+function shutdown($ch, $conn)
+{
$ch->close();
$conn->close();
}
register_shutdown_function('shutdown', $ch, $conn);
// Loop as long as the channel has callbacks registered
-while(count($ch->callbacks)) {
+while (count($ch->callbacks)) {
$ch->wait();
}
-?>
View
10 demo/amqp_consumer_fanout_1.php
@@ -34,8 +34,8 @@
$ch->queue_bind($queue, $exchange);
-function process_message($msg) {
-
+function process_message($msg)
+{
echo "\n--------\n";
echo $msg->body;
echo "\n--------\n";
@@ -63,14 +63,14 @@ function process_message($msg) {
$ch->basic_consume($queue, $consumer_tag, false, false, false, false, 'process_message');
-function shutdown($ch, $conn){
+function shutdown($ch, $conn)
+{
$ch->close();
$conn->close();
}
register_shutdown_function('shutdown', $ch, $conn);
// Loop as long as the channel has callbacks registered
-while(count($ch->callbacks)) {
+while (count($ch->callbacks)) {
$ch->wait();
}
-?>
View
10 demo/amqp_consumer_fanout_2.php
@@ -34,8 +34,8 @@
$ch->queue_bind($queue, $exchange);
-function process_message($msg) {
-
+function process_message($msg)
+{
echo "\n--------\n";
echo $msg->body;
echo "\n--------\n";
@@ -63,14 +63,14 @@ function process_message($msg) {
$ch->basic_consume($queue, $consumer_tag, false, false, false, false, 'process_message');
-function shutdown($ch, $conn){
+function shutdown($ch, $conn)
+{
$ch->close();
$conn->close();
}
register_shutdown_function('shutdown', $ch, $conn);
// Loop as long as the channel has callbacks registered
-while(count($ch->callbacks)) {
+while (count($ch->callbacks)) {
$ch->wait();
}
-?>
View
8 demo/amqp_consumer_non_blocking.php
@@ -37,8 +37,8 @@
$ch->queue_bind($queue, $exchange);
-function process_message($msg) {
-
+function process_message($msg)
+{
echo "\n--------\n";
echo $msg->body;