Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataBufferUtils.read(ReadableByteChannel,...) may corrupt data when used with NettyDataBuffers [SPR-16070] #20619

Closed
spring-issuemaster opened this issue Oct 13, 2017 · 1 comment

Comments

Projects
None yet
2 participants
@spring-issuemaster
Copy link
Collaborator

commented Oct 13, 2017

Oleg Alexeyev opened SPR-16070 and commented

If published buffer is not immediately consumed, read may corrupt the data because the same ByteBuffer is reused and NettyDataBuffer.write(ByteBuffer) just wraps it into a ByteBuf and adds to a CompositeByteBuf, effectively reusing the same byte array.

Here is a test reproducing the problem"

public class DataBufferUtilsTest extends FTestBase {

  private final ByteBufAllocator allocator = DEFAULT;
  private final DataBuffer dataBuffer = bufferFactory.allocateBuffer();
  private final DataBufferFactory bufferFactory = new NettyDataBufferFactory(allocator);
  @Mock
  private ReadableByteChannel channel;

  @Test
  public void read_shouldPublishData() throws Exception {
    when(channel.read(any()))
        .thenAnswer(putByte(1))
        .thenAnswer(putByte(2))
        .thenAnswer(putByte(3))
        .thenReturn(-1);

    Flux<DataBuffer> read = read(channel, bufferFactory, 1);

    StepVerifier.create(
        read.reduce(DataBuffer::write)
            .map(this::dataBufferToBytes)
            .map(this::encodeHexString)
    )
        .expectNext("010203")
        .verifyComplete();
  }

  private Answer<Integer> putByte(int b) {
    return invocation -> {
      invocation.getArgumentAt(0, ByteBuffer.class).put((byte) b);
      return 1;
    };
  }

  private byte[] dataBufferToBytes(DataBuffer buffer) {
    try {
      int byteCount = buffer.readableByteCount();
      byte[] bytes = new byte[byteCount];
      buffer.read(bytes);
      return bytes;
    } finally {
      release(buffer);
    }
  }

  private String encodeHexString(byte[] data) {
    StringBuilder builder = new StringBuilder();
    for (byte b : data) {
      builder.append((0xF0 & b) >>> 4);
      builder.append(0x0F & b);
    }
    return builder.toString();
  }

Output:

java.lang.AssertionError: expectation "expectNext(010203)" failed (expected value: 010203; actual value: 030303)

Another problem with this method is that reusing the same ByteBuffer for reads means unnecessary copy of data is required (it does happen with DefaultDataBuffer and doesn't with NettyDataBuffer - hence, the corruption), what seems to be contrary to the idea of more efficient hardware usage in reactive programs.

For now we're interested in Netty only, so we fixed it in the following way:

public static Flux<DataBuffer> read(ReadableByteChannel channel) {
         return Flux.generate(
      () -> channel,
      (ch, sink) -> {
        boolean release = true;
        NettyDataBuffer dataBuffer = DEFAULT_DATA_BUFFER_FACTORY.allocateBuffer(BUFFER_SIZE);
        try {
          int read;
          ByteBuf byteBuf = dataBuffer.getNativeBuffer();
          // Cannot use asByteBuffer() as it returns one with zero capacity: #20617
          ByteBuffer byteBuffer = byteBuf.nioBuffer(0, byteBuf.capacity());
          if ((read = ch.read(byteBuffer)) >= 0) {
            byteBuf.writerIndex(read);
            release = false;
            sink.next(dataBuffer);
          } else {
            sink.complete();
          }
        } catch (Throwable ex) {
          sink.error(ex);
        } finally {
          if (release) {
            release(dataBuffer);
          }
        }
        return channel;
      },
      IOUtils::closeQuietly
  );

This way to fix both the corruption and get rid of unnecessary copy.

Unfortunately, the idiom doesn't work well for DefaultDataBuffer. For it, with current API, perhaps ByteArray could be allocated, then data is read into it and then DefaultDataBufferFactory.wrap(ByteBuffer) would properly set read/writePosition.


Affects: 5.0 GA

Issue Links:

  • #20617 Empty NettyByteBuffer.asByteBuffer() returns ByteBuffer with zero capacity

Referenced from: commits c7a1526

1 votes, 3 watchers

@spring-issuemaster

This comment has been minimized.

Copy link
Collaborator Author

commented Oct 19, 2017

Arjen Poutsma commented

Fixed in c7a1526, by using the newly introduced DataBuffer.asByteBuffer(int, int), and writing directly to the returned ByteBuffer rather than copying into a DataBuffer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.