Skip to content

Commit

Permalink
Simplified RPC examples
Browse files Browse the repository at this point in the history
  • Loading branch information
michellammertink committed Sep 20, 2022
1 parent e4fee05 commit a04cfcc
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 55 deletions.
14 changes: 6 additions & 8 deletions java/RPCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.*;

public class RPCClient implements AutoCloseable {

Expand All @@ -31,12 +29,12 @@ public static void main(String[] argv) {
String response = fibonacciRpc.call(i_str);
System.out.println(" [.] Got '" + response + "'");
}
} catch (IOException | TimeoutException | InterruptedException e) {
} catch (IOException | TimeoutException | InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}

public String call(String message) throws IOException, InterruptedException {
public String call(String message) throws IOException, InterruptedException, ExecutionException {
final String corrId = UUID.randomUUID().toString();

String replyQueueName = channel.queueDeclare().getQueue();
Expand All @@ -48,16 +46,16 @@ public String call(String message) throws IOException, InterruptedException {

channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
final CompletableFuture<String> response = new CompletableFuture<>();

String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response.offer(new String(delivery.getBody(), "UTF-8"));
response.complete(new String(delivery.getBody(), "UTF-8"));
}
}, consumerTag -> {
});

String result = response.take();
String result = response.get();
channel.basicCancel(ctag);
return result;
}
Expand Down
77 changes: 30 additions & 47 deletions java/RPCServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,53 +14,36 @@ public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.queuePurge(RPC_QUEUE_NAME);

channel.basicQos(1);

System.out.println(" [x] Awaiting RPC requests");

Object monitor = new Object();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();

String response = "";

try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);

System.out.println(" [.] fib(" + message + ")");
response += fib(n);
} catch (RuntimeException e) {
System.out.println(" [.] " + e.toString());
} finally {
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC server owner thread
synchronized (monitor) {
monitor.notify();
}
}
};

channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
// Wait and be prepared to consume the message from RPC client.
while (true) {
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.queuePurge(RPC_QUEUE_NAME);

channel.basicQos(1);

System.out.println(" [x] Awaiting RPC requests");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();

String response = "";
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);

System.out.println(" [.] fib(" + message + ")");
response += fib(n);
} catch (RuntimeException e) {
System.out.println(" [.] " + e);
} finally {
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
};

channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> {}));
}
}

0 comments on commit a04cfcc

Please sign in to comment.