Skip to content

Commit

Permalink
Reuse Reactive Streams 1.0.3 FlowAdapters for Flow.Publisher bridge
Browse files Browse the repository at this point in the history
  • Loading branch information
jhoeller committed Aug 14, 2023
1 parent 2c89597 commit fd5b0e1
Showing 1 changed file with 24 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.function.Function;

import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.adapter.JdkFlowAdapter;
import reactor.blockhound.BlockHound;
import reactor.blockhound.integration.BlockHoundIntegration;
Expand Down Expand Up @@ -114,7 +114,7 @@ public ReactiveAdapterRegistry() {

// Simple Flow.Publisher bridge if Reactor is not present
if (!reactorPresent) {
new FlowBridgeRegistrar().registerAdapter(this);
new FlowAdaptersRegistrar().registerAdapters(this);
}
}

Expand Down Expand Up @@ -349,14 +349,19 @@ void registerAdapters(ReactiveAdapterRegistry registry) {

if (Flow.Publisher.class.isAssignableFrom(uniToPublisher.getReturnType())) {
// Mutiny 2 based on Flow.Publisher
Method uniPublisher = ClassUtils.getMethod(io.smallrye.mutiny.groups.UniCreate.class, "publisher", Flow.Publisher.class);
Method multiPublisher = ClassUtils.getMethod(io.smallrye.mutiny.groups.MultiCreate.class, "publisher", Flow.Publisher.class);
Method uniPublisher = ClassUtils.getMethod(
io.smallrye.mutiny.groups.UniCreate.class, "publisher", Flow.Publisher.class);
Method multiPublisher = ClassUtils.getMethod(
io.smallrye.mutiny.groups.MultiCreate.class, "publisher", Flow.Publisher.class);
registry.registerReactiveType(uniDesc,
uni -> new PublisherToRS<>((Flow.Publisher<Object>) ReflectionUtils.invokeMethod(uniToPublisher, ((io.smallrye.mutiny.Uni<?>) uni).convert())),
publisher -> ReflectionUtils.invokeMethod(uniPublisher, io.smallrye.mutiny.Uni.createFrom(), new PublisherToFlow<>(publisher)));
uni -> FlowAdapters.toPublisher((Flow.Publisher<Object>) Objects.requireNonNull(
ReflectionUtils.invokeMethod(uniToPublisher, ((io.smallrye.mutiny.Uni<?>) uni).convert()))),
publisher -> ReflectionUtils.invokeMethod(uniPublisher, io.smallrye.mutiny.Uni.createFrom(),
FlowAdapters.toFlowPublisher(publisher)));
registry.registerReactiveType(multiDesc,
multi -> new PublisherToRS<>((Flow.Publisher<Object>) multi),
publisher -> ReflectionUtils.invokeMethod(multiPublisher, io.smallrye.mutiny.Multi.createFrom(), new PublisherToFlow<>(publisher)));
multi -> FlowAdapters.toPublisher((Flow.Publisher<Object>) multi),
publisher -> ReflectionUtils.invokeMethod(multiPublisher, io.smallrye.mutiny.Multi.createFrom(),
FlowAdapters.toFlowPublisher(publisher)));
}
else {
// Mutiny 1 based on Reactive Streams
Expand All @@ -371,19 +376,7 @@ void registerAdapters(ReactiveAdapterRegistry registry) {
}


private static class FlowBridgeRegistrar {

@SuppressWarnings("unchecked")
void registerAdapter(ReactiveAdapterRegistry registry) {
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Flow.Publisher.class, () -> PublisherToRS.EMPTY_FLOW),
source -> new PublisherToRS<>((Flow.Publisher<Object>) source),
source -> new PublisherToFlow<>((Publisher<Object>) source));
}
}


private static class PublisherToFlow<T> implements Flow.Publisher<T> {
private static class FlowAdaptersRegistrar {

private static final Flow.Subscription EMPTY_SUBSCRIPTION = new Flow.Subscription() {
@Override
Expand All @@ -394,136 +387,17 @@ public void cancel() {
}
};

@Nullable
private final Publisher<T> publisher;

public PublisherToFlow(@Nullable Publisher<T> publisher) {
this.publisher = publisher;
}

@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
if (this.publisher != null) {
this.publisher.subscribe(new SubscriberToFlow<>(subscriber));
}
else {
subscriber.onSubscribe(EMPTY_SUBSCRIPTION);
subscriber.onComplete();
}
}
}


private static class PublisherToRS<T> implements Publisher<T> {

private static final Flow.Publisher<Object> EMPTY_FLOW = new PublisherToFlow<>(null);

private final Flow.Publisher<T> publisher;
private static final Flow.Publisher<Object> EMPTY_PUBLISHER = subscriber -> {
subscriber.onSubscribe(EMPTY_SUBSCRIPTION);
subscriber.onComplete();
};

@SuppressWarnings("unchecked")
public PublisherToRS(@Nullable Flow.Publisher<T> publisher) {
this.publisher = (publisher != null ? publisher : (Flow.Publisher<T>) EMPTY_FLOW);
}

@Override
public void subscribe(Subscriber<? super T> subscriber) {
this.publisher.subscribe(new SubscriberToRS<>(subscriber));
}
}


private static class SubscriberToFlow<T> implements Subscriber<T>, Flow.Subscription {

private final Flow.Subscriber<? super T> subscriber;

@Nullable
private Subscription subscription;

public SubscriberToFlow(Flow.Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
}

@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscriber.onSubscribe(this);
}

@Override
public void onNext(T o) {
this.subscriber.onNext(o);
}

@Override
public void onError(Throwable t) {
this.subscriber.onError(t);
}

@Override
public void onComplete() {
this.subscriber.onComplete();
}

@Override
public void request(long n) {
if (this.subscription != null) {
this.subscription.request(n);
}
}

@Override
public void cancel() {
if (this.subscription != null) {
this.subscription.cancel();
}
}
}


private static class SubscriberToRS<T> implements Flow.Subscriber<T>, Subscription {

private final Subscriber<? super T> subscriber;

@Nullable
private Flow.Subscription subscription;

public SubscriberToRS(Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscriber.onSubscribe(this);
}

@Override
public void onNext(T o) {
this.subscriber.onNext(o);
}

@Override
public void onError(Throwable throwable) {
this.subscriber.onError(throwable);
}

@Override
public void onComplete() {
this.subscriber.onComplete();
}

@Override
public void request(long n) {
if (this.subscription != null) {
this.subscription.request(n);
}
}

@Override
public void cancel() {
if (this.subscription != null) {
this.subscription.cancel();
}
void registerAdapters(ReactiveAdapterRegistry registry) {
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Flow.Publisher.class, () -> EMPTY_PUBLISHER),
source -> FlowAdapters.toPublisher((Flow.Publisher<Object>) source),
source -> FlowAdapters.toFlowPublisher((Publisher<Object>) source));
}
}

Expand Down

0 comments on commit fd5b0e1

Please sign in to comment.