Skip to content

Commit

Permalink
GH-3623: Deprecarte an IntegrationFlows
Browse files Browse the repository at this point in the history
Fixes #3623

* `IntegrationFlow` refactoring
* Apply several code style improvements and good practices
* Code style: no empty lines for methods javadocs
* make deprecated implementation reuse actual one instead of the copy-paste approach
* add whats-new comments
* Fix whats-new page according to standards
  • Loading branch information
oxcafedead committed Jul 5, 2022
1 parent 63759d7 commit 53dd050
Show file tree
Hide file tree
Showing 90 changed files with 796 additions and 570 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2020 the original author or authors.
* Copyright 2014-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,7 +59,6 @@
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlowBuilder;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Transformers;
import org.springframework.integration.dsl.context.IntegrationFlowContext;
import org.springframework.integration.support.MessageBuilder;
Expand Down Expand Up @@ -161,7 +160,7 @@ public void testAmqpOutboundFlow() {

@Test
public void testTemplateChannelTransacted() {
IntegrationFlowBuilder flow = IntegrationFlows.from(Amqp.channel("testTemplateChannelTransacted",
IntegrationFlowBuilder flow = IntegrationFlow.from(Amqp.channel("testTemplateChannelTransacted",
this.rabbitConnectionFactory)
.autoStartup(false)
.templateChannelTransacted(true));
Expand Down Expand Up @@ -235,7 +234,7 @@ void unitTestChannel() {
@Test
void testContentTypeOverrideWithReplyHeadersMappedLast() {
IntegrationFlow testFlow =
IntegrationFlows
IntegrationFlow
.from(Amqp.inboundGateway(this.rabbitConnectionFactory, this.amqpQueue2)
.replyHeadersMappedLast(true))
.transform(Transformers.fromJson())
Expand Down Expand Up @@ -300,7 +299,7 @@ public Queue defaultReplyTo() {

@Bean
public IntegrationFlow amqpFlow(ConnectionFactory rabbitConnectionFactory, AmqpTemplate amqpTemplate) {
return IntegrationFlows
return IntegrationFlow
.from(Amqp.inboundGateway(rabbitConnectionFactory, amqpTemplate, queue())
.id("amqpInboundGateway")
.configureContainer(c -> c
Expand All @@ -315,7 +314,7 @@ public IntegrationFlow amqpFlow(ConnectionFactory rabbitConnectionFactory, AmqpT

// syntax only
public IntegrationFlow amqpDMLCFlow(ConnectionFactory rabbitConnectionFactory, AmqpTemplate amqpTemplate) {
return IntegrationFlows
return IntegrationFlow
.from(Amqp.inboundGateway(new DirectMessageListenerContainer())
.id("amqpInboundGateway")
.configureContainer(c -> c
Expand All @@ -329,7 +328,7 @@ public IntegrationFlow amqpDMLCFlow(ConnectionFactory rabbitConnectionFactory, A

@Bean
public IntegrationFlow amqpOutboundFlow(ConnectionFactory rabbitConnectionFactory, AmqpTemplate amqpTemplate) {
return IntegrationFlows.from(Amqp.channel("amqpOutboundInput", rabbitConnectionFactory))
return IntegrationFlow.from(Amqp.channel("amqpOutboundInput", rabbitConnectionFactory))
.handle(Amqp.outboundAdapter(amqpTemplate).routingKeyExpression("headers.routingKey"))
.get();
}
Expand All @@ -346,7 +345,7 @@ public Queue amqpReplyChannel() {

@Bean
public IntegrationFlow amqpInboundFlow(ConnectionFactory rabbitConnectionFactory) {
return IntegrationFlows.from(Amqp.inboundAdapter(rabbitConnectionFactory, fooQueue())
return IntegrationFlow.from(Amqp.inboundAdapter(rabbitConnectionFactory, fooQueue())
.configureContainer(container -> container.consumerBatchEnabled(true)
.batchSize(2))
.batchMode(BatchMode.EXTRACT_PAYLOADS)
Expand Down Expand Up @@ -384,7 +383,7 @@ public Queue exDLQ() {

@Bean
public IntegrationFlow inboundWithExceptionFlow(ConnectionFactory cf) {
return IntegrationFlows.from(Amqp.inboundAdapter(cf, exQueue())
return IntegrationFlow.from(Amqp.inboundAdapter(cf, exQueue())
.configureContainer(c -> c.defaultRequeueRejected(false))
.errorChannel("errors.input"))
.handle(m -> {
Expand Down Expand Up @@ -421,7 +420,7 @@ public Queue exConvDLQ() {

@Bean
public IntegrationFlow inboundWithConvExceptionFlow(ConnectionFactory cf) {
return IntegrationFlows.from(Amqp.inboundAdapter(cf, exConvQueue())
return IntegrationFlow.from(Amqp.inboundAdapter(cf, exConvQueue())
.configureContainer(c -> c.defaultRequeueRejected(false))
.messageConverter(new SimpleMessageConverter() {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2019 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -54,7 +54,6 @@
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.messaging.handler.annotation.Header;
Expand Down Expand Up @@ -198,7 +197,7 @@ public void ack(@Header(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK

@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(Amqp.inboundPolledAdapter(connectionFactory(), DSL_QUEUE),
return IntegrationFlow.from(Amqp.inboundPolledAdapter(connectionFactory(), DSL_QUEUE),
e -> e.poller(Pollers.fixedDelay(100)).autoStartup(false))
.handle(p -> {
this.fromDsl = p.getPayload();
Expand All @@ -209,7 +208,7 @@ public IntegrationFlow flow() {

@Bean
public IntegrationFlow messageSourceChannelFlow() {
return IntegrationFlows.from(interceptedSource(),
return IntegrationFlow.from(interceptedSource(),
e -> e.poller(Pollers.fixedDelay(100)).autoStartup(false))
.handle(p -> {
this.fromInterceptedSource = p.getPayload();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2019 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,7 +41,6 @@
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

Expand Down Expand Up @@ -108,7 +107,7 @@ public BoundRabbitChannelAdvice advice(RabbitTemplate template) {

@Bean
public IntegrationFlow flow(RabbitTemplate template, BoundRabbitChannelAdvice advice) {
return IntegrationFlows.from(Gate.class)
return IntegrationFlow.from(Gate.class)
.split(s -> s.delimiters(",")
.advice(advice))
.<String, String>transform(String::toUpperCase)
Expand All @@ -118,7 +117,7 @@ public IntegrationFlow flow(RabbitTemplate template, BoundRabbitChannelAdvice ad

@Bean
public IntegrationFlow listener(CachingConnectionFactory ccf) {
return IntegrationFlows.from(Amqp.inboundAdapter(ccf, QUEUE))
return IntegrationFlow.from(Amqp.inboundAdapter(ccf, QUEUE))
.handle(m -> {
received.add((String) m.getPayload());
this.latch.countDown();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2019 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,7 +43,6 @@
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import com.rabbitmq.client.Channel;
Expand Down Expand Up @@ -121,7 +120,7 @@ public RabbitTemplate template() throws Exception {

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlows.from(Gate.class)
return IntegrationFlow.from(Gate.class)
.split(s -> s.delimiters(",")
.advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
.<String, String>transform(String::toUpperCase)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ protected MessageChannel obtainInputChannelFromFlow(IntegrationFlow flow) {
MessageChannel messageChannel = flow.getInputChannel();
if (messageChannel == null) {
messageChannel = new DirectChannel();
IntegrationFlowDefinition<?> flowBuilder = IntegrationFlows.from(messageChannel);
IntegrationFlowDefinition<?> flowBuilder = IntegrationFlow.from(messageChannel);
flow.configure(flowBuilder);
addComponent(flowBuilder.get());
}
Expand Down Expand Up @@ -457,7 +457,7 @@ protected MessageChannel obtainInputChannelFromFlow(IntegrationFlow flow) {
*/
public B wireTap(String wireTapChannel, Consumer<WireTapSpec> wireTapConfigurer) {
DirectChannel internalWireTapChannel = new DirectChannel();
addComponent(IntegrationFlows.from(internalWireTapChannel).channel(wireTapChannel).get());
addComponent(IntegrationFlow.from(internalWireTapChannel).channel(wireTapChannel).get());
return wireTap(internalWireTapChannel, wireTapConfigurer);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 the original author or authors.
* Copyright 2020-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -58,7 +58,7 @@ public BroadcastPublishSubscribeSpec subscribe(IntegrationFlow subFlow) {
Assert.notNull(subFlow, "'subFlow' must not be null");

IntegrationFlowBuilder flowBuilder =
IntegrationFlows.from(this.target)
IntegrationFlow.from(this.target)
.bridge(consumer -> consumer.order(this.order++));

MessageChannel subFlowInput = subFlow.getInputChannel();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -163,7 +163,7 @@ protected MessageChannel obtainInputChannelFromFlow(IntegrationFlow subFlow, boo
MessageChannel messageChannel = subFlow.getInputChannel();
if (messageChannel == null) {
messageChannel = new DirectChannel();
IntegrationFlowDefinition<?> flowBuilder = IntegrationFlows.from(messageChannel);
IntegrationFlowDefinition<?> flowBuilder = IntegrationFlow.from(messageChannel);
subFlow.configure(flowBuilder);
this.componentsToRegister.put(evaluateInternalBuilder ? flowBuilder.get() : flowBuilder, null);
}
Expand Down

0 comments on commit 53dd050

Please sign in to comment.