Skip to content

Commit

Permalink
Fix buffer leak in AbstractServerHttpResponse
Browse files Browse the repository at this point in the history
  • Loading branch information
rstoyanchev committed Dec 7, 2020
1 parent ad42010 commit 7418c4b
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,16 @@ public final Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
// We must resolve value first however, for a chance to handle potential error.
if (body instanceof Mono) {
return ((Mono<? extends DataBuffer>) body)
.flatMap(buffer -> doCommit(() ->
writeWithInternal(Mono.fromCallable(() -> buffer)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release))))
.flatMap(buffer ->
doCommit(() -> {
try {
return writeWithInternal(Mono.fromCallable(() -> buffer)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
}
catch (Throwable ex) {
return Mono.error(ex);
}
}).doOnError(ex -> DataBufferUtils.release(buffer)))
.doOnError(t -> getHeaders().clearContentHeaders());
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,33 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;

import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.channel.AbortedException;
import reactor.test.StepVerifier;

import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.core.testfixture.io.buffer.LeakAwareDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseCookie;
import org.springframework.http.codec.EncoderHttpMessageWriter;
import org.springframework.http.codec.HttpMessageWriter;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.web.testfixture.http.server.reactive.MockServerHttpRequest;
import org.springframework.web.testfixture.http.server.reactive.MockServerHttpResponse;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -186,6 +196,25 @@ void beforeCommitErrorShouldLeaveResponseNotCommitted() {
});
}

@Test // gh-26232
void monoResponseShouldNotLeakIfCancelled() {
LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory();
MockServerHttpRequest request = MockServerHttpRequest.get("/").build();
MockServerHttpResponse response = new MockServerHttpResponse(bufferFactory);
response.setWriteHandler(flux -> {
throw AbortedException.beforeSend();
});

HttpMessageWriter<Object> messageWriter = new EncoderHttpMessageWriter<>(new Jackson2JsonEncoder());
Mono<Void> result = messageWriter.write(Mono.just(Collections.singletonMap("foo", "bar")),
ResolvableType.forClass(Mono.class), ResolvableType.forClass(Map.class), null,
request, response, Collections.emptyMap());

StepVerifier.create(result).expectError(AbortedException.class).verify();

bufferFactory.checkForLeaks();
}


private DefaultDataBuffer wrap(String a) {
return DefaultDataBufferFactory.sharedInstance.wrap(ByteBuffer.wrap(a.getBytes(StandardCharsets.UTF_8)));
Expand Down

0 comments on commit 7418c4b

Please sign in to comment.