Skip to content

Commit

Permalink
ByteBuf Input Stream Reference Count Ownership
Browse files Browse the repository at this point in the history
Motivation:
Netty provides a adaptor from ByteBuf to Java's InputStream interface. The JDK Stream interfaces have an explicit lifetime because they implement the Closable interface. This lifetime may be differnt than the ByteBuf which is wrapped, and controlled by the interface which accepts the JDK Stream. However Netty's ByteBufInputStream currently does not take reference count ownership of the underlying ByteBuf. There may be no way for existing classes which only accept the InputStream interface to communicate when they are done with the stream, other than calling close(). This means that when the stream is closed it may be appropriate to release the underlying ByteBuf, as the ownership of the underlying ByteBuf resource may be transferred to the Java Stream.

Motivation:
- ByteBufInputStream.close() supports taking reference count ownership of the underyling ByteBuf

Result:
ByteBufInputStream can assume reference count ownership so the underlying ByteBuf can be cleaned up when the stream is closed.
  • Loading branch information
Scottmitch committed Nov 15, 2016
1 parent d479e93 commit c1932a8
Show file tree
Hide file tree
Showing 16 changed files with 357 additions and 212 deletions.
63 changes: 61 additions & 2 deletions buffer/src/main/java/io/netty/buffer/ByteBufInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.netty.buffer;

import io.netty.util.ReferenceCounted;

import java.io.DataInput;
import java.io.DataInputStream;
import java.io.EOFException;
Expand All @@ -38,15 +40,23 @@
* @see ByteBufOutputStream
*/
public class ByteBufInputStream extends InputStream implements DataInput {

private final ByteBuf buffer;
private final int startIndex;
private final int endIndex;
private boolean closed;
/**
* To preserve backwards compatibility (which didn't transfer ownership) we support a conditional flag which
* indicates if {@link #buffer} should be released when this {@link InputStream} is closed.
* However in future releases ownership should always be transferred and callers of this class should call
* {@link ReferenceCounted#retain()} if necessary.
*/
private boolean releaseOnClose;

/**
* Creates a new stream which reads data from the specified {@code buffer}
* starting at the current {@code readerIndex} and ending at the current
* {@code writerIndex}.
* @param buffer The buffer which provides the content for this {@link InputStream}.
*/
public ByteBufInputStream(ByteBuf buffer) {
this(buffer, buffer.readableBytes());
Expand All @@ -56,23 +66,59 @@ public ByteBufInputStream(ByteBuf buffer) {
* Creates a new stream which reads data from the specified {@code buffer}
* starting at the current {@code readerIndex} and ending at
* {@code readerIndex + length}.
*
* @param buffer The buffer which provides the content for this {@link InputStream}.
* @param length The length of the buffer to use for this {@link InputStream}.
* @throws IndexOutOfBoundsException
* if {@code readerIndex + length} is greater than
* {@code writerIndex}
*/
public ByteBufInputStream(ByteBuf buffer, int length) {
this(buffer, length, false);
}

/**
* Creates a new stream which reads data from the specified {@code buffer}
* starting at the current {@code readerIndex} and ending at the current
* {@code writerIndex}.
* @param buffer The buffer which provides the content for this {@link InputStream}.
* @param releaseOnClose {@code true} means that when {@link #close()} is called then {@link ByteBuf#release()} will
* be called on {@code buffer}.
*/
public ByteBufInputStream(ByteBuf buffer, boolean releaseOnClose) {
this(buffer, buffer.readableBytes(), releaseOnClose);
}

/**
* Creates a new stream which reads data from the specified {@code buffer}
* starting at the current {@code readerIndex} and ending at
* {@code readerIndex + length}.
* @param buffer The buffer which provides the content for this {@Link InputStream}.
* @param length The length of the buffer to use for this {@link InputStream}.
* @param releaseOnClose {@code true} means that when {@link #close()} is called then {@link ByteBuf#release()} will
* be called on {@code buffer}.
* @throws IndexOutOfBoundsException
* if {@code readerIndex + length} is greater than
* {@code writerIndex}
*/
public ByteBufInputStream(ByteBuf buffer, int length, boolean releaseOnClose) {
if (buffer == null) {
throw new NullPointerException("buffer");
}
if (length < 0) {
if (releaseOnClose) {
buffer.release();
}
throw new IllegalArgumentException("length: " + length);
}
if (length > buffer.readableBytes()) {
if (releaseOnClose) {
buffer.release();
}
throw new IndexOutOfBoundsException("Too many bytes to be read - Needs "
+ length + ", maximum is " + buffer.readableBytes());
}

this.releaseOnClose = releaseOnClose;
this.buffer = buffer;
startIndex = buffer.readerIndex();
endIndex = startIndex + length;
Expand All @@ -86,6 +132,19 @@ public int readBytes() {
return buffer.readerIndex() - startIndex;
}

@Override
public void close() throws IOException {
try {
super.close();
} finally {
// The Closable interface says "If the stream is already closed then invoking this method has no effect."
if (releaseOnClose && !closed) {
closed = true;
buffer.release();
}
}
}

@Override
public int available() throws IOException {
return endIndex - buffer.readerIndex();
Expand Down
205 changes: 107 additions & 98 deletions buffer/src/test/java/io/netty/buffer/ByteBufStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.netty.buffer;

import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import org.junit.Test;

import java.io.EOFException;
Expand All @@ -30,7 +32,7 @@ public class ByteBufStreamTest {

@Test
public void testAll() throws Exception {
ByteBuf buf = Unpooled.buffer(0, 65536);
ByteBuf buf = ReferenceCountUtil.releaseLater(Unpooled.buffer(0, 65536));

try {
new ByteBufOutputStream(null);
Expand All @@ -40,142 +42,149 @@ public void testAll() throws Exception {
}

ByteBufOutputStream out = new ByteBufOutputStream(buf);
assertSame(buf, out.buffer());
out.writeBoolean(true);
out.writeBoolean(false);
out.writeByte(42);
out.writeByte(224);
out.writeBytes("Hello, World!");
out.writeChars("Hello, World");
out.writeChar('!');
out.writeDouble(42.0);
out.writeFloat(42.0f);
out.writeInt(42);
out.writeLong(42);
out.writeShort(42);
out.writeShort(49152);
out.writeUTF("Hello, World!");
out.writeBytes("The first line\r\r\n");
out.write(EMPTY_BYTES);
out.write(new byte[] { 1, 2, 3, 4 });
out.write(new byte[] { 1, 3, 3, 4 }, 0, 0);
out.close();
try {
assertSame(buf, out.buffer());
out.writeBoolean(true);
out.writeBoolean(false);
out.writeByte(42);
out.writeByte(224);
out.writeBytes("Hello, World!");
out.writeChars("Hello, World");
out.writeChar('!');
out.writeDouble(42.0);
out.writeFloat(42.0f);
out.writeInt(42);
out.writeLong(42);
out.writeShort(42);
out.writeShort(49152);
out.writeUTF("Hello, World!");
out.writeBytes("The first line\r\r\n");
out.write(EMPTY_BYTES);
out.write(new byte[]{1, 2, 3, 4});
out.write(new byte[]{1, 3, 3, 4}, 0, 0);
} finally {
out.close();
}

try {
new ByteBufInputStream(null);
new ByteBufInputStream(null, true);
fail();
} catch (NullPointerException e) {
// Expected
}

try {
new ByteBufInputStream(null, 0);
new ByteBufInputStream(null, 0, true);
fail();
} catch (NullPointerException e) {
// Expected
}

try {
new ByteBufInputStream(buf, -1);
new ByteBufInputStream(buf.retainedSlice(), -1, true);
} catch (IllegalArgumentException e) {
// Expected
}

try {
new ByteBufInputStream(buf, buf.capacity() + 1);
new ByteBufInputStream(buf.retainedSlice(), buf.capacity() + 1, true);
} catch (IndexOutOfBoundsException e) {
// Expected
}

ByteBufInputStream in = new ByteBufInputStream(buf);
ByteBufInputStream in = new ByteBufInputStream(buf, true);
try {
assertTrue(in.markSupported());
in.mark(Integer.MAX_VALUE);

assertTrue(in.markSupported());
in.mark(Integer.MAX_VALUE);
assertEquals(buf.writerIndex(), in.skip(Long.MAX_VALUE));
assertFalse(buf.isReadable());

assertEquals(buf.writerIndex(), in.skip(Long.MAX_VALUE));
assertFalse(buf.isReadable());
in.reset();
assertEquals(0, buf.readerIndex());

in.reset();
assertEquals(0, buf.readerIndex());
assertEquals(4, in.skip(4));
assertEquals(4, buf.readerIndex());
in.reset();

assertEquals(4, in.skip(4));
assertEquals(4, buf.readerIndex());
in.reset();

assertTrue(in.readBoolean());
assertFalse(in.readBoolean());
assertEquals(42, in.readByte());
assertEquals(224, in.readUnsignedByte());

byte[] tmp = new byte[13];
in.readFully(tmp);
assertEquals("Hello, World!", new String(tmp, "ISO-8859-1"));

assertEquals('H', in.readChar());
assertEquals('e', in.readChar());
assertEquals('l', in.readChar());
assertEquals('l', in.readChar());
assertEquals('o', in.readChar());
assertEquals(',', in.readChar());
assertEquals(' ', in.readChar());
assertEquals('W', in.readChar());
assertEquals('o', in.readChar());
assertEquals('r', in.readChar());
assertEquals('l', in.readChar());
assertEquals('d', in.readChar());
assertEquals('!', in.readChar());

assertEquals(42.0, in.readDouble(), 0.0);
assertEquals(42.0f, in.readFloat(), 0.0);
assertEquals(42, in.readInt());
assertEquals(42, in.readLong());
assertEquals(42, in.readShort());
assertEquals(49152, in.readUnsignedShort());

assertEquals("Hello, World!", in.readUTF());
assertEquals("The first line", in.readLine());
assertEquals("", in.readLine());

assertEquals(4, in.read(tmp));
assertEquals(1, tmp[0]);
assertEquals(2, tmp[1]);
assertEquals(3, tmp[2]);
assertEquals(4, tmp[3]);

assertEquals(-1, in.read());
assertEquals(-1, in.read(tmp));
assertTrue(in.readBoolean());
assertFalse(in.readBoolean());
assertEquals(42, in.readByte());
assertEquals(224, in.readUnsignedByte());

try {
in.readByte();
fail();
} catch (EOFException e) {
// Expected
}
byte[] tmp = new byte[13];
in.readFully(tmp);
assertEquals("Hello, World!", new String(tmp, "ISO-8859-1"));

assertEquals('H', in.readChar());
assertEquals('e', in.readChar());
assertEquals('l', in.readChar());
assertEquals('l', in.readChar());
assertEquals('o', in.readChar());
assertEquals(',', in.readChar());
assertEquals(' ', in.readChar());
assertEquals('W', in.readChar());
assertEquals('o', in.readChar());
assertEquals('r', in.readChar());
assertEquals('l', in.readChar());
assertEquals('d', in.readChar());
assertEquals('!', in.readChar());

assertEquals(42.0, in.readDouble(), 0.0);
assertEquals(42.0f, in.readFloat(), 0.0);
assertEquals(42, in.readInt());
assertEquals(42, in.readLong());
assertEquals(42, in.readShort());
assertEquals(49152, in.readUnsignedShort());

assertEquals("Hello, World!", in.readUTF());
assertEquals("The first line", in.readLine());
assertEquals("", in.readLine());

assertEquals(4, in.read(tmp));
assertEquals(1, tmp[0]);
assertEquals(2, tmp[1]);
assertEquals(3, tmp[2]);
assertEquals(4, tmp[3]);

assertEquals(-1, in.read());
assertEquals(-1, in.read(tmp));

try {
in.readByte();
fail();
} catch (EOFException e) {
// Expected
}

try {
in.readFully(tmp, 0, -1);
fail();
} catch (IndexOutOfBoundsException e) {
// Expected
}
try {
in.readFully(tmp, 0, -1);
fail();
} catch (IndexOutOfBoundsException e) {
// Expected
}

try {
in.readFully(tmp);
fail();
} catch (EOFException e) {
// Expected
try {
in.readFully(tmp);
fail();
} catch (EOFException e) {
// Expected
}
} finally {
// Ownership was transferred to the ByteBufOutputStream, before we close we must retain the underlying
// buffer.
buf.retain();
in.close();
}

in.close();

assertEquals(buf.readerIndex(), in.readBytes());
}

@Test
public void testReadLine() throws Exception {
Charset utf8 = Charset.forName("UTF-8");
ByteBuf buf = Unpooled.buffer();
ByteBufInputStream in = new ByteBufInputStream(buf);
ByteBufInputStream in = new ByteBufInputStream(buf, true);

String s = in.readLine();
assertNull(s);
Expand Down
Loading

0 comments on commit c1932a8

Please sign in to comment.