diff --git a/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java b/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java index 7fa8208671ab..3dc5de7b46a9 100644 --- a/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java +++ b/spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java @@ -50,9 +50,6 @@ */ class WiretapConnector implements ClientHttpConnector { - private static final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); - - private final ClientHttpConnector delegate; private final Map exchanges = new ConcurrentHashMap<>(); @@ -117,119 +114,157 @@ public Info(WiretapClientHttpRequest request, WiretapClientHttpResponse response public ExchangeResult createExchangeResult(@Nullable String uriTemplate) { return new ExchangeResult(this.request, this.response, - this.request.getContent(), this.response.getContent(), uriTemplate); + this.request.getRecorder().getContent(), this.response.getRecorder().getContent(), uriTemplate); } } /** - * ClientHttpRequestDecorator that intercepts and saves the request body. + * Tap into a Publisher of data buffers to save the content. */ - private static class WiretapClientHttpRequest extends ClientHttpRequestDecorator { + final static class WiretapRecorder { - private final DataBuffer buffer; + private static final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); - private final MonoProcessor body = MonoProcessor.create(); + public static final byte[] EMPTY_CONTENT = new byte[0]; - public WiretapClientHttpRequest(ClientHttpRequest delegate) { - super(delegate); - this.buffer = bufferFactory.allocateBuffer(); - } + @Nullable + private final Publisher publisher; + @Nullable + private final Publisher> publisherNested; - /** - * Return a "promise" with the request body content written to the server. - */ - public MonoProcessor getContent() { - return this.body; - } + private final DataBuffer buffer; + private final MonoProcessor content; - @Override - public Mono writeWith(Publisher publisher) { - return super.writeWith( + + private WiretapRecorder(@Nullable Publisher publisher, + @Nullable Publisher> publisherNested) { + + if (publisher != null && publisherNested != null) { + throw new IllegalArgumentException("At most one publisher expected"); + } + + this.publisher = publisher != null ? Flux.from(publisher) .doOnNext(this::handleOnNext) - .doOnError(this::handleError) + .doOnError(this::handleOnError) .doOnCancel(this::handleOnComplete) - .doOnComplete(this::handleOnComplete)); - } + .doOnComplete(this::handleOnComplete) : null; - @Override - public Mono writeAndFlushWith(Publisher> publisher) { - return super.writeAndFlushWith( - Flux.from(publisher) - .map(p -> Flux.from(p).doOnNext(this::handleOnNext).doOnError(this::handleError)) - .doOnError(this::handleError) + this.publisherNested = publisherNested != null ? + Flux.from(publisherNested) + .map(p -> Flux.from(p).doOnNext(this::handleOnNext).doOnError(this::handleOnError)) + .doOnError(this::handleOnError) .doOnCancel(this::handleOnComplete) - .doOnComplete(this::handleOnComplete)); + .doOnComplete(this::handleOnComplete) : null; + + this.buffer = bufferFactory.allocateBuffer(); + this.content = MonoProcessor.create(); + + if (this.publisher == null && this.publisherNested == null) { + this.content.onNext(EMPTY_CONTENT); + } } - @Override - public Mono setComplete() { - handleOnComplete(); - return super.setComplete(); + + public Publisher getPublisherToUse() { + Assert.notNull(this.publisher, "Publisher not in use."); + return this.publisher; } - private void handleOnNext(DataBuffer buffer) { - this.buffer.write(buffer); + public Publisher> getNestedPublisherToUse() { + Assert.notNull(this.publisherNested, "Nested publisher not in use."); + return this.publisherNested; } - private void handleError(Throwable ex) { - if (!this.body.isTerminated()) { - this.body.onError(ex); + public MonoProcessor getContent() { + return this.content; + } + + + private void handleOnNext(DataBuffer nextBuffer) { + this.buffer.write(nextBuffer); + } + + private void handleOnError(Throwable ex) { + if (!this.content.isTerminated()) { + this.content.onError(ex); } } private void handleOnComplete() { - if (!this.body.isTerminated()) { + if (!this.content.isTerminated()) { byte[] bytes = new byte[this.buffer.readableByteCount()]; this.buffer.read(bytes); - this.body.onNext(bytes); + this.content.onNext(bytes); } } } + /** + * ClientHttpRequestDecorator that intercepts and saves the request body. + */ + private static class WiretapClientHttpRequest extends ClientHttpRequestDecorator { + + @Nullable + private WiretapRecorder recorder; + + + public WiretapClientHttpRequest(ClientHttpRequest delegate) { + super(delegate); + } + + public WiretapRecorder getRecorder() { + Assert.notNull(this.recorder, "No WiretapRecorder: was the client request written?"); + return this.recorder; + } + + @Override + public Mono writeWith(Publisher publisher) { + this.recorder = new WiretapRecorder(publisher, null); + return super.writeWith(this.recorder.getPublisherToUse()); + } + + @Override + public Mono writeAndFlushWith(Publisher> publisher) { + this.recorder = new WiretapRecorder(null, publisher); + return super.writeAndFlushWith(this.recorder.getNestedPublisherToUse()); + } + + @Override + public Mono setComplete() { + this.recorder = new WiretapRecorder(null, null); + return super.setComplete(); + } + } + + /** * ClientHttpResponseDecorator that intercepts and saves the response body. */ private static class WiretapClientHttpResponse extends ClientHttpResponseDecorator { - private final DataBuffer buffer; - - private final MonoProcessor body = MonoProcessor.create(); + private final WiretapRecorder recorder; public WiretapClientHttpResponse(ClientHttpResponse delegate) { super(delegate); - this.buffer = bufferFactory.allocateBuffer(); + this.recorder = new WiretapRecorder(super.getBody(), null); } - /** - * Return a "promise" with the response body content read from the server. - */ - public MonoProcessor getContent() { - return this.body; + public WiretapRecorder getRecorder() { + return this.recorder; } @Override + @SuppressWarnings("ConstantConditions") public Flux getBody() { - return super.getBody() - .doOnNext(this.buffer::write) - .doOnError(this.body::onError) - .doOnCancel(this::handleOnComplete) - .doOnComplete(this::handleOnComplete); - } - - private void handleOnComplete() { - if (!this.body.isTerminated()) { - byte[] bytes = new byte[this.buffer.readableByteCount()]; - this.buffer.read(bytes); - this.body.onNext(bytes); - } + return Flux.from(this.recorder.getPublisherToUse()); } }