Skip to content
No description, website, or topics provided.
Branch: master
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Type Name Latest commit message Commit time
Failed to load latest commit information.

CoffeeSocket Reactive Architecture demo


In this demo project a number of technologies are combined to display the use of Reactive Streams throughout an entire architecture.


  • A running RabbitMQ server (preferably on localhost)
  • A running MongoDB database server (preferably on localhost)


Simply start up the PaymentService, CoffeeSocketService and CoffeeSocketClient applications.

In case you don't have your RabbitMQ server and MongoDB database server running on localhost, you will need to apply some additional configuration in the files of the different applications.

What happens?

There is one single flow that moves through the different applications. This flow forms one big Reactive Stream, applying back pressure all the way.

On a functional level, the flow starts with the CoffeeSocketClient, requesting paid CoffeeOrders from the CoffeeSocketService. These start with "Mock orders", CoffeeOrders we pretend entering the system. For these orders we retrieve the price by getting the Coffee information from a MongoDB database. After we retrieved the price, we create some payment information, that we send to the PaymentService. After the payment has been taken care of in the PaymentService, we finally notify the CoffeeSocketService about the CoffeeOrder.

Architectural drawing


On a technical level, the flow starts at the CoffeeSocketClient application. This application runs on Spring Boot along with Project Reactor to facilitate its internal Reactive Streams. It also uses the Spring RSocket integration to enable Reactive Streams using TCP (or other technologies like Websocket, Aeron, etc) as the transport layer.

After starting up the CoffeeSocketClient application, it will do a call to the CoffeeSocketService using an RSocketRequester. This call will set up a Reactive Stream between the CoffeeSocketClient and CoffeeSocketService. The CoffeeSocketClient will request a number of messages (e.g. "256") to the CoffeeSocketService, which will make the CoffeeSocketService send up to that number of results to the CoffeeSocketClient. Neither application has to block threads at any point, but simply react to new information flowing in.

private final RSocketRequester requester;
public Flux<CoffeeOrder> receiveCoffeeOrders() {
	return this.requester
		.data(new CoffeeServerSubscription("WaiterName"))

The RSocket connection will apply a backpressure mechanism we can see in "normal" Reactive Streams as well. The CoffeeSocketClient will ask for a number of elements from the CoffeeSocketService which will then be delivered, as long as the CoffeeSocketService gets them from upstream. The moment the maximum amount of elements are delivered, more will be requested. This happens until the CoffeeSocketService signals the stream has completed or an error has occurred, which ends the stream.

It's interesting to note that through using RSocket, the CoffeeOrderClient and CoffeeOrderService move from a Client-Server architecture to a Peer to Peer one. It's even possible to have a two-way connection of Fluxes, where both parties apply backpressure and flow data to each other!


The CoffeeSocketService also runs Spring Boot along with Project Reactor and the Spring RSocket integration. It offers an RSocket message handler to receive the coffeeOrders request from the CoffeeSocketClient.

public Flux<CoffeeOrder> coffeeOrdersStream(CoffeeServerSubscription request) {
    return orders()
            coffeeOrder -> coffeeRepository.findOneByCoffeeType(coffeeOrder.getCoffeeType())
                .flatMap(ct -> paymentSender.sendMessage(new PaymentInformation(coffeeOrder.getTableNumber(), ct.getPrice())))
                .map(paymentResult -> coffeeOrder)

When the coffeeOrdersStream method gets called through the message listener, a Flux with coffeeOrder objects will be created, forming the start of our Reactive Pipeline.

These coffeeOrder objects are randomly generated by the CoffeeSocketService through the orders method but we can pretend they come from somewhere else, like through REST calls, a Kafka Topic, or an AMQP queue.

Each of these orders will go through a sequence of different steps, increasing the length of our Reactive Pipeline.

The first step is a call to a MongoDB database using the spring-boot-starter-data-mongodb-reactive library, which enables Reactive Streams from MongoDB. We retrieve a Coffee from the database, containing some details on the price of the coffee. We get a Mono<Coffee> from the database, which will make the pipeline continue after the value is retrieved from the database. This means we don't need to block a Thread waiting for the result of the database.

After we receive the Coffee object, our next goal is to send the cost of the order to the PaymentService so it can be processed further there. The processing will simply be printing the cost to the command line, but we can pretend it does some more interesting things. After the message has been printed, we want to continue our stream by sending the CoffeeOrder to the client.

The PaymentService is some kind of stateless worker application. This means it's an ideal use case for AMQP using a RabbitMQ server. By using AMQP queues, we can easily scale our worker applications horizontally. When we require more processing, we spin up more workers and vice versa.

Because we want to make this a part of our Reactive Streams we use the Reactor RabbitMQ library. In it we find an RpcClient that we can use to do an asynchronous request/reply to the PaymentService and back again.

The CoffeeSocketService does not need to actively wait for a reply by blocking a thread or applying busy waiting, but can just react further to new incoming events until a response is received on the reply queue.

public void onInit() {
    String queue = "rpc.server.queue";
    Sender sender = RabbitFlux.createSender();
    rpcClient = sender.rpcClient("", queue);

public void preDestroy() {

public Mono<PaymentResult> sendMessage(PaymentInformation paymentInformation) {
    return rpcClient.rpc(Mono.just(
            new RpcClient.RpcRequest(toBinary(paymentInformation))
    )).map(delivery -> fromBinary(delivery.getBody(), PaymentResult.class));


The PaymentService itself has an RPCServer running to handle the processing of the incoming message. After the processing is done, the reply will be placed on the reply queue so the processing can continue in the CoffeeSocketService

RpcServer rpcServer = new RpcServer(ch, "rpc.server.queue") {
    public byte[] handleCall(byte[] requestBody, AMQP.BasicProperties replyProperties) {
        return toBinary(handlePayment(fromBinary(requestBody, PaymentInformation.class)));


After we receive the reply of the PaymentService in the CoffeeSocketService we finally pass the CoffeeOrder object along the Reactive Stream again, this time sending it over our original RSocket call as a result. When the CoffeeSocketClient receives it, its Reactive Stream continues and prints it to the command line, forming the end of our stream.


In this project we looked at two interesting ways to handle communication through Reactive Streams in a Reactive architecture. By making one big Reactive Stream over Project Reactor in multiple JVMs, along with an RSocket connection and RabbitMQ server we were able to apply backpressure from front to back. New CoffeeOrders streaming in will go over all these moving parts, but no blocking needs to happen at any point.

RSocket enables us a two-way communication stream between two different applications. This stream has all the advantages of Reactive Streams on top of the network level, and can even enable us to do two-way streams.

We also took a look at using RabbitMQ to do RPC calls to worker applications. These stateless worker applications can scale easily because they're behind a queue, while we can be sure we don't kill our application's performance by blocking threads waiting for a reply.

You can’t perform that action at this time.