From c8499bdc696c53469a6ea52d12bf34f511912cb9 Mon Sep 17 00:00:00 2001 From: Matt Simmons Date: Tue, 3 Apr 2012 01:36:36 -0400 Subject: [PATCH] test programs working --- receive-parallel.php | 39 +++++++++++++++++++++++++++++++++++++++ receive.php | 23 +++++++++++++---------- send.php | 19 +++++++++++++------ 3 files changed, 65 insertions(+), 16 deletions(-) create mode 100644 receive-parallel.php diff --git a/receive-parallel.php b/receive-parallel.php new file mode 100644 index 0000000..845f359 --- /dev/null +++ b/receive-parallel.php @@ -0,0 +1,39 @@ +connect() or die ("Error connecting\n"); + + +// setup our queue +$chan = new AMQPChannel($connection); +$chan->qos(0,5); +$q = new AMQPQueue($chan); +//$q->setName($queueName); +//$q->setFlags(AMQP_DURABLE); + +$q->declare(); + +$q->bind($exchangeName, $queueName); + + +function consumeQueue($envelope, $queue) { + print "Processing message: " . $envelope->getBody() . "... "; + //sleep(rand(2,7)); + sleep(1); + $queue->ack($envelope->getDeliveryTag()); + print "completed.\n"; +} // end function consumeQueue() + + + +print "Waiting. Press ctrl-c to cancel\n"; +$q->consume("consumeQueue"); + +$q->unbind($exchangeName, $queueName); +$connection->disconnect(); + +?> diff --git a/receive.php b/receive.php index 4044337..6aeb87f 100644 --- a/receive.php +++ b/receive.php @@ -1,27 +1,30 @@ connect() or die ("Error connecting\n"); -$chan = new AMQPChannel($connection); // setup our queue +$chan = new AMQPChannel($connection); $q = new AMQPQueue($chan); -$q->setName = $queueName; +$q->setName($queueName); $q->declare(); -// Bind it on the exchange to routing.key -$q->bind($exchangeName, $routingKey); +$q->bind($exchangeName, $queueName); -// show the message -print_r($q->get()); +$envelope = $q->get(AMQP_AUTOACK); +if ( ! $envelope ) { + print "It appears that the queue is empty\n"; +} else { + print "We may have found something: \n"; + print $envelope->getBody() . "\n"; +} +$q->unbind($exchangeName, $queueName); -// disconnect $connection->disconnect(); ?> diff --git a/send.php b/send.php index 4e1d4d5..030d66b 100644 --- a/send.php +++ b/send.php @@ -1,7 +1,7 @@ connect(); $chan = new AMQPChannel($connection); -$ex = new AMQPExchange($chan); -$ex->setName('new_topic1'); -if ( ! $ex->getType() ) { - $ex->setType('direct'); - $ex->declare(); +if ( $chan->isConnected() ) { + $ex = new AMQPExchange($chan); +} else { + print "Error connecting to channel\n"; + exit; } +$ex->setName("IRClog"); +$ex->setType("fanout"); +$ex->declare(); + + $chan->startTransaction(); +//$ex->publish($message, $routingKey, AMQP_NOPARAM, array("delivery_mode", "2")); $ex->publish($message, $routingKey); $chan->commitTransaction(); +print "Sent: " . $argv[1] . "\n"; ?>