Skip to content

Commit

Permalink
Minor refactoring in StringDecoder + polish
Browse files Browse the repository at this point in the history
1. Avoid re-creating the List with delimited byte arrays on every
request if using the default delimiters which don't vary by charset.

2. Replace flatMap with flatMapIterable for splitOnDelimiter.

3. Avoid going through DataBufferUtils#join, and unnecessarily creating
Flux from the List, since the join method needs a list anyway.
  • Loading branch information
rstoyanchev committed Oct 26, 2018
1 parent fc957e9 commit fa096dc
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,15 @@

/**
* Simple pass-through decoder for {@link DataBuffer DataBuffers}.
* <p><strong>Note</strong> that the "decoded" buffers returned by instances of this class should
* be released after usage by calling
* {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer)}.
*
* <p><strong>Note:</strong> The data buffers should be released via
* {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer)}
* after they have been consumed. In addition, if using {@code Flux} or
* {@code Mono} operators such as flatMap, reduce, and others that prefetch,
* cache, and skip or filter out data items internally, please add
* {@code doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)} to the
* composition chain to ensure cached data buffers are released prior to an
* error or cancellation signal.
*
* @author Arjen Poutsma
* @author Rossen Stoyanchev
Expand All @@ -48,7 +54,8 @@ public DataBufferDecoder() {
@Override
public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType) {
Class<?> clazz = elementType.getRawClass();
return (super.canDecode(elementType, mimeType) && clazz != null && DataBuffer.class.isAssignableFrom(clazz));
return (super.canDecode(elementType, mimeType) &&
clazz != null && DataBuffer.class.isAssignableFrom(clazz));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
Expand All @@ -36,7 +35,6 @@
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.core.log.LogFormatUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;

Expand All @@ -59,25 +57,26 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {

private static final DataBuffer END_FRAME = new DefaultDataBufferFactory().wrap(new byte[0]);

/**
* The default charset to use, i.e. "UTF-8".
*/
/** The default charset to use, i.e. "UTF-8". */
public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;

/**
* The default delimiter strings to use, i.e. {@code \n} and {@code \r\n}.
*/
/** The default delimiter strings to use, i.e. {@code \r\n} and {@code \n}. */
public static final List<String> DEFAULT_DELIMITERS = Arrays.asList("\r\n", "\n");

private static final List<byte[]> DEFAULT_DELIMITER_BYTES = DEFAULT_DELIMITERS.stream()
.map(s -> s.getBytes(StandardCharsets.UTF_8))
.collect(Collectors.toList());


@Nullable
private final List<String> delimiters;

private final boolean stripDelimiter;

private StringDecoder(List<String> delimiters, boolean stripDelimiter, MimeType... mimeTypes) {

private StringDecoder(@Nullable List<String> delimiters, boolean stripDelimiter, MimeType... mimeTypes) {
super(mimeTypes);
Assert.notEmpty(delimiters, "'delimiters' must not be empty");
this.delimiters = new ArrayList<>(delimiters);
this.delimiters = delimiters != null ? new ArrayList<>(delimiters) : null;
this.stripDelimiter = stripDelimiter;
}

Expand All @@ -92,36 +91,32 @@ public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType
public Flux<String> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {

List<byte[]> delimiterBytes = getDelimiterBytes(mimeType);
List<byte[]> delimiterBytes = this.delimiters != null ?
this.delimiters.stream().map(s -> s.getBytes(getCharset(mimeType))).collect(Collectors.toList()) :
DEFAULT_DELIMITER_BYTES;

Flux<DataBuffer> inputFlux = Flux.from(inputStream)
.flatMap(dataBuffer -> splitOnDelimiter(dataBuffer, delimiterBytes))
.flatMapIterable(dataBuffer -> splitOnDelimiter(dataBuffer, delimiterBytes))
.bufferUntil(StringDecoder::isEndFrame)
.flatMap(StringDecoder::joinUntilEndFrame)
.map(StringDecoder::joinUntilEndFrame)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
return super.decode(inputFlux, elementType, mimeType, hints);
}

private List<byte[]> getDelimiterBytes(@Nullable MimeType mimeType) {
Charset charset = getCharset(mimeType);
return this.delimiters.stream()
.map(s -> s.getBytes(charset))
.collect(Collectors.toList());
return super.decode(inputFlux, elementType, mimeType, hints);
}

/**
* Splits the given data buffer on delimiter boundaries. The returned Flux contains a
* {@link #END_FRAME} buffer after each delimiter.
*/
private Flux<DataBuffer> splitOnDelimiter(DataBuffer dataBuffer, List<byte[]> delimiterBytes) {
private List<DataBuffer> splitOnDelimiter(DataBuffer dataBuffer, List<byte[]> delimiterBytes) {
List<DataBuffer> frames = new ArrayList<>();
do {
int length = Integer.MAX_VALUE;
byte[] matchingDelimiter = null;
for (byte[] delimiter : delimiterBytes) {
int idx = indexOf(dataBuffer, delimiter);
if (idx >= 0 && idx < length) {
length = idx;
int index = indexOf(dataBuffer, delimiter);
if (index >= 0 && index < length) {
length = index;
matchingDelimiter = delimiter;
}
}
Expand All @@ -148,12 +143,12 @@ private Flux<DataBuffer> splitOnDelimiter(DataBuffer dataBuffer, List<byte[]> de
while (dataBuffer.readableByteCount() > 0);

DataBufferUtils.release(dataBuffer);
return Flux.fromIterable(frames);
return frames;
}

/**
* Finds the given delimiter in the given data buffer. Return the index of the delimiter, or
* -1 if not found.
* Find the given delimiter in the given data buffer.
* @return the index of the delimiter, or -1 if not found.
*/
private static int indexOf(DataBuffer dataBuffer, byte[] delimiter) {
for (int i = dataBuffer.readPosition(); i < dataBuffer.writePosition(); i++) {
Expand All @@ -172,7 +167,6 @@ private static int indexOf(DataBuffer dataBuffer, byte[] delimiter) {
}
delimiterPos++;
}

if (delimiterPos == delimiter.length) {
return i - dataBuffer.readPosition();
}
Expand All @@ -188,17 +182,17 @@ private static boolean isEndFrame(DataBuffer dataBuffer) {
}

/**
* Joins the given list of buffers into a single buffer.
* Joins the given list of buffers into a single buffer, also removing
* the (inserted) {@link #END_FRAME}.
*/
private static Mono<DataBuffer> joinUntilEndFrame(List<DataBuffer> dataBuffers) {
private static DataBuffer joinUntilEndFrame(List<DataBuffer> dataBuffers) {
if (!dataBuffers.isEmpty()) {
int lastIdx = dataBuffers.size() - 1;
if (isEndFrame(dataBuffers.get(lastIdx))) {
dataBuffers.remove(lastIdx);
}
}
Flux<DataBuffer> flux = Flux.fromIterable(dataBuffers);
return DataBufferUtils.join(flux);
return dataBuffers.get(0).factory().join(dataBuffers);
}

@Override
Expand Down Expand Up @@ -241,15 +235,18 @@ public static StringDecoder textPlainOnly(boolean ignored) {
* Create a {@code StringDecoder} for {@code "text/plain"}.
*/
public static StringDecoder textPlainOnly() {
return textPlainOnly(DEFAULT_DELIMITERS, true);
return textPlainOnly(null, true);
}

/**
* Create a {@code StringDecoder} for {@code "text/plain"}.
* @param delimiters delimiter strings to use to split the input stream, if
* {@code null} by default {@link #DEFAULT_DELIMITERS} is used.
* @param stripDelimiter whether to remove delimiters from the resulting
* input strings.
*/
public static StringDecoder textPlainOnly(List<String> delimiters, boolean stripDelimiter) {
return new StringDecoder(delimiters, stripDelimiter,
new MimeType("text", "plain", DEFAULT_CHARSET));
public static StringDecoder textPlainOnly(@Nullable List<String> delimiters, boolean stripDelimiter) {
return new StringDecoder(delimiters, stripDelimiter, new MimeType("text", "plain", DEFAULT_CHARSET));
}

/**
Expand All @@ -267,16 +264,19 @@ public static StringDecoder allMimeTypes(boolean ignored) {
* Create a {@code StringDecoder} that supports all MIME types.
*/
public static StringDecoder allMimeTypes() {
return allMimeTypes(DEFAULT_DELIMITERS, true);
return allMimeTypes(null, true);
}

/**
* Create a {@code StringDecoder} that supports all MIME types.
* @param delimiters delimiter strings to use to split the input stream, if
* {@code null} by default {@link #DEFAULT_DELIMITERS} is used.
* @param stripDelimiter whether to remove delimiters from the resulting
* input strings.
*/
public static StringDecoder allMimeTypes(List<String> delimiters, boolean stripDelimiter) {
public static StringDecoder allMimeTypes(@Nullable List<String> delimiters, boolean stripDelimiter) {
return new StringDecoder(delimiters, stripDelimiter,
new MimeType("text", "plain", DEFAULT_CHARSET), MimeTypeUtils.ALL);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -422,13 +422,16 @@ public static Consumer<DataBuffer> releaseConsumer() {
}

/**
* Return a new {@code DataBuffer} composed of the {@code dataBuffers} elements joined together.
* Depending on the {@link DataBuffer} implementation, the returned buffer may be a single
* buffer containing all data of the provided buffers, or it may be a true composite that
* contains references to the buffers.
* <p>If {@code dataBuffers} contains an error signal, then all buffers that preceded the error
* will be {@linkplain #release(DataBuffer) released}, and the error is stored in the
* returned {@code Mono}.
* Return a new {@code DataBuffer} composed from joining together the given
* {@code dataBuffers} elements. Depending on the {@link DataBuffer} type,
* the returned buffer may be a single buffer containing all data of the
* provided buffers, or it may be a zero-copy, composite with references to
* the given buffers.
* <p>If {@code dataBuffers} produces an error or if there is a cancel
* signal, then all accumulated buffers will be
* {@linkplain #release(DataBuffer) released}.
* <p>Note that the given data buffers do <strong>not</strong> have to be
* released. They will be released as part of the returned composite.
* @param dataBuffers the data buffers that are to be composed
* @return a buffer that is composed from the {@code dataBuffers} argument
* @since 5.0.3
Expand All @@ -439,10 +442,7 @@ public static Mono<DataBuffer> join(Publisher<DataBuffer> dataBuffers) {
return Flux.from(dataBuffers)
.collectList()
.filter(list -> !list.isEmpty())
.map(list -> {
DataBufferFactory bufferFactory = list.get(0).factory();
return bufferFactory.join(list);
})
.map(list -> list.get(0).factory().join(list))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ public DataBuffer join(List<? extends DataBuffer> dataBuffers) {
CompositeByteBuf composite = this.byteBufAllocator.compositeBuffer(dataBuffers.size());
for (DataBuffer dataBuffer : dataBuffers) {
Assert.isInstanceOf(NettyDataBuffer.class, dataBuffer);
NettyDataBuffer nettyDataBuffer = (NettyDataBuffer) dataBuffer;
composite.addComponent(true, nettyDataBuffer.getNativeBuffer());
composite.addComponent(true, ((NettyDataBuffer) dataBuffer).getNativeBuffer());
}
return new NettyDataBuffer(composite, this);
}
Expand Down

0 comments on commit fa096dc

Please sign in to comment.