Skip to content
This repository has been archived by the owner on Jul 16, 2020. It is now read-only.

Commit

Permalink
Merge branch 'master' of git@github.com:rabbitmq/rmq-0mq
Browse files Browse the repository at this point in the history
Conflicts:
	examples/pipeline/rabbitmq.config
	examples/pubsub/rabbitmq.config
	examples/reqrep/rabbitmq.config
  • Loading branch information
squaremo committed Sep 10, 2010
2 parents 7421591 + 41547b1 commit e7bff68
Show file tree
Hide file tree
Showing 34 changed files with 110 additions and 35 deletions.
Binary file added doc/pipeline0.dia
Binary file not shown.
Binary file added doc/pipeline0.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/pipeline1.dia
Binary file not shown.
Binary file modified doc/pipeline1.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/pipeline2.dia
Binary file not shown.
Binary file modified doc/pipeline2.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/pubsub0.dia
Binary file not shown.
Binary file added doc/pubsub0.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/pubsub1.dia
Binary file not shown.
Binary file modified doc/pubsub1.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/reqrep0.dia
Binary file not shown.
Binary file added doc/reqrep0.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/reqrep1.dia
Binary file not shown.
Binary file modified doc/reqrep1.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/reqrep2.dia
Binary file not shown.
Binary file modified doc/reqrep2.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/reqrep3.dia
Binary file not shown.
Binary file modified doc/reqrep3.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/reqrep4.dia
Binary file not shown.
Binary file modified doc/reqrep4.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/reqrep5.dia
Binary file not shown.
Binary file modified doc/reqrep5.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
42 changes: 42 additions & 0 deletions examples/pipeline/pull.java
@@ -0,0 +1,42 @@

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class pull {
public static void main(String[] args) {
try {

// By default connect to the local AMQP broker
String hostName = (args.length > 0) ? args[0] : "localhost";
int portNumber = (args.length > 1) ?
Integer.parseInt(args[1]) : AMQP.PROTOCOL.PORT;

// Connect to the AMQP broker
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(hostName);
factory.setPort(portNumber);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// Establish the PUB/SUB wiring
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "PIPELINE", null);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);

for (;;) {

// Get next request
String msg = consumer.nextDelivery().getBody().toString();
System.out.println (msg);
}
} catch (Exception e) {
System.err.println("Main thread caught exception: " + e);
e.printStackTrace();
System.exit(1);
}
}
}
1 change: 0 additions & 1 deletion examples/pipeline/pull.py
@@ -1,6 +1,5 @@

import zmq
import time

context = zmq.Context()
socket = context.socket(zmq.PULL)
Expand Down
39 changes: 39 additions & 0 deletions examples/pipeline/push.java
@@ -0,0 +1,39 @@

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class push {
public static void main(String[] args) {
try {

// By default connect to the local AMQP broker
String hostName = (args.length > 0) ? args[0] : "localhost";
int portNumber = (args.length > 1) ?
Integer.parseInt(args[1]) : AMQP.PROTOCOL.PORT;

// Connect to the AMQP broker
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(hostName);
factory.setPort(portNumber);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

for (;;) {

// Send a message.
channel.basicPublish("PIPELINE", null, null,
"Hello, World???".getBytes());

// Sleep for one second.
Thread.sleep (1000);
}
} catch (Exception e) {
System.err.println("Main thread caught exception: " + e);
e.printStackTrace();
System.exit(1);
}
}
}
4 changes: 2 additions & 2 deletions examples/pipeline/rabbitmq.config
@@ -1,4 +1,4 @@
[{r0mq,
[{services,
[{push, "tcp://127.0.0.1:5557", <<"pipeline">>},
{pull, "tcp://127.0.0.1:5558", <<"pipeline">>}]}]}].
[{push, "tcp://127.0.0.1:5557", <<"PIPELINE">>},
{pull, "tcp://127.0.0.1:5558", <<"PIPELINE">>}]}]}].
5 changes: 1 addition & 4 deletions examples/pubsub/pub.java
Expand Up @@ -21,13 +21,10 @@ public static void main(String[] args) {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// Establish the PUB/SUB wiring.
channel.exchangeDeclare("HELLO_FEED", "fanout");

for (;;) {

// Send a message.
channel.basicPublish("HELLO_FEED", null, null,
channel.basicPublish("PUBSUB", null, null,
"Hello, World???".getBytes());

// Sleep for one second.
Expand Down
4 changes: 2 additions & 2 deletions examples/pubsub/rabbitmq.config
@@ -1,4 +1,4 @@
[{r0mq,
[{services,
[{pub, "tcp://127.0.0.1:5555", <<"HELLO_FEED">>},
{sub, "tcp://127.0.0.1:5556", <<"HELLO_FEED">>}]}]}].
[{pub, "tcp://127.0.0.1:5555", <<"PUBSUB">>},
{sub, "tcp://127.0.0.1:5556", <<"PUBSUB">>}]}]}].
5 changes: 2 additions & 3 deletions examples/pubsub/sub.java
Expand Up @@ -21,10 +21,9 @@ public static void main(String[] args) {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// Establish the PUB/SUB wiring.
channel.exchangeDeclare("HELLO_FEED", "fanout");
// Establish the PUB/SUB wiring
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "HELLO_FEED", null);
channel.queueBind(queueName, "PUBSUB", null);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);

Expand Down
8 changes: 2 additions & 6 deletions examples/reqrep/rabbitmq.config
@@ -1,8 +1,4 @@
[{r0mq,
[{services,
[{req,
"tcp://127.0.0.1:5559",
<<"HELLO_WORLD">>},
{rep,
"tcp://127.0.0.1:5560",
<<"HELLO_WORLD">>}]}]}].
[{req, "tcp://127.0.0.1:5559", <<"REQREP">>},
{rep, "tcp://127.0.0.1:5560", <<"REQREP">>}]}]}].
6 changes: 4 additions & 2 deletions examples/reqrep/rep.java
Expand Up @@ -22,16 +22,18 @@ public static void main(String[] args) {
Channel channel = connection.createChannel();

// Establish the REQ/REP wiring.
channel.queueDeclare("HELLO_WORLD", true, false, false, null);
channel.queueDeclare("REQREP", true, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("HELLO_WORLD", true, consumer);
channel.basicConsume("REQREP", true, consumer);

for (;;) {

// Get next request
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String replyTo = delivery.getProperties().getReplyTo();

System.err.println("processing request");

// Send the reply
channel.basicPublish(null, replyTo, null, "World!".getBytes());
}
Expand Down
1 change: 1 addition & 0 deletions examples/reqrep/rep.py
Expand Up @@ -7,4 +7,5 @@

while True:
message = socket.recv()
print "processing request"
socket.send("World!")
26 changes: 13 additions & 13 deletions examples/reqrep/req.java
Expand Up @@ -26,19 +26,19 @@ public static void main(String[] args) {
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);

for (;;) {

// Send the request
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setReplyTo(queueName);
channel.basicPublish("", "HELLO_WORLD", properties,
"Hello!".getBytes());

// Get and print the reply
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String reply = new String(delivery.getBody());
System.out.println(reply);
}
// Send the request
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setReplyTo(queueName);
channel.basicPublish("", "REQREP", properties,
"Hello!".getBytes());

// Get and print the reply
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String reply = new String(delivery.getBody());
System.out.println(reply);

connection.close();

} catch (Exception e) {
System.err.println("Main thread caught exception: " + e);
e.printStackTrace();
Expand Down
4 changes: 2 additions & 2 deletions examples/reqrep/req.py
Expand Up @@ -6,5 +6,5 @@
socket.connect("tcp://127.0.0.1:5559")

socket.send("Hello!")
rep = socket.recv()
print rep
print socket.recv()

0 comments on commit e7bff68

Please sign in to comment.