Skip to content

Commit

Permalink
Compatibility with Flow-based SmallRye Mutiny 2 at runtime
Browse files Browse the repository at this point in the history
Includes simple Flow.Publisher bridge without Reactor.

Closes gh-31000
  • Loading branch information
jhoeller committed Aug 6, 2023
1 parent 3253d2d commit 40b33bc
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.core;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand All @@ -24,9 +25,9 @@
import java.util.concurrent.Flow;
import java.util.function.Function;

import kotlinx.coroutines.CompletableDeferredKt;
import kotlinx.coroutines.Deferred;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.adapter.JdkFlowAdapter;
import reactor.blockhound.BlockHound;
import reactor.blockhound.integration.BlockHoundIntegration;
Expand All @@ -36,15 +37,18 @@
import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.springframework.util.ReflectionUtils;

/**
* A registry of adapters to adapt Reactive Streams {@link Publisher} to/from
* various async/reactive types such as {@code CompletableFuture}, RxJava
* {@code Flowable}, and others.
* A registry of adapters to adapt Reactive Streams {@link Publisher} to/from various
* async/reactive types such as {@code CompletableFuture}, RxJava {@code Flowable}, etc.
* This is designed to complement Spring's Reactor {@code Mono}/{@code Flux} support while
* also being usable without Reactor, e.g. just for {@code org.reactivestreams} bridging.
*
* <p>By default, depending on classpath availability, adapters are registered
* for Reactor, RxJava 3, {@link CompletableFuture}, {@code Flow.Publisher},
* and Kotlin Coroutines' {@code Deferred} and {@code Flow}.
* <p>By default, depending on classpath availability, adapters are registered for Reactor
* (including {@code CompletableFuture} and {@code Flow.Publisher} adapters), RxJava 3,
* Kotlin Coroutines' {@code Deferred} (bridged via Reactor) and SmallRye Mutiny 1.x/2.x.
* If Reactor is not present, a simple {@code Flow.Publisher} bridge will be registered.
*
* @author Rossen Stoyanchev
* @author Sebastien Deleuze
Expand Down Expand Up @@ -79,6 +83,7 @@ public class ReactiveAdapterRegistry {
* Create a registry and auto-register default adapters.
* @see #getSharedInstance()
*/
@SuppressWarnings("unchecked")
public ReactiveAdapterRegistry() {
// Reactor
if (reactorPresent) {
Expand All @@ -99,6 +104,14 @@ public ReactiveAdapterRegistry() {
if (mutinyPresent) {
new MutinyRegistrar().registerAdapters(this);
}

// Simple Flow.Publisher bridge if Reactor is not present
if (!reactorPresent) {
registerReactiveType(
ReactiveTypeDescriptor.multiValue(Flow.Publisher.class, () -> PublisherToRS.EMPTY_FLOW),
source -> new PublisherToRS<>((Flow.Publisher<Object>) source),
source -> new PublisherToFlow<>((Publisher<Object>) source));

This comment has been minimized.

Copy link
@making

making Aug 8, 2023

Member

Updating to the latest snapshot started giving the following error:

2023-08-08T15:59:45.697+09:00  INFO 30904 --- [blog-api] [           main] [,] .s.b.a.l.ConditionEvaluationReportLogger : 

Error starting ApplicationContext. To display the condition evaluation report re-run your application with 'debug' enabled.
2023-08-08T15:59:45.706+09:00 ERROR 30904 --- [blog-api] [           main] [,] o.s.boot.SpringApplication               : Application run failed

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'requestMappingHandlerAdapter' defined in class path resource [org/springframework/boot/autoconfigure/web/servlet/WebMvcAutoConfiguration$EnableWebMvcConfiguration.class]: Failed to instantiate [org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter]: Factory method 'requestMappingHandlerAdapter' threw exception with message: org/reactivestreams/Publisher
        at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:654)
        at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:642)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1336)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1166)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:563)
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:523)
        at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:325)
        at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
        at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:323)
        at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
        at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:973)
        at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:939)
        at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:608)
        at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146)
        at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:734)
        at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:436)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:312)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1306)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1295)
        at am.ik.blog.BlogApiApplication.main(BlogApiApplication.java:13)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49)
        at org.springframework.boot.loader.Launcher.launch(Launcher.java:95)
        at org.springframework.boot.loader.Launcher.launch(Launcher.java:58)
        at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:65)
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter]: Factory method 'requestMappingHandlerAdapter' threw exception with message: org/reactivestreams/Publisher
        at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:171)
        at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:650)
        ... 27 common frames omitted
Caused by: java.lang.NoClassDefFoundError: org/reactivestreams/Publisher
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.<init>(RequestMappingHandlerAdapter.java:181)
        at org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport.createRequestMappingHandlerAdapter(WebMvcConfigurationSupport.java:681)
        at org.springframework.boot.autoconfigure.web.servlet.WebMvcAutoConfiguration$EnableWebMvcConfiguration.createRequestMappingHandlerAdapter(WebMvcAutoConfiguration.java:413)
        at org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport.requestMappingHandlerAdapter(WebMvcConfigurationSupport.java:650)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:139)
        ... 28 common frames omitted
Caused by: java.lang.ClassNotFoundException: org.reactivestreams.Publisher
        at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:587)
        at org.springframework.boot.loader.LaunchedURLClassLoader.loadClass(LaunchedURLClassLoader.java:149)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
        ... 37 common frames omitted

I suspect this line is relevant.

This comment has been minimized.

Copy link
@bclozel

bclozel Aug 8, 2023

Member

This should be addressed in the latest snapshots already with dedb58f

This comment has been minimized.

Copy link
@making

making Aug 8, 2023

Member

with c4896ac commit, the above error disappeared. Thanks!

}
}


Expand Down Expand Up @@ -304,9 +317,9 @@ private static class CoroutinesRegistrar {
@SuppressWarnings("KotlinInternalInJava")
void registerAdapters(ReactiveAdapterRegistry registry) {
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(Deferred.class,
() -> CompletableDeferredKt.CompletableDeferred(null)),
source -> CoroutinesUtils.deferredToMono((Deferred<?>) source),
ReactiveTypeDescriptor.singleOptionalValue(kotlinx.coroutines.Deferred.class,
() -> kotlinx.coroutines.CompletableDeferredKt.CompletableDeferred(null)),
source -> CoroutinesUtils.deferredToMono((kotlinx.coroutines.Deferred<?>) source),
source -> CoroutinesUtils.monoToDeferred(Mono.from(source)));

registry.registerReactiveType(
Expand All @@ -319,20 +332,182 @@ void registerAdapters(ReactiveAdapterRegistry registry) {

private static class MutinyRegistrar {

private static final Method uniToPublisher = ClassUtils.getMethod(io.smallrye.mutiny.groups.UniConvert.class, "toPublisher");

@SuppressWarnings("unchecked")
void registerAdapters(ReactiveAdapterRegistry registry) {
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(
io.smallrye.mutiny.Uni.class,
() -> io.smallrye.mutiny.Uni.createFrom().nothing()),
uni -> ((io.smallrye.mutiny.Uni<?>) uni).convert().toPublisher(),
publisher -> io.smallrye.mutiny.Uni.createFrom().publisher(publisher));
ReactiveTypeDescriptor uniDesc = ReactiveTypeDescriptor.singleOptionalValue(
io.smallrye.mutiny.Uni.class,
() -> io.smallrye.mutiny.Uni.createFrom().nothing());
ReactiveTypeDescriptor multiDesc = ReactiveTypeDescriptor.multiValue(
io.smallrye.mutiny.Multi.class,
() -> io.smallrye.mutiny.Multi.createFrom().empty());

if (Flow.Publisher.class.isAssignableFrom(uniToPublisher.getReturnType())) {
// Mutiny 2 based on Flow.Publisher
Method uniPublisher = ClassUtils.getMethod(io.smallrye.mutiny.groups.UniCreate.class, "publisher", Flow.Publisher.class);
Method multiPublisher = ClassUtils.getMethod(io.smallrye.mutiny.groups.MultiCreate.class, "publisher", Flow.Publisher.class);
registry.registerReactiveType(uniDesc,
uni -> new PublisherToRS<>((Flow.Publisher<Object>) ReflectionUtils.invokeMethod(uniToPublisher, ((io.smallrye.mutiny.Uni<?>) uni).convert())),
publisher -> ReflectionUtils.invokeMethod(uniPublisher, io.smallrye.mutiny.Uni.createFrom(), new PublisherToFlow<>(publisher)));
registry.registerReactiveType(multiDesc,
multi -> new PublisherToRS<>((Flow.Publisher<Object>) multi),
publisher -> ReflectionUtils.invokeMethod(multiPublisher, io.smallrye.mutiny.Multi.createFrom(), new PublisherToFlow<>(publisher)));
}
else {
// Mutiny 1 based on Reactive Streams
registry.registerReactiveType(uniDesc,
uni -> ((io.smallrye.mutiny.Uni<?>) uni).convert().toPublisher(),
publisher -> io.smallrye.mutiny.Uni.createFrom().publisher(publisher));
registry.registerReactiveType(multiDesc,
multi -> (io.smallrye.mutiny.Multi<?>) multi,
publisher -> io.smallrye.mutiny.Multi.createFrom().publisher(publisher));
}
}
}

registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(
io.smallrye.mutiny.Multi.class,
() -> io.smallrye.mutiny.Multi.createFrom().empty()),
multi -> (io.smallrye.mutiny.Multi<?>) multi,
publisher -> io.smallrye.mutiny.Multi.createFrom().publisher(publisher));

private static class PublisherToFlow<T> implements Flow.Publisher<T> {

private static final Flow.Subscription EMPTY_SUBSCRIPTION = new Flow.Subscription() {
@Override
public void request(long n) {
}
@Override
public void cancel() {
}
};

@Nullable
private final Publisher<T> publisher;

public PublisherToFlow(@Nullable Publisher<T> publisher) {
this.publisher = publisher;
}

@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
if (this.publisher != null) {
this.publisher.subscribe(new SubscriberToFlow<>(subscriber));
}
else {
subscriber.onSubscribe(EMPTY_SUBSCRIPTION);
subscriber.onComplete();
}
}
}


private static class PublisherToRS<T> implements Publisher<T> {

private static final Flow.Publisher<Object> EMPTY_FLOW = new PublisherToFlow<>(null);

private final Flow.Publisher<T> publisher;

@SuppressWarnings("unchecked")
public PublisherToRS(@Nullable Flow.Publisher<T> publisher) {
this.publisher = (publisher != null ? publisher : (Flow.Publisher<T>) EMPTY_FLOW);
}

@Override
public void subscribe(Subscriber<? super T> subscriber) {
this.publisher.subscribe(new SubscriberToRS<>(subscriber));
}
}


private static class SubscriberToFlow<T> implements Subscriber<T>, Flow.Subscription {

private final Flow.Subscriber<? super T> subscriber;

@Nullable
private Subscription subscription;

public SubscriberToFlow(Flow.Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
}

@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscriber.onSubscribe(this);
}

@Override
public void onNext(T o) {
this.subscriber.onNext(o);
}

@Override
public void onError(Throwable t) {
this.subscriber.onError(t);
}

@Override
public void onComplete() {
this.subscriber.onComplete();
}

@Override
public void request(long n) {
if (this.subscription != null) {
this.subscription.request(n);
}
}

@Override
public void cancel() {
if (this.subscription != null) {
this.subscription.cancel();
}
}
}


private static class SubscriberToRS<T> implements Flow.Subscriber<T>, Subscription {

private final Subscriber<? super T> subscriber;

@Nullable
private Flow.Subscription subscription;

public SubscriberToRS(Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscriber.onSubscribe(this);
}

@Override
public void onNext(T o) {
this.subscriber.onNext(o);
}

@Override
public void onError(Throwable throwable) {
this.subscriber.onError(throwable);
}

@Override
public void onComplete() {
this.subscriber.onComplete();
}

@Override
public void request(long n) {
if (this.subscription != null) {
this.subscription.request(n);
}
}

@Override
public void cancel() {
if (this.subscription != null) {
this.subscription.cancel();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import kotlinx.coroutines.Deferred;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import reactor.adapter.JdkFlowAdapter;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -112,6 +114,16 @@ void toMono() {
assertThat(((Mono<Integer>) target).block(ONE_SECOND)).isEqualTo(Integer.valueOf(1));
}

@Test
void toFlowPublisher() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Publisher<Integer> source = io.reactivex.rxjava3.core.Flowable.fromIterable(sequence);
Object target = getAdapter(Flow.Publisher.class).fromPublisher(source);
assertThat(target).isInstanceOf(Flow.Publisher.class);
assertThat(JdkFlowAdapter.flowPublisherToFlux((Flow.Publisher<Integer>) target)
.collectList().block(ONE_SECOND)).isEqualTo(sequence);
}

@Test
void toCompletableFuture() throws Exception {
Publisher<Integer> source = Flux.fromArray(new Integer[] {1, 2, 3});
Expand Down

0 comments on commit 40b33bc

Please sign in to comment.