-
Notifications
You must be signed in to change notification settings - Fork 38.6k
Description
Daniel Fernández opened SPR-14952 and commented
Scenario
Currently, the writeAndFlushWith(...)
method in the org.springframework.http.ReactiveHttpOutputMessage
interface looks like this:
/**
* Use the given {@link Publisher} of {@code Publishers} to write the body of the
* message to the underlying HTTP layer, flushing after each
* {@code Publisher<DataBuffer>}.
* @param body the body content publisher
* @return a {@link Mono} that indicates completion or error
*/
Mono<Void> writeAndFlushWith(Publisher<Publisher<DataBuffer>> body);
The signature in itself makes sense, but due to the way Java generics work this is not very flexible, because it requires the argument to be exactly of type Publisher<Publisher<DataBuffer>>
.
This can be uncomfortable for code calling this method like e.g. reactive views (implementations of org.springframework.web.reactive.result.view.View
), which will normally work with Flux<DataBuffer>
objects and may have code like:
final Flux<Flux<DataBuffer>> responseFlux = ...;
// Error: Publisher<Publisher<DataBuffer> is not assignable from Flux<Flux<DataBuffer>>
exchange.getResponse().writeAndFlushWith(responseFlux);
The above code would not compile. Publisher<Publisher<DataBuffer>
is not assignable from Flux<Flux<DataBuffer>>
.
Ending up with a Flux<Flux<DataBuffer>>
could be relatively common. For example, that's what the Flux#window(...)
methods return. Imagine we configure our data stream as Flux<DataBuffer>
and want to make sure that output is flushed every 3 data items:
final Flux<DataBuffer> dataStream = ...;
// Flux<T>#window(...) returns Flux<Flux<T>>
exchange.getResponse().writeAndFlushWith(dataStream.window(3));
Which, again, would not compile and would require some (ugly) fiddling with the generic types to make it compile.
Suggested solution
Modify the signature of ReactiveHttpOutputMessage#writeAndFlushWith
to:
Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<DataBuffer>> body);
Given Publisher<? extends Publisher<DataBuffer>>
is assignable from Flux<Flux<DataBuffer>
, this signature would be more flexible and directly usable.
Also, note that this way the signature of this method would better match the one used for equivalent purposes at reactor.ipc.netty.NettyOutbound
:
Mono<Void> sendGroups(Publisher<? extends Publisher<? extends ByteBuf>> dataStreams);
Affects: 5.0 M3