Skip to content

Commit 782ffda

Browse files
author
Ann Witbrock
committed
exceptions
1 parent 6244ae4 commit 782ffda

File tree

2 files changed

+122
-83
lines changed

2 files changed

+122
-83
lines changed

java/RPCClient.java

Lines changed: 62 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -4,66 +4,82 @@
44
import com.rabbitmq.client.QueueingConsumer;
55
import com.rabbitmq.client.AMQP.BasicProperties;
66
import java.util.UUID;
7-
7+
88
public class RPCClient {
9-
10-
private static final String RPC_QUEUE_NAME = "rpc_queue";
11-
12-
static class FibonacciClient {
9+
10+
static class FibonacciRpcClient {
11+
private Connection connection;
1312
private Channel channel;
14-
private String queue;
13+
private String requestQueueName = "rpc_queue";
14+
private String replyQueueName;
15+
private QueueingConsumer consumer;
1516

16-
public FibonacciClient(Channel chann, String queueName){
17-
channel = chann;
18-
queue = queueName;
17+
public FibonacciRpcClient() throws Exception {
18+
ConnectionFactory factory = new ConnectionFactory();
19+
factory.setHost("localhost");
20+
connection = factory.newConnection();
21+
channel = connection.createChannel();
22+
23+
replyQueueName = channel.queueDeclare().getQueue();
24+
consumer = new QueueingConsumer(channel);
25+
channel.basicConsume(replyQueueName, true, consumer);
1926
}
20-
21-
public String call(String message) throws Exception {
22-
String replyQueueName = channel.queueDeclare().getQueue();
27+
28+
public String call(String message) throws Exception {
29+
String response = null;
30+
boolean replied = false;
2331
String corrId = UUID.randomUUID().toString();
32+
2433
BasicProperties props = new BasicProperties();
2534
props.setReplyTo(replyQueueName);
2635
props.setCorrelationId(corrId);
27-
28-
QueueingConsumer consumer = new QueueingConsumer(channel);
29-
channel.basicConsume(replyQueueName, true, consumer);
30-
31-
channel.basicPublish("", queue, props, message.getBytes());
32-
33-
String response = "";
34-
35-
while (!validResponse(response)){
36-
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
37-
response = new String(delivery.getBody());
36+
37+
channel.basicPublish("", requestQueueName, props, message.getBytes());
38+
39+
while (replied == false) {
40+
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
41+
if (delivery.getProperties().getCorrelationId().compareTo(corrId) == 0) {
42+
response = new String(delivery.getBody());
43+
replied = true;
44+
}
3845
}
39-
return response;
46+
47+
return response;
4048
}
4149

42-
private Boolean validResponse(String response){
43-
return (response.length() > 0);
50+
public void close() throws Exception {
51+
channel.close();
52+
connection.close();
4453
}
4554
}
4655

47-
private static Connection defaultConnection() throws Exception {
48-
ConnectionFactory factory = new ConnectionFactory();
49-
factory.setHost("localhost");
50-
return factory.newConnection();
51-
}
56+
public static void main(String[] argv) {
57+
RPCClient.FibonacciRpcClient fibonacciRpc = null;
58+
String response = null;
59+
try {
60+
fibonacciRpc = new RPCClient.FibonacciRpcClient();
5261

53-
public static void main(String[] argv) throws Exception {
54-
55-
Connection connection = defaultConnection();
56-
Channel channel = connection.createChannel();
57-
58-
RPCClient.FibonacciClient rpc = new RPCClient.FibonacciClient(channel, RPC_QUEUE_NAME);
59-
60-
System.out.println(" [x] Requesting fib(30)");
61-
String response = rpc.call("30");
62-
63-
System.out.println(" [.] Got '" + response + "'");
64-
65-
channel.close();
66-
connection.close();
67-
}
62+
System.out.println(" [x] Requesting fib(30)");
63+
response = fibonacciRpc.call("30");
64+
System.out.println(" [.] Got '" + response + "'");
65+
System.out.println(" [x] Requesting fib(-1)");
66+
response = fibonacciRpc.call("-1");
67+
System.out.println(" [.] Got '" + response + "'");
68+
System.out.println(" [x] Requesting fib(a)");
69+
response = fibonacciRpc.call("a");
70+
System.out.println(" [.] Got '" + response + "'");
71+
}
72+
catch (Exception e) {
73+
e.printStackTrace();
74+
}
75+
finally {
76+
if (fibonacciRpc!= null) {
77+
try {
78+
fibonacciRpc.close();
79+
}
80+
catch (Exception ignore) {}
81+
}
82+
}
83+
}
6884
}
6985

java/RPCServer.java

Lines changed: 60 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -5,47 +5,70 @@
55
import com.rabbitmq.client.AMQP.BasicProperties;
66

77
public class RPCServer {
8-
8+
99
private static final String RPC_QUEUE_NAME = "rpc_queue";
10-
11-
public static void main(String[] argv) throws Exception {
12-
13-
ConnectionFactory factory = new ConnectionFactory();
14-
factory.setHost("localhost");
15-
Connection connection = factory.newConnection();
16-
Channel channel = connection.createChannel();
17-
18-
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
19-
20-
channel.basicQos(1);
21-
22-
QueueingConsumer consumer = new QueueingConsumer(channel);
23-
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
10+
11+
private static int fib(int n) throws Exception {
12+
if (n > 1) return fib(n-1) + fib(n-2);
13+
else return n;
14+
}
2415

25-
System.out.println(" [x] Awaiting RPC requests");
26-
27-
while (true) {
28-
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
29-
String message = new String(delivery.getBody());
30-
int n = Integer.parseInt(message);
31-
32-
System.out.println(" [.] fib(" + message + ")");
33-
String response = "" + fib(n);
16+
public static void main(String[] argv) {
17+
Connection connection = null;
18+
try {
19+
ConnectionFactory factory = new ConnectionFactory();
20+
factory.setHost("localhost");
21+
22+
connection = factory.newConnection();
23+
Channel channel = connection.createChannel();
3424

35-
BasicProperties props = delivery.getProperties();
36-
37-
channel.basicPublish( "", props.getReplyTo(), props, response.getBytes());
38-
39-
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
40-
}
41-
}
25+
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
4226

43-
private static int fib(int n) throws Exception {
44-
if (n == 0)
45-
return 0;
46-
else if (n == 1)
47-
return 1;
48-
else return fib(n-1) + fib(n-2);
27+
channel.basicQos(1);
28+
29+
QueueingConsumer consumer = new QueueingConsumer(channel);
30+
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
31+
32+
System.out.println(" [x] Awaiting RPC requests");
33+
34+
while (true) {
35+
String response = null;
36+
37+
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
38+
39+
BasicProperties props = delivery.getProperties();
40+
BasicProperties replyProps = new BasicProperties();
41+
replyProps.setCorrelationId(props.getCorrelationId());
42+
43+
try {
44+
String message = new String(delivery.getBody());
45+
int n = Integer.parseInt(message);
46+
47+
System.out.println(" [.] fib(" + message + ")");
48+
response = "" + fib(n);
49+
}
50+
catch (Exception e){
51+
System.out.println(" [.] " + e.toString());
52+
response = "";
53+
}
54+
finally {
55+
channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());
56+
57+
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
58+
}
59+
}
60+
}
61+
catch (Exception e) {
62+
e.printStackTrace();
63+
}
64+
finally {
65+
if (connection != null) {
66+
try {
67+
connection.close();
68+
}
69+
catch (Exception ignore) {}
70+
}
71+
}
4972
}
5073
}
5174

0 commit comments

Comments
 (0)