From e08a7c85faffc4cc439ab42e94df21a59ca9e120 Mon Sep 17 00:00:00 2001 From: Nat Luengnaruemitchai Date: Wed, 8 Apr 2015 08:12:18 -0700 Subject: [PATCH 1/2] JERSEY-2837: Correctly handle negative byte values --- .../collection/ByteBufferInputStream.java | 161 +++++++++--------- .../collection/ByteBufferInputStreamTest.java | 16 +- 2 files changed, 88 insertions(+), 89 deletions(-) diff --git a/core-common/src/main/java/org/glassfish/jersey/internal/util/collection/ByteBufferInputStream.java b/core-common/src/main/java/org/glassfish/jersey/internal/util/collection/ByteBufferInputStream.java index 375b42e50d..7d2e9383cd 100644 --- a/core-common/src/main/java/org/glassfish/jersey/internal/util/collection/ByteBufferInputStream.java +++ b/core-common/src/main/java/org/glassfish/jersey/internal/util/collection/ByteBufferInputStream.java @@ -162,53 +162,16 @@ public int available() throws IOException { @Override public int read() throws IOException { - if (eof) { - checkThrowable(); - checkNotClosed(); - return -1; - } - - final int c; - if (current != null && current.hasRemaining()) { - c = current.get(); - } else { - try { - // let's block until next non-empty chunk or EOF - c = fetchChunk(true) ? current.get() : -1; - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException(e); - } - } - - checkThrowable(); - checkNotClosed(); - return c; + return tryRead(true); } @Override - public int tryRead() throws IOException { - checkThrowable(); - checkNotClosed(); - - if (eof) { - return -1; - } - - if (current != null && current.hasRemaining()) { - return current.get(); - } - - try { - // try to fetch, but don't block && check if something has been fetched - if (fetchChunk(false) && current != null) { - return current.get(); - } - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - } + public int read(byte[] b, int off, int len) throws IOException { + return tryRead(b, off, len, true); + } - return (eof) ? -1 : NOTHING; + public int tryRead() throws IOException { + return tryRead(false); } @Override @@ -218,44 +181,7 @@ public int tryRead(final byte[] b) throws IOException { @Override public int tryRead(final byte[] b, final int off, final int len) throws IOException { - checkThrowable(); - checkNotClosed(); - - if (b == null) { - throw new NullPointerException(); - } else if (off < 0 || len < 0 || len > b.length - off) { - throw new IndexOutOfBoundsException(); - } else if (len == 0) { - return 0; - } - - if (eof) { - return -1; - } - - int i = 0; - while (i < len) { - if (current != null && current.hasRemaining()) { - final int available = current.remaining(); - if (available < len - i) { - current.get(b, off + i, available); - i += available; - } else { - current.get(b, off + i, len - i); - return len; - } - } else { - try { - if (!fetchChunk(false) || current == null) { - break; // eof or no data - } - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - - return i; + return tryRead(b, off, len, false); } @Override @@ -335,4 +261,77 @@ public void closeQueue(final Throwable throwable) { } } } + + private int tryRead(final byte[] b, final int off, final int len, boolean block) throws IOException { + checkThrowable(); + checkNotClosed(); + + if (b == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return 0; + } + + if (eof) { + return -1; + } + + int i = 0; + while (i < len) { + if (current != null && current.hasRemaining()) { + final int available = current.remaining(); + if (available < len - i) { + current.get(b, off + i, available); + i += available; + } else { + current.get(b, off + i, len - i); + return len; + } + } else { + try { + if (!fetchChunk(block) || current == null) { + break; // eof or no data + } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + if (block) { + throw new IOException(e); + } + } + } + } + + return i; + } + + private int tryRead(boolean block) throws IOException { + checkThrowable(); + checkNotClosed(); + + if (eof) { + return -1; + } + + if (current != null && current.hasRemaining()) { + return current.get() & 0xFF; + } + + try { + // try to fetch, but don't block && check if something has been fetched + if (fetchChunk(block) && current != null) { + return current.get() & 0xFF; + } else if (block) { + return -1; + } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + if (block) { + throw new IOException(e); + } + } + + return (eof) ? -1 : NOTHING; + } } diff --git a/core-common/src/test/java/org/glassfish/jersey/internal/util/collection/ByteBufferInputStreamTest.java b/core-common/src/test/java/org/glassfish/jersey/internal/util/collection/ByteBufferInputStreamTest.java index ad60935ed1..189486c92d 100644 --- a/core-common/src/test/java/org/glassfish/jersey/internal/util/collection/ByteBufferInputStreamTest.java +++ b/core-common/src/test/java/org/glassfish/jersey/internal/util/collection/ByteBufferInputStreamTest.java @@ -85,7 +85,7 @@ public void run() { } data.clear(); for (int j = 0; j < data.capacity(); j++) { - data.put((byte) (i % 128)); + data.put((byte) (i & 0xFF)); } data.flip(); if (!bbis.put(data)) { @@ -113,7 +113,7 @@ public void run() { Thread.yield(); // Give the other thread a chance to run. continue; } - assertEquals("At position: " + j, (byte) (i % 128), c); + assertEquals("At position: " + j, (byte) (i & 0xFF), (byte) (c & 0xFF)); if (++j % BUFFER_SIZE == 0) { i++; Thread.yield(); // Give the other thread a chance to run. @@ -155,7 +155,7 @@ public void run() { } data.clear(); for (int j = 0; j < data.capacity(); j++) { - data.put((byte) (i % 128)); + data.put((byte) (i & 0xFF)); } data.flip(); if (!bbis.put(data)) { @@ -185,7 +185,7 @@ public void run() { continue; } for (int p = 0; p < c; p++) { - assertEquals("At position: " + j, (byte) (i % 128), buffer[p]); + assertEquals("At position: " + j, (byte) (i & 0xFF), (byte) buffer[p]); if (++j % BUFFER_SIZE == 0) { i++; Thread.yield(); // Give the other thread a chance to run. @@ -228,7 +228,7 @@ public void run() { } data.clear(); for (int j = 0; j < data.capacity(); j++) { - data.put((byte) (i % 128)); + data.put((byte) (i & 0xFF)); } data.flip(); if (!bbis.put(data)) { @@ -253,7 +253,7 @@ public void run() { while ((c = bbis.read()) != -1) { assertNotEquals("Should not read 'nothing' in blocking mode.", Integer.MIN_VALUE, c); - assertEquals("At position: " + j, (byte) (i % 128), c); + assertEquals("At position: " + j, (byte) (i & 0xFF), (byte) c); if (++j % BUFFER_SIZE == 0) { i++; Thread.yield(); // Give the other thread a chance to run. @@ -295,7 +295,7 @@ public void run() { } data.clear(); for (int j = 0; j < data.capacity(); j++) { - data.put((byte) (i % 128)); + data.put((byte) (i & 0xFF)); } data.flip(); if (!bbis.put(data)) { @@ -322,7 +322,7 @@ public void run() { assertNotEquals("Should not read 0 bytes in blocking mode.", 0, c); for (int p = 0; p < c; p++) { - assertEquals("At position: " + j, (byte) (i % 128), buffer[p]); + assertEquals("At position: " + j, (byte) (i & 0xFF), buffer[p]); if (++j % BUFFER_SIZE == 0) { i++; Thread.yield(); // Give the other thread a chance to run. From cb34eea64b4a0bac5fa73c2d43a1f941ae41ee68 Mon Sep 17 00:00:00 2001 From: Nat Luengnaruemitchai Date: Mon, 13 Apr 2015 11:20:16 -0700 Subject: [PATCH 2/2] JERSEY-2837: For empty stream, it should return -1 when read --- .../collection/ByteBufferInputStream.java | 2 +- .../collection/ByteBufferInputStreamTest.java | 110 ++++++++++++++++++ 2 files changed, 111 insertions(+), 1 deletion(-) diff --git a/core-common/src/main/java/org/glassfish/jersey/internal/util/collection/ByteBufferInputStream.java b/core-common/src/main/java/org/glassfish/jersey/internal/util/collection/ByteBufferInputStream.java index 7d2e9383cd..50962008e5 100644 --- a/core-common/src/main/java/org/glassfish/jersey/internal/util/collection/ByteBufferInputStream.java +++ b/core-common/src/main/java/org/glassfish/jersey/internal/util/collection/ByteBufferInputStream.java @@ -303,7 +303,7 @@ private int tryRead(final byte[] b, final int off, final int len, boolean block) } } - return i; + return i == 0 && eof ? -1 : i; } private int tryRead(boolean block) throws IOException { diff --git a/core-common/src/test/java/org/glassfish/jersey/internal/util/collection/ByteBufferInputStreamTest.java b/core-common/src/test/java/org/glassfish/jersey/internal/util/collection/ByteBufferInputStreamTest.java index 189486c92d..6c20b8c5fc 100644 --- a/core-common/src/test/java/org/glassfish/jersey/internal/util/collection/ByteBufferInputStreamTest.java +++ b/core-common/src/test/java/org/glassfish/jersey/internal/util/collection/ByteBufferInputStreamTest.java @@ -41,9 +41,12 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.glassfish.jersey.internal.LocalizationMessages; @@ -52,6 +55,7 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** @@ -61,6 +65,112 @@ */ public class ByteBufferInputStreamTest { + @Test + public void testBlockingReadAByteEmptyStream() throws Exception { + final ByteBufferInputStream bbis = new ByteBufferInputStream(); + bbis.closeQueue(); + assertEquals(-1, bbis.read()); + } + + @Test + public void testNonBlockingReadAByteEmptyStream() throws Exception { + final ByteBufferInputStream bbis = new ByteBufferInputStream(); + bbis.closeQueue(); + assertEquals(-1, bbis.tryRead()); + } + + @Test + public void testBlockingReadByteArrayEmptyStream() throws Exception { + final ByteBufferInputStream bbis = new ByteBufferInputStream(); + bbis.closeQueue(); + byte[] buf = new byte[1024]; + assertEquals(-1, bbis.read(buf)); + } + + @Test + public void testNonBlockingReadByteArrayEmptyStream() throws Exception { + final ByteBufferInputStream bbis = new ByteBufferInputStream(); + bbis.closeQueue(); + byte[] buf = new byte[1024]; + assertEquals(-1, bbis.tryRead(buf)); + } + + @Test + public void testBlockingReadByteArrayFromFinishedExactLengthStream() throws Exception { + final ByteBufferInputStream bbis = new ByteBufferInputStream(); + byte[] sourceData = new byte[1024]; + new Random().nextBytes(sourceData); + ByteBuffer byteBuf = ByteBuffer.wrap(sourceData); + bbis.put(byteBuf); + bbis.closeQueue(); + byte[] buf = new byte[1024]; + assertEquals(1024, bbis.read(buf)); + // no more data to read; so it should return -1 + assertEquals(-1, bbis.read(buf)); + } + + @Test + public void testNonBlockingReadByteArrayFromFinishedExactLengthStream() throws Exception { + final ByteBufferInputStream bbis = new ByteBufferInputStream(); + byte[] sourceData = new byte[1024]; + new Random().nextBytes(sourceData); + ByteBuffer byteBuf = ByteBuffer.wrap(sourceData); + bbis.put(byteBuf); + byte[] buf = new byte[1024]; + assertEquals(1024, bbis.tryRead(buf)); + // the queue has not been close; so it should return 0 + assertEquals(0, bbis.tryRead(buf)); + bbis.closeQueue(); + } + + @Test + public void testBlockingReadByteArrayFromUnfinishedExactLengthStream() throws Exception { + final ByteBufferInputStream bbis = new ByteBufferInputStream(); + byte[] sourceData = new byte[1024]; + new Random().nextBytes(sourceData); + ByteBuffer byteBuf = ByteBuffer.wrap(sourceData); + bbis.put(byteBuf); + final byte[] buf = new byte[1024]; + assertEquals(1024, bbis.read(buf)); + final AtomicBoolean closed = new AtomicBoolean(false); + final Semaphore s = new Semaphore(1); + s.acquire(); + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + // it should return -1 since there is no more data + assertEquals(-1, bbis.read(buf)); + // it should only reach here if the stream has been closed + assertTrue(closed.get()); + } catch (IOException e) { + e.printStackTrace(); + } finally { + s.release(); + } + } + }); + t.start(); + Thread.sleep(500); + closed.set(true); + bbis.closeQueue(); + // wait until the job is done + s.acquire(); + } + + @Test + public void testNonBlockingReadByteArrayFromUnfinishedExactLengthStream() throws Exception { + final ByteBufferInputStream bbis = new ByteBufferInputStream(); + byte[] sourceData = new byte[1024]; + new Random().nextBytes(sourceData); + ByteBuffer byteBuf = ByteBuffer.wrap(sourceData); + bbis.put(byteBuf); + bbis.closeQueue(); + byte[] buf = new byte[1024]; + assertEquals(1024, bbis.tryRead(buf)); + assertEquals(-1, bbis.tryRead(buf)); + } + /** * Test for non blocking single-byte read of the stream. *