Reactive socket communication for microservices
Switch branches/tags
Nothing to show
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Failed to load latest commit information.


Spring Cloud Sockets [We need a better name] Build Status Coverage

This repo is a POC on how to expose Spring Services via RSocket protocol.

This is a very early draft, I’ve cut a lot of corners to get something that at least runs.

It’s meant to attract interest and feedback on the programming model

If you are wondering why another remote protocol when we have things such as gRPC and REST already please take a moment to read RSocket motivations to understand how this is different.

Programming Model

RSocket is not a traditional RPC protocol where one can map directly a method with several parameters into a remote endpoint.

Instead it embraces the reactive async nature of message passing, therefore, methods are always expected to receive one parameter and return zero or one results.

The protocol defines 4 exchange modes

  • Fire and Forget

  • Request Response

  • Request Stream

  • Channel (bi directional channels)

Following Spring conventions, this project aims to allow developers to annotate their classes with a series of annotations that would expose the methods to be invoked over rsocket.

Common Reactive Annotations Properties

  • path: The path where the service method is exposed

  • mimeType: The mimeType used for encoding the payload (currently only application/java-serialized-object and application/json are supported)

About method handling

Reactive sockets is about messaging passing, so because of this any method annotated with any of the exchange modes explained bellow must follow the convention:

  • At least one argument is needed

  • In case of a single argument, that is mapped to the Payload from RSocket

  • In case of multiple arguments, at most one must be annotated with @Payload

  • @RequestManyMapping and @RequestStreamMapping methods must return a type Flux

  • @RequestStreamMapping methods must receive a type Flux as the payload argument (resolved as explained before)


One way methods map to a fire/forget in Rsocket and therefore any result from your Service will be ignored and not sent to the wire to use

@OneWayMapping(path="/receive", mimeType="application/json")
public void receive(Foo foo){



This is very similar to a traditional RPC scenario

@RequestOneMapping(path="/convert", mimeType="application/json")
public Bar convert(Foo foo){



RequestMany or Request/Stream is where reactive streams start to show it’s value on this protocol.

The server can open a channel and push data as it arrives to the client. The client control back pressure to the server using reactor backpressure support.

In the example bellow a service would emmit Temperature data on a live connection to the client.

@RequestManyMapping(path="/temperature", mimeType="application/json")
public Flux<Temperature> temperature(Location location){


In this mode, client and server keep a full duplex communication.

An example could be a client sending a Stream of Strings that the server hashes and returns to the client

@RequestStreamMapping(path="/hash", mimeType="application/json")
public Flux<Integer> hash(Flux<String> flux){
	return -> s.hashCode());

Configuring Server Side

On the server side, add @EnableReactiveSocket annotation to a Spring Configuration class:

public class MyReactiveSocketsApplication {


Any class scanned via boot’s classpath scanning that contains any of the annotations above, will be registered as a remote endpoint

Configuring Transport and Host/Port

As many boot applications, if not defined a default TCPTransport is used, and it binds to localhost on port 5000

To override the default properties just pass them on an file:


To use a different transport just provide a ServerTransport as a bean in your application, refer to rsocket-java to see the available implementations.

Configuring the Client

To use the client, just pass an interface of the service annotated with the same annotations.

public interface MyService {
	@RequestStreamMapping(value = "/hash", mimeType = "application/json")
	public Flux<Integer> hash(Flux<String> flux);

ReactiveSocketClient client = new ReactiveSocketClient("localhost", 5000);
Flux<String> flux = flux.just("A","B");
MyService service = client.create(MyService.class);

Short term goals

  • Provide a functional model to both server and client and not only annotation style

  • Create a Starter

  • @EnableReactiveSocketClient to allow injection of a client in the application context as well scanning any services such is done in Feign

  • Tests, Tests, Tests

  • Improve a lot the boilerplate code, revisit serialization options

  • Explore resume operations and backpressure