Skip to content

Commit

Permalink
WritetapConnector internal refactoring
Browse files Browse the repository at this point in the history
Extract a common delegate class to share between the request and the
to wiretap a Publisher and record and buffer its data.

Preparation for SPR-17363.
  • Loading branch information
rstoyanchev committed Oct 11, 2018
1 parent 050f44d commit c567e65
Showing 1 changed file with 99 additions and 64 deletions.
Expand Up @@ -50,9 +50,6 @@
*/
class WiretapConnector implements ClientHttpConnector {

private static final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();


private final ClientHttpConnector delegate;

private final Map<String, Info> exchanges = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -117,119 +114,157 @@ public Info(WiretapClientHttpRequest request, WiretapClientHttpResponse response

public ExchangeResult createExchangeResult(@Nullable String uriTemplate) {
return new ExchangeResult(this.request, this.response,
this.request.getContent(), this.response.getContent(), uriTemplate);
this.request.getRecorder().getContent(), this.response.getRecorder().getContent(), uriTemplate);
}
}


/**
* ClientHttpRequestDecorator that intercepts and saves the request body.
* Tap into a Publisher of data buffers to save the content.
*/
private static class WiretapClientHttpRequest extends ClientHttpRequestDecorator {
final static class WiretapRecorder {

private final DataBuffer buffer;
private static final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();

private final MonoProcessor<byte[]> body = MonoProcessor.create();
public static final byte[] EMPTY_CONTENT = new byte[0];


public WiretapClientHttpRequest(ClientHttpRequest delegate) {
super(delegate);
this.buffer = bufferFactory.allocateBuffer();
}
@Nullable
private final Publisher<? extends DataBuffer> publisher;

@Nullable
private final Publisher<? extends Publisher<? extends DataBuffer>> publisherNested;

/**
* Return a "promise" with the request body content written to the server.
*/
public MonoProcessor<byte[]> getContent() {
return this.body;
}
private final DataBuffer buffer;

private final MonoProcessor<byte[]> content;

@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> publisher) {
return super.writeWith(

private WiretapRecorder(@Nullable Publisher<? extends DataBuffer> publisher,
@Nullable Publisher<? extends Publisher<? extends DataBuffer>> publisherNested) {

if (publisher != null && publisherNested != null) {
throw new IllegalArgumentException("At most one publisher expected");
}

this.publisher = publisher != null ?
Flux.from(publisher)
.doOnNext(this::handleOnNext)
.doOnError(this::handleError)
.doOnError(this::handleOnError)
.doOnCancel(this::handleOnComplete)
.doOnComplete(this::handleOnComplete));
}
.doOnComplete(this::handleOnComplete) : null;

@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> publisher) {
return super.writeAndFlushWith(
Flux.from(publisher)
.map(p -> Flux.from(p).doOnNext(this::handleOnNext).doOnError(this::handleError))
.doOnError(this::handleError)
this.publisherNested = publisherNested != null ?
Flux.from(publisherNested)
.map(p -> Flux.from(p).doOnNext(this::handleOnNext).doOnError(this::handleOnError))
.doOnError(this::handleOnError)
.doOnCancel(this::handleOnComplete)
.doOnComplete(this::handleOnComplete));
.doOnComplete(this::handleOnComplete) : null;

this.buffer = bufferFactory.allocateBuffer();
this.content = MonoProcessor.create();

if (this.publisher == null && this.publisherNested == null) {
this.content.onNext(EMPTY_CONTENT);
}
}

@Override
public Mono<Void> setComplete() {
handleOnComplete();
return super.setComplete();

public Publisher<? extends DataBuffer> getPublisherToUse() {
Assert.notNull(this.publisher, "Publisher not in use.");
return this.publisher;
}

private void handleOnNext(DataBuffer buffer) {
this.buffer.write(buffer);
public Publisher<? extends Publisher<? extends DataBuffer>> getNestedPublisherToUse() {
Assert.notNull(this.publisherNested, "Nested publisher not in use.");
return this.publisherNested;
}

private void handleError(Throwable ex) {
if (!this.body.isTerminated()) {
this.body.onError(ex);
public MonoProcessor<byte[]> getContent() {
return this.content;
}


private void handleOnNext(DataBuffer nextBuffer) {
this.buffer.write(nextBuffer);
}

private void handleOnError(Throwable ex) {
if (!this.content.isTerminated()) {
this.content.onError(ex);
}
}

private void handleOnComplete() {
if (!this.body.isTerminated()) {
if (!this.content.isTerminated()) {
byte[] bytes = new byte[this.buffer.readableByteCount()];
this.buffer.read(bytes);
this.body.onNext(bytes);
this.content.onNext(bytes);
}
}
}


/**
* ClientHttpRequestDecorator that intercepts and saves the request body.
*/
private static class WiretapClientHttpRequest extends ClientHttpRequestDecorator {

@Nullable
private WiretapRecorder recorder;


public WiretapClientHttpRequest(ClientHttpRequest delegate) {
super(delegate);
}

public WiretapRecorder getRecorder() {
Assert.notNull(this.recorder, "No WiretapRecorder: was the client request written?");
return this.recorder;
}

@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> publisher) {
this.recorder = new WiretapRecorder(publisher, null);
return super.writeWith(this.recorder.getPublisherToUse());
}

@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> publisher) {
this.recorder = new WiretapRecorder(null, publisher);
return super.writeAndFlushWith(this.recorder.getNestedPublisherToUse());
}

@Override
public Mono<Void> setComplete() {
this.recorder = new WiretapRecorder(null, null);
return super.setComplete();
}
}


/**
* ClientHttpResponseDecorator that intercepts and saves the response body.
*/
private static class WiretapClientHttpResponse extends ClientHttpResponseDecorator {

private final DataBuffer buffer;

private final MonoProcessor<byte[]> body = MonoProcessor.create();
private final WiretapRecorder recorder;


public WiretapClientHttpResponse(ClientHttpResponse delegate) {
super(delegate);
this.buffer = bufferFactory.allocateBuffer();
this.recorder = new WiretapRecorder(super.getBody(), null);
}


/**
* Return a "promise" with the response body content read from the server.
*/
public MonoProcessor<byte[]> getContent() {
return this.body;
public WiretapRecorder getRecorder() {
return this.recorder;
}

@Override
@SuppressWarnings("ConstantConditions")
public Flux<DataBuffer> getBody() {
return super.getBody()
.doOnNext(this.buffer::write)
.doOnError(this.body::onError)
.doOnCancel(this::handleOnComplete)
.doOnComplete(this::handleOnComplete);
}

private void handleOnComplete() {
if (!this.body.isTerminated()) {
byte[] bytes = new byte[this.buffer.readableByteCount()];
this.buffer.read(bytes);
this.body.onNext(bytes);
}
return Flux.from(this.recorder.getPublisherToUse());
}
}

Expand Down

0 comments on commit c567e65

Please sign in to comment.