Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #42 from brikou/dev_psr

fixed coding standard to be psr compliant
commit 8a393d2e1c041b31186196cc67a17a880153ecea 2 parents a0c3be4 + 12172d2
@videlalvaro authored
Showing with 317 additions and 277 deletions.
  1. +46 −35 PhpAmqpLib/Channel/AMQPChannel.php
  2. +74 −76 PhpAmqpLib/Channel/AbstractChannel.php
  3. +22 −17 PhpAmqpLib/Connection/AMQPConnection.php
  4. +2 −1  PhpAmqpLib/Connection/AMQPSSLConnection.php
  5. +1 −1  PhpAmqpLib/Exception/AMQPChannelException.php
  6. +1 −1  PhpAmqpLib/Exception/AMQPConnectionException.php
  7. +1 −1  PhpAmqpLib/Exception/AMQPException.php
  8. +6 −7 PhpAmqpLib/Helper/MiscHelper.php
  9. +21 −1 PhpAmqpLib/Helper/Protocol/FrameBuilder.php
  10. +1 −1  PhpAmqpLib/Message/AMQPMessage.php
  11. +1 −1  PhpAmqpLib/Tests/Functional/Bug40Test.php
  12. +1 −1  PhpAmqpLib/Tests/Functional/FileTransferTest.php
  13. +1 −1  PhpAmqpLib/Tests/Functional/PublishConsumeTest.php
  14. +1 −2  PhpAmqpLib/Tests/Unit/Helper/Protocol/FrameBuilderTest.php
  15. +7 −9 PhpAmqpLib/Tests/Unit/Wire/AMQPWriterTest.php
  16. +1 −3 PhpAmqpLib/Tests/Unit/WireTest.php
  17. +1 −1  PhpAmqpLib/Wire/AMQPDecimal.php
  18. +21 −14 PhpAmqpLib/Wire/AMQPReader.php
  19. +29 −16 PhpAmqpLib/Wire/AMQPWriter.php
  20. +11 −13 PhpAmqpLib/Wire/BufferedInput.php
  21. +12 −11 PhpAmqpLib/Wire/GenericContent.php
  22. +4 −5 benchmark/consumer.php
  23. +3 −4 benchmark/file_consume.php
  24. +0 −2  benchmark/file_publish.php
  25. +0 −2  benchmark/producer.php
  26. +4 −4 demo/amqp_consumer.php
  27. +7 −7 demo/amqp_consumer_exclusive.php
  28. +5 −5 demo/amqp_consumer_fanout_1.php
  29. +5 −5 demo/amqp_consumer_fanout_2.php
  30. +4 −4 demo/amqp_consumer_non_blocking.php
  31. +16 −16 demo/amqp_ha_consumer.php
  32. +0 −1  demo/amqp_publisher.php
  33. +0 −1  demo/amqp_publisher_exclusive.php
  34. +0 −1  demo/amqp_publisher_fanout.php
  35. +1 −1  demo/basic_get.php
  36. +3 −3 demo/basic_return.php
  37. +1 −1  demo/config.php
  38. +3 −2 demo/ssl_connection.php
View
81 PhpAmqpLib/Channel/AMQPChannel.php
@@ -36,14 +36,14 @@ class AMQPChannel extends AbstractChannel
"90,31" => "tx_rollback_ok"
);
/**
- *
- * @var callable these parameters will be passed to function
+ *
+ * @var callable these parameters will be passed to function
* in case of basic_return:
- * param int $reply_code
- * param string $reply_text
- * param string $exchange
- * param string $routing_key
- * param AMQPMessage $msg
+ * param int $reply_code
+ * param string $reply_text
+ * param string $exchange
+ * param string $routing_key
+ * param AMQPMessage $msg
*/
protected $basic_return_callback = null;
@@ -122,6 +122,7 @@ public function close($reply_code=0,
);
$this->send_method_frame(array(20, 40), $args);
+
return $this->wait(array(
"20,41" // Channel.close_ok
));
@@ -157,6 +158,7 @@ public function flow($active)
{
$args = $this->frameBuilder->flow($active);
$this->send_method_frame(array(20, 20), $args);
+
return $this->wait(array(
"20,21" //Channel.flow_ok
));
@@ -187,6 +189,7 @@ protected function x_open($out_of_band="")
$args = $this->frameBuilder->xOpen($out_of_band);
$this->send_method_frame(array(20, 10), $args);
+
return $this->wait(array(
"20,11" //Channel.open_ok
));
@@ -211,6 +214,7 @@ public function access_request($realm, $exclusive=false,
$passive, $active,
$write, $read);
$this->send_method_frame(array(30, 10), $args);
+
return $this->wait(array(
"30,11" //Channel.access_request_ok
));
@@ -222,6 +226,7 @@ public function access_request($realm, $exclusive=false,
protected function access_request_ok($args)
{
$this->default_ticket = $args->read_short();
+
return $this->default_ticket;
}
@@ -452,6 +457,7 @@ public function basic_cancel($consumer_tag, $nowait=false)
{
$args = $this->frameBuilder->basicCancel($consumer_tag, $nowait);
$this->send_method_frame(array(60, 30), $args);
+
return $this->wait(array(
"60,31" // Channel.basic_cancel_ok
));
@@ -487,6 +493,7 @@ public function basic_consume($queue="", $consumer_tag="", $no_local=false,
}
$this->callbacks[$consumer_tag] = $callback;
+
return $consumer_tag;
}
@@ -538,6 +545,7 @@ public function basic_get($queue="", $no_ack=false, $ticket=null)
$args = $this->frameBuilder->basicGet($queue, $no_ack, $ticket);
$this->send_method_frame(array(60, 70), $args);
+
return $this->wait(array(
"60,71", //Channel.basic_get_ok
"60,72" // Channel.basic_get_empty
@@ -570,6 +578,7 @@ protected function basic_get_ok($args, $msg)
"routing_key" => $routing_key,
"message_count" => $message_count
);
+
return $msg;
}
@@ -593,7 +602,6 @@ public function basic_publish($msg, $exchange="", $routing_key="",
$msg->body);
}
-
/**
* specify quality of service
*/
@@ -601,12 +609,12 @@ public function basic_qos($prefetch_size, $prefetch_count, $a_global)
{
$args = $this->frameBuilder->basicQos($prefetch_size, $prefetch_count, $a_global);
$this->send_method_frame(array(60, 10), $args);
+
return $this->wait(array(
- "60,11" //Channel.basic_qos_ok
- ));
+ "60,11" //Channel.basic_qos_ok
+ ));
}
-
/**
* confirm the requested qos
*/
@@ -641,25 +649,26 @@ protected function basic_return($args, $msg)
$reply_text = $args->read_shortstr();
$exchange = $args->read_shortstr();
$routing_key = $args->read_shortstr();
-
- if( !is_null($this->basic_return_callback )){
- call_user_func_array($this->basic_return_callback, array(
- $reply_code,
- $reply_text,
- $exchange,
- $routing_key,
- $msg,
- ));
- } else if ($this->debug) {
- MiscHelper::debug_msg("Skipping unhandled basic_return message");
+
+ if ( !is_null($this->basic_return_callback )) {
+ call_user_func_array($this->basic_return_callback, array(
+ $reply_code,
+ $reply_text,
+ $exchange,
+ $routing_key,
+ $msg,
+ ));
+ } elseif ($this->debug) {
+ MiscHelper::debug_msg("Skipping unhandled basic_return message");
}
}
public function tx_commit()
{
$this->send_method_frame(array(90, 20));
+
return $this->wait(array(
- "90,21" //Channel.tx_commit_ok
- ));
+ "90,21" //Channel.tx_commit_ok
+ ));
}
/**
@@ -669,16 +678,16 @@ protected function tx_commit_ok($args)
{
}
-
/**
* abandon the current transaction
*/
public function tx_rollback()
{
$this->send_method_frame(array(90, 30));
+
return $this->wait(array(
- "90,31" //Channel.tx_rollback_ok
- ));
+ "90,31" //Channel.tx_rollback_ok
+ ));
}
/**
@@ -694,9 +703,10 @@ protected function tx_rollback_ok($args)
public function tx_select()
{
$this->send_method_frame(array(90, 10));
+
return $this->wait(array(
- "90,11" //Channel.tx_select_ok
- ));
+ "90,11" //Channel.tx_select_ok
+ ));
}
/**
@@ -717,13 +727,14 @@ protected function getTicket($ticket)
}
/**
* set callback for basic_return
- * @param callable $callback
+ * @param callable $callback
* @throws \InvalidArgumentException if $callback is not callable
*/
- public function set_return_listener($callback){
- if(!is_callable($callback))
- throw new \InvalidArgumentException('$callback should be callable.');
- $this->basic_return_callback = $callback;
+ public function set_return_listener($callback)
+ {
+ if (!is_callable($callback)) {
+ throw new \InvalidArgumentException("$callback should be callable.");
+ }
+ $this->basic_return_callback = $callback;
}
-
}
View
150 PhpAmqpLib/Channel/AbstractChannel.php
@@ -9,17 +9,17 @@
class AbstractChannel
{
- /**
- * taken from http://www.rabbitmq.com/amqp-0-9-1-reference.html#constants
- * @var array
- */
- protected static $FRAME_TYPES = array(
- 1 => 'frame-method',
- 2 => 'frame-header',
- 3 => 'frame-body',
- 8 => 'frame-heartbeat',
- );
-
+ /**
+ * taken from http://www.rabbitmq.com/amqp-0-9-1-reference.html#constants
+ * @var array
+ */
+ protected static $FRAME_TYPES = array(
+ 1 => 'frame-method',
+ 2 => 'frame-header',
+ 3 => 'frame-body',
+ 8 => 'frame-heartbeat',
+ );
+
private static $CONTENT_METHODS = array(
"60,60", // Basic.deliver
"60,71", // Basic.get_ok
@@ -33,66 +33,66 @@ class AbstractChannel
// All the method names
public static $GLOBAL_METHOD_NAMES = array(
- "10,10" => "Connection.start",
- "10,11" => "Connection.start_ok",
- "10,20" => "Connection.secure",
- "10,21" => "Connection.secure_ok",
- "10,30" => "Connection.tune",
- "10,31" => "Connection.tune_ok",
- "10,40" => "Connection.open",
- "10,41" => "Connection.open_ok",
- "10,50" => "Connection.redirect",
- "10,60" => "Connection.close",
- "10,61" => "Connection.close_ok",
- "20,10" => "Channel.open",
- "20,11" => "Channel.open_ok",
- "20,20" => "Channel.flow",
- "20,21" => "Channel.flow_ok",
- "20,30" => "Channel.alert",
- "20,40" => "Channel.close",
- "20,41" => "Channel.close_ok",
- "30,10" => "Channel.access_request",
- "30,11" => "Channel.access_request_ok",
- "40,10" => "Channel.exchange_declare",
- "40,11" => "Channel.exchange_declare_ok",
- "40,20" => "Channel.exchange_delete",
- "40,21" => "Channel.exchange_delete_ok",
- "50,10" => "Channel.queue_declare",
- "50,11" => "Channel.queue_declare_ok",
- "50,20" => "Channel.queue_bind",
- "50,21" => "Channel.queue_bind_ok",
- "50,30" => "Channel.queue_purge",
- "50,31" => "Channel.queue_purge_ok",
- "50,40" => "Channel.queue_delete",
- "50,41" => "Channel.queue_delete_ok",
- "50,50" => "Channel.queue_unbind",
- "50,51" => "Channel.queue_unbind_ok",
- "60,10" => "Channel.basic_qos",
- "60,11" => "Channel.basic_qos_ok",
- "60,20" => "Channel.basic_consume",
- "60,21" => "Channel.basic_consume_ok",
- "60,30" => "Channel.basic_cancel",
- "60,31" => "Channel.basic_cancel_ok",
- "60,40" => "Channel.basic_publish",
- "60,50" => "Channel.basic_return",
- "60,60" => "Channel.basic_deliver",
- "60,70" => "Channel.basic_get",
- "60,71" => "Channel.basic_get_ok",
- "60,72" => "Channel.basic_get_empty",
- "60,80" => "Channel.basic_ack",
- "60,90" => "Channel.basic_reject",
- "60,100" => "Channel.basic_recover",
- "90,10" => "Channel.tx_select",
- "90,11" => "Channel.tx_select_ok",
- "90,20" => "Channel.tx_commit",
- "90,21" => "Channel.tx_commit_ok",
- "90,30" => "Channel.tx_rollback",
- "90,31" => "Channel.tx_rollback_ok"
+ "10,10" => "Connection.start",
+ "10,11" => "Connection.start_ok",
+ "10,20" => "Connection.secure",
+ "10,21" => "Connection.secure_ok",
+ "10,30" => "Connection.tune",
+ "10,31" => "Connection.tune_ok",
+ "10,40" => "Connection.open",
+ "10,41" => "Connection.open_ok",
+ "10,50" => "Connection.redirect",
+ "10,60" => "Connection.close",
+ "10,61" => "Connection.close_ok",
+ "20,10" => "Channel.open",
+ "20,11" => "Channel.open_ok",
+ "20,20" => "Channel.flow",
+ "20,21" => "Channel.flow_ok",
+ "20,30" => "Channel.alert",
+ "20,40" => "Channel.close",
+ "20,41" => "Channel.close_ok",
+ "30,10" => "Channel.access_request",
+ "30,11" => "Channel.access_request_ok",
+ "40,10" => "Channel.exchange_declare",
+ "40,11" => "Channel.exchange_declare_ok",
+ "40,20" => "Channel.exchange_delete",
+ "40,21" => "Channel.exchange_delete_ok",
+ "50,10" => "Channel.queue_declare",
+ "50,11" => "Channel.queue_declare_ok",
+ "50,20" => "Channel.queue_bind",
+ "50,21" => "Channel.queue_bind_ok",
+ "50,30" => "Channel.queue_purge",
+ "50,31" => "Channel.queue_purge_ok",
+ "50,40" => "Channel.queue_delete",
+ "50,41" => "Channel.queue_delete_ok",
+ "50,50" => "Channel.queue_unbind",
+ "50,51" => "Channel.queue_unbind_ok",
+ "60,10" => "Channel.basic_qos",
+ "60,11" => "Channel.basic_qos_ok",
+ "60,20" => "Channel.basic_consume",
+ "60,21" => "Channel.basic_consume_ok",
+ "60,30" => "Channel.basic_cancel",
+ "60,31" => "Channel.basic_cancel_ok",
+ "60,40" => "Channel.basic_publish",
+ "60,50" => "Channel.basic_return",
+ "60,60" => "Channel.basic_deliver",
+ "60,70" => "Channel.basic_get",
+ "60,71" => "Channel.basic_get_ok",
+ "60,72" => "Channel.basic_get_empty",
+ "60,80" => "Channel.basic_ack",
+ "60,90" => "Channel.basic_reject",
+ "60,100" => "Channel.basic_recover",
+ "90,10" => "Channel.tx_select",
+ "90,11" => "Channel.tx_select_ok",
+ "90,20" => "Channel.tx_commit",
+ "90,21" => "Channel.tx_commit_ok",
+ "90,30" => "Channel.tx_rollback",
+ "90,31" => "Channel.tx_rollback_ok"
);
-
+
protected $debug;
/**
- *
+ *
* @var AMQPConnection
*/
protected $connection;
@@ -113,7 +113,6 @@ public function getChannelId()
return $this->channel_id;
}
-
public function dispatch($method_sig, $args, $content)
{
if (!array_key_exists($method_sig, $this->method_map)) {
@@ -135,7 +134,7 @@ public function next_frame()
MiscHelper::debug_msg("waiting for a new frame");
}
- if(!empty($this->frame_queue)) {
+ if (!empty($this->frame_queue)) {
return array_shift($this->frame_queue);
}
@@ -174,7 +173,7 @@ public function wait_content()
if ($frame_type != 3) {
throw new \Exception("Expecting Content body, received frame type $frame_type ("
- .self::$FRAME_TYPES[$frame_type].")");
+ .self::$FRAME_TYPES[$frame_type].")");
}
$body_parts[] = $payload;
@@ -239,10 +238,10 @@ public function wait($allowed_methods=null, $non_blocking = false)
$frm = $this->next_frame();
$frame_type = $frm[0];
$payload = $frm[1];
-
+
if ($frame_type != 1) {
throw new \Exception("Expecting AMQP method, received frame type: $frame_type ("
- .self::$FRAME_TYPES[$frame_type].")");
+ .self::$FRAME_TYPES[$frame_type].")");
}
if (strlen($payload) < 4) {
@@ -256,8 +255,7 @@ public function wait($allowed_methods=null, $non_blocking = false)
if ($this->debug) {
MiscHelper::debug_msg("> $method_sig: " . self::$GLOBAL_METHOD_NAMES[MiscHelper::methodSig($method_sig)]);
}
-
-
+
if (in_array($method_sig, self::$CONTENT_METHODS)) {
$content = $this->wait_content();
} else {
@@ -283,4 +281,4 @@ public function wait($allowed_methods=null, $non_blocking = false)
}
}
-}
+}
View
39 PhpAmqpLib/Connection/AMQPConnection.php
@@ -31,12 +31,12 @@ class AMQPConnection extends AbstractChannel
* contructor parameters for clone
* @var array
*/
- protected $construct_params ;
- /**
- * close the connection in destructur
- * @var bool
- */
- protected $close_on_destruct = true ;
+ protected $construct_params ;
+ /**
+ * close the connection in destructur
+ * @var bool
+ */
+ protected $close_on_destruct = true ;
public function __construct($host, $port,
$user, $password,
@@ -48,7 +48,7 @@ public function __construct($host, $port,
$read_write_timeout = 3,
$context = null)
{
- $this->construct_params = func_get_args();
+ $this->construct_params = func_get_args();
if ($user && $password) {
$login_response = new AMQPWriter();
@@ -112,14 +112,14 @@ public function __construct($host, $port,
/**
* clossing will use the old properties to make a new connection to the same server
*/
- function __clone()
+ public function __clone()
{
- call_user_func_array(array($this, '__construct'), $this->construct_params);
+ call_user_func_array(array($this, '__construct'), $this->construct_params);
}
public function __destruct()
{
- if($this->close_on_destruct){
+ if ($this->close_on_destruct) {
if (isset($this->input) && $this->input) {
// close() always tries to connect to the server to shutdown
// the connection. If the server has gone away, it will
@@ -136,9 +136,10 @@ public function __destruct()
* it`s useful after the fork when you don`t want to close parent process connection
* @param bool $close
*/
- public function set_close_on_destruct($close = true){
- $this->close_on_destruct = (bool) $close;
- }
+ public function set_close_on_destruct($close = true)
+ {
+ $this->close_on_destruct = (bool) $close;
+ }
protected function close_socket()
{
@@ -146,7 +147,7 @@ protected function close_socket()
MiscHelper::debug_msg("closing socket");
}
- if(is_resource($this->sock)) {
+ if (is_resource($this->sock)) {
fclose($this->sock);
}
$this->sock = null;
@@ -214,7 +215,7 @@ public function send_content($channel, $class_id, $weight, $body_size,
$pkt = $pkt->getvalue();
$this->write($pkt);
- while($body) {
+ while ($body) {
$payload = substr($body,0, $this->frame_max-8);
$body = substr($body,$this->frame_max-8);
$pkt = new AMQPWriter();
@@ -289,7 +290,7 @@ protected function wait_channel($channel_id)
return array($frame_type, $payload);
}
- // Not the channel we were looking for. Queue this frame
+ // Not the channel we were looking for. Queue this frame
//for later, when the other channel is looking for frames.
array_push($this->channels[$frame_channel]->frame_queue,
array($frame_type, $payload));
@@ -315,6 +316,7 @@ public function channel($channel_id = null)
$channel_id = $channel_id ? $channel_id : $this->get_free_channel_id();
$ch = new AMQPChannel($this->connection, $channel_id);
$this->channels[$channel_id] = $ch;
+
return $ch;
}
}
@@ -330,6 +332,7 @@ public function close($reply_code=0, $reply_text="", $method_sig=array(0, 0))
$args->write_short($method_sig[0]); // class_id
$args->write_short($method_sig[1]); // method_id
$this->send_method_frame(array(10, 60), $args);
+
return $this->wait(array(
"10,61", // Connection.close_ok
));
@@ -354,6 +357,7 @@ public static function dump_table($table)
}
$tokens[] = $name . '=' . $val;
}
+
return implode(', ', $tokens);
}
@@ -395,6 +399,7 @@ protected function x_open($virtual_host, $capabilities="", $insist=false)
$args->write_shortstr($capabilities);
$args->write_bit($insist);
$this->send_method_frame(array(10, 40), $args);
+
return $this->wait(array(
"10,41", // Connection.open_ok
"10,50" // Connection.redirect
@@ -426,6 +431,7 @@ protected function redirect($args)
if ($this->debug) {
MiscHelper::debug_msg("Redirected to [". $host . "], known_hosts [" . $this->known_hosts . "]" );
}
+
return $host;
}
@@ -469,7 +475,6 @@ protected function start($args)
}
-
protected function x_start_ok($client_properties, $mechanism, $response, $locale)
{
$args = new AMQPWriter();
View
3  PhpAmqpLib/Connection/AMQPSSLConnection.php
@@ -27,6 +27,7 @@ private function create_ssl_context($options)
foreach ($options as $k => $v) {
stream_context_set_option($ssl_context, 'ssl', $k, $v);
}
+
return $ssl_context;
}
-}
+}
View
2  PhpAmqpLib/Exception/AMQPChannelException.php
@@ -10,4 +10,4 @@ public function __construct($reply_code, $reply_text, $method_sig)
{
parent::__construct($reply_code, $reply_text, $method_sig);
}
-}
+}
View
2  PhpAmqpLib/Exception/AMQPConnectionException.php
@@ -10,4 +10,4 @@ public function __construct($reply_code, $reply_text, $method_sig)
{
parent::__construct($reply_code, $reply_text, $method_sig);
}
-}
+}
View
2  PhpAmqpLib/Exception/AMQPException.php
@@ -29,4 +29,4 @@ public function __construct($reply_code, $reply_text, $method_sig)
$mn
);
}
-}
View
13 PhpAmqpLib/Helper/MiscHelper.php
@@ -36,10 +36,10 @@ public static function saveBytes($bytes)
* @author Aidan Lister <aidan@php.net>
* @author Peter Waller <iridum@php.net>
* @link http://aidanlister.com/repos/v/function.hexdump.php
- * @param string $data The string to be dumped
- * @param bool $htmloutput Set to false for non-HTML output
- * @param bool $uppercase Set to true for uppercase hex
- * @param bool $return Set to true to return the dump
+ * @param string $data The string to be dumped
+ * @param bool $htmloutput Set to false for non-HTML output
+ * @param bool $uppercase Set to true for uppercase hex
+ * @param bool $return Set to true to return the dump
*/
public static function hexdump($data, $htmloutput = true, $uppercase = false, $return = false)
{
@@ -54,8 +54,7 @@ public static function hexdump($data, $htmloutput = true, $uppercase = false, $r
$x = ($uppercase === false) ? 'x' : 'X';
// Iterate string
- for ($i = $j = 0; $i < $len; $i++)
- {
+ for ($i = $j = 0; $i < $len; $i++) {
// Convert to hexidecimal
$hexi .= sprintf("%02$x ", ord($data[$i]));
@@ -104,4 +103,4 @@ public static function hexdump($data, $htmloutput = true, $uppercase = false, $r
return $dump;
}
}
-}
+}
View
22 PhpAmqpLib/Helper/Protocol/FrameBuilder.php
@@ -13,6 +13,7 @@ public function channelClose($reply_code, $reply_text, $class_id, $method_id)
->write_shortstr($reply_text)
->write_short($class_id)
->write_short($method_id);
+
return $args;
}
@@ -20,6 +21,7 @@ public function flow($active)
{
$args = new AMQPWriter();
$args->write_bit($active);
+
return $args;
}
@@ -27,6 +29,7 @@ public function xFlowOk($active)
{
$args = new AMQPWriter();
$args->write_bit($active);
+
return $args;
}
@@ -34,6 +37,7 @@ public function xOpen($out_of_band)
{
$args = new AMQPWriter();
$args->write_shortstr($out_of_band);
+
return $args;
}
@@ -51,6 +55,7 @@ public function accessRequest($realm,
->write_bit($active)
->write_bit($write)
->write_bit($read);
+
return $args;
}
@@ -74,6 +79,7 @@ public function exchangeDeclare($exchange,
->write_bit($internal)
->write_bit($nowait)
->write_table($arguments);
+
return $args;
}
@@ -84,6 +90,7 @@ public function exchangeDelete($exchange, $if_unused, $nowait, $ticket)
->write_shortstr($exchange)
->write_bit($if_unused)
->write_bit($nowait);
+
return $args;
}
@@ -97,6 +104,7 @@ public function queueBind($queue, $exchange, $routing_key, $nowait, $arguments,
->write_bit($nowait)
->write_table($arguments)
;
+
return $args;
}
@@ -109,6 +117,7 @@ public function queueUnbind($queue, $exchange, $routing_key, $arguments, $ticket
->write_shortstr($routing_key)
->write_table($arguments)
;
+
return $args;
}
@@ -130,6 +139,7 @@ public function queueDeclare($queue,
->write_bit($auto_delete)
->write_bit($nowait)
->write_table($arguments);
+
return $args;
}
@@ -142,6 +152,7 @@ public function queueDelete($queue, $if_unused, $if_empty, $nowait, $ticket)
->write_bit($if_empty)
->write_bit($nowait)
;
+
return $args;
}
@@ -152,6 +163,7 @@ public function queuePurge($queue, $nowait, $ticket)
->write_shortstr($queue)
->write_bit($nowait)
;
+
return $args;
}
@@ -161,6 +173,7 @@ public function basicAck($delivery_tag, $multiple)
$args->write_longlong($delivery_tag)
->write_bit($multiple)
;
+
return $args;
}
@@ -170,6 +183,7 @@ public function basicCancel($consumer_tag, $nowait)
$args->write_shortstr($consumer_tag)
->write_bit($nowait)
;
+
return $args;
}
@@ -185,6 +199,7 @@ public function basicConsume($queue, $consumer_tag, $no_local,
->write_bit($exclusive)
->write_bit($nowait)
;
+
return $args;
}
@@ -195,6 +210,7 @@ public function basicGet($queue, $no_ack, $ticket)
->write_shortstr($queue)
->write_bit($no_ack)
;
+
return $args;
}
@@ -207,6 +223,7 @@ public function basicPublish($exchange, $routing_key, $mandatory, $immediate, $t
->write_bit($mandatory)
->write_bit($immediate)
;
+
return $args;
}
@@ -217,6 +234,7 @@ public function basicQos($prefetch_size, $prefetch_count, $a_global)
->write_short($prefetch_count)
->write_bit($a_global)
;
+
return $args;
}
@@ -224,6 +242,7 @@ public function basicRecover($requeue)
{
$args = new AMQPWriter();
$args->write_bit($requeue);
+
return $args;
}
@@ -233,6 +252,7 @@ public function basicReject($delivery_tag, $requeue)
$args->write_longlong($delivery_tag)
->write_bit($requeue)
;
+
return $args;
}
-}
+}
View
2  PhpAmqpLib/Message/AMQPMessage.php
@@ -32,4 +32,4 @@ public function __construct($body = '', $properties = null)
parent::__construct($properties, static::$PROPERTIES);
}
-}
+}
View
2  PhpAmqpLib/Tests/Functional/Bug40Test.php
@@ -86,4 +86,4 @@ public function tearDown()
$this->ch->close();
$this->conn->close();
}
-}
+}
View
2  PhpAmqpLib/Tests/Functional/FileTransferTest.php
@@ -59,4 +59,4 @@ public function tearDown()
$this->ch->close();
$this->conn->close();
}
-}
+}
View
2  PhpAmqpLib/Tests/Functional/PublishConsumeTest.php
@@ -78,4 +78,4 @@ public function tearDown()
$this->ch->close();
$this->conn->close();
}
-}
+}
View
3  PhpAmqpLib/Tests/Unit/Helper/Protocol/FrameBuilderTest.php
@@ -103,7 +103,6 @@ public function testQueueUnbind()
$this->assertEquals($expected, $args->getvalue());
}
-
public function testQueueDeclare()
{
$expected = "\x00\x00\x03foo\x00\x00\x00\x00\x00";
@@ -193,4 +192,4 @@ public function testBasicReject()
$args = $this->frameBuilder->basicReject(1, false);
$this->assertEquals($expected, $args->getvalue());
}
-}
+}
View
16 PhpAmqpLib/Tests/Unit/Wire/AMQPWriterTest.php
@@ -2,37 +2,35 @@
namespace PhpAmqpLib\Tests\Unit;
-use PhpAmqpLib\Wire\AMQPReader;
use PhpAmqpLib\Wire\AMQPWriter;
-
class AMQPWriterTest extends \PHPUnit_Framework_TestCase
{
protected $_writer;
-
+
public function setUp()
{
$this->_writer = new AMQPWriter();
}
-
+
public function tearDown()
{
$this->_writer = null;
}
-
+
public function testWriteArray()
{
$this->_writer->write_array(array(
'rabbit@localhost',
'hare@localhost'
));
-
+
$out = $this->_writer->getvalue();
-
+
$this->assertEquals(44, strlen($out));
-
+
$expected = "\x00\x00\x00(S\x00\x00\x00\x10rabbit@localhostS\x00\x00\x00\x0Ehare@localhost";
-
+
$this->assertEquals($expected, $out);
}
}
View
4 PhpAmqpLib/Tests/Unit/WireTest.php
@@ -92,8 +92,6 @@ protected function longstrWriteRead($v)
$this->writeAndRead($v, 'write_longstr', 'read_longstr');
}
-
-
public function testLongLongWriteRead()
{
// First test with values represented as strings
@@ -121,4 +119,4 @@ protected function writeAndRead($v, $write_method, $read_method)
$r = new AMQPReader($w->getvalue());
$this->assertEquals($v, $r->{$read_method}());
}
-}
+}
View
2  PhpAmqpLib/Wire/AMQPDecimal.php
@@ -27,4 +27,4 @@ public function asBCvalue()
{
return bcdiv($this->n, bcpow(10,$this->e));
}
-}
+}
View
35 PhpAmqpLib/Wire/AMQPReader.php
@@ -12,18 +12,16 @@ class AMQPReader
public function __construct($str, $sock=null)
{
$this->str = $str;
- if ($sock !== null)
- {
+ if ($sock !== null) {
$this->sock = new BufferedInput($sock);
- } else
- {
+ } else {
$this->sock = null;
}
$this->offset = 0;
$this->bitcount = $this->bits = 0;
- if(((int)4294967296)!=0)
+ if(((int) 4294967296)!=0)
$this->is64bits = true;
else
$this->is64bits = false;
@@ -43,6 +41,7 @@ public function close()
public function read($n)
{
$this->bitcount = $this->bits = 0;
+
return $this->rawread($n);
}
@@ -58,7 +57,7 @@ private function rawread($n)
$res .= $buf;
}
- if(strlen($res)!=$n) {
+ if (strlen($res)!=$n) {
throw new \Exception("Error reading data. Received " .
strlen($res) . " instead of expected $n bytes");
}
@@ -72,19 +71,20 @@ private function rawread($n)
$this->str = substr($this->str,$n);
$this->offset += $n;
}
+
return $res;
}
public function read_bit()
{
- if(!$this->bitcount)
- {
+ if (!$this->bitcount) {
$this->bits = ord($this->rawread(1));
$this->bitcount = 8;
}
$result = ($this->bits & 1) == 1;
$this->bits >>= 1;
$this->bitcount -= 1;
+
return $result;
}
@@ -92,6 +92,7 @@ public function read_octet()
{
$this->bitcount = $this->bits = 0;
list(,$res) = unpack('C', $this->rawread(1));
+
return $res;
}
@@ -99,6 +100,7 @@ public function read_short()
{
$this->bitcount = $this->bits = 0;
list(,$res) = unpack('n', $this->rawread(2));
+
return $res;
}
@@ -116,10 +118,10 @@ public function read_short()
public function read_php_int()
{
list(,$res) = unpack('N', $this->rawread(4));
- if($this->is64bits)
- {
+ if ($this->is64bits) {
$sres = sprintf ( "%u", $res );
- return (int)$sres;
+
+ return (int) $sres;
} else {
return $res;
}
@@ -132,6 +134,7 @@ public function read_long()
$this->bitcount = $this->bits = 0;
list(,$res) = unpack('N', $this->rawread(4));
$sres = sprintf ( "%u", $res );
+
return $sres;
}
@@ -141,6 +144,7 @@ private function read_signed_long()
// In PHP unpack('N') always return signed value,
// on both 32 and 64 bit systems!
list(,$res) = unpack('N', $this->rawread(4));
+
return $res;
}
@@ -168,6 +172,7 @@ public function read_shortstr()
{
$this->bitcount = $this->bits = 0;
list(,$slen) = unpack('C', $this->rawread(1));
+
return $this->rawread($slen);
}
@@ -182,6 +187,7 @@ public function read_longstr()
$slen = $this->read_php_int();
if($slen<0)
throw new \Exception("Strings longer than supported on this platform");
+
return $this->rawread($slen);
}
@@ -212,6 +218,7 @@ public function read_table()
$val = $table_data->read_value($ftype);
$result[$name] = array($ftype,$val);
}
+
return $result;
}
@@ -230,7 +237,7 @@ public function read_array()
$result = array();
// Read values until we reach the end of the array
- while($this->offset < $endOffset) {
+ while ($this->offset < $endOffset) {
$fieldType = $this->rawread(1);
$result[] = $this->read_value($fieldType);
}
@@ -241,7 +248,7 @@ public function read_array()
/**
* Reads the next value as the provided field type.
*
- * @param string $fieldType the char field type
+ * @param string $fieldType the char field type
* @return mixed
*/
public function read_value($fieldType)
@@ -249,7 +256,7 @@ public function read_value($fieldType)
$this->bitcount = $this->bits = 0;
$val = NULL;
- switch($fieldType) {
+ switch ($fieldType) {
case 'S': // Long string
$val = $this->read_longstr();
break;
View
45 PhpAmqpLib/Wire/AMQPWriter.php
@@ -32,7 +32,7 @@ private static function bytesplit($x, $bytes)
while ($bytes > 0) {
$b = bcmod($x,'256');
- $res[] = (int)$b;
+ $res[] = (int) $b;
$x = bcdiv($x,'256', 0);
$bytes--;
}
@@ -61,6 +61,7 @@ private function flushbits()
public function getvalue()
{
$this->flushbits();
+
return $this->out;
}
@@ -71,6 +72,7 @@ public function write($s)
{
$this->flushbits();
$this->out .= $s;
+
return $this;
}
@@ -97,6 +99,7 @@ public function write_bit($b)
array_push($this->bits, $last);
$this->bitcount += 1;
+
return $this;
}
@@ -111,6 +114,7 @@ public function write_octet($n)
$this->flushbits();
$this->out .= chr($n);
+
return $this;
}
@@ -125,6 +129,7 @@ public function write_short($n)
$this->flushbits();
$this->out .= pack('n', $n);
+
return $this;
}
@@ -135,6 +140,7 @@ public function write_long($n)
{
$this->flushbits();
$this->out .= implode("", AMQPWriter::chrbytesplit($n,4));
+
return $this;
}
@@ -144,6 +150,7 @@ private function write_signed_long($n)
// although format spec for 'N' mentions unsigned
// it will deal with sinned integers as well. tested.
$this->out .= pack('N', $n);
+
return $this;
}
@@ -154,6 +161,7 @@ public function write_longlong($n)
{
$this->flushbits();
$this->out .= implode("", AMQPWriter::chrbytesplit($n,8));
+
return $this;
}
@@ -170,6 +178,7 @@ public function write_shortstr($s)
$this->write_octet(strlen($s));
$this->out .= $s;
+
return $this;
}
@@ -182,51 +191,54 @@ public function write_longstr($s)
$this->flushbits();
$this->write_long(strlen($s));
$this->out .= $s;
+
return $this;
}
-
+
/**
* Supports the writing of Array types, so that you can implement
* array methods, like Rabbitmq's HA parameters
- *
+ *
* @param array $a
- *
+ *
* @return self
*/
public function write_array($a)
{
$this->flushbits();
$data = new AMQPWriter();
-
+
foreach ($a as $v) {
if (is_string($v)) {
$data->write('S');
$data->write_longstr($v);
- } else if (is_int($v)) {
+ } elseif (is_int($v)) {
$data->write('I');
$data->write_signed_long($v);
- } else if ($v instanceof AMQPDecimal) {
+ } elseif ($v instanceof AMQPDecimal) {
$data->write('D');
$data->write_octet($v->e);
$data->write_signed_long($v->n);
- } else if (is_array($v)) {
+ } elseif (is_array($v)) {
$data->write('A');
$data->write_array($v);
}
}
-
+
$data = $data->getvalue();
$this->write_long(strlen($data));
$this->write($data);
+
return $this;
}
-
+
/**
* Write unix time_t value as 64 bit timestamp.
*/
public function write_timestamp($v)
{
$this->write_longlong($v);
+
return $this;
}
@@ -244,29 +256,30 @@ public function write_table($d)
if ($ftype=='S') {
$table_data->write('S');
$table_data->write_longstr($v);
- } else if ($ftype=='I') {
+ } elseif ($ftype=='I') {
$table_data->write('I');
$table_data->write_signed_long($v);
- } else if ($ftype=='D') {
+ } elseif ($ftype=='D') {
// 'D' type values are passed AMQPDecimal instances.
$table_data->write('D');
$table_data->write_octet($v->e);
$table_data->write_signed_long($v->n);
- } else if ($ftype=='T') {
+ } elseif ($ftype=='T') {
$table_data->write('T');
$table_data->write_timestamp($v);
- } else if ($ftype=='F') {
+ } elseif ($ftype=='F') {
$table_data->write('F');
$table_data->write_table($v);
- } else if ($ftype = 'A') {
+ } elseif ($ftype = 'A') {
$table_data->write('A');
$table_data->write_array($v);
}
}
-
+
$table_data = $table_data->getvalue();
$this->write_long(strlen($table_data));
$this->write($table_data);
+
return $this;
}
}
View
24 PhpAmqpLib/Wire/BufferedInput.php
@@ -20,13 +20,12 @@ public function real_sock()
public function read($n)
{
- if ($this->offset >= strlen($this->buffer))
- {
- if (!($rv = $this->populate_buffer()))
- {
+ if ($this->offset >= strlen($this->buffer)) {
+ if (!($rv = $this->populate_buffer())) {
return $rv;
}
}
+
return $this->read_buffer($n);
}
@@ -39,8 +38,7 @@ public function close()
private function read_buffer($n)
{
$n = min($n, strlen($this->buffer) - $this->offset);
- if ($n === 0)
- {
+ if ($n === 0) {
// substr("", 0, 0) => false, which screws up read loops that are
// expecting non-blocking reads to return "". This avoids that edge
// case when the buffer is empty/used up.
@@ -48,6 +46,7 @@ private function read_buffer($n)
}
$block = substr($this->buffer, $this->offset, $n);
$this->offset += $n;
+
return $block;
}
@@ -59,20 +58,19 @@ private function reset($block)
private function populate_buffer()
{
- if(feof($this->sock))
- {
+ if (feof($this->sock)) {
$this->reset("");
+
return false;
}
$block = fread($this->sock, $this->block_size);
- if ($block !== false)
- {
+ if ($block !== false) {
$this->reset($block);
+
return true;
- } else
- {
+ } else {
return $block;
}
}
-}
+}
View
23 PhpAmqpLib/Wire/GenericContent.php
@@ -29,7 +29,6 @@ public function __construct($props, $prop_types=null)
$this->properties = $d;
}
-
/**
* Look for additional properties in the 'properties' dictionary,
* and if present - the 'delivery_info' dictionary.
@@ -37,22 +36,25 @@ public function __construct($props, $prop_types=null)
public function get($name)
{
if(isset($this->properties[$name]))
+
return $this->properties[$name];
if(isset($this->delivery_info) && isset($this->delivery_info[$name]))
+
return $this->delivery_info[$name];
throw new \OutOfBoundsException("No '$name' property");
}
- /**
- * allows to set the property after creation of the object
- */
- public function set($name, $value){
- if(array_key_exists($name, $this->prop_types))
- $this->properties[$name] = $value;
- else
- throw new \OutOfBoundsException("No '$name' property");
+ /**
+ * allows to set the property after creation of the object
+ */
+ public function set($name, $value)
+ {
+ if(array_key_exists($name, $this->prop_types))
+ $this->properties[$name] = $value;
+ else
+ throw new \OutOfBoundsException("No '$name' property");
}
/**
* Given the raw bytes containing the property-flags and
@@ -92,7 +94,6 @@ public function load_properties($raw_bytes)
$this->properties = $d;
}
-
/**
* serialize the 'properties' attribute (a dictionary) into the
* raw bytes making up a set of property flags and a property
@@ -138,4 +139,4 @@ public function serialize_properties()
return $result->getvalue();
}
-}
+}
View
9 benchmark/consumer.php
@@ -2,7 +2,6 @@
include(__DIR__ . '/config.php');
use PhpAmqpLib\Connection\AMQPConnection;
-use PhpAmqpLib\Message\AMQPMessage;
$exchange = 'bench_exchange';
$queue = 'bench_queue';
@@ -15,14 +14,14 @@
$ch->exchange_declare($exchange, 'direct', false, false, false);
$ch->queue_bind($queue, $exchange);
-class Consumer
+class consumer
{
protected $msgCount = 0;
protected $startTime = null;
public function process_message($msg)
{
- if($this->startTime === null) {
+ if ($this->startTime === null) {
$this->startTime = microtime(true);
}
@@ -36,7 +35,8 @@ public function process_message($msg)
$ch->basic_consume($queue, '', false, true, false, false, array(new Consumer(), 'process_message'));
-function shutdown($ch, $conn){
+function shutdown($ch, $conn)
+{
$ch->close();
$conn->close();
}
@@ -45,4 +45,3 @@ function shutdown($ch, $conn){
while (count($ch->callbacks)) {
$ch->wait();
}
-?>
View
7 benchmark/file_consume.php
@@ -6,7 +6,6 @@
include(__DIR__ . '/config.php');
use PhpAmqpLib\Connection\AMQPConnection;
-use PhpAmqpLib\Message\AMQPMessage;
$exchange = 'file_exchange';
$queue = 'file_queue';
@@ -26,7 +25,7 @@ class Consumer
public function process_message($msg)
{
- if($this->startTime === null) {
+ if ($this->startTime === null) {
$this->startTime = microtime(true);
}
@@ -40,7 +39,8 @@ public function process_message($msg)
$ch->basic_consume($queue, '', false, true, false, false, array(new Consumer(), 'process_message'));
-function shutdown($ch, $conn){
+function shutdown($ch, $conn)
+{
$ch->close();
$conn->close();
}
@@ -49,4 +49,3 @@ function shutdown($ch, $conn){
while (count($ch->callbacks)) {
$ch->wait();
}
-?>
View
2  benchmark/file_publish.php
@@ -62,5 +62,3 @@ function generate_random_content($bytes)
$ch->close();
$conn->close();
-
-?>
View
2  benchmark/producer.php
@@ -23,7 +23,6 @@
$ch->queue_bind($queue, $exchange);
-
$msg_body = <<<EOT
abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz
abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz
@@ -54,4 +53,3 @@
$ch->close();
$conn->close();
-?>
View
8 demo/amqp_consumer.php
@@ -37,8 +37,8 @@
$ch->queue_bind($queue, $exchange);
-function process_message($msg) {
-
+function process_message($msg)
+{
echo "\n--------\n";
echo $msg->body;
echo "\n--------\n";
@@ -65,7 +65,8 @@ function process_message($msg) {
$ch->basic_consume($queue, $consumer_tag, false, false, false, false, 'process_message');
-function shutdown($ch, $conn){
+function shutdown($ch, $conn)
+{
$ch->close();
$conn->close();
}
@@ -75,4 +76,3 @@ function shutdown($ch, $conn){
while (count($ch->callbacks)) {
$ch->wait();
}
-?>
View
14 demo/amqp_consumer_exclusive.php
@@ -16,7 +16,7 @@
$ch = $conn->channel();
/*
- name: $queue // should be unique in fanout exchange. Let RabbitMQ create
+ name: $queue // should be unique in fanout exchange. Let RabbitMQ create
// a queue name for us
passive: false // don't check if a queue with the same name exists
durable: false // the queue will not survive server restarts
@@ -37,8 +37,8 @@
$ch->queue_bind($queue_name, $exchange);
-function process_message($msg) {
-
+function process_message($msg)
+{
echo "\n--------\n";
echo $msg->body;
echo "\n--------\n";
@@ -59,21 +59,21 @@ function process_message($msg) {
no_local: Don't receive messages published by this consumer.
no_ack: Tells the server if the consumer will acknowledge the messages.
exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
- nowait: don't wait for a server response. In case of error the server will raise a channel
+ nowait: don't wait for a server response. In case of error the server will raise a channel
exception
callback: A PHP Callback
*/
$ch->basic_consume($queue_name, $consumer_tag, false, false, true, false, 'process_message');
-function shutdown($ch, $conn){
+function shutdown($ch, $conn)
+{
$ch->close();
$conn->close();
}
register_shutdown_function('shutdown', $ch, $conn);
// Loop as long as the channel has callbacks registered
-while(count($ch->callbacks)) {
+while (count($ch->callbacks)) {
$ch->wait();
}
-?>
View
10 demo/amqp_consumer_fanout_1.php
@@ -34,8 +34,8 @@
$ch->queue_bind($queue, $exchange);
-function process_message($msg) {
-
+function process_message($msg)
+{
echo "\n--------\n";
echo $msg->body;
echo "\n--------\n";
@@ -63,14 +63,14 @@ function process_message($msg) {
$ch->basic_consume($queue, $consumer_tag, false, false, false, false, 'process_message');
-function shutdown($ch, $conn){
+function shutdown($ch, $conn)
+{
$ch->close();
$conn->close();
}
register_shutdown_function('shutdown', $ch, $conn);
// Loop as long as the channel has callbacks registered
-while(count($ch->callbacks)) {
+while (count($ch->callbacks)) {
$ch->wait();
}
-?>
View
10 demo/amqp_consumer_fanout_2.php
@@ -34,8 +34,8 @@
$ch->queue_bind($queue, $exchange);
-function process_message($msg) {
-
+function process_message($msg)
+{
echo "\n--------\n";
echo $msg->body;
echo "\n--------\n";
@@ -63,14 +63,14 @@ function process_message($msg) {
$ch->basic_consume($queue, $consumer_tag, false, false, false, false, 'process_message');
-function shutdown($ch, $conn){
+function shutdown($ch, $conn)
+{
$ch->close();
$conn->close();
}
register_shutdown_function('shutdown', $ch, $conn);
// Loop as long as the channel has callbacks registered
-while(count($ch->callbacks)) {
+while (count($ch->callbacks)) {
$ch->wait();
}
-?>
View
8 demo/amqp_consumer_non_blocking.php
@@ -37,8 +37,8 @@
$ch->queue_bind($queue, $exchange);
-function process_message($msg) {
-
+function process_message($msg)
+{
echo "\n--------\n";
echo $msg->body;
echo "\n--------\n";
@@ -65,7 +65,8 @@ function process_message($msg) {
$ch->basic_consume($queue, $consumer_tag, false, false, false, false, 'process_message');
-function shutdown($ch, $conn){
+function shutdown($ch, $conn)
+{
$ch->close();
$conn->close();
}
@@ -82,4 +83,3 @@ function shutdown($ch, $conn){
$ch->wait();
}
}
-?>
View
32 demo/amqp_ha_consumer.php
@@ -19,21 +19,21 @@
*/
$ha_connection = array(
- 'x-ha-policy' => array(
- 'S', 'all'
- ),
+ 'x-ha-policy' => array(
+ 'S', 'all'
+ ),
);
$ha_specific_connection = array(
- 'x-ha-policy' => array(
- 'S', 'nodes'
- ),
- 'x-ha-policy-params' => array(
- 'A', array(
- 'rabbit@' . HOST,
- 'hare@' . HOST,
- ),
- ),
+ 'x-ha-policy' => array(
+ 'S', 'nodes'
+ ),
+ 'x-ha-policy-params' => array(
+ 'A', array(
+ 'rabbit@' . HOST,
+ 'hare@' . HOST,
+ ),
+ ),
);
/*
@@ -61,8 +61,8 @@
$ch->queue_bind($queue, $exchange);
$ch->queue_bind($specific_queue, $exchange);
-function process_message($msg) {
-
+function process_message($msg)
+{
echo "\n--------\n";
echo $msg->body;
echo "\n--------\n";
@@ -89,7 +89,8 @@ function process_message($msg) {
$ch->basic_consume($queue, $consumer_tag, false, false, false, false, 'process_message');
-function shutdown($ch, $conn){
+function shutdown($ch, $conn)
+{
$ch->close();
$conn->close();
}
@@ -99,4 +100,3 @@ function shutdown($ch, $conn){
while (count($ch->callbacks)) {
$ch->wait();
}
-?>
View
1  demo/amqp_publisher.php
@@ -43,4 +43,3 @@
$ch->close();
$conn->close();
-?>
View
1  demo/amqp_publisher_exclusive.php
@@ -25,4 +25,3 @@
$ch->close();
$conn->close();
-?>
View
1  demo/amqp_publisher_fanout.php
@@ -25,4 +25,3 @@
$ch->close();
$conn->close();
-?>
View
2  demo/basic_get.php
@@ -47,4 +47,4 @@
var_dump($msg->body);
$ch->close();
-$conn->close();
+$conn->close();
View
6 demo/basic_return.php
@@ -22,7 +22,7 @@
$wait = true;
$return_listener = function ($reply_code, $reply_text,
- $exchange, $routing_key, $msg) use ($wait){
+ $exchange, $routing_key, $msg) use ($wait) {
$GLOBALS['wait'] = false;
echo "return: ",
@@ -42,9 +42,9 @@
true );
echo " done.\n";
-while($wait) {
+while ($wait) {
$channel->wait();
}
$channel->close();
-$connection->close();
+$connection->close();
View
2  demo/config.php
@@ -9,4 +9,4 @@
define('VHOST', '/');
//If this is enabled you can see AMQP output on the CLI
-define('AMQP_DEBUG', true);
+define('AMQP_DEBUG', true);
View
5 demo/ssl_connection.php
@@ -14,10 +14,11 @@
$conn = new AMQPSSLConnection(HOST, PORT, USER, PASS, VHOST, $ssl_options);
-function shutdown($conn){
+function shutdown($conn)
+{
$conn->close();
}
register_shutdown_function('shutdown', $conn);
-while (true) {}
+while (true) {}
Please sign in to comment.
Something went wrong with that request. Please try again.