Skip to content

Commit

Permalink
Add toReactivePublisher(autoStartOnSubscribe) (#3657)
Browse files Browse the repository at this point in the history
* Add `toReactivePublisher(autoStartOnSubscribe)`

The `IntegrationFlowBuilder.toReactivePublisher()` returns a `Publisher<Message<?>>`
which may be subscribed somewhere late in the application logic, e.g.
when WebSocket (or RSocket) subscription is initiated by the external client.
In between application context startup and that subscription moment, the `IntegrationFlow`
must not try to produce messages since there is nothing to consumer them from the
`Publisher<Message<?>>` side.
One of the way is to have a source endpoint not started automatically and control its
lifecycle from the point fo reactive subscription

* Introduce an `IntegrationFlowBuilder.toReactivePublisher(boolean autoStartOnSubscribe)`
to let the framework do a job for an `IntegrationFlow` lifecycle control.
This way end-user doesn't need to know autowire a starting endpoint and use `doOnSubscribe()`
and similar callbacks
* Change `ConsumerEndpointFactoryBean` log message about a `FixedSubscriberChannel`
to `INFO` since an `autoStartup = false` really does not have any effect and there is nothing
for end-user to worry about.
The `IntegrationFlow` knows nothing about each endpoint internals and cannot control which
mark as `autoStartup = false` and which not

* Fix languge in JavaDocs

Co-authored-by: Gary Russell <grussell@vmware.com>

Co-authored-by: Gary Russell <grussell@vmware.com>
  • Loading branch information
artembilan and garyrussell committed Nov 1, 2021
1 parent fc7d338 commit eb4d583
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ private void eventDrivenConsumer(MessageChannel channel) {
+ "', since '" + channel + "' is a SubscribableChannel (not pollable).");
this.endpoint = new EventDrivenConsumer((SubscribableChannel) channel, this.handler);
if (Boolean.FALSE.equals(this.autoStartup) && channel instanceof FixedSubscriberChannel) {
LOGGER.warn("'autoStartup=\"false\"' has no effect when using a FixedSubscriberChannel");
LOGGER.info("'autoStartup=\"false\"' has no effect when using a FixedSubscriberChannel");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2934,8 +2934,21 @@ public IntegrationFlow to(IntegrationFlow other) {
* @param <T> the expected {@code payload} type
* @return the Reactive Streams {@link Publisher}
*/
@SuppressWarnings(UNCHECKED)
protected <T> Publisher<Message<T>> toReactivePublisher() {
return toReactivePublisher(false);
}

/**
* Represent an Integration Flow as a Reactive Streams {@link Publisher} bean.
* @param autoStartOnSubscribe start message production and consumption in the flow,
* when a subscription to the publisher is initiated.
* If this set to true, the flow is marked to not start automatically by the application context.
* @param <T> the expected {@code payload} type
* @return the Reactive Streams {@link Publisher}
* @since 5.5.6
*/
@SuppressWarnings(UNCHECKED)
protected <T> Publisher<Message<T>> toReactivePublisher(boolean autoStartOnSubscribe) {
MessageChannel channelForPublisher = getCurrentMessageChannel();
Publisher<Message<T>> publisher;
Map<Object, String> components = getIntegrationComponents();
Expand All @@ -2959,7 +2972,7 @@ protected <T> Publisher<Message<T>> toReactivePublisher() {

get();

return new PublisherIntegrationFlow<>(components, publisher);
return new PublisherIntegrationFlow<>(components, publisher, autoStartOnSubscribe);
}

protected <S extends ConsumerEndpointSpec<? super S, ? extends MessageHandler>> B register(S endpointSpec,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,4 +40,9 @@ public <T> Publisher<Message<T>> toReactivePublisher() { // NOSONAR - not useles
return super.toReactivePublisher();
}

@Override
public <T> Publisher<Message<T>> toReactivePublisher(boolean autoStartOnSubscribe) { // NOSONAR
return super.toReactivePublisher(autoStartOnSubscribe);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,8 @@

import org.springframework.messaging.Message;

import reactor.core.publisher.Flux;

/**
*
* @param <T> the message payload type.
Expand All @@ -35,9 +37,26 @@ class PublisherIntegrationFlow<T> extends StandardIntegrationFlow implements Pub

private final Publisher<Message<T>> delegate;

PublisherIntegrationFlow(Map<Object, String> integrationComponents, Publisher<Message<T>> publisher) {
PublisherIntegrationFlow(Map<Object, String> integrationComponents, Publisher<Message<T>> publisher,
boolean autoStartOnSubscribe) {

super(integrationComponents);
this.delegate = publisher;
Flux<Message<T>> flux =
Flux.from(publisher)
.doOnCancel(this::stop)
.doOnTerminate(this::stop);

if (autoStartOnSubscribe) {
flux = flux.doOnSubscribe((sub) -> start());
for (Object component : integrationComponents.keySet()) {
if (component instanceof EndpointSpec) {
((EndpointSpec<?, ?, ?>) component).autoStartup(false);
}
}
}

this.delegate = flux;

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

Expand Down Expand Up @@ -104,19 +105,22 @@ public class ReactiveStreamsTests {

@Test
void testReactiveFlow() throws Exception {
assertThat(this.messageSource.isRunning()).isFalse();
List<String> results = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(6);
Flux.from(this.publisher)
Disposable disposable =
Flux.from(this.publisher)
.map(m -> m.getPayload().toUpperCase())
.subscribe(p -> {
results.add(p);
latch.countDown();
});
this.messageSource.start();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
String[] strings = results.toArray(new String[0]);
assertThat(strings).isEqualTo(new String[]{ "A", "B", "C", "D", "E", "F" });
this.messageSource.stop();

disposable.dispose();
assertThat(this.messageSource.isRunning()).isFalse();
}

@Test
Expand Down Expand Up @@ -249,11 +253,10 @@ public Publisher<Message<String>> reactiveFlow() {
return IntegrationFlows
.from(() -> new GenericMessage<>("a,b,c,d,e,f"),
e -> e.poller(p -> p.trigger(ctx -> this.invoked.getAndSet(true) ? null : new Date()))
.autoStartup(false)
.id("reactiveStreamsMessageSource"))
.split(String.class, p -> p.split(","))
.log()
.toReactivePublisher();
.toReactivePublisher(true);
}

@Bean
Expand Down
9 changes: 8 additions & 1 deletion src/reference/asciidoc/reactive-streams.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,14 @@ See <<./splitter.adoc#split-stream-and-flux,Stream and Flux Splitting>> and <<./
An `IntegrationFlow` in Java DSL can start from any `Publisher` instance (see `IntegrationFlows.from(Publisher<Message<T>>)`).
Also, with an `IntegrationFlowBuilder.toReactivePublisher()` operator, the `IntegrationFlow` can be turned into a reactive hot source.
A `FluxMessageChannel` is used internally in both cases; it can subscribe to an inbound `Publisher` according to its `ReactiveStreamsSubscribableChannel` contract and it is a `Publisher<Message<?>>` by itself for downstream subscribers.
With a dynamic `IntegrationFlow` registration we can implement a powerful logic combining Reactive Streams with this integration flow bringing to/from `Publisher`.
With a dynamic `IntegrationFlow` registration we can implement a powerful logic combining Reactive Streams with this integration flow bridging to/from `Publisher`.

Starting with version 5.5.6, a `toReactivePublisher(boolean autoStartOnSubscribe)` operator variant is present to control a lifecycle of the whole `IntegrationFlow` behind the returned `Publisher<Message<?>>`.
Typically, the subscription and consumption from the reactive publisher happens in the later runtime phase, not during reactive stream composition, or even `ApplicationContext` startup.
To avoid boilerplate code for lifecycle management of the `IntegrationFlow` at the `Publisher<Message<?>>` subscription point and for better end-user experience, this new operator with the `autoStartOnSubscribe` flag has been introduced.
It marks (if `true`) the `IntegrationFlow` and its components for `autoStartup = false`, so an `ApplicationContext` won't initiate production and consumption of messages in the flow automatically.
Instead the `start()` for the `IntegrationFlow` is initiated from the internal `Flux.doOnSubscribe()`.
Independently of the `autoStartOnSubscribe` value, the flow is stopped from a `Flux.doOnCancel()` and `Flux.doOnTerminate()` - it does not make sense to produce messages if there is nothing to consume them.

For the exact opposite use-case, when `IntegrationFlow` should call a reactive stream and continue after completion, a `fluxTransform()` operator is provided in the `IntegrationFlowDefinition`.
The flow at this point is turned into a `FluxMessageChannel` which is propagated into a provided `fluxFunction`, performed in the `Flux.transform()` operator.
Expand Down

0 comments on commit eb4d583

Please sign in to comment.