diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java b/spring-integration-core/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java index b1922c1980c..b4600aa2c1e 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java @@ -374,7 +374,7 @@ private void eventDrivenConsumer(MessageChannel channel) { + "', since '" + channel + "' is a SubscribableChannel (not pollable)."); this.endpoint = new EventDrivenConsumer((SubscribableChannel) channel, this.handler); if (Boolean.FALSE.equals(this.autoStartup) && channel instanceof FixedSubscriberChannel) { - LOGGER.warn("'autoStartup=\"false\"' has no effect when using a FixedSubscriberChannel"); + LOGGER.info("'autoStartup=\"false\"' has no effect when using a FixedSubscriberChannel"); } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java index 01003c9d67a..7ac6af1e898 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java @@ -2934,8 +2934,21 @@ public IntegrationFlow to(IntegrationFlow other) { * @param the expected {@code payload} type * @return the Reactive Streams {@link Publisher} */ - @SuppressWarnings(UNCHECKED) protected Publisher> toReactivePublisher() { + return toReactivePublisher(false); + } + + /** + * Represent an Integration Flow as a Reactive Streams {@link Publisher} bean. + * @param autoStartOnSubscribe start message production and consumption in the flow, + * when a subscription to the publisher is initiated. + * If this set to true, the flow is marked to not start automatically by the application context. + * @param the expected {@code payload} type + * @return the Reactive Streams {@link Publisher} + * @since 5.5.6 + */ + @SuppressWarnings(UNCHECKED) + protected Publisher> toReactivePublisher(boolean autoStartOnSubscribe) { MessageChannel channelForPublisher = getCurrentMessageChannel(); Publisher> publisher; Map components = getIntegrationComponents(); @@ -2959,7 +2972,7 @@ protected Publisher> toReactivePublisher() { get(); - return new PublisherIntegrationFlow<>(components, publisher); + return new PublisherIntegrationFlow<>(components, publisher, autoStartOnSubscribe); } protected > B register(S endpointSpec, diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowBuilder.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowBuilder.java index e7de9b9bcce..aea2bc2121c 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowBuilder.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,4 +40,9 @@ public Publisher> toReactivePublisher() { // NOSONAR - not useles return super.toReactivePublisher(); } + @Override + public Publisher> toReactivePublisher(boolean autoStartOnSubscribe) { // NOSONAR + return super.toReactivePublisher(autoStartOnSubscribe); + } + } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/PublisherIntegrationFlow.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/PublisherIntegrationFlow.java index f0de569df4b..d8dcb670a6f 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/PublisherIntegrationFlow.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/PublisherIntegrationFlow.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,8 @@ import org.springframework.messaging.Message; +import reactor.core.publisher.Flux; + /** * * @param the message payload type. @@ -35,9 +37,26 @@ class PublisherIntegrationFlow extends StandardIntegrationFlow implements Pub private final Publisher> delegate; - PublisherIntegrationFlow(Map integrationComponents, Publisher> publisher) { + PublisherIntegrationFlow(Map integrationComponents, Publisher> publisher, + boolean autoStartOnSubscribe) { + super(integrationComponents); - this.delegate = publisher; + Flux> flux = + Flux.from(publisher) + .doOnCancel(this::stop) + .doOnTerminate(this::stop); + + if (autoStartOnSubscribe) { + flux = flux.doOnSubscribe((sub) -> start()); + for (Object component : integrationComponents.keySet()) { + if (component instanceof EndpointSpec) { + ((EndpointSpec) component).autoStartup(false); + } + } + } + + this.delegate = flux; + } @Override diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java index 7fd62aa144e..72f2d80b4aa 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java @@ -54,6 +54,7 @@ import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; @@ -104,19 +105,22 @@ public class ReactiveStreamsTests { @Test void testReactiveFlow() throws Exception { + assertThat(this.messageSource.isRunning()).isFalse(); List results = new ArrayList<>(); CountDownLatch latch = new CountDownLatch(6); - Flux.from(this.publisher) + Disposable disposable = + Flux.from(this.publisher) .map(m -> m.getPayload().toUpperCase()) .subscribe(p -> { results.add(p); latch.countDown(); }); - this.messageSource.start(); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); String[] strings = results.toArray(new String[0]); assertThat(strings).isEqualTo(new String[]{ "A", "B", "C", "D", "E", "F" }); - this.messageSource.stop(); + + disposable.dispose(); + assertThat(this.messageSource.isRunning()).isFalse(); } @Test @@ -249,11 +253,10 @@ public Publisher> reactiveFlow() { return IntegrationFlows .from(() -> new GenericMessage<>("a,b,c,d,e,f"), e -> e.poller(p -> p.trigger(ctx -> this.invoked.getAndSet(true) ? null : new Date())) - .autoStartup(false) .id("reactiveStreamsMessageSource")) .split(String.class, p -> p.split(",")) .log() - .toReactivePublisher(); + .toReactivePublisher(true); } @Bean diff --git a/src/reference/asciidoc/reactive-streams.adoc b/src/reference/asciidoc/reactive-streams.adoc index c337586ac76..b09514b419e 100644 --- a/src/reference/asciidoc/reactive-streams.adoc +++ b/src/reference/asciidoc/reactive-streams.adoc @@ -125,7 +125,14 @@ See <<./splitter.adoc#split-stream-and-flux,Stream and Flux Splitting>> and <<./ An `IntegrationFlow` in Java DSL can start from any `Publisher` instance (see `IntegrationFlows.from(Publisher>)`). Also, with an `IntegrationFlowBuilder.toReactivePublisher()` operator, the `IntegrationFlow` can be turned into a reactive hot source. A `FluxMessageChannel` is used internally in both cases; it can subscribe to an inbound `Publisher` according to its `ReactiveStreamsSubscribableChannel` contract and it is a `Publisher>` by itself for downstream subscribers. -With a dynamic `IntegrationFlow` registration we can implement a powerful logic combining Reactive Streams with this integration flow bringing to/from `Publisher`. +With a dynamic `IntegrationFlow` registration we can implement a powerful logic combining Reactive Streams with this integration flow bridging to/from `Publisher`. + +Starting with version 5.5.6, a `toReactivePublisher(boolean autoStartOnSubscribe)` operator variant is present to control a lifecycle of the whole `IntegrationFlow` behind the returned `Publisher>`. +Typically, the subscription and consumption from the reactive publisher happens in the later runtime phase, not during reactive stream composition, or even `ApplicationContext` startup. +To avoid boilerplate code for lifecycle management of the `IntegrationFlow` at the `Publisher>` subscription point and for better end-user experience, this new operator with the `autoStartOnSubscribe` flag has been introduced. +It marks (if `true`) the `IntegrationFlow` and its components for `autoStartup = false`, so an `ApplicationContext` won't initiate production and consumption of messages in the flow automatically. +Instead the `start()` for the `IntegrationFlow` is initiated from the internal `Flux.doOnSubscribe()`. +Independently of the `autoStartOnSubscribe` value, the flow is stopped from a `Flux.doOnCancel()` and `Flux.doOnTerminate()` - it does not make sense to produce messages if there is nothing to consume them. For the exact opposite use-case, when `IntegrationFlow` should call a reactive stream and continue after completion, a `fluxTransform()` operator is provided in the `IntegrationFlowDefinition`. The flow at this point is turned into a `FluxMessageChannel` which is propagated into a provided `fluxFunction`, performed in the `Flux.transform()` operator.