Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import org.springframework.beans.factory.BeanNameAware;
import org.springframework.integration.IntegrationPattern;
Expand All @@ -35,13 +38,16 @@

/**
* A channel implementation that essentially behaves like "/dev/null".
* All receive() calls will return <em>null</em>, and all send() calls
* will return <em>true</em> although no action is performed.
* All {@link #receive()} calls will return {@code null},
* and all {@link #send} calls will return {@code true} although no action is performed.
* Unless the payload of a sent message is a {@link Publisher} implementation, in
* which case the {@link Publisher#subscribe(Subscriber)} is called to initiate
* the reactive stream, although the data is discarded by this channel.
* Note however that the invocations are logged at debug-level.
*
* @author Mark Fisher
* @author Gary Russell
* @author Artyem Bilan
* @author Artem Bilan
*/
@IntegrationManagedResource
public class NullChannel implements PollableChannel,
Expand Down Expand Up @@ -119,6 +125,31 @@ public boolean send(Message<?> message) {
if (this.loggingEnabled && this.logger.isDebugEnabled()) {
this.logger.debug("message sent to null channel: " + message);
}

Object payload = message.getPayload();
if (payload instanceof Publisher<?>) {
((Publisher<?>) payload).subscribe(
new Subscriber<Object>() {

@Override public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}

@Override public void onNext(Object o) {

}

@Override public void onError(Throwable t) {

}

@Override public void onComplete() {

}

});
}

if (this.metricsCaptor != null) {
sendTimer().record(0, TimeUnit.MILLISECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.Test;
Expand All @@ -28,6 +30,7 @@

import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.util.concurrent.Queues;
Expand Down Expand Up @@ -104,4 +107,13 @@ void testOverproducingWithSubscribableChannel() {
.isLessThanOrEqualTo(Queues.SMALL_BUFFER_SIZE);
}

@Test
void testPublisherPayloadWithNullChannel() throws InterruptedException {
NullChannel nullChannel = new NullChannel();
CountDownLatch publisherSubscribed = new CountDownLatch(1);
Mono<Object> mono = Mono.empty().doOnSubscribe((s) -> publisherSubscribed.countDown());
nullChannel.send(new GenericMessage<>(mono));
assertThat(publisherSubscribed.await(10, TimeUnit.SECONDS)).isTrue();
}

}
6 changes: 4 additions & 2 deletions src/reference/asciidoc/channel.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1142,9 +1142,11 @@ For example, you can use this technique to configure a test case to verify messa
[[channel-special-channels]]
==== Special Channels

If namespace support is enabled, two special channels are defined within the application context by default: `errorChannel` and `nullChannel`.
The 'nullChannel' acts like `/dev/null`, logging any message sent to it at the `DEBUG` level and returning immediately.
Two special channels are defined within the application context by default: `errorChannel` and `nullChannel`.
The 'nullChannel' (an instance of `NullChannel`) acts like `/dev/null`, logging any message sent to it at the `DEBUG` level and returning immediately.
The special treatment is applied for an `org.reactivestreams.Publisher` payload of a sent message: it is subscribed to in this channel immediately, to initiate reactive stream processing, although the data is discarded.
Any time you face channel resolution errors for a reply that you do not care about, you can set the affected component's `output-channel` attribute to 'nullChannel' (the name, 'nullChannel', is reserved within the application context).

The 'errorChannel' is used internally for sending error messages and may be overridden with a custom configuration.
This is discussed in greater detail in <<./error-handling.adoc#error-handling,Error Handling>>.

Expand Down