From 7653550f5d753c229d956628936d57fe8e9ae183 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Mon, 1 Nov 2021 12:13:02 -0400 Subject: [PATCH 1/2] Add `toReactivePublisher(autoStartOnSubscribe)` The `IntegrationFlowBuilder.toReactivePublisher()` returns a `Publisher>` which may be subscribed somewhere late in the application logic, e.g. when WebSocket (or RSocket) subscription is initiated by the external client. In between application context startup and that subscription moment, the `IntegrationFlow` must not try to produce messages since there is nothing to consumer them from the `Publisher>` side. One of the way is to have a source endpoint not started automatically and control its lifecycle from the point fo reactive subscription * Introduce an `IntegrationFlowBuilder.toReactivePublisher(boolean autoStartOnSubscribe)` to let the framework do a job for an `IntegrationFlow` lifecycle control. This way end-user doesn't need to know autowire a starting endpoint and use `doOnSubscribe()` and similar callbacks * Change `ConsumerEndpointFactoryBean` log message about a `FixedSubscriberChannel` to `INFO` since an `autoStartup = false` really does not have any effect and there is nothing for end-user to worry about. The `IntegrationFlow` knows nothing about each endpoint internals and cannot control which mark as `autoStartup = false` and which not --- .../config/ConsumerEndpointFactoryBean.java | 2 +- .../dsl/BaseIntegrationFlowDefinition.java | 17 +++++++++++-- .../dsl/IntegrationFlowBuilder.java | 7 +++++- .../dsl/PublisherIntegrationFlow.java | 25 ++++++++++++++++--- .../reactivestreams/ReactiveStreamsTests.java | 13 ++++++---- src/reference/asciidoc/reactive-streams.adoc | 9 ++++++- 6 files changed, 60 insertions(+), 13 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java b/spring-integration-core/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java index b1922c1980c..b4600aa2c1e 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java @@ -374,7 +374,7 @@ private void eventDrivenConsumer(MessageChannel channel) { + "', since '" + channel + "' is a SubscribableChannel (not pollable)."); this.endpoint = new EventDrivenConsumer((SubscribableChannel) channel, this.handler); if (Boolean.FALSE.equals(this.autoStartup) && channel instanceof FixedSubscriberChannel) { - LOGGER.warn("'autoStartup=\"false\"' has no effect when using a FixedSubscriberChannel"); + LOGGER.info("'autoStartup=\"false\"' has no effect when using a FixedSubscriberChannel"); } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java index 01003c9d67a..b385c9fafa2 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java @@ -2934,8 +2934,21 @@ public IntegrationFlow to(IntegrationFlow other) { * @param the expected {@code payload} type * @return the Reactive Streams {@link Publisher} */ - @SuppressWarnings(UNCHECKED) protected Publisher> toReactivePublisher() { + return toReactivePublisher(false); + } + + /** + * Represent an Integration Flow as a Reactive Streams {@link Publisher} bean. + * @param autoStartOnSubscribe start a production and consumption in the flow, + * when subscription to the publisher is initiated. + * If this set to true, the flow is marked to not start automatically by the application context. + * @param the expected {@code payload} type + * @return the Reactive Streams {@link Publisher} + * @since 5.5.6 + */ + @SuppressWarnings(UNCHECKED) + protected Publisher> toReactivePublisher(boolean autoStartOnSubscribe) { MessageChannel channelForPublisher = getCurrentMessageChannel(); Publisher> publisher; Map components = getIntegrationComponents(); @@ -2959,7 +2972,7 @@ protected Publisher> toReactivePublisher() { get(); - return new PublisherIntegrationFlow<>(components, publisher); + return new PublisherIntegrationFlow<>(components, publisher, autoStartOnSubscribe); } protected > B register(S endpointSpec, diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowBuilder.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowBuilder.java index e7de9b9bcce..aea2bc2121c 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowBuilder.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,4 +40,9 @@ public Publisher> toReactivePublisher() { // NOSONAR - not useles return super.toReactivePublisher(); } + @Override + public Publisher> toReactivePublisher(boolean autoStartOnSubscribe) { // NOSONAR + return super.toReactivePublisher(autoStartOnSubscribe); + } + } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/PublisherIntegrationFlow.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/PublisherIntegrationFlow.java index f0de569df4b..d8dcb670a6f 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/PublisherIntegrationFlow.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/PublisherIntegrationFlow.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,8 @@ import org.springframework.messaging.Message; +import reactor.core.publisher.Flux; + /** * * @param the message payload type. @@ -35,9 +37,26 @@ class PublisherIntegrationFlow extends StandardIntegrationFlow implements Pub private final Publisher> delegate; - PublisherIntegrationFlow(Map integrationComponents, Publisher> publisher) { + PublisherIntegrationFlow(Map integrationComponents, Publisher> publisher, + boolean autoStartOnSubscribe) { + super(integrationComponents); - this.delegate = publisher; + Flux> flux = + Flux.from(publisher) + .doOnCancel(this::stop) + .doOnTerminate(this::stop); + + if (autoStartOnSubscribe) { + flux = flux.doOnSubscribe((sub) -> start()); + for (Object component : integrationComponents.keySet()) { + if (component instanceof EndpointSpec) { + ((EndpointSpec) component).autoStartup(false); + } + } + } + + this.delegate = flux; + } @Override diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java index 7fd62aa144e..72f2d80b4aa 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java @@ -54,6 +54,7 @@ import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; @@ -104,19 +105,22 @@ public class ReactiveStreamsTests { @Test void testReactiveFlow() throws Exception { + assertThat(this.messageSource.isRunning()).isFalse(); List results = new ArrayList<>(); CountDownLatch latch = new CountDownLatch(6); - Flux.from(this.publisher) + Disposable disposable = + Flux.from(this.publisher) .map(m -> m.getPayload().toUpperCase()) .subscribe(p -> { results.add(p); latch.countDown(); }); - this.messageSource.start(); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); String[] strings = results.toArray(new String[0]); assertThat(strings).isEqualTo(new String[]{ "A", "B", "C", "D", "E", "F" }); - this.messageSource.stop(); + + disposable.dispose(); + assertThat(this.messageSource.isRunning()).isFalse(); } @Test @@ -249,11 +253,10 @@ public Publisher> reactiveFlow() { return IntegrationFlows .from(() -> new GenericMessage<>("a,b,c,d,e,f"), e -> e.poller(p -> p.trigger(ctx -> this.invoked.getAndSet(true) ? null : new Date())) - .autoStartup(false) .id("reactiveStreamsMessageSource")) .split(String.class, p -> p.split(",")) .log() - .toReactivePublisher(); + .toReactivePublisher(true); } @Bean diff --git a/src/reference/asciidoc/reactive-streams.adoc b/src/reference/asciidoc/reactive-streams.adoc index c337586ac76..b09514b419e 100644 --- a/src/reference/asciidoc/reactive-streams.adoc +++ b/src/reference/asciidoc/reactive-streams.adoc @@ -125,7 +125,14 @@ See <<./splitter.adoc#split-stream-and-flux,Stream and Flux Splitting>> and <<./ An `IntegrationFlow` in Java DSL can start from any `Publisher` instance (see `IntegrationFlows.from(Publisher>)`). Also, with an `IntegrationFlowBuilder.toReactivePublisher()` operator, the `IntegrationFlow` can be turned into a reactive hot source. A `FluxMessageChannel` is used internally in both cases; it can subscribe to an inbound `Publisher` according to its `ReactiveStreamsSubscribableChannel` contract and it is a `Publisher>` by itself for downstream subscribers. -With a dynamic `IntegrationFlow` registration we can implement a powerful logic combining Reactive Streams with this integration flow bringing to/from `Publisher`. +With a dynamic `IntegrationFlow` registration we can implement a powerful logic combining Reactive Streams with this integration flow bridging to/from `Publisher`. + +Starting with version 5.5.6, a `toReactivePublisher(boolean autoStartOnSubscribe)` operator variant is present to control a lifecycle of the whole `IntegrationFlow` behind the returned `Publisher>`. +Typically, the subscription and consumption from the reactive publisher happens in the later runtime phase, not during reactive stream composition, or even `ApplicationContext` startup. +To avoid boilerplate code for lifecycle management of the `IntegrationFlow` at the `Publisher>` subscription point and for better end-user experience, this new operator with the `autoStartOnSubscribe` flag has been introduced. +It marks (if `true`) the `IntegrationFlow` and its components for `autoStartup = false`, so an `ApplicationContext` won't initiate production and consumption of messages in the flow automatically. +Instead the `start()` for the `IntegrationFlow` is initiated from the internal `Flux.doOnSubscribe()`. +Independently of the `autoStartOnSubscribe` value, the flow is stopped from a `Flux.doOnCancel()` and `Flux.doOnTerminate()` - it does not make sense to produce messages if there is nothing to consume them. For the exact opposite use-case, when `IntegrationFlow` should call a reactive stream and continue after completion, a `fluxTransform()` operator is provided in the `IntegrationFlowDefinition`. The flow at this point is turned into a `FluxMessageChannel` which is propagated into a provided `fluxFunction`, performed in the `Flux.transform()` operator. From 1db884baab45156f689e3f2d9dc8084cdd2ff278 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Mon, 1 Nov 2021 16:46:14 -0400 Subject: [PATCH 2/2] Fix languge in JavaDocs Co-authored-by: Gary Russell --- .../integration/dsl/BaseIntegrationFlowDefinition.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java index b385c9fafa2..7ac6af1e898 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java @@ -2940,8 +2940,8 @@ protected Publisher> toReactivePublisher() { /** * Represent an Integration Flow as a Reactive Streams {@link Publisher} bean. - * @param autoStartOnSubscribe start a production and consumption in the flow, - * when subscription to the publisher is initiated. + * @param autoStartOnSubscribe start message production and consumption in the flow, + * when a subscription to the publisher is initiated. * If this set to true, the flow is marked to not start automatically by the application context. * @param the expected {@code payload} type * @return the Reactive Streams {@link Publisher}