Skip to content

Loading…

php examples based on the python examples of RabbitMQ #18

Closed
wants to merge 1 commit into from

2 participants

@bowika

hello Alvaro,

I had a research on MQs a year ago, and I had to port these:
http://www.rabbitmq.com/getstarted.html
examples as well. Now I rebased them to your repo.
If you find them useful, you could merge them.

Best regards,
David

@videlalvaro

Cool. I will review them. Maybe we should also contribute them back here: https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/php

@videlalvaro videlalvaro commented on the diff
demo/1_example/send.php
((10 lines not shown))
+{
+ die("You must specify the sleeptime in usec as arguments.\n");
+}
+
+try
+{
+ echo "Creating connection\n";
+ $conn = new AMQPConnection(HOST, PORT,
+ USER,
+ PASS);
+
+ echo "Getting channel\n";
+ $ch = $conn->channel();
+
+ echo "Requesting access\n";
+ $ch->access_request('/data', false, false, true, true);

I think access request is not used anymore. If you want to access a specific vhost then pass it as the last argument to connection. The same applies for the other examples

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@videlalvaro

Hi,

Regarding the exceptions I don't know since I'm not the original developer of this library.

@videlalvaro videlalvaro closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Apr 26, 2012
  1. @bowika
View
2 .gitignore
@@ -1,3 +1,5 @@
.project
.buildpath
.settings/
+.idea
+
View
53 demo/1_example/receive.php
@@ -0,0 +1,53 @@
+<?php
+
+//AMQP PHP library test
+
+include(__DIR__ . '/../config.php');
+use PhpAmqpLib\Connection\AMQPConnection;
+
+$msg_body = NULL;
+
+function callback($msg)
+{
+ echo "Message: " . $msg->body . "\n";
+}
+
+try
+{
+ echo "Creating connection\n";
+ $conn = new AMQPConnection(HOST, PORT,
+ USER,
+ PASS);
+
+ echo "Getting channel\n";
+ $ch = $conn->channel();
+
+ echo "Requesting access\n";
+ $ch->access_request('/data', false, false, true, true);
+
+ echo "Declaring queue\n";
+ $queue = $ch->queue_declare("hello");
+
+ echo "Consuming message\n";
+ $ch->basic_consume("hello","testtag",false,true,false,true,"callback");
+
+
+ while(True)
+ {
+ $ch->wait();
+ }
+
+ $ch->basic_cancel("testtag");
+
+ echo "Closing channel\n";
+ $ch->close();
+
+ echo "Closing connection\n";
+ $conn->close();
+
+ echo "Done.\n";
+} catch (Exception $e) {
+ echo 'Caught exception: ', $e->getMessage();
+ echo "\nTrace:\n" . $e->getTraceAsString();
+}
+?>
View
50 demo/1_example/send.php
@@ -0,0 +1,50 @@
+<?php
+
+//AMQP PHP library test
+
+include(__DIR__ . '/../config.php');
+use PhpAmqpLib\Connection\AMQPConnection;
+use PhpAmqpLib\Message\AMQPMessage;
+
+if ($argc < 2)
+{
+ die("You must specify the sleeptime in usec as arguments.\n");
+}
+
+try
+{
+ echo "Creating connection\n";
+ $conn = new AMQPConnection(HOST, PORT,
+ USER,
+ PASS);
+
+ echo "Getting channel\n";
+ $ch = $conn->channel();
+
+ echo "Requesting access\n";
+ $ch->access_request('/data', false, false, true, true);

I think access request is not used anymore. If you want to access a specific vhost then pass it as the last argument to connection. The same applies for the other examples

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+
+ echo "Declaring queue\n";
+ $queue = $ch->queue_declare("hello");
+
+while(true){
+ echo "Creating message\n";
+ $msg = new AMQPMessage("Time: ".microtime(true), array('content_type' => 'text/plain'));
+
+ echo "Publishing message\n";
+ $ch->basic_publish($msg,"","hello");
+ usleep($argv[1]);
+}
+
+ echo "Closing channel\n";
+ $ch->close();
+
+ echo "Closing connection\n";
+ $conn->close();
+
+ echo "Done.\n";
+} catch (Exception $e) {
+ echo 'Caught exception: ', $e->getMessage();
+ echo "\nTrace:\n" . $e->getTraceAsString();
+}
+?>
View
47 demo/2_example/new_task.php
@@ -0,0 +1,47 @@
+<?php
+
+//AMQP PHP library test
+
+include(__DIR__ . '/../config.php');
+use PhpAmqpLib\Connection\AMQPConnection;
+use PhpAmqpLib\Message\AMQPMessage;
+
+if ($argc < 2)
+{
+ die("You must specify a task like 'Hello...' means a 3rd level complex task.\n");
+}
+
+try
+{
+ echo "Creating connection\n";
+ $conn = new AMQPConnection(HOST, PORT,
+ USER,
+ PASS);
+
+ echo "Getting channel\n";
+ $ch = $conn->channel();
+
+ echo "Requesting access\n";
+ $ch->access_request('/data', false, false, true, true);
+
+ echo "Declaring queue\n";
+ $queue = $ch->queue_declare("task_queue",false,true);
+
+ echo "Creating message\n";
+ $msg = new AMQPMessage($argv[1], array('content_type' => 'text/plain','delivery_mode'=>2));
+
+ echo "Publishing message\n";
+ $ch->basic_publish($msg,"","task_queue");
+
+ echo "Closing channel\n";
+ $ch->close();
+
+ echo "Closing connection\n";
+ $conn->close();
+
+ echo "Done.\n";
+} catch (Exception $e) {
+ echo 'Caught exception: ', $e->getMessage();
+ echo "\nTrace:\n" . $e->getTraceAsString();
+}
+?>
View
60 demo/2_example/worker.php
@@ -0,0 +1,60 @@
+<?php
+
+//AMQP PHP library test
+
+include(__DIR__ . '/../config.php');
+use PhpAmqpLib\Connection\AMQPConnection;
+
+//function callback($msg)
+function callback($msg)
+{
+ echo "Task: " . $msg->body . " received... working...";
+ usleep(1000000 * substr_count($msg->body,'.'));
+ echo "DONE.\n";
+ $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
+}
+
+try
+{
+ echo "Creating connection\n";
+ $conn = new AMQPConnection(HOST, PORT,
+ USER,
+ PASS);
+
+ echo "Getting channel\n";
+ $ch = $conn->channel();
+
+ echo "Requesting access\n";
+ $ch->access_request('/data', false, false, true, true);
+
+ echo "Declaring queue\n";
+ $queue = $ch->queue_declare("task_queue",false,true);
+
+
+ $ch->basic_qos(0,1,false);
+
+ /*
+ Here it is quite important that the routing_key is with the same name as the declared queue name.
+ */
+ echo "Consuming message\n";
+ $ch->basic_consume("task_queue","testtag",false,false,false,true,"callback");
+
+ while(True)
+ {
+ $ch->wait();
+ }
+
+ $ch->basic_cancel("testtag");
+
+ echo "Closing channel\n";
+ $ch->close();
+
+ echo "Closing connection\n";
+ $conn->close();
+
+ echo "Done.\n";
+} catch (Exception $e) {
+ echo 'Caught exception: ', $e->getMessage();
+ echo "\nTrace:\n" . $e->getTraceAsString();
+}
+?>
View
47 demo/3_example/emit_log.php
@@ -0,0 +1,47 @@
+<?php
+
+//AMQP PHP library test
+
+include(__DIR__ . '/../config.php');
+use PhpAmqpLib\Connection\AMQPConnection;
+use PhpAmqpLib\Message\AMQPMessage;
+
+if ($argc < 2)
+{
+ die("You must specify a message as an argument.\n");
+}
+
+try
+{
+ echo "Creating connection\n";
+ $conn = new AMQPConnection(HOST, PORT,
+ USER,
+ PASS);
+
+ echo "Getting channel\n";
+ $ch = $conn->channel();
+
+ echo "Requesting access\n";
+ $ch->access_request('/data', false, false, true, true);
+
+ echo "Declaring exchange\n";
+ $ch->exchange_declare("logs","fanout");
+
+ echo "Creating message\n";
+ $msg = new AMQPMessage($argv[1]);
+
+ echo "Publishing message\n";
+ $ch->basic_publish($msg,"logs","");
+
+ echo "Closing channel\n";
+ $ch->close();
+
+ echo "Closing connection\n";
+ $conn->close();
+
+ echo "Done.\n";
+} catch (Exception $e) {
+ echo 'Caught exception: ', $e->getMessage();
+ echo "\nTrace:\n" . $e->getTraceAsString();
+}
+?>
View
54 demo/3_example/receive_logs.php
@@ -0,0 +1,54 @@
+<?php
+
+//AMQP PHP library test
+
+include(__DIR__ . '/../config.php');
+use PhpAmqpLib\Connection\AMQPConnection;
+
+//function callback($msg)
+function callback($msg)
+{
+ echo "Message: " . $msg->body ."\n";
+}
+
+try
+{
+ echo "Creating connection\n";
+ $conn = new AMQPConnection(HOST, PORT,
+ USER,
+ PASS);
+
+ echo "Getting channel\n";
+ $ch = $conn->channel();
+
+ echo "Requesting access\n";
+ $ch->access_request('/data', false, false, true, true);
+
+ echo "Declaring exchange\n";
+ $ch->exchange_declare("logs","fanout");
+
+ echo "Declaring queue\n";
+ $queue = $ch->queue_declare("",false,false,true);
+
+ echo "Consuming message\n";
+ $ch->basic_consume($queue[0],"",false,true,false,false,"callback");
+
+ $ch->queue_bind($queue[0],"logs");
+
+ while(True)
+ {
+ $ch->wait();
+ }
+
+ echo "Closing channel\n";
+ $ch->close();
+
+ echo "Closing connection\n";
+ $conn->close();
+
+ echo "Done.\n";
+} catch (Exception $e) {
+ echo 'Caught exception: ', $e->getMessage();
+ echo "\nTrace:\n" . $e->getTraceAsString();
+}
+?>
View
47 demo/4_example/emit_log_direct.php
@@ -0,0 +1,47 @@
+<?php
+
+//AMQP PHP library test
+
+include(__DIR__ . '/../config.php');
+use PhpAmqpLib\Connection\AMQPConnection;
+use PhpAmqpLib\Message\AMQPMessage;
+
+if ($argc < 3)
+{
+ die("You must specify a severity {error|info|warning|} and the message.\n");
+}
+
+try
+{
+ echo "Creating connection\n";
+ $conn = new AMQPConnection(HOST, PORT,
+ USER,
+ PASS);
+
+ echo "Getting channel\n";
+ $ch = $conn->channel();
+
+ echo "Requesting access\n";
+ $ch->access_request('/data', false, false, true, true);
+
+ echo "Declaring exchange\n";
+ $ch->exchange_declare("direct_logs","direct");
+
+ echo "Creating message\n";
+ $msg = new AMQPMessage($argv[2]);
+
+ echo "Publishing message\n";
+ $ch->basic_publish($msg,"direct_logs",$argv[1]);
+
+ echo "Closing channel\n";
+ $ch->close();
+
+ echo "Closing connection\n";
+ $conn->close();
+
+ echo "Done.\n";
+} catch (Exception $e) {
+ echo 'Caught exception: ', $e->getMessage();
+ echo "\nTrace:\n" . $e->getTraceAsString();
+}
+?>
View
63 demo/4_example/receive_logs_direct.php
@@ -0,0 +1,63 @@
+<?php
+
+//AMQP PHP library test
+
+include(__DIR__ . '/../config.php');
+use PhpAmqpLib\Connection\AMQPConnection;
+
+if ($argc < 2)
+{
+ die("You must specify at least one severity as an argument.\n");
+}
+
+$severities = $argv;
+array_shift($severities);
+
+function callback($msg)
+{
+ echo "Message: " . $msg->body ."\n";
+}
+
+try
+{
+ echo "Creating connection\n";
+ $conn = new AMQPConnection(HOST, PORT,
+ USER,
+ PASS);
+
+ echo "Getting channel\n";
+ $ch = $conn->channel();
+
+ echo "Requesting access\n";
+ $ch->access_request('/data', false, false, true, true);
+
+ echo "Declaring exchange\n";
+ $ch->exchange_declare("direct_logs","direct");
+
+ echo "Declaring queue\n";
+ $queue = $ch->queue_declare("",false,false,true);
+
+ echo "Binding queue\n";
+ foreach($severities as $severity)
+ $ch->queue_bind($queue[0],"direct_logs",$severity);
+
+ echo "Consuming message\n";
+ $ch->basic_consume($queue[0],"",false,true,false,false,"callback");
+
+ while(True)
+ {
+ $ch->wait();
+ }
+
+ echo "Closing channel\n";
+ $ch->close();
+
+ echo "Closing connection\n";
+ $conn->close();
+
+ echo "Done.\n";
+} catch (Exception $e) {
+ echo 'Caught exception: ', $e->getMessage();
+ echo "\nTrace:\n" . $e->getTraceAsString();
+}
+?>
View
47 demo/5_example/emit_log_topic.php
@@ -0,0 +1,47 @@
+<?php
+
+//AMQP PHP library test
+
+include(__DIR__ . '/../config.php');
+use PhpAmqpLib\Connection\AMQPConnection;
+use PhpAmqpLib\Message\AMQPMessage;
+
+if ($argc < 3)
+{
+ die("You must specify <facility>.<severity> and the message.\n");
+}
+
+try
+{
+ echo "Creating connection\n";
+ $conn = new AMQPConnection(HOST, PORT,
+ USER,
+ PASS);
+
+ echo "Getting channel\n";
+ $ch = $conn->channel();
+
+ echo "Requesting access\n";
+ $ch->access_request('/data', false, false, true, true);
+
+ echo "Declaring exchange\n";
+ $ch->exchange_declare("topic_logs","topic");
+
+ echo "Creating message\n";
+ $msg = new AMQPMessage($argv[2]);
+
+ echo "Publishing message\n";
+ $ch->basic_publish($msg,"topic_logs",$argv[1]);
+
+ echo "Closing channel\n";
+ $ch->close();
+
+ echo "Closing connection\n";
+ $conn->close();
+
+ echo "Done.\n";
+} catch (Exception $e) {
+ echo 'Caught exception: ', $e->getMessage();
+ echo "\nTrace:\n" . $e->getTraceAsString();
+}
+?>
View
63 demo/5_example/receive_logs_topic.php
@@ -0,0 +1,63 @@
+<?php
+
+//AMQP PHP library test
+
+include(__DIR__ . '/../config.php');
+use PhpAmqpLib\Connection\AMQPConnection;
+
+if ($argc < 2)
+{
+ die("You must specify at least one <facility>.<severity> as an argument.\n");
+}
+
+$binding_keys = $argv;
+array_shift($binding_keys);
+
+function callback($msg)
+{
+ echo "Message: " . $msg->body ."\n";
+}
+
+try
+{
+ echo "Creating connection\n";
+ $conn = new AMQPConnection(HOST, PORT,
+ USER,
+ PASS);
+
+ echo "Getting channel\n";
+ $ch = $conn->channel();
+
+ echo "Requesting access\n";
+ $ch->access_request('/data', false, false, true, true);
+
+ echo "Declaring exchange\n";
+ $ch->exchange_declare("topic_logs","topic");
+
+ echo "Declaring queue\n";
+ $queue = $ch->queue_declare("",false,false,true);
+
+ echo "Binding queue\n";
+ foreach($binding_keys as $binding_key)
+ $ch->queue_bind($queue[0],"topic_logs",$binding_key);
+
+ echo "Consuming message\n";
+ $ch->basic_consume($queue[0],"",false,true,false,false,"callback");
+
+ while(True)
+ {
+ $ch->wait();
+ }
+
+ echo "Closing channel\n";
+ $ch->close();
+
+ echo "Closing connection\n";
+ $conn->close();
+
+ echo "Done.\n";
+} catch (Exception $e) {
+ echo 'Caught exception: ', $e->getMessage();
+ echo "\nTrace:\n" . $e->getTraceAsString();
+}
+?>
View
77 demo/6_example/rpc_client.php
@@ -0,0 +1,77 @@
+<?php
+
+//AMQP PHP library test
+/* note: unfortunately this example is using non-OOP terminology
+ since the basic_consume function makes only procedural call possible.
+ */
+
+include(__DIR__ . '/../config.php');
+use PhpAmqpLib\Connection\AMQPConnection;
+use PhpAmqpLib\Message\AMQPMessage;
+
+$response = false;
+
+function on_response($msg)
+{
+ global $corr_id,$response;
+ if ($corr_id == $msg->properties['correlation_id'])
+ $response = $msg->body;
+}
+
+function call($n)
+{
+ global $corr_id,$response,$ch,$callback_queue;
+ $corr_id = uniqid();
+ $response = false;
+
+ $message = new AMQPMessage($n,array("correlation_id" => $corr_id,"reply_to" => $callback_queue));
+
+ $ch->basic_publish($message,"","rpc_queue");
+
+ while($response === false)
+ {
+ $ch->wait();
+ }
+
+ }
+
+try
+{
+ echo "Creating connection\n";
+ $conn = new AMQPConnection(HOST, PORT,
+ USER,
+ PASS);
+
+ echo "Getting channel\n";
+ $ch = $conn->channel();
+
+ echo "Requesting access\n";
+ $ch->access_request('/data', false, false, true, true);
+
+ echo "Declaring queue\n";
+ $queue = $ch->queue_declare("",false,false,true);
+
+ $callback_queue = $queue[0];
+
+ $corr_id = Null;
+
+ $ch->basic_consume($callback_queue,"",false,true,false,false,"on_response");
+
+ echo "Requesting fib(30)\n";
+ call(30);
+
+ echo "Got {$response}\n";
+
+ echo "Closing channel\n";
+ $ch->close();
+
+ echo "Closing connection\n";
+ $conn->close();
+
+ echo "Done.\n";
+} catch (Exception $e) {
+ echo 'Caught exception: ', $e->getMessage();
+ echo "\nTrace:\n" . $e->getTraceAsString();
+}
+
+?>
View
65 demo/6_example/rpc_server.php
@@ -0,0 +1,65 @@
+<?php
+
+//AMQP PHP library test
+
+include(__DIR__ . '/../config.php');
+use PhpAmqpLib\Connection\AMQPConnection;
+use PhpAmqpLib\Message\AMQPMessage;
+
+function fib($n)
+{
+ if ($n == 0)
+ return 0;
+ elseif ($n == 1)
+ return 1;
+ else
+ return fib($n-1) + fib($n-2);
+}
+
+function on_request($msg)
+{
+ $n = (int) $msg->body;
+ echo " [.] fib({$n})\n";
+ $response = new AMQPMessage(fib($n),array("correlation_id" => $msg->properties['correlation_id']));
+ $msg->delivery_info['channel']->basic_publish($response,"",$msg->properties['reply_to']);
+ $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
+}
+
+try
+{
+ echo "Creating connection\n";
+ $conn = new AMQPConnection(HOST, PORT,
+ USER,
+ PASS);
+
+ echo "Getting channel\n";
+ $ch = $conn->channel();
+
+ echo "Requesting access\n";
+ $ch->access_request('/data', false, false, true, true);
+
+ echo "Declaring queue\n";
+ $ch->queue_declare("rpc_queue");
+
+ $ch->basic_qos(0,1,false);
+
+ echo "Consuming message\n";
+ $ch->basic_consume("rpc_queue","",false,false,false,false,"on_request");
+
+ while(True)
+ {
+ $ch->wait();
+ }
+
+ echo "Closing channel\n";
+ $ch->close();
+
+ echo "Closing connection\n";
+ $conn->close();
+
+ echo "Done.\n";
+} catch (Exception $e) {
+ echo 'Caught exception: ', $e->getMessage();
+ echo "\nTrace:\n" . $e->getTraceAsString();
+}
+?>
Something went wrong with that request. Please try again.