Permalink
Browse files

Use pooling mechanism for buffer pipe input stream

  • Loading branch information...
1 parent d37ac54 commit 4846d69773da1c155f569e221a5d2f5f34881488 @dmlloyd dmlloyd committed Jan 13, 2011
@@ -1594,4 +1594,34 @@ public static boolean readLine(final ByteBuffer src, final StringBuilder builder
oneChar.clear();
}
}
+
+ /**
+ * Create a pooled wrapper around a buffer. The buffer is unreferenced for garbage collection when
+ * freed or discarded.
+ *
+ * @param buffer the buffer to wrap
+ * @param <B> the buffer type
+ * @return the pooled wrapper
+ */
+ public static <B extends Buffer> Pooled<B> pooledWrapper(final B buffer) {
+ return new Pooled<B>() {
+ private volatile B buf = buffer;
+
+ public void discard() {
+ buf = null;
+ }
+
+ public void free() {
+ buf = null;
+ }
+
+ public B getResource() throws IllegalStateException {
+ final B buffer = buf;
+ if (buffer == null) {
+ throw new IllegalStateException();
+ }
+ return buffer;
+ }
+ };
+ }
}
@@ -30,28 +30,20 @@
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.Semaphore;
+import org.xnio.Buffers;
+import org.xnio.Pooled;
/**
* An {@code InputStream} implementation which is populated asynchronously with {@link ByteBuffer} instances.
*/
public class BufferPipeInputStream extends InputStream {
- private final Queue<Entry> queue;
+ private final Queue<Pooled<ByteBuffer>> queue;
private final InputHandler inputHandler;
- // protected by "queue"
+ // protected by "this"
private boolean eof;
private IOException failure;
- private static final class Entry {
- private final ByteBuffer buffer;
- private final BufferReturn bufferReturn;
-
- private Entry(final ByteBuffer buffer, final BufferReturn bufferReturn) {
- this.buffer = buffer;
- this.bufferReturn = bufferReturn;
- }
- }
-
/**
* Construct a new instance. The given {@code inputHandler} will
* be invoked after each buffer is fully read and when the stream is closed.
@@ -60,7 +52,7 @@ private Entry(final ByteBuffer buffer, final BufferReturn bufferReturn) {
*/
public BufferPipeInputStream(final InputHandler inputHandler) {
this.inputHandler = inputHandler;
- queue = new ArrayDeque<Entry>();
+ queue = new ArrayDeque<Pooled<ByteBuffer>>();
}
/**
@@ -72,7 +64,7 @@ public BufferPipeInputStream(final InputHandler inputHandler) {
public void push(final ByteBuffer buffer) {
synchronized (this) {
if (!eof && failure == null) {
- queue.add(new Entry(buffer, null));
+ queue.add(Buffers.pooledWrapper(buffer));
notifyAll();
}
}
@@ -82,16 +74,15 @@ public void push(final ByteBuffer buffer) {
* Push a buffer into the queue. There is no mechanism to limit the number of pushed buffers; if such a mechanism
* is desired, it must be implemented externally, for example maybe using a {@link Semaphore}.
*
- * @param buffer the buffer from which more data should be read
- * @param bufferReturn the buffer return to send this buffer to when it is exhausted
+ * @param pooledBuffer the buffer from which more data should be read
*/
- public void push(final ByteBuffer buffer, final BufferReturn bufferReturn) {
+ public void push(final Pooled<ByteBuffer> pooledBuffer) {
synchronized (this) {
if (!eof && failure == null) {
- queue.add(new Entry(buffer, bufferReturn));
+ queue.add(pooledBuffer);
notifyAll();
} else {
- bufferReturn.returnBuffer(buffer);
+ pooledBuffer.free();
}
}
}
@@ -124,7 +115,7 @@ public void pushException(IOException e) {
/** {@inheritDoc} */
public int read() throws IOException {
- final Queue<Entry> queue = this.queue;
+ final Queue<Pooled<ByteBuffer>> queue = this.queue;
synchronized (this) {
while (queue.isEmpty()) {
if (eof) {
@@ -138,14 +129,11 @@ public int read() throws IOException {
throw new InterruptedIOException("Interrupted on read()");
}
}
- final Entry entry = queue.peek();
- final ByteBuffer buf = entry.buffer;
- final BufferReturn bufferReturn = entry.bufferReturn;
+ final Pooled<ByteBuffer> entry = queue.peek();
+ final ByteBuffer buf = entry.getResource();
final int v = buf.get() & 0xff;
if (buf.remaining() == 0) {
- if (bufferReturn != null) {
- bufferReturn.returnBuffer(buf);
- }
+ entry.free();
queue.poll();
try {
inputHandler.acknowledge();
@@ -159,13 +147,9 @@ public int read() throws IOException {
private void clearQueue() {
synchronized (this) {
- Entry entry;
+ Pooled<ByteBuffer> entry;
while ((entry = queue.poll()) != null) {
- final ByteBuffer buffer = entry.buffer;
- final BufferReturn ret = entry.bufferReturn;
- if (ret != null) {
- ret.returnBuffer(buffer);
- }
+ entry.free();
}
}
}
@@ -175,7 +159,7 @@ public int read(final byte[] b, int off, int len) throws IOException {
if (len == 0) {
return 0;
}
- final Queue<Entry> queue = this.queue;
+ final Queue<Pooled<ByteBuffer>> queue = this.queue;
synchronized (this) {
while (queue.isEmpty()) {
if (eof) {
@@ -191,21 +175,18 @@ public int read(final byte[] b, int off, int len) throws IOException {
}
int total = 0;
while (len > 0) {
- final Entry entry = queue.peek();
+ final Pooled<ByteBuffer> entry = queue.peek();
if (entry == null) {
break;
}
- final ByteBuffer buffer = entry.buffer;
- final BufferReturn bufferReturn = entry.bufferReturn;
+ final ByteBuffer buffer = entry.getResource();
final int byteCnt = Math.min(buffer.remaining(), len);
buffer.get(b, off, byteCnt);
off += byteCnt;
total += byteCnt;
len -= byteCnt;
if (buffer.remaining() == 0) {
- if (bufferReturn != null) {
- bufferReturn.returnBuffer(buffer);
- }
+ entry.free();
queue.poll();
try {
inputHandler.acknowledge();
@@ -222,8 +203,8 @@ public int read(final byte[] b, int off, int len) throws IOException {
public int available() throws IOException {
synchronized (this) {
int total = 0;
- for (Entry entry : queue) {
- total += entry.buffer.remaining();
+ for (Pooled<ByteBuffer> entry : queue) {
+ total += entry.getResource().remaining();
if (total < 0) {
return Integer.MAX_VALUE;
}
@@ -233,7 +214,7 @@ public int available() throws IOException {
}
public long skip(long qty) throws IOException {
- final Queue<Entry> queue = this.queue;
+ final Queue<Pooled<ByteBuffer>> queue = this.queue;
synchronized (this) {
while (queue.isEmpty()) {
if (eof) {
@@ -249,21 +230,18 @@ public long skip(long qty) throws IOException {
}
long skipped = 0L;
while (qty > 0L) {
- final Entry entry = queue.peek();
+ final Pooled<ByteBuffer> entry = queue.peek();
if (entry == null) {
break;
}
- final ByteBuffer buffer = entry.buffer;
- final BufferReturn bufferReturn = entry.bufferReturn;
+ final ByteBuffer buffer = entry.getResource();
final int byteCnt = Math.min(buffer.remaining(), (int) Math.max((long)Integer.MAX_VALUE, qty));
buffer.position(buffer.position() + byteCnt);
skipped += byteCnt;
qty -= byteCnt;
if (buffer.remaining() == 0) {
queue.poll();
- if (bufferReturn != null) {
- bufferReturn.returnBuffer(buffer);
- }
+ entry.free();
try {
inputHandler.acknowledge();
} catch (IOException e) {
@@ -323,17 +301,4 @@ private void checkFailure() throws IOException {
*/
void close() throws IOException;
}
-
- /**
- * A handler for returning buffers which are have been exhausted.
- */
- public interface BufferReturn {
-
- /**
- * Accept a returned buffer.
- *
- * @param buffer the buffer
- */
- void returnBuffer(ByteBuffer buffer);
- }
}

0 comments on commit 4846d69

Please sign in to comment.