Skip to content

Commit

Permalink
Defensively register Flow bridge in delegate class
Browse files Browse the repository at this point in the history
  • Loading branch information
jhoeller committed Aug 8, 2023
1 parent 3b09375 commit c4896ac
Showing 1 changed file with 13 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ public class ReactiveAdapterRegistry {
* Create a registry and auto-register default adapters.
* @see #getSharedInstance()
*/
@SuppressWarnings("unchecked")
public ReactiveAdapterRegistry() {
// Defensive guard for the Reactive Streams API itself
if (!reactiveStreamsPresent) {
Expand Down Expand Up @@ -115,10 +114,7 @@ public ReactiveAdapterRegistry() {

// Simple Flow.Publisher bridge if Reactor is not present
if (!reactorPresent) {
this.adapters.add(new ReactiveAdapter(
ReactiveTypeDescriptor.multiValue(Flow.Publisher.class, () -> PublisherToRS.EMPTY_FLOW),
source -> new PublisherToRS<>((Flow.Publisher<Object>) source),
source -> new PublisherToFlow<>((Publisher<Object>) source)));
new FlowBridgeRegistrar().registerAdapter(this);
}
}

Expand Down Expand Up @@ -375,6 +371,18 @@ void registerAdapters(ReactiveAdapterRegistry registry) {
}


private static class FlowBridgeRegistrar {

@SuppressWarnings("unchecked")
void registerAdapter(ReactiveAdapterRegistry registry) {
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Flow.Publisher.class, () -> PublisherToRS.EMPTY_FLOW),
source -> new PublisherToRS<>((Flow.Publisher<Object>) source),
source -> new PublisherToFlow<>((Publisher<Object>) source));
}
}


private static class PublisherToFlow<T> implements Flow.Publisher<T> {

private static final Flow.Subscription EMPTY_SUBSCRIPTION = new Flow.Subscription() {
Expand Down

0 comments on commit c4896ac

Please sign in to comment.