Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,13 +63,12 @@
* @see IntegrationFlows
* @see org.springframework.integration.dsl.context.IntegrationFlowBeanPostProcessor
* @see org.springframework.integration.dsl.context.IntegrationFlowContext
* @see SmartLifecycle
*/
public class StandardIntegrationFlow implements IntegrationFlow, SmartLifecycle {

private final Map<Object, String> integrationComponents;

private final List<SmartLifecycle> lifecycles = new LinkedList<>();

private MessageChannel inputChannel;

private boolean running;
Expand Down Expand Up @@ -113,11 +112,9 @@ public void start() {
if (!this.running) {
List<Object> components = new LinkedList<>(this.integrationComponents.keySet());
ListIterator<Object> iterator = components.listIterator(this.integrationComponents.size());
this.lifecycles.clear();
while (iterator.hasPrevious()) {
Object component = iterator.previous();
if (component instanceof SmartLifecycle) {
this.lifecycles.add((SmartLifecycle) component);
((SmartLifecycle) component).start();
}
}
Expand All @@ -127,30 +124,29 @@ public void start() {

@Override
public void stop(Runnable callback) {
if (this.lifecycles.size() > 0) {
AggregatingCallback aggregatingCallback = new AggregatingCallback(this.lifecycles.size(), callback);
ListIterator<SmartLifecycle> iterator = this.lifecycles.listIterator(this.lifecycles.size());
while (iterator.hasPrevious()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't notice this, but stopping right to left was wrong; corrected in the new code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, Gary, it was right. Because we started before from right to left and filled out the local lifecycles property.
Then we used this lifecycles in the stop() and therefore the first one in the flow was in the end of that list, so we again should iterate it backwards to stop in proper order.
Right now indeed we don't have an intermediate list therefore the logic is changed around a normal integrationComponents state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah... right.

SmartLifecycle lifecycle = iterator.previous();
AggregatingCallback aggregatingCallback = new AggregatingCallback(this.integrationComponents.size(), callback);
for (Object component : this.integrationComponents.keySet()) {
if (component instanceof SmartLifecycle) {
SmartLifecycle lifecycle = (SmartLifecycle) component;
if (lifecycle.isRunning()) {
lifecycle.stop(aggregatingCallback);
}
else {
aggregatingCallback.run();
continue;
}
}
}
else {
callback.run();
aggregatingCallback.run();
}
this.running = false;
}

@Override
public void stop() {
ListIterator<SmartLifecycle> iterator = this.lifecycles.listIterator(this.lifecycles.size());
while (iterator.hasPrevious()) {
iterator.previous().stop();
for (Object component : this.integrationComponents.keySet()) {
if (component instanceof SmartLifecycle) {
SmartLifecycle lifecycle = (SmartLifecycle) component;
if (lifecycle.isRunning()) {
lifecycle.stop();
}
}
}
this.running = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.Lifecycle;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
Expand All @@ -64,6 +65,7 @@
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.dsl.Transformers;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.gateway.GatewayProxyFactoryBean;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.handler.ReactiveMessageHandlerAdapter;
Expand Down Expand Up @@ -511,6 +513,27 @@ public void testInterceptorFlow() {
);
}

@Autowired
@Qualifier("controlBusFlow")
Lifecycle controlBusFlow;

@Test
public void testStandardIntegrationFlowLifecycle() {
this.controlBusFlow.stop();

GatewayProxyFactoryBean controlBusGateway =
this.beanFactory.getBean("&controlBusGateway", GatewayProxyFactoryBean.class);
assertThat(controlBusGateway.isRunning()).isFalse();
Lifecycle controlBus = this.beanFactory.getBean("controlBus", Lifecycle.class);
assertThat(controlBus.isRunning()).isFalse();

this.controlBusFlow.start();

assertThat(controlBusGateway.isRunning()).isTrue();
assertThat(controlBus.isRunning()).isTrue();
}


@After
public void cleanUpList() {
outputStringList.clear();
Expand Down Expand Up @@ -592,8 +615,8 @@ public static class ContextConfiguration {

@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlows.from(ControlBusGateway.class)
.controlBus()
return IntegrationFlows.from(ControlBusGateway.class, (gateway) -> gateway.beanName("controlBusGateway"))
.controlBus((endpoint) -> endpoint.id("controlBus"))
.get();
}

Expand Down Expand Up @@ -941,6 +964,7 @@ public List<String> outputStringList() {
public IntegrationFlow interceptorFlow(List<String> outputStringList) {
return IntegrationFlows.from("interceptorChannelIn")
.intercept(new ChannelInterceptor() {

@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
outputStringList.add("Pre send transform: " + message.getPayload());
Expand All @@ -954,6 +978,7 @@ public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
})
.transform((String s) -> s.toUpperCase())
.intercept(new ChannelInterceptor() {

@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
outputStringList.add("Pre send handle: " + message.getPayload());
Expand All @@ -967,6 +992,7 @@ public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
})
.handle(m -> outputStringList.add("Handle: " + m.getPayload())).get();
}

}

@Service
Expand Down
1 change: 1 addition & 0 deletions src/reference/asciidoc/dsl.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ The following list includes the common DSL method names and the associated EIP e
Conceptually, integration processes are constructed by composing these endpoints into one or more message flows.
Note that EIP does not formally define the term 'message flow', but it is useful to think of it as a unit of work that uses well known messaging patterns.
The DSL provides an `IntegrationFlow` component to define a composition of channels and endpoints between them, but now `IntegrationFlow` plays only the configuration role to populate real beans in the application context and is not used at runtime.
However the bean for `IntegrationFlow` can be autowired as a `Lifecycle` to control `start()` and `stop()` for the whole flow which is delegated to all the Spring Integration components associated with this `IntegrationFlow`.
The following example uses the `IntegrationFlows` factory to define an `IntegrationFlow` bean by using EIP-methods from `IntegrationFlowBuilder`:

====
Expand Down