Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ private void eventDrivenConsumer(MessageChannel channel) {
+ "', since '" + channel + "' is a SubscribableChannel (not pollable).");
this.endpoint = new EventDrivenConsumer((SubscribableChannel) channel, this.handler);
if (Boolean.FALSE.equals(this.autoStartup) && channel instanceof FixedSubscriberChannel) {
LOGGER.warn("'autoStartup=\"false\"' has no effect when using a FixedSubscriberChannel");
LOGGER.info("'autoStartup=\"false\"' has no effect when using a FixedSubscriberChannel");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2934,8 +2934,21 @@ public IntegrationFlow to(IntegrationFlow other) {
* @param <T> the expected {@code payload} type
* @return the Reactive Streams {@link Publisher}
*/
@SuppressWarnings(UNCHECKED)
protected <T> Publisher<Message<T>> toReactivePublisher() {
return toReactivePublisher(false);
}

/**
* Represent an Integration Flow as a Reactive Streams {@link Publisher} bean.
* @param autoStartOnSubscribe start message production and consumption in the flow,
* when a subscription to the publisher is initiated.
* If this set to true, the flow is marked to not start automatically by the application context.
* @param <T> the expected {@code payload} type
* @return the Reactive Streams {@link Publisher}
* @since 5.5.6
*/
@SuppressWarnings(UNCHECKED)
protected <T> Publisher<Message<T>> toReactivePublisher(boolean autoStartOnSubscribe) {
MessageChannel channelForPublisher = getCurrentMessageChannel();
Publisher<Message<T>> publisher;
Map<Object, String> components = getIntegrationComponents();
Expand All @@ -2959,7 +2972,7 @@ protected <T> Publisher<Message<T>> toReactivePublisher() {

get();

return new PublisherIntegrationFlow<>(components, publisher);
return new PublisherIntegrationFlow<>(components, publisher, autoStartOnSubscribe);
}

protected <S extends ConsumerEndpointSpec<? super S, ? extends MessageHandler>> B register(S endpointSpec,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,4 +40,9 @@ public <T> Publisher<Message<T>> toReactivePublisher() { // NOSONAR - not useles
return super.toReactivePublisher();
}

@Override
public <T> Publisher<Message<T>> toReactivePublisher(boolean autoStartOnSubscribe) { // NOSONAR
return super.toReactivePublisher(autoStartOnSubscribe);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,8 @@

import org.springframework.messaging.Message;

import reactor.core.publisher.Flux;

/**
*
* @param <T> the message payload type.
Expand All @@ -35,9 +37,26 @@ class PublisherIntegrationFlow<T> extends StandardIntegrationFlow implements Pub

private final Publisher<Message<T>> delegate;

PublisherIntegrationFlow(Map<Object, String> integrationComponents, Publisher<Message<T>> publisher) {
PublisherIntegrationFlow(Map<Object, String> integrationComponents, Publisher<Message<T>> publisher,
boolean autoStartOnSubscribe) {

super(integrationComponents);
this.delegate = publisher;
Flux<Message<T>> flux =
Flux.from(publisher)
.doOnCancel(this::stop)
.doOnTerminate(this::stop);

if (autoStartOnSubscribe) {
flux = flux.doOnSubscribe((sub) -> start());
for (Object component : integrationComponents.keySet()) {
if (component instanceof EndpointSpec) {
((EndpointSpec<?, ?, ?>) component).autoStartup(false);
}
}
}

this.delegate = flux;

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

Expand Down Expand Up @@ -104,19 +105,22 @@ public class ReactiveStreamsTests {

@Test
void testReactiveFlow() throws Exception {
assertThat(this.messageSource.isRunning()).isFalse();
List<String> results = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(6);
Flux.from(this.publisher)
Disposable disposable =
Flux.from(this.publisher)
.map(m -> m.getPayload().toUpperCase())
.subscribe(p -> {
results.add(p);
latch.countDown();
});
this.messageSource.start();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
String[] strings = results.toArray(new String[0]);
assertThat(strings).isEqualTo(new String[]{ "A", "B", "C", "D", "E", "F" });
this.messageSource.stop();

disposable.dispose();
assertThat(this.messageSource.isRunning()).isFalse();
}

@Test
Expand Down Expand Up @@ -249,11 +253,10 @@ public Publisher<Message<String>> reactiveFlow() {
return IntegrationFlows
.from(() -> new GenericMessage<>("a,b,c,d,e,f"),
e -> e.poller(p -> p.trigger(ctx -> this.invoked.getAndSet(true) ? null : new Date()))
.autoStartup(false)
.id("reactiveStreamsMessageSource"))
.split(String.class, p -> p.split(","))
.log()
.toReactivePublisher();
.toReactivePublisher(true);
}

@Bean
Expand Down
9 changes: 8 additions & 1 deletion src/reference/asciidoc/reactive-streams.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,14 @@ See <<./splitter.adoc#split-stream-and-flux,Stream and Flux Splitting>> and <<./
An `IntegrationFlow` in Java DSL can start from any `Publisher` instance (see `IntegrationFlows.from(Publisher<Message<T>>)`).
Also, with an `IntegrationFlowBuilder.toReactivePublisher()` operator, the `IntegrationFlow` can be turned into a reactive hot source.
A `FluxMessageChannel` is used internally in both cases; it can subscribe to an inbound `Publisher` according to its `ReactiveStreamsSubscribableChannel` contract and it is a `Publisher<Message<?>>` by itself for downstream subscribers.
With a dynamic `IntegrationFlow` registration we can implement a powerful logic combining Reactive Streams with this integration flow bringing to/from `Publisher`.
With a dynamic `IntegrationFlow` registration we can implement a powerful logic combining Reactive Streams with this integration flow bridging to/from `Publisher`.

Starting with version 5.5.6, a `toReactivePublisher(boolean autoStartOnSubscribe)` operator variant is present to control a lifecycle of the whole `IntegrationFlow` behind the returned `Publisher<Message<?>>`.
Typically, the subscription and consumption from the reactive publisher happens in the later runtime phase, not during reactive stream composition, or even `ApplicationContext` startup.
To avoid boilerplate code for lifecycle management of the `IntegrationFlow` at the `Publisher<Message<?>>` subscription point and for better end-user experience, this new operator with the `autoStartOnSubscribe` flag has been introduced.
It marks (if `true`) the `IntegrationFlow` and its components for `autoStartup = false`, so an `ApplicationContext` won't initiate production and consumption of messages in the flow automatically.
Instead the `start()` for the `IntegrationFlow` is initiated from the internal `Flux.doOnSubscribe()`.
Independently of the `autoStartOnSubscribe` value, the flow is stopped from a `Flux.doOnCancel()` and `Flux.doOnTerminate()` - it does not make sense to produce messages if there is nothing to consume them.

For the exact opposite use-case, when `IntegrationFlow` should call a reactive stream and continue after completion, a `fluxTransform()` operator is provided in the `IntegrationFlowDefinition`.
The flow at this point is turned into a `FluxMessageChannel` which is propagated into a provided `fluxFunction`, performed in the `Flux.transform()` operator.
Expand Down