Skip to content

Commit f66c4dd

Browse files
author
Ann Witbrock
committed
initial commit client for rpc java tutorial
1 parent a9f76e2 commit f66c4dd

File tree

1 file changed

+74
-0
lines changed

1 file changed

+74
-0
lines changed

java/RPCClient.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import java.io.IOException;
2+
import com.rabbitmq.client.ConnectionFactory;
3+
import com.rabbitmq.client.Connection;
4+
import com.rabbitmq.client.Channel;
5+
import com.rabbitmq.client.QueueingConsumer;
6+
import com.rabbitmq.client.AMQP.BasicProperties;
7+
import java.util.UUID;
8+
9+
public class RPCClient {
10+
11+
private static final String RPC_QUEUE_NAME = "rpc_queue";
12+
13+
static class FibonacciClient {
14+
private Channel channel;
15+
private String queue;
16+
17+
public FibonacciClient(Channel chann, String queueName){
18+
channel = chann;
19+
queue = queueName;
20+
}
21+
22+
public String call(String message)
23+
throws java.io.IOException,
24+
java.lang.InterruptedException {
25+
String replyQueueName = channel.queueDeclare().getQueue();
26+
String corrId = UUID.randomUUID().toString();
27+
BasicProperties props = new BasicProperties();
28+
props.setReplyTo(replyQueueName);
29+
props.setCorrelationId(corrId);
30+
31+
QueueingConsumer consumer = new QueueingConsumer(channel);
32+
channel.basicConsume(replyQueueName, true, consumer);
33+
34+
channel.basicPublish("", queue, props, message.getBytes());
35+
36+
String response = "";
37+
38+
while (!validResponse(response)){
39+
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
40+
response = new String(delivery.getBody());
41+
}
42+
return response;
43+
}
44+
45+
private Boolean validResponse(String response){
46+
return (response.length() > 0);
47+
}
48+
}
49+
50+
private static Connection defaultConnection()throws java.io.IOException{
51+
ConnectionFactory factory = new ConnectionFactory();
52+
factory.setHost("localhost");
53+
return factory.newConnection();
54+
}
55+
56+
public static void main(String[] argv)
57+
throws java.io.IOException,
58+
java.lang.InterruptedException {
59+
60+
Connection connection = defaultConnection();
61+
Channel channel = connection.createChannel();
62+
63+
RPCClient.FibonacciClient rpc = new RPCClient.FibonacciClient(channel, RPC_QUEUE_NAME);
64+
65+
System.out.println(" [x] Requesting fib(30)");
66+
String response = rpc.call("30");
67+
68+
System.out.println(" [.] Got '" + response + "'");
69+
70+
channel.close();
71+
connection.close();
72+
}
73+
}
74+

0 commit comments

Comments
 (0)