diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/StandardIntegrationFlow.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/StandardIntegrationFlow.java index fc1839d34d6..c3af2f9e619 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/StandardIntegrationFlow.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/StandardIntegrationFlow.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -63,13 +63,12 @@ * @see IntegrationFlows * @see org.springframework.integration.dsl.context.IntegrationFlowBeanPostProcessor * @see org.springframework.integration.dsl.context.IntegrationFlowContext + * @see SmartLifecycle */ public class StandardIntegrationFlow implements IntegrationFlow, SmartLifecycle { private final Map integrationComponents; - private final List lifecycles = new LinkedList<>(); - private MessageChannel inputChannel; private boolean running; @@ -113,11 +112,9 @@ public void start() { if (!this.running) { List components = new LinkedList<>(this.integrationComponents.keySet()); ListIterator iterator = components.listIterator(this.integrationComponents.size()); - this.lifecycles.clear(); while (iterator.hasPrevious()) { Object component = iterator.previous(); if (component instanceof SmartLifecycle) { - this.lifecycles.add((SmartLifecycle) component); ((SmartLifecycle) component).start(); } } @@ -127,30 +124,29 @@ public void start() { @Override public void stop(Runnable callback) { - if (this.lifecycles.size() > 0) { - AggregatingCallback aggregatingCallback = new AggregatingCallback(this.lifecycles.size(), callback); - ListIterator iterator = this.lifecycles.listIterator(this.lifecycles.size()); - while (iterator.hasPrevious()) { - SmartLifecycle lifecycle = iterator.previous(); + AggregatingCallback aggregatingCallback = new AggregatingCallback(this.integrationComponents.size(), callback); + for (Object component : this.integrationComponents.keySet()) { + if (component instanceof SmartLifecycle) { + SmartLifecycle lifecycle = (SmartLifecycle) component; if (lifecycle.isRunning()) { lifecycle.stop(aggregatingCallback); - } - else { - aggregatingCallback.run(); + continue; } } - } - else { - callback.run(); + aggregatingCallback.run(); } this.running = false; } @Override public void stop() { - ListIterator iterator = this.lifecycles.listIterator(this.lifecycles.size()); - while (iterator.hasPrevious()) { - iterator.previous().stop(); + for (Object component : this.integrationComponents.keySet()) { + if (component instanceof SmartLifecycle) { + SmartLifecycle lifecycle = (SmartLifecycle) component; + if (lifecycle.isRunning()) { + lifecycle.stop(); + } + } } this.running = false; } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java index a412cd94981..c7bf9ff0fbd 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java @@ -42,6 +42,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.Lifecycle; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; @@ -64,6 +65,7 @@ import org.springframework.integration.dsl.Pollers; import org.springframework.integration.dsl.Transformers; import org.springframework.integration.endpoint.EventDrivenConsumer; +import org.springframework.integration.gateway.GatewayProxyFactoryBean; import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; import org.springframework.integration.handler.LoggingHandler; import org.springframework.integration.handler.ReactiveMessageHandlerAdapter; @@ -511,6 +513,27 @@ public void testInterceptorFlow() { ); } + @Autowired + @Qualifier("controlBusFlow") + Lifecycle controlBusFlow; + + @Test + public void testStandardIntegrationFlowLifecycle() { + this.controlBusFlow.stop(); + + GatewayProxyFactoryBean controlBusGateway = + this.beanFactory.getBean("&controlBusGateway", GatewayProxyFactoryBean.class); + assertThat(controlBusGateway.isRunning()).isFalse(); + Lifecycle controlBus = this.beanFactory.getBean("controlBus", Lifecycle.class); + assertThat(controlBus.isRunning()).isFalse(); + + this.controlBusFlow.start(); + + assertThat(controlBusGateway.isRunning()).isTrue(); + assertThat(controlBus.isRunning()).isTrue(); + } + + @After public void cleanUpList() { outputStringList.clear(); @@ -592,8 +615,8 @@ public static class ContextConfiguration { @Bean public IntegrationFlow controlBusFlow() { - return IntegrationFlows.from(ControlBusGateway.class) - .controlBus() + return IntegrationFlows.from(ControlBusGateway.class, (gateway) -> gateway.beanName("controlBusGateway")) + .controlBus((endpoint) -> endpoint.id("controlBus")) .get(); } @@ -941,6 +964,7 @@ public List outputStringList() { public IntegrationFlow interceptorFlow(List outputStringList) { return IntegrationFlows.from("interceptorChannelIn") .intercept(new ChannelInterceptor() { + @Override public Message preSend(Message message, MessageChannel channel) { outputStringList.add("Pre send transform: " + message.getPayload()); @@ -954,6 +978,7 @@ public void postSend(Message message, MessageChannel channel, boolean sent) { }) .transform((String s) -> s.toUpperCase()) .intercept(new ChannelInterceptor() { + @Override public Message preSend(Message message, MessageChannel channel) { outputStringList.add("Pre send handle: " + message.getPayload()); @@ -967,6 +992,7 @@ public void postSend(Message message, MessageChannel channel, boolean sent) { }) .handle(m -> outputStringList.add("Handle: " + m.getPayload())).get(); } + } @Service diff --git a/src/reference/asciidoc/dsl.adoc b/src/reference/asciidoc/dsl.adoc index 4caa5375f05..c1642a2ac4f 100644 --- a/src/reference/asciidoc/dsl.adoc +++ b/src/reference/asciidoc/dsl.adoc @@ -75,6 +75,7 @@ The following list includes the common DSL method names and the associated EIP e Conceptually, integration processes are constructed by composing these endpoints into one or more message flows. Note that EIP does not formally define the term 'message flow', but it is useful to think of it as a unit of work that uses well known messaging patterns. The DSL provides an `IntegrationFlow` component to define a composition of channels and endpoints between them, but now `IntegrationFlow` plays only the configuration role to populate real beans in the application context and is not used at runtime. +However the bean for `IntegrationFlow` can be autowired as a `Lifecycle` to control `start()` and `stop()` for the whole flow which is delegated to all the Spring Integration components associated with this `IntegrationFlow`. The following example uses the `IntegrationFlows` factory to define an `IntegrationFlow` bean by using EIP-methods from `IntegrationFlowBuilder`: ====