Skip to content

Commit

Permalink
Backport fixes for discarding data buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
rstoyanchev authored and zx20110729 committed Feb 18, 2022
1 parent d03a50c commit f2f33b0
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,8 +29,8 @@
import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.Hints;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpLogging;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpOutputMessage;
Expand Down Expand Up @@ -108,7 +108,6 @@ public boolean canWrite(ResolvableType elementType, @Nullable MediaType mediaTyp
return this.encoder.canEncode(elementType, mediaType);
}

@SuppressWarnings("unchecked")
@Override
public Mono<Void> write(Publisher<? extends T> inputStream, ResolvableType elementType,
@Nullable MediaType mediaType, ReactiveHttpOutputMessage message, Map<String, Object> hints) {
Expand All @@ -119,23 +118,23 @@ public Mono<Void> write(Publisher<? extends T> inputStream, ResolvableType eleme
inputStream, message.bufferFactory(), elementType, contentType, hints);

if (inputStream instanceof Mono) {
HttpHeaders headers = message.getHeaders();
return body
.singleOrEmpty()
.switchIfEmpty(Mono.defer(() -> {
headers.setContentLength(0);
message.getHeaders().setContentLength(0);
return message.setComplete().then(Mono.empty());
}))
.flatMap(buffer -> {
headers.setContentLength(buffer.readableByteCount());
message.getHeaders().setContentLength(buffer.readableByteCount());
return message.writeWith(Mono.just(buffer)
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release));
});
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
})
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}

if (isStreamingMediaType(contentType)) {
return message.writeAndFlushWith(body.map(buffer ->
Mono.just(buffer).doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release)));
Mono.just(buffer).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)));
}

return message.writeWith(body);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -180,9 +180,29 @@ public final Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
// Write as Mono if possible as an optimization hint to Reactor Netty
// ChannelSendOperator not necessary for Mono
if (body instanceof Mono) {
return ((Mono<? extends DataBuffer>) body).flatMap(buffer ->
doCommit(() -> writeWithInternal(Mono.just(buffer)))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
return ((Mono<? extends DataBuffer>) body)
.flatMap(buffer -> {
AtomicReference<Boolean> subscribed = new AtomicReference<>(false);
return doCommit(
() -> {
try {
return writeWithInternal(Mono.fromCallable(() -> buffer)
.doOnSubscribe(s -> subscribed.set(true))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release));
}
catch (Throwable ex) {
return Mono.error(ex);
}
})
.doOnError(ex -> DataBufferUtils.release(buffer))
.doOnCancel(() -> {
if (!subscribed.get()) {
DataBufferUtils.release(buffer);
}
});
})
.doOnError(t -> removeContentLength())
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}
return new ChannelSendOperator<>(body, inner -> doCommit(() -> writeWithInternal(inner)))
.doOnError(t -> removeContentLength());
Expand Down

0 comments on commit f2f33b0

Please sign in to comment.