Spring Integration Java DSL Reference

Artem Bilan edited this page Nov 7, 2017 · 12 revisions

This project has been absorbed by Spring Integration Core starting with version 5.0. Please consult its Reference Manual for the actual documentation. This project is only in a maintenance, bug fixing state.

Spring Integration Java DSL

Table of Contents

The Spring Integration JavaConfig and DSL extension provides a set of convenient Builders and a fluent API to configure Spring Integration message flows from Spring @Configuration classes.

Example Configurations

@Configuration
@EnableIntegration
public class MyConfiguration {
    @Bean
    public MessageSource<?> integerMessageSource() {
        MethodInvokingMessageSource source = new MethodInvokingMessageSource();
        source.setObject(new AtomicInteger());
        source.setMethodName("getAndIncrement");
        return source;
    }
    @Bean
    public DirectChannel inputChannel() {
        return new DirectChannel();
    }
    @Bean
    public IntegrationFlow myFlow() {
        return IntegrationFlows.from(this.integerMessageSource(), c -> 
                                                   c.poller(Pollers.fixedRate(100)))
                    .channel(this.inputChannel())
                    .filter((Integer p) -> p > 0)
                    .transform(Object::toString)
                    .channel(MessageChannels.queue())
                    .get();
    }
}

As the result after ApplicationContext start up Spring Integration endpoints and Message Channels will be created as is the case after XML parsing. Such configuration can be used to replace XML configuration or along side with it.

Introduction

The Java DSL for Spring Integration is essentially a facade for Spring Integration. This guide assumes you are familiar with Spring Integration. If this is not the case, the Spring Integration Reference is a good place to begin.

The DSL provides a simple way to embed Spring Integration Message Flows into your application using the fluent Builder pattern together with existing Java and Annotation configurations from Spring Framework and Spring Integration as well. Another useful tool to simplify configuration is Java 8 Lambdas.

The cafe is a good example of using the DSL.

In this Reference we provide an overview of crucial principles and components. More info you can get from project's source code.

How it Works

The DSL is presented by the IntegrationFlows Factory for the IntegrationFlowBuilder. This produces the IntegrationFlow component, which should be registered as a Spring bean (@Bean). The builder pattern is used to express arbitrarily complex structures as a hierarchy of methods that may accept Lambdas as arguments.

The IntegrationFlowBuilder just collects integration components (MessageChannels, AbstractEndpoints etc.) in the IntegrationFlow bean for further parsing and registration of concrete beans in the application context by IntegrationFlowBeanPostProcessor.

The Java DSL uses Spring Integration classes directly and bypasses any XML generation and parsing.

However, the DSL offers more than syntactic sugar on top of XML. One of its most compelling features is the ability to define inline Lambdas to implement endpoint logic, eliminating the need for external classes to implement custom logic. In some sense, Spring Integration's support for the Spring Expression Language (SpEL) and inline scripting address this, but Java Lambdas are easier and much more powerful.

DSL Basics

The core DSL package contains the IntegrationFlowBuilder API mentioned above and a bunch of IntegrationComponentSpec implementations which are builders too and provide the fluent API to configure concrete endpoints.

The IntegrationFlowBuilder infrastructure provides common EIP for message based applications, such as channels, endpoints, pollers and channel interceptors.

Endpoints are expressed as verbs in the DSL to improve readability. The following list includes the common DSL method names and the associated EIP endpoint:

  • transform -> Transformer
  • filter -> Filter
  • handle -> ServiceActivator
  • split -> Splitter
  • aggregate -> Aggregator
  • route -> Router
  • bridge -> Bridge

Conceptually, integration processes are constructed by composing these endpoints into one or more message flows. Note that EIP does not formally define the term 'message flow', but it is useful to think of it as a unit of work that uses well known messaging patterns. The DSL provides a IntegrationFlow component to define a composition of channels and endpoints between them, but now IntegrationFlow plays only the configuration role to populate real beans in the application context and isn't used at runtime. Let's start with a simple example:

@Bean
public IntegrationFlow integerFlow() {
	return IntegrationFlows.from("input")
    			.<String, Integer>transform(Integer::parseInt)
    			.get();
}

Here we used the IntegrationFlows factory to define a IntegrationFlow using EIP-methods from IntegrationFlowBuilder.

The transform method accepts a Lambda as an endpoint argument to operate on the message payload. The real argument of this method is GenericTransformer<S, T>, hence any out-of-the-box transformers (ObjectToJsonTransformer, FileToStringTransformer etc.) can be used here.

Under the covers, IntegrationFlowBuilder recognizes the MessageHandler and endpoint for that: MessageTransformingHandler and ConsumerEndpointFactoryBean, respectively. Let's look at another example:

@Bean
public IntegrationFlow myFlow() {
	return IntegrationFlows.from("input")
	            .filter("World"::equals)
    			.transform("Hello "::concat)
    			.handle(System.out::println)
    			.get();
}

The above example composes a sequence of Filter->Transformer->Service Activator. The flow is 'one way', that is it does not provide a a reply message but simply prints the payload to STDOUT. The endpoints are automatically wired together using direct channels.

Message Channels

In addition to the IntegrationFlowBuilder with EIP-methods the Java DSL provides a fluent API to configure MessageChannels. For this purpose the MessageChannels builder factory is provided:

@Bean
public MessageChannel priorityChannel() {
   return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
                        .interceptor(wireTap())
                        .get();
}

The same MessageChannels builder factory can be used in the channel() EIP-method from IntegrationFlowBuilder to wire endpoints similar to aninput-channel/output-channel pair in the XML configuration. By default endpoints are wired via DirectChannel where the bean name is based on the pattern: [IntegrationFlow.beanName].channel#[channelNameIndex]. This rule is applied for unnamed channels produced by inline MessageChannels builder factory usage, too. However all MessageChannels methods have a channelId-aware variant to create the bean names for MessageChannels. The MessageChannel references can be used as well as beanName, as bean-method invocations. Here is a sample with possible variants of channel() EIP-method usage:

@Bean
public MessageChannel queueChannel() {
    return MessageChannels.queue().get();
}

@Bean
public MessageChannel publishSubscribe() {
     return MessageChannels.publishSubscribe().get();
}

@Bean
public IntegrationFlow channelFlow() {
	return IntegrationFlows.from("input")
	            .fixedSubscriberChannel()
	            .channel("queueChannel")
	            .channel(publishSubscribe())
	            .channel(MessageChannels.executor("executorChannel", this.taskExecutor))
	            .channel("output")
    		    .get();
}
  • from("input") means: 'find and use the MessageChannel with the "input" id, or create one';
  • fixedSubscriberChannel() produces an instance of FixedSubscriberChannel and registers it with name channelFlow.channel#0;
  • channel("queueChannel") works the same way but, of course, uses an existing "queueChannel" bean;
  • channel(publishSubscribe()) - the bean-method reference;
  • channel(MessageChannels.executor("executorChannel", this.taskExecutor)) the IntegrationFlowBuilder unwraps IntegrationComponentSpec to the ExecutorChannel and registers it as "executorChannel";
  • channel("output") - registers the DirectChannel bean with "output" name as long as there are no beans with this name.

Note: the IntegrationFlow definition shown above is valid and all of its channels are applied to endpoints with BridgeHandlers. Important. Be carefull to use the same inline channel definition via MessageChannels factory from different IntegrationFlows. Even if the DSL parsers registers non-existing objects as beans, it can't determine the same object (MessageChannel) from different IntegrationFlow containers. This is wrong:

@Bean
public IntegrationFlow startFlow() {
	return IntegrationFlows.from("input")
                           .transform(...)
                           .channel(MessageChannels.queue("queueChannel"))
                           .get();

@Bean
public IntegrationFlow endFlow() {
	return IntegrationFlows.from(MessageChannels.queue("queueChannel"))
                           .handle(...)
                           .get();

You end up with:

Caused by: java.lang.IllegalStateException: Could not register object [queueChannel] under bean name 'queueChannel': there is already object [queueChannel] bound
	at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)

To make it working there is just need to declare @Bean for that channel and use its bean-method from different IntegrationFlows.

Pollers

A similar fluent API is provided to configure PollerMetadata for AbstractPollingEndpoint implementations. The Pollers builder factory can be used to configure common bean definitions or those created from IntegrationFlowBuilder EIP-methods:

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
	return Pollers.fixedRate(500).get();
}

DSL and Endpoint Configuration

All IntegrationFlowBuilder EIP-methods have a variant to apply the Lambda parameter to provide options for AbstractEndpoints: SmartLifecycle, PollerMetadata, request-handler-advice-chain etc. Each of them has generic arguments, so it allows you to simply configure an endpoint and even its MessageHandler in the context:

@Bean
public IntegrationFlow flow2() {
	return IntegrationFlows.from(this.inputChannel)
				.transform(new PayloadSerializingTransformer(),
						c -> c.autoStartup(false).id("payloadSerializingTransformer"))
				.transform((Integer p) -> p * 2, c -> c.advice(this.expressionAdvice()))
				.get();
}

In addition the EndpointSpec provides the id() method to allow you to register an endpoint bean with a given bean name, rather than a generated one.

Transformers

The DSL API provides a convenient, fluent Transformers factory to be used as inline target object definition within .transform() EIP-method:

@Bean
public IntegrationFlow transformFlow() {
    return IntegrationFlows.from("input")
            .transform(Transformers.xpath("/root/myJson", XPathEvaluationType.STRING_RESULT))
            .transform(Transformers.fromJson(MyPojo.class))
            .transform(Transformers.serializer())
            .get();
}

It avoids inconvenient coding using setters and makes the flow definition more straightforward. Note, that Transformers can be use to declare target Transformers as @Beans and, again, use them from IntegrationFlow definition as bean-methods. Nevertheless, the DSL parser takes care about bean declarations for inline objects, if they aren't defined as beans yet.

Inbound Channel Adapters

Typically message flows start from some Inbound Channel Adapter (e.g. <int-jdbc:inbound-channel-adapter>). The adapter is configured with <poller> and it asks a MessageSource<?> for messages periodically. Java DSL allows to start IntegrationFlow from MessageSource<?>, too. For this purpose IntegrationFlows builder factory provides overloaded from(MessageSource<?> messageSource) method. The MessageSource<?> should be configured as a bean and provided as argounet for that method. The second parameter of from() is a Consumer<SourcePollingChannelAdapterSpec> Lambda and allows to provide options for SourcePollingChannelAdapter, e.g. PollerMetadata or SmartLifecycle:

@Bean
public MessageSource<Object> jdbcMessageSource() {
   return new JdbcPollingChannelAdapter(this.dataSource, "SELECT * FROM foo");
}

@Bean
public IntegrationFlow pollingFlow() {
	return IntegrationFlows.from(jdbcMessageSource(), 
                c -> c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1)))
			.transform(new ObjectToJsonTransformer())
			.channel("furtherProcessChannel")
			.get();
}

The next sections discuss selected endpoints which require further explanation.

Message Routers

Spring Integration natively provides specialized router types including:

  • HeaderValueRouter
  • PayloadTypeRouter
  • ExceptionTypeRouter
  • RecipientListRouter
  • XPathRouter

As with many other DSL IntegrationFlowBuilder EIP-methods the route() method can apply any out-of-the-box AbstractMessageRouter implementation, or for convenience a String as a SpEL expression, or a ref/method pair. In addition route() can be configured with a Lambda - the inline method invocation case, and with a Lambda for Consumer<RouterSpec<MethodInvokingRouter>>. The fluent API also provides AbstractMappingMessageRouter options like channelMapping(String key, String channelName) pairs:

@Bean
public IntegrationFlow routeFlow() {
	return IntegrationFlows.from("routerInput")
			.<Integer, Boolean>route(p -> p % 2 == 0,
					m -> m.suffix("Channel")
					      .channelMapping("true", "even")
					      .channelMapping("false", "odd")
			)
			.get();
}

A simple expression-based router:

@Bean
public IntegrationFlow routeFlow() {
	return IntegrationFlows.from("routerInput")
			.route("headers['destChannel']")
			.get();
}

The routeToRecipients() method takes a Consumer<RecipientListRouterSpec>:

@Bean
public IntegrationFlow recipientListFlow() {
	return IntegrationFlows.from("recipientListInput")
			.<String, String>transform(p -> p.replaceFirst("Payload", ""))
                     	.routeToRecipients(r -> r
				.recipient("foo-channel", "'foo' == payload")
				.recipient("bar-channel", m ->
					m.getHeaders().containsKey("recipient")
						&& (boolean) m.getHeaders().get("recipient"))
				.recipientFlow("'foo' == payload or 'bar' == payload or 'baz' == payload",
					f -> f.<String, String>transform(String::toUpperCase)
						.channel(c -> c.queue("recipientListSubFlow1Result")))
				.recipientFlow((String p) -> p.startsWith("baz"),
					f -> f.transform("Hello "::concat)
						.channel(c -> c.queue("recipientListSubFlow2Result")))
				.recipientFlow(new FunctionExpression<Message<?>>(m ->
                                		   "bax".equals(m.getPayload())),
					f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
				.defaultOutputToParentFlow())
			.get();
}

The .defaultOutputToParentFlow() of the .routeToRecipients() allows to make the router's defaultOutput as a gateway to continue a process for the unmatched messages in the main flow.

Splitters

A splitter is created using the split() EIP-method. By default, if the payload is a Collection or Array, this will output each item as an individual message. This takes a Lambda, SpEL expression, any AbstractMessageSplitter implementation, or can be used without parameters to provide the DefaultMessageSplitter. For example:

@Bean
public IntegrationFlow splitFlow() {
		return IntegrationFlows.from("splitInput")
					.split(s ->
							s.applySequence(false).get().getT2().setDelimiters(","))
					.channel(MessageChannels.executor(this.taskExecutor()))
					.get();
}

This creates a splitter that splits a message containing a comma delimited String. Note: the getT2() method comes from Tuple Collection which is the result of EndpointSpec.get() and represents a pair of ConsumerEndpointFactoryBean and DefaultMessageSplitter for the example above.

Aggregators and Resequencers

An Aggregator is conceptually the converse of a splitter. It aggregates a sequence of individual messages into a single message and is necessarily more complex. By default, an aggregator will return a message containing a collection of payloads from incoming messages. The same rules are applied for Resequencer:

@Bean
public IntegrationFlow splitAggregateFlow() {
	return IntegrationFlows.from("splitAggregateInput")
			.split(null)
			.channel(MessageChannels.executor(this.taskExecutor()))
			.resequence()
			.aggregate()
			.get();
}

The above is a canonical example of splitter/aggregator pattern. The split() method splits the list into individual messages and sends them to the ExecutorChannel. The resequence() method reorders messages by sequence details from message headers. The aggregate() method just collects those messages to the result list.

However, you may change the default behavior by specifying a release strategy and correlation strategy, among other things. Consider the following:

.aggregate(a ->
						a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
						 .releaseStrategy(g -> g.size() > 10)
						 .messageStore(messageStore()), null)

The similar Lambda configurations are provided for the resequence() EIP-method.

ServiceActivators (.handle())

The .handle() EIP-method's goal is to invoke any MessageHandler implementation or any method on some POJO. Another option to define "activity" via Lambda expression. Hence a generic GenericHandler<P> functional interface has been introduced. Is handle method requires two arguments - P payload and Map<String, Object> headers. Having that we can define a flow like this:

@Bean
public IntegrationFlow myFlow() {
	return IntegrationFlows.from("flow3Input")
		.<Integer>handle((p, h) -> p * 2)
		.get();
}

However one main goal of Spring Integration an achieving of loose coupling via runtime type conversion from message payload to target arguments of message handler. Since Java doesn't support generic type resolution for Lambda classes, we introduced a workaround with additional payloadType argument for the most EIP-methods and LambdaMessageProcessor, which delegates the hard conversion work to the Spring's ConversionService using provided type and requested message to target method arguments. The IntegrationFlow might look like this:

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlows.from("input")
            .<byte[], String>transform(p - > new String(p, "UTF-8"))
            .handle(Integer.class, (p, h) -> p * 2)
            .get();
}

Of course we register some custom BytesToIntegerConverter within ConversionService and get rid of that aditional .transform().

Operator log()

For convenience to log the message journey Spring Integration manner (a-la <logging-channel-adapter>), a log() operator is presented. Underneath it is represented just by WireTap ChannelInterceptor and LoggingHandler as subscriber. It is responsible to log message incoming into the next endpoint:

.filter(...)
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(...)

In this example an id header will be logged with ERROR level onto "test.category" only for messages passed the filter and before routing.

MessageChannelSpec.wireTap()

A .wireTap() fluent API exists for MessageChannelSpec builders. A target configuration gains much more from Java DSL usage:

@Bean
public QueueChannelSpec myChannel() {
    return MessageChannels
            .queue()
            .wireTap("loggingFlow.input");
}

@Bean
public IntegrationFlow loggingFlow() {
    return f -> f.log();
}

Working With Message Flows

As we have seen, IntegrationFlowBuilder provides a top level API to produce Integration components wired to message flows. This is convenient if your integration may be accomplished with a single flow (which is often the case). Alternately IntegrationFlows can be joined via MessageChannels.

By default, the MessageFlow behaves as a Chain in Spring Integration parlance. That is, the endpoints are automatically wired implicitly via DirectChannels. The message flow is not actually constructed as a chain, affording much more flexibility. For example, you may send a message to any component within the flow, if you know its inputChannel name, i.e., explicitly define it. You may also reference externally defined channels within a flow to allow the use of channel adapters to enable remote transport protocols, file I/O, and the like, instead of direct channels. As such, the DSL does not support the Spring Integration chain element since it doesn't add much value.

Since the Spring Integration Java DSL produces the same bean definition model as any other configuration options and is based on the existing Spring Framework @Configuration infrastructure it can be used together with Integration XML definitions and wired with Spring Integration Messaging Annotations configuration.

Another alternative to define direct IntegrationFlows is based on a fact that IntegrationFlow can be declared as Lambda too:

@Bean
public IntegrationFlow lambdaFlow() {
	return f -> f.filter("World"::equals)
                     .transform("Hello "::concat)
                     .handle(System.out::println);
}

The result of this definition is the same bunch of Integration components wired with implicit direct channel. Only limitation is here, that this flow is started with named direct channel - lambdaFlow.input. And Lambda flow can't start from MessageSource or MessageProducer.

FunctionExpression

The FunctionExpression (an implementation of SpEL Expression) has been introduced to get a gain of Java and Lambda usage for the method and its generics context. The Function<T, R> option is provided for the DSL components alongside with expression option, when there is the implicit Strategy variant from Core Spring Integration. The usage may look like:

.enrich(e -> e.requestChannel("enrichChannel")
            .requestPayload(Message::getPayload)
            .propertyFunction("date", m -> new Date()))

The FunctionExpression also supports runtime type conversion as it is done in the standard SpelExpression.

Sub Flows support

Some of if...else and publish-subscribe components provide the support to specify their logic or mapping using Sub Flows. The simplest sample is .publishSubscribeChannel():

@Bean
public IntegrationFlow subscribersFlow() {
    return flow -> flow
            .publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p / 2)
                            .channel(c -> c.queue("subscriber1Results")))
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p * 2)
                            .channel(c -> c.queue("subscriber2Results"))))
            .<Integer>handle((p, h) -> p * 3)
            .channel(c -> c.queue("subscriber3Results"));
}

Of course the same result we can achieve with separate IntegrationFlow @Bean definitions, but we hope you'll find the subflow style of logic composition useful.

Similar publish-subscribe subflow composition provides .routeToRecipients().

Another sample is .discardFlow() on the .filter() instead of .discardChannel().

The .route() deserves special attention. As a sample:

@Bean
public IntegrationFlow routeFlow() {
    return f -> f
            .<Integer, Boolean>route(p -> p % 2 == 0,
                    m -> m.channelMapping("true", "evenChannel")
                            .subFlowMapping("false", sf ->
                                    sf.<Integer>handle((p, h) -> p * 3)))
            .transform(Object::toString)
            .channel(c -> c.queue("oddChannel"));
}

The .channelMapping() continues to work as in regular Router mapping, but the .subFlowMapping() tied that subflow with main flow. In other words, any router's subflow returns to the main flow after .route().

Of course, subflows can be nested with any depth, but we don't recommend to do that because, in fact, even in the router case, adding complex subflows within a flow would quickly begin to look like a plate of spaghetti and difficult for a human to parse.

Using Protocol Adapters

All of the examples so far illustrate how the DSL supports a messaging architecture using the Spring Integration programming model, but we haven't done any real integration yet. This requires access to remote resources via http, jms, amqp, tcp, jdbc, ftp, smtp, and the like, or access to the local file system. Spring Integration supports all of these and more. Ideally, the DSL should offer first class support for all of these but it is a daunting task to implement all of these and keep up as new adapters are added to Spring Integration. So the expectation is that the DSL will continually be catching up with Spring Integration.

Anyway we are providing the hi-level API to define protocol-specific seamlessly. This is achieved with Factory and Builder patterns and, of course, with Lambdas. The factory classes can be considered "Namespace Factories", because they play the same role as XML namespace for components from the concrete protocol-specific Spring Integration modules. Currently, Spring Integration Java DSL supports now Amqp, Jms, Files, (S)Ftp, Http and Mail namespace factories:

@Bean
public IntegrationFlow amqpFlow() {
    return IntegrationFlows.from(Amqp.inboundGateway(this.rabbitConnectionFactory, queue()))
            .transform("hello "::concat)
            .transform(String.class, String::toUpperCase)
            .get();
}

@Bean
public IntegrationFlow amqpOutboundFlow() {
    return IntegrationFlows.from("amqpOutboundInput")
            .handle(Amqp.outboundAdapter(this.amqpTemplate).routingKeyExpression("headers.routingKey"))
            .get();
}

@Bean
public IntegrationFlow jmsInboundFlow() {
    return IntegrationFlows
            .from(Jms.inboundAdapter(this.jmsConnectionFactory)
                    .configureJmsTemplate(t ->
                            t.deliveryPersistent(true)
                                    .jmsMessageConverter(myMessageConverter()))
                    .destination("jmsInbound"))
            .transform(...)
            .channel(...)
            .get();
}

@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
    return IntegrationFlows.from("jmsOutboundGatewayChannel")
            .handle(Jms.outboundGateway(this.jmsConnectionFactory)
                        .replyContainer(c ->
                                    c.concurrentConsumers(3)
                                            .sessionTransacted(true))
                        .requestDestination("jmsPipelineTest"))
            .get();
}

@Autowired
private DefaultFtpSessionFactory ftpSessionFactory;

@Bean
public IntegrationFlow ftpInboundFlow() {
     return IntegrationFlows
               .from(Ftp.inboundAdapter(this.ftpSessionFactory)
                        .preserveTimestamp(true)
                        .remoteDirectory("ftpSource")
                        .regexFilter(".*\\.txt$")
                        .localFilenameExpression("#this.toUpperCase() + '.a'")
                        .localDirectory(new File("tmp")),
                      e -> e.id("ftpInboundAdapter"))
               .channel(MessageChannels.queue("ftpInboundResultChannel"))
               .get();
}

@Bean
public IntegrationFlow httpInboundGatewayFlow() {
    return IntegrationFlows.from(Http.inboundGateway("/hello/{country}")
                    .requestMapping(r -> r
                            .methods(HttpMethod.GET)
                            .params("msg"))
                    .headerExpression("country", "#pathVariables.country")
                    .payloadExpression("#requestParams.msg[0]"))
            .handle((payload, headers) ->
                    "de".equals(headers.get("country")) 
                        ? "Hallo " + payload 
                        : "Hello " + payload)
            .get();
}

@Bean
public IntegrationFlow sendMailFlow() {
    return IntegrationFlows.from("sendMailChannel")
            .handle(Mail.outboundAdapter("localhost")
                            .port(smtpPort)
                            .credentials("user", "pw")
                            .protocol("smtp")
                            .javaMailProperties(p -> p.put("mail.debug", "true")),
                    e -> e.id("sendMailEndpoint"))
            .get();
}

@Bean
public IntegrationFlow pop3MailFlow() {
    return IntegrationFlows
            .from(Mail.pop3InboundAdapter("localhost", pop3Port, "user", "pw")
                            .javaMailProperties(p -> p.put("mail.debug", "true")),
                    e -> e.autoStartup(true)
                            .poller(Pollers.fixedDelay(60000)))
            .channel(MessageChannels.queue("pop3Channel"))
            .get();
}

@Bean
public IntegrationFlow imapMailFlow() {
    return IntegrationFlows
            .from(Mail.imapInboundAdapter("imap://user:pw@localhost:" + imapPort + "/INBOX")
                            .searchTermStrategy((f,l) -> new FromTerm(fromAddress()))
                            .javaMailProperties(p -> p.put("mail.debug", "true")),
                    e -> e.autoStartup(true)
                            .poller(Pollers.fixedDelay(60000)))
            .channel(MessageChannels.queue("imapChannel"))
            .get();
}

@Bean
public IntegrationFlow imapIdleFlow() {
    return IntegrationFlows
            .from(Mail.imapIdleAdapter("imap://user:pw@localhost:" + imapIdlePort + "/INBOX")
                    .searchTermStrategy((f, l) -> fromAndNotSeenTerm())
                    .javaMailProperties(p -> p
                            .put("mail.debug", "true")
                            .put("mail.imap.connectionpoolsize", "5"))
                    .shouldReconnectAutomatically(true))
            .enrichHeaders(s -> s.headerExpressions(h -> h
                            .put(MailHeaders.SUBJECT, "payload.subject")
                            .put(MailHeaders.FROM, "payload.from[0].toString()")))
            .channel(MessageChannels.queue("imapIdleChannel"))
            .get();
}

private SearchTerm fromAndNotSeenTerm() {
    try {
        FromTerm fromTerm = new FromTerm(new InternetAddress("bar@baz"));
        return new AndTerm(fromTerm, new FlagTerm(new Flags(Flags.Flag.SEEN), false));
    }
    catch (AddressException e) {
        throw new RuntimeException(e);
    }
}

We show here the usage of namespace factories as inline adapters declarations, however they can be used from @Bean definitions to make the IntegrationFlow method-chain more readable.

We are soliciting community feedback on these namespace factories before we spend effort on others; we'd also appreciate some prioritization for which adapters/gateways we should support next.

Be sure to have concrete spring-integration-[PROTOCOL].jar and its required dependencies on the classpath, because the spring-integration-java-dsl declares them as optional to avoid unnecessary end-application overhead.

All other protocol specific adapters you should configure as generic beans and wire them to the IntegrationFlow:

@Bean
public HttpRequestHandlingMessagingGateway httpGate() {
	HttpRequestHandlingMessagingGateway gateway = new HttpRequestHandlingMessagingGateway(true);
	RequestMapping requestMapping = new RequestMapping();
	requestMapping.setMethods(HttpMethod.POST);
	requestMapping.setPathPatterns("/foo");
	gateway.setRequestMapping(requestMapping);
	gateway.setRequestChannel(requestChannel());
	return gateway;
}

@Bean
public DirectChannel requestChannel() {
	return MessageChannels.direct().get();
}

@Bean
public IntegrationFlow flow() {
    return IntegrationFlows.from(requestChannel())
    		.handle(...)
    		.transform(...)
    		.get();
}

Maven

Repository

<repository>
    <id>repository.springframework.maven.release</id>
    <name>Spring Framework Maven Release Repository</name>
    <url>http://repo.spring.io/release</url>
</repository>

Artifact

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-java-dsl</artifactId>
    <version>1.0.2.RELEASE</version>
</dependency>

Support

Check out the spring-integration tag on Stack Overflow. Commercial support is available, too.

Related GitHub projects

For more information, please also don't forget to visit the Spring Integration website.

Clone this wiki locally
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.
Press h to open a hovercard with more details.