Skip to content

Commit

Permalink
Enforce use of unpooled data buffers in 5.0.x
Browse files Browse the repository at this point in the history
Issue: SPR-17501
  • Loading branch information
rstoyanchev committed Nov 16, 2018
1 parent 2405161 commit 18da718
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 15 deletions.
Expand Up @@ -20,13 +20,15 @@
import java.util.function.Consumer;
import java.util.function.Function;

import io.netty.buffer.UnpooledByteBufAllocator;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.http.client.HttpClient;
import reactor.ipc.netty.http.client.HttpClientOptions;
import reactor.ipc.netty.http.client.HttpClientRequest;
import reactor.ipc.netty.http.client.HttpClientResponse;
import reactor.ipc.netty.options.ClientOptions;

import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpMethod;

/**
Expand All @@ -38,6 +40,11 @@
*/
public class ReactorClientHttpConnector implements ClientHttpConnector {

// 5.0.x only: no buffer pooling
static final NettyDataBufferFactory BUFFER_FACTORY =
new NettyDataBufferFactory(new UnpooledByteBufAllocator(false));


private final HttpClient httpClient;


Expand Down
Expand Up @@ -48,21 +48,18 @@ class ReactorClientHttpRequest extends AbstractClientHttpRequest implements Zero

private final HttpClientRequest httpRequest;

private final NettyDataBufferFactory bufferFactory;


public ReactorClientHttpRequest(HttpMethod httpMethod, URI uri,
HttpClientRequest httpRequest) {
this.httpMethod = httpMethod;
this.uri = uri;
this.httpRequest = httpRequest.failOnClientError(false).failOnServerError(false);
this.bufferFactory = new NettyDataBufferFactory(httpRequest.alloc());
}


@Override
public DataBufferFactory bufferFactory() {
return this.bufferFactory;
return ReactorClientHttpConnector.BUFFER_FACTORY;
}

@Override
Expand Down
Expand Up @@ -23,7 +23,6 @@
import reactor.ipc.netty.http.client.HttpClientResponse;

import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie;
Expand All @@ -41,16 +40,13 @@
*/
class ReactorClientHttpResponse implements ClientHttpResponse {

private final NettyDataBufferFactory dataBufferFactory;

private final HttpClientResponse response;

private final AtomicBoolean bodyConsumed = new AtomicBoolean();


public ReactorClientHttpResponse(HttpClientResponse response) {
this.response = response;
this.dataBufferFactory = new NettyDataBufferFactory(response.channel().alloc());
}


Expand All @@ -64,9 +60,11 @@ public Flux<DataBuffer> getBody() {
// isn't consistent in doing so and may hang without completion.
Assert.state(this.bodyConsumed.compareAndSet(false, true),
"The client response body can only be consumed once."))
.map(buf -> {
buf.retain();
return dataBufferFactory.wrap(buf);
.map(byteBuf -> {
// 5.0.x only: do not retain, make a copy..
byte[] data = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(data);
return ReactorClientHttpConnector.BUFFER_FACTORY.wrap(data);
});
}

Expand Down
Expand Up @@ -19,6 +19,7 @@
import java.net.URISyntaxException;
import java.util.function.BiFunction;

import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -39,6 +40,10 @@
*/
public class ReactorHttpHandlerAdapter implements BiFunction<HttpServerRequest, HttpServerResponse, Mono<Void>> {

// 5.0.x only: no buffer pooling
private static final NettyDataBufferFactory BUFFER_FACTORY =
new NettyDataBufferFactory(new UnpooledByteBufAllocator(false));

private static final Log logger = LogFactory.getLog(ReactorHttpHandlerAdapter.class);


Expand All @@ -53,12 +58,11 @@ public ReactorHttpHandlerAdapter(HttpHandler httpHandler) {

@Override
public Mono<Void> apply(HttpServerRequest request, HttpServerResponse response) {
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(response.alloc());
ServerHttpRequest adaptedRequest;
ServerHttpResponse adaptedResponse;
try {
adaptedRequest = new ReactorServerHttpRequest(request, bufferFactory);
adaptedResponse = new ReactorServerHttpResponse(response, bufferFactory);
adaptedRequest = new ReactorServerHttpRequest(request, BUFFER_FACTORY);
adaptedResponse = new ReactorServerHttpResponse(response, BUFFER_FACTORY);
}
catch (URISyntaxException ex) {
if (logger.isWarnEnabled()) {
Expand Down
Expand Up @@ -167,7 +167,12 @@ protected SslInfo initSslInfo() {

@Override
public Flux<DataBuffer> getBody() {
return this.request.receive().retain().map(this.bufferFactory::wrap);
// 5.0.x only: do not retain, make a copy..
return this.request.receive().map(byteBuf -> {
byte[] data = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(data);
return bufferFactory.wrap(data);
});
}

@SuppressWarnings("unchecked")
Expand Down

0 comments on commit 18da718

Please sign in to comment.