-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Add ReactiveMessageSourceProducer
#3254
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ReactiveMessageSourceProducer
#3254
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few doc polishes.
|
||
Starting with version 5.3, a `ReactiveMessageSourceProducer` is provided. | ||
It is a combination of a provided `MessageSource` and event-driven production into the configured `outputChannel`. | ||
Internally it wraps a `MessageSource` into the repeatedly resubscribed `Mono` producing a `Flux<Message<?>>` to be subscribed in the `subscribeToPublisher(Publisher<? extends Message<?>>)` mention above. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mentioned
Internally it wraps a `MessageSource` into the repeatedly resubscribed `Mono` producing a `Flux<Message<?>>` to be subscribed in the `subscribeToPublisher(Publisher<? extends Message<?>>)` mention above. | ||
The subscription for this `Mono` is done using `Schedulers.boundedElastic()` to avoid possible blocking in the target `MessageSource`. | ||
When the message source returns `null` (no data to pull), the `Mono` is turned into a `repeatWhenEmpty()` state with a `delay` for a subsequent re-subscription based on a `IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY` `Duration` entry from the subscriber context. | ||
By default it is a 1 second. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is 1 second.
The subscription for this `Mono` is done using `Schedulers.boundedElastic()` to avoid possible blocking in the target `MessageSource`. | ||
When the message source returns `null` (no data to pull), the `Mono` is turned into a `repeatWhenEmpty()` state with a `delay` for a subsequent re-subscription based on a `IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY` `Duration` entry from the subscriber context. | ||
By default it is a 1 second. | ||
If the `MessageSource` produces messages with a `IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK` information in headers, it is acknowledged (if necessary) in the `doOnSuccess()` of the original `Mono` and rejected in the `doOnError()` if downstream flow throws a `MessagingException` with failed message to reject. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the headers
if the downstream flow throws
with the failed messagee
When the message source returns `null` (no data to pull), the `Mono` is turned into a `repeatWhenEmpty()` state with a `delay` for a subsequent re-subscription based on a `IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY` `Duration` entry from the subscriber context. | ||
By default it is a 1 second. | ||
If the `MessageSource` produces messages with a `IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK` information in headers, it is acknowledged (if necessary) in the `doOnSuccess()` of the original `Mono` and rejected in the `doOnError()` if downstream flow throws a `MessagingException` with failed message to reject. | ||
This `ReactiveMessageSourceProducer` could be used for any use-case when a polling channel adapter features should be turned into a reactive, on demand solution for any existing `MessageSource<?>` implementation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a polling channel adapter's features
[[x5.3-reactive-message-source-producer]] | ||
==== `ReactiveMessageSourceProducer` | ||
|
||
The `ReactiveMessageSourceProducer` is a reactive implementation of the `MessageProducerSupport` to wrap a provided `MessageSource` into the `Flux` for on demand `receive()` calls. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
into a Flux
7ff705c
to
559474f
Compare
See travis (checkstyle). |
|
The `ReactiveMessageSourceProducer` wraps a provided `MessageSource` into a `Flux` for subscription in the `subscribeToPublisher(Publisher<? extends Message<?>>)` to make a source polling feature fully based on a reactive, on demand solution * Introduce a `IntegrationReactiveUtils` replacing existing `MessageChannelReactiveUtils` with more functionality * Replace a deprecated `MessageChannelReactiveUtils` with a new `IntegrationReactiveUtils` * Test and document the feature
c4644a6
to
9b1fabf
Compare
Pushed the fix as well. |
* Add `ReactiveMessageSourceProducer` The `ReactiveMessageSourceProducer` wraps a provided `MessageSource` into a `Flux` for subscription in the `subscribeToPublisher(Publisher<? extends Message<?>>)` to make a source polling feature fully based on a reactive, on demand solution * Introduce a `IntegrationReactiveUtils` replacing existing `MessageChannelReactiveUtils` with more functionality * Replace a deprecated `MessageChannelReactiveUtils` with a new `IntegrationReactiveUtils` * Test and document the feature * * Fix Docs typos * * Remove unused imports from `MessageChannelReactiveUtils` * * Fix JavaDoc copy/paste artifact
The
ReactiveMessageSourceProducer
wraps a providedMessageSource
into a
Flux
for subscription in thesubscribeToPublisher(Publisher<? extends Message<?>>)
to make a source polling feature fully based on a reactive, on demand solution
IntegrationReactiveUtils
replacing existingMessageChannelReactiveUtils
with more functionality
MessageChannelReactiveUtils
with a newIntegrationReactiveUtils