-
Notifications
You must be signed in to change notification settings - Fork 357
Description
Currently, the DuplexConnection looks like following:
Mono<Void> send(Publisher<Frame> frames);
Flux<Frame> receive();As discussed in #481, this interface leaks stream ids into the main implementation of reactor-core. This is required because the coordination of each logical stream (tying an outbound Publisher<Frame> to in an inbound Publisher<Frame>) is required to be above the DuplexConnection since the sending and receiving are split into two different method invocations. There is a desire to push the existence of a stream id down into transport layer abstractly because the specification treats them as optional, but more concretely because a forthcoming HTTP/2 transport would not need stream ids and this would lead to some serious work to start treating them as optional in the main reactor-core implementation.
To that end, I propose that the DuplexConnection interface be changed to:
/**
* Initiate an exchange as the requester.
*
* @param outbound the outbound {@link Frame}s
* @return the inbound {@link Frame}s
* @throws NullPointerException if {@code outbound} is {@code null}
*/
Flux<Frame> request(Publisher<Frame> outbound);
/**
* Respond to an exchange as the responder.
*
* @param responder a {@link Function} that transforms a {@link Flux} of inbound frames to a
* {@link Publisher} of outbound frames
* @throws NullPointerException if {@code responder} is {@code null}
*/
void respond(Function<Flux<Frame>, Publisher<Frame>> responder);This interface is quite similar to the original, but with the critical change that each method invocation of request() and each invocation of the Function passed into respond() constitute a “logical” stream (no stream id necessary).
An added benefit is that the Publishers passed around in this design have the proper Reactive Streams lifecycles. For example, an client implementation of RSocket#requestResponse() could make the call request(Mono.just(frame)).single() and ensure that only a single request frame was sent and a single response frame was received. The CANCEL signal sent up the Flux from the single() would be used to signal the transport implementation to cleanup any stored state for that logical stream.
On the server side, an implementation delegating to RSocket#requestResponse() could do so with respond(incoming -> Mono.from(incoming).flatMap(rsocket::requestResponse)) (this elides dispatching based on the first incoming Frame type, so more complex, but you get the idea). Again, the proper Reactive Streams lifecycle means that CANCEL and onComplete() can be used to trigger proper cleanup in the transport at the appropriate times.
Finally this change doesn’t make decorators more difficult and in some ways makes them less complex by isolating stream-specific state. For example, the FragmentingDuplexConnection is reduced to:
@Override
public Flux<Frame> request(Publisher<Frame> requests) {
Objects.requireNonNull(requests, "requests must not be null");
Flux<Frame> fragmented = Flux.from(requests).concatMap(frameFragmenter);
return getReassembled(delegate.request(fragmented));
}
@Override
public void respond(Function<Flux<Frame>, Publisher<Frame>> responder) {
Objects.requireNonNull(responder, "responder must not be null");
delegate.respond(
inbound -> {
Flux<Frame> reassembled = getReassembled(inbound);
return Flux.from(responder.apply(reassembled)).concatMap(frameFragmenter);
});
}
private Flux<Frame> getReassembled(Flux<Frame> flux) {
FrameReassembler frameReassembler = createFrameReassembler(byteBufAllocator);
return flux.handle(frameReassembler).doOnTerminate(frameReassembler::dispose);
}Note that the reassembler state is now isolated in the logical stream and the proper Reactive Streams lifecycle provides an opportunity to cleanup when the stream either ends organically or experiences an error. An example of a decorator that doesn’t need to maintain individual stream state can be found in the MicrometerDuplexConnection:
@Override
public Flux<Frame> request(Publisher<Frame> requests) {
Objects.requireNonNull(requests, "requests must not be null");
Flux<Frame> metricsOutbound = Flux.from(requests).doOnNext(outboundFrameCounters);
return delegate.request(metricsOutbound).doOnNext(inboundFrameCounters);
}
@Override
public void respond(Function<Flux<Frame>, Publisher<Frame>> responder) {
delegate.respond(
inbound ->
Flux.from(responder.apply(inbound.doOnNext(inboundFrameCounters)))
.doOnNext(outboundFrameCounters));
}In this example, each element is counted using doOnNext() invocations that decorate each inbound and outbound flow. They use a single set of counters for all streams, and all frames that cross those streams so no stream-scoped state.
With that, let’s being discussion.