Skip to content

Commit

Permalink
Add JavaDSL for ReactiveMongoDbMessageSource
Browse files Browse the repository at this point in the history
* Implement `ReactiveMongoDbMessageSourceSpec` and factories for it
* Rework `ReactiveMongoDbMessageSourceTests` to use new Java DSL for `ReactiveMongoDbMessageSource`
* Document changes
  • Loading branch information
artembilan authored and garyrussell committed Jan 22, 2020
1 parent c7c7422 commit 0606a6f
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 27 deletions.
Expand Up @@ -21,11 +21,15 @@
import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.expression.ValueExpression;

/**
* Factory class for building MongoDb components
*
* @author Xavier Padro
* @author Artem Bilan
*
* @since 5.0
*/
Expand Down Expand Up @@ -74,10 +78,69 @@ public static ReactiveMongoDbMessageHandlerSpec reactiveOutboundChannelAdapter(
* @return the {@link ReactiveMongoDbMessageHandlerSpec} instance
* @since 5.3
*/
public static ReactiveMongoDbMessageHandlerSpec reactiveOutboundChannelAdapter(ReactiveMongoOperations mongoTemplate) {
public static ReactiveMongoDbMessageHandlerSpec reactiveOutboundChannelAdapter(
ReactiveMongoOperations mongoTemplate) {

return new ReactiveMongoDbMessageHandlerSpec(mongoTemplate);
}

/**
* Create a {@link ReactiveMongoDbMessageSourceSpec} builder instance
* based on the provided {@link ReactiveMongoDatabaseFactory}.
* @param mongoDbFactory the {@link ReactiveMongoDatabaseFactory} to use.
* @param query the MongoDb query
* @return the {@link ReactiveMongoDbMessageSourceSpec} instance
* @since 5.3
*/
public static ReactiveMongoDbMessageSourceSpec reactiveInboundChannelAdapter(
ReactiveMongoDatabaseFactory mongoDbFactory, String query) {

return new ReactiveMongoDbMessageSourceSpec(mongoDbFactory, new LiteralExpression(query));
}

/**
* Create a {@link ReactiveMongoDbMessageSourceSpec} builder instance
* based on the provided {@link ReactiveMongoDatabaseFactory}.
* @param mongoDbFactory the {@link ReactiveMongoDatabaseFactory} to use.
* @param query the MongoDb query DSL object
* @return the {@link ReactiveMongoDbMessageSourceSpec} instance
* @since 5.3
*/
public static ReactiveMongoDbMessageSourceSpec reactiveInboundChannelAdapter(
ReactiveMongoDatabaseFactory mongoDbFactory, Query query) {

return new ReactiveMongoDbMessageSourceSpec(mongoDbFactory, new ValueExpression<>(query));
}

/**
* Create a {@link ReactiveMongoDbMessageSourceSpec} builder instance
* based on the provided {@link ReactiveMongoOperations}.
* @param mongoTemplate the {@link ReactiveMongoOperations} to use.
* @param query the MongoDb query
* @return the {@link ReactiveMongoDbMessageSourceSpec} instance
* @since 5.3
*/
public static ReactiveMongoDbMessageSourceSpec reactiveInboundChannelAdapter(ReactiveMongoOperations mongoTemplate,
String query) {

return new ReactiveMongoDbMessageSourceSpec(mongoTemplate, new LiteralExpression(query));
}

/**
* Create a {@link ReactiveMongoDbMessageSourceSpec} builder instance
* based on the provided {@link ReactiveMongoOperations}.
* @param mongoTemplate the {@link ReactiveMongoOperations} to use.
* @param query the MongoDb query DSL object
* @return the {@link ReactiveMongoDbMessageSourceSpec} instance
* @since 5.3
*/
public static ReactiveMongoDbMessageSourceSpec reactiveInboundChannelAdapter(ReactiveMongoOperations mongoTemplate,
Query query) {

return new ReactiveMongoDbMessageSourceSpec(mongoTemplate, new ValueExpression<>(query));
}


private MongoDb() {
}

Expand Down
@@ -0,0 +1,126 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.mongodb.dsl;

import java.util.function.Supplier;

import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.dsl.MessageSourceSpec;
import org.springframework.integration.expression.SupplierExpression;
import org.springframework.integration.mongodb.inbound.ReactiveMongoDbMessageSource;

/**
* A {@link MessageSourceSpec} implementation for a {@link ReactiveMongoDbMessageSource}.
*
* @author Artem Bilan
*
* @since 5.3
*/
public class ReactiveMongoDbMessageSourceSpec
extends MessageSourceSpec<ReactiveMongoDbMessageSourceSpec, ReactiveMongoDbMessageSource> {

ReactiveMongoDbMessageSourceSpec(ReactiveMongoDatabaseFactory reactiveMongoDatabaseFactory,
Expression queryExpression) {
this.target = new ReactiveMongoDbMessageSource(reactiveMongoDatabaseFactory, queryExpression);
}

ReactiveMongoDbMessageSourceSpec(ReactiveMongoOperations reactiveMongoTemplate, Expression queryExpression) {
this.target = new ReactiveMongoDbMessageSource(reactiveMongoTemplate, queryExpression);
}

/**
* Allow you to set the type of the entityClass that will be passed to the
* {@link ReactiveMongoOperations#find(Query, Class)} or {@link ReactiveMongoOperations#findOne(Query, Class)}
* method.
* Default is {@link com.mongodb.DBObject}.
* @param entityClass The entity class.
* @return the spec
* @see ReactiveMongoDbMessageSource#setEntityClass(Class)
*/
public ReactiveMongoDbMessageSourceSpec entityClass(Class<?> entityClass) {
this.target.setEntityClass(entityClass);
return this;
}

/**
* Allow you to manage which find* method to invoke on {@link ReactiveMongoOperations}.
* @param expectSingleResult true if a single result is expected.
* @return the spec
* @see ReactiveMongoDbMessageSource#setExpectSingleResult(boolean)
*/
public ReactiveMongoDbMessageSourceSpec expectSingleResult(boolean expectSingleResult) {
this.target.setExpectSingleResult(expectSingleResult);
return this;
}

/**
* Configure a collection name to query against.
* @param collectionName the name of the MongoDb collection
* @return the spec
*/
public ReactiveMongoDbMessageSourceSpec collectionName(String collectionName) {
return collectionNameExpression(new LiteralExpression(collectionName));
}

/**
* Configure a SpEL expression to evaluation a collection name on each {@code receive()} call.
* @param collectionNameExpression the SpEL expression for name of the MongoDb collection
* @return the spec
*/
public ReactiveMongoDbMessageSourceSpec collectionNameExpression(String collectionNameExpression) {
return collectionNameExpression(PARSER.parseExpression(collectionNameExpression));
}

/**
* Configure a {@link Supplier} to obtain a collection name on each {@code receive()} call.
* @param collectionNameSupplier the {@link Supplier} for name of the MongoDb collection
* @return the spec
*/
public ReactiveMongoDbMessageSourceSpec collectionNameSupplier(Supplier<String> collectionNameSupplier) {
return collectionNameExpression(new SupplierExpression<>(collectionNameSupplier));
}

/**
* Configure a SpEL expression to evaluation a collection name on each {@code receive()} call.
* @param collectionNameExpression the SpEL expression for name of the MongoDb collection
* @return the spec
* @see ReactiveMongoDbMessageSource#setCollectionNameExpression(Expression)
*/
public ReactiveMongoDbMessageSourceSpec collectionNameExpression(Expression collectionNameExpression) {
this.target.setCollectionNameExpression(collectionNameExpression);
return this;
}

/**
* Configure a custom {@link MongoConverter} used to assist in deserialization
* data read from MongoDb. Only allowed if this instance was constructed with a
* {@link ReactiveMongoDatabaseFactory}.
* @param mongoConverter The mongo converter.
* @return the spec
* @see ReactiveMongoDbMessageSource#setMongoConverter(MongoConverter)
*/
public ReactiveMongoDbMessageSourceSpec mongoConverter(MongoConverter mongoConverter) {
this.target.setMongoConverter(mongoConverter);
return this;
}

}
Expand Up @@ -32,7 +32,6 @@
import org.bson.conversions.Bson;
import org.junit.Test;
import org.mockito.Mockito;
import org.reactivestreams.Publisher;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.context.ConfigurableApplicationContext;
Expand All @@ -47,10 +46,10 @@
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.mongodb.dsl.MongoDb;
import org.springframework.integration.mongodb.rules.MongoDbAvailable;
import org.springframework.integration.mongodb.rules.MongoDbAvailableTests;

Expand Down Expand Up @@ -245,30 +244,14 @@ private static <T> T waitFor(Mono<T> mono) {
static class TestContext {

@Bean
FluxMessageChannel output() {
return new FluxMessageChannel();
}

@Bean
public MessageSource<Publisher<?>> mongodbMessageSource(ReactiveMongoDatabaseFactory mongoDatabaseFactory) {
Expression queryExpression = new LiteralExpression("{'name' : 'Oleg'}");
ReactiveMongoDbMessageSource reactiveMongoDbMessageSource =
new ReactiveMongoDbMessageSource(mongoDatabaseFactory, queryExpression);
reactiveMongoDbMessageSource.setEntityClass(Person.class);
return reactiveMongoDbMessageSource;
}

@Bean
ReactiveMongoDatabaseFactory mongoDatabaseFactory() {
return MongoDbAvailableTests.REACTIVE_MONGO_DATABASE_FACTORY;
}

@Bean
public IntegrationFlow pollingFlow(MessageSource<Publisher<?>> mongodbMessageSource) {
public IntegrationFlow pollingFlow() {
return IntegrationFlows
.from(mongodbMessageSource, c -> c.poller(Pollers.fixedDelay(100).maxMessagesPerPoll(1)))
.from(MongoDb.reactiveInboundChannelAdapter(
MongoDbAvailableTests.REACTIVE_MONGO_DATABASE_FACTORY, "{'name' : 'Oleg'}")
.entityClass(Person.class),
c -> c.poller(Pollers.fixedDelay(100).maxMessagesPerPoll(1)))
.split()
.channel(output())
.channel(c -> c.flux("output"))
.get();
}

Expand Down
23 changes: 23 additions & 0 deletions src/reference/asciidoc/mongodb.adoc
Expand Up @@ -544,3 +544,26 @@ public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbF

In this sample we are going to connect to the MongoDb via provided `ReactiveMongoDatabaseFactory` and store a data from request message into a default collection with the `data` name.
The real operation is going to be performed on-demand from the reactive stream composition in the internally created `ReactiveStreamsConsumer`.

The `ReactiveMongoDbMessageSource` is an `AbstractMessageSource` implementation based on the provided `ReactiveMongoDatabaseFactory` or `ReactiveMongoOperations` and MongoDb query (or expression), calls `find()` or `findOne()` operation according an `expectSingleResult` option with an expected `entityClass` type to convert a query result.
A query execution and result evaluation is performed on demand when `Publisher` (`Flux` or `Mono` according `expectSingleResult` option) in the payload of produced message is subscribed.
The framework can subscribe to such a payload automatically (essentially `flatMap`) when splitter and `FluxMessageChannel` are used downstream.
Otherwise it is target application responsibility to subscribe into a polled publishers in downstream endpoints.

With Java DSL such a channel adapter could be configured like:

====
[source, java]
----
@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
return IntegrationFlows
.from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, "{'name' : 'Name'}")
.entityClass(Person.class),
c -> c.poller(Pollers.fixedDelay(1000)))
.split()
.channel(c -> c.flux("output"))
.get();
}
----
====
2 changes: 1 addition & 1 deletion src/reference/asciidoc/reactive-streams.adoc
Expand Up @@ -134,5 +134,5 @@ A returned reactive type can be subscribed immediately if we are in one-way, fir

Currently Spring Integration provides channel adapter (or gateway) implementations for <<./webflux.adoc#webflux,WebFlux>>, <<./rsocket.adoc#rsocket,RSocket>> and <<./mongodb.adoc#mongodb,MongoDb>>.
Also an https://github.com/spring-projects/spring-integration-extensions/tree/master/spring-integration-cassandra[Apache Cassandra Extension] provides a `MessageHandler` implementation for the Cassandra reactive driver.
More reactive channel adapters are coming, for example for https://r2dbc.io/[R2DBC], https://mongodb.github.io/mongo-java-driver-reactivestreams/[MongoDB], for Apache Kafka in https://github.com/spring-projects/spring-integration-kafka[Spring Integration Kafka] based on the `ReactiveKafkaProducerTemplate` and `ReactiveKafkaConsumerTemplate` from https://spring.io/projects/spring-kafka[Spring for Apache Kafka] etc.
More reactive channel adapters are coming, for example for https://r2dbc.io/[R2DBC], for Apache Kafka in https://github.com/spring-projects/spring-integration-kafka[Spring Integration Kafka] based on the `ReactiveKafkaProducerTemplate` and `ReactiveKafkaConsumerTemplate` from https://spring.io/projects/spring-kafka[Spring for Apache Kafka] etc.
For many other non-reactive channel adapters thread pools are recommended to avoid blocking during reactive stream processing.
1 change: 0 additions & 1 deletion src/reference/asciidoc/whats-new.adoc
Expand Up @@ -39,6 +39,5 @@ See <<./mongodb.adoc#mongodb-reactive-channel-adapters,MongoDB Reactive Channel
The gateway proxy now doesn't proxy `default` methods by default.
See <<./gateway.adoc/gateway-calling-default-methods,Invoking `default` Methods>> for more information.


Internal components (such as `_org.springframework.integration.errorLogger`) now have a shortened name when they are represented in the integration graph.
See <<./graph.adoc#integration-graph,Integration Graph>> for more information.

0 comments on commit 0606a6f

Please sign in to comment.