Skip to content
This repository was archived by the owner on May 28, 2018. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,53 +162,16 @@ public int available() throws IOException {

@Override
public int read() throws IOException {
if (eof) {
checkThrowable();
checkNotClosed();
return -1;
}

final int c;
if (current != null && current.hasRemaining()) {
c = current.get();
} else {
try {
// let's block until next non-empty chunk or EOF
c = fetchChunk(true) ? current.get() : -1;
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
}
}

checkThrowable();
checkNotClosed();
return c;
return tryRead(true);
}

@Override
public int tryRead() throws IOException {
checkThrowable();
checkNotClosed();

if (eof) {
return -1;
}

if (current != null && current.hasRemaining()) {
return current.get();
}

try {
// try to fetch, but don't block && check if something has been fetched
if (fetchChunk(false) && current != null) {
return current.get();
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
public int read(byte[] b, int off, int len) throws IOException {
return tryRead(b, off, len, true);
}

return (eof) ? -1 : NOTHING;
public int tryRead() throws IOException {
return tryRead(false);
}

@Override
Expand All @@ -218,44 +181,7 @@ public int tryRead(final byte[] b) throws IOException {

@Override
public int tryRead(final byte[] b, final int off, final int len) throws IOException {
checkThrowable();
checkNotClosed();

if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}

if (eof) {
return -1;
}

int i = 0;
while (i < len) {
if (current != null && current.hasRemaining()) {
final int available = current.remaining();
if (available < len - i) {
current.get(b, off + i, available);
i += available;
} else {
current.get(b, off + i, len - i);
return len;
}
} else {
try {
if (!fetchChunk(false) || current == null) {
break; // eof or no data
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

return i;
return tryRead(b, off, len, false);
}

@Override
Expand Down Expand Up @@ -335,4 +261,77 @@ public void closeQueue(final Throwable throwable) {
}
}
}

private int tryRead(final byte[] b, final int off, final int len, boolean block) throws IOException {
checkThrowable();
checkNotClosed();

if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}

if (eof) {
return -1;
}

int i = 0;
while (i < len) {
if (current != null && current.hasRemaining()) {
final int available = current.remaining();
if (available < len - i) {
current.get(b, off + i, available);
i += available;
} else {
current.get(b, off + i, len - i);
return len;
}
} else {
try {
if (!fetchChunk(block) || current == null) {
break; // eof or no data
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
if (block) {
throw new IOException(e);
}
}
}
}

return i == 0 && eof ? -1 : i;
}

private int tryRead(boolean block) throws IOException {
checkThrowable();
checkNotClosed();

if (eof) {
return -1;
}

if (current != null && current.hasRemaining()) {
return current.get() & 0xFF;
}

try {
// try to fetch, but don't block && check if something has been fetched
if (fetchChunk(block) && current != null) {
return current.get() & 0xFF;
} else if (block) {
return -1;
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
if (block) {
throw new IOException(e);
}
}

return (eof) ? -1 : NOTHING;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.glassfish.jersey.internal.LocalizationMessages;

Expand All @@ -52,6 +55,7 @@
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
Expand All @@ -61,6 +65,112 @@
*/
public class ByteBufferInputStreamTest {

@Test
public void testBlockingReadAByteEmptyStream() throws Exception {
final ByteBufferInputStream bbis = new ByteBufferInputStream();
bbis.closeQueue();
assertEquals(-1, bbis.read());
}

@Test
public void testNonBlockingReadAByteEmptyStream() throws Exception {
final ByteBufferInputStream bbis = new ByteBufferInputStream();
bbis.closeQueue();
assertEquals(-1, bbis.tryRead());
}

@Test
public void testBlockingReadByteArrayEmptyStream() throws Exception {
final ByteBufferInputStream bbis = new ByteBufferInputStream();
bbis.closeQueue();
byte[] buf = new byte[1024];
assertEquals(-1, bbis.read(buf));
}

@Test
public void testNonBlockingReadByteArrayEmptyStream() throws Exception {
final ByteBufferInputStream bbis = new ByteBufferInputStream();
bbis.closeQueue();
byte[] buf = new byte[1024];
assertEquals(-1, bbis.tryRead(buf));
}

@Test
public void testBlockingReadByteArrayFromFinishedExactLengthStream() throws Exception {
final ByteBufferInputStream bbis = new ByteBufferInputStream();
byte[] sourceData = new byte[1024];
new Random().nextBytes(sourceData);
ByteBuffer byteBuf = ByteBuffer.wrap(sourceData);
bbis.put(byteBuf);
bbis.closeQueue();
byte[] buf = new byte[1024];
assertEquals(1024, bbis.read(buf));
// no more data to read; so it should return -1
assertEquals(-1, bbis.read(buf));
}

@Test
public void testNonBlockingReadByteArrayFromFinishedExactLengthStream() throws Exception {
final ByteBufferInputStream bbis = new ByteBufferInputStream();
byte[] sourceData = new byte[1024];
new Random().nextBytes(sourceData);
ByteBuffer byteBuf = ByteBuffer.wrap(sourceData);
bbis.put(byteBuf);
byte[] buf = new byte[1024];
assertEquals(1024, bbis.tryRead(buf));
// the queue has not been close; so it should return 0
assertEquals(0, bbis.tryRead(buf));
bbis.closeQueue();
}

@Test
public void testBlockingReadByteArrayFromUnfinishedExactLengthStream() throws Exception {
final ByteBufferInputStream bbis = new ByteBufferInputStream();
byte[] sourceData = new byte[1024];
new Random().nextBytes(sourceData);
ByteBuffer byteBuf = ByteBuffer.wrap(sourceData);
bbis.put(byteBuf);
final byte[] buf = new byte[1024];
assertEquals(1024, bbis.read(buf));
final AtomicBoolean closed = new AtomicBoolean(false);
final Semaphore s = new Semaphore(1);
s.acquire();
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
// it should return -1 since there is no more data
assertEquals(-1, bbis.read(buf));
// it should only reach here if the stream has been closed
assertTrue(closed.get());
} catch (IOException e) {
e.printStackTrace();
} finally {
s.release();
}
}
});
t.start();
Thread.sleep(500);
closed.set(true);
bbis.closeQueue();
// wait until the job is done
s.acquire();
}

@Test
public void testNonBlockingReadByteArrayFromUnfinishedExactLengthStream() throws Exception {
final ByteBufferInputStream bbis = new ByteBufferInputStream();
byte[] sourceData = new byte[1024];
new Random().nextBytes(sourceData);
ByteBuffer byteBuf = ByteBuffer.wrap(sourceData);
bbis.put(byteBuf);
bbis.closeQueue();
byte[] buf = new byte[1024];
assertEquals(1024, bbis.tryRead(buf));
assertEquals(-1, bbis.tryRead(buf));
}

/**
* Test for non blocking single-byte read of the stream.
*
Expand All @@ -85,7 +195,7 @@ public void run() {
}
data.clear();
for (int j = 0; j < data.capacity(); j++) {
data.put((byte) (i % 128));
data.put((byte) (i & 0xFF));
}
data.flip();
if (!bbis.put(data)) {
Expand Down Expand Up @@ -113,7 +223,7 @@ public void run() {
Thread.yield(); // Give the other thread a chance to run.
continue;
}
assertEquals("At position: " + j, (byte) (i % 128), c);
assertEquals("At position: " + j, (byte) (i & 0xFF), (byte) (c & 0xFF));
if (++j % BUFFER_SIZE == 0) {
i++;
Thread.yield(); // Give the other thread a chance to run.
Expand Down Expand Up @@ -155,7 +265,7 @@ public void run() {
}
data.clear();
for (int j = 0; j < data.capacity(); j++) {
data.put((byte) (i % 128));
data.put((byte) (i & 0xFF));
}
data.flip();
if (!bbis.put(data)) {
Expand Down Expand Up @@ -185,7 +295,7 @@ public void run() {
continue;
}
for (int p = 0; p < c; p++) {
assertEquals("At position: " + j, (byte) (i % 128), buffer[p]);
assertEquals("At position: " + j, (byte) (i & 0xFF), (byte) buffer[p]);
if (++j % BUFFER_SIZE == 0) {
i++;
Thread.yield(); // Give the other thread a chance to run.
Expand Down Expand Up @@ -228,7 +338,7 @@ public void run() {
}
data.clear();
for (int j = 0; j < data.capacity(); j++) {
data.put((byte) (i % 128));
data.put((byte) (i & 0xFF));
}
data.flip();
if (!bbis.put(data)) {
Expand All @@ -253,7 +363,7 @@ public void run() {
while ((c = bbis.read()) != -1) {
assertNotEquals("Should not read 'nothing' in blocking mode.", Integer.MIN_VALUE, c);

assertEquals("At position: " + j, (byte) (i % 128), c);
assertEquals("At position: " + j, (byte) (i & 0xFF), (byte) c);
if (++j % BUFFER_SIZE == 0) {
i++;
Thread.yield(); // Give the other thread a chance to run.
Expand Down Expand Up @@ -295,7 +405,7 @@ public void run() {
}
data.clear();
for (int j = 0; j < data.capacity(); j++) {
data.put((byte) (i % 128));
data.put((byte) (i & 0xFF));
}
data.flip();
if (!bbis.put(data)) {
Expand All @@ -322,7 +432,7 @@ public void run() {
assertNotEquals("Should not read 0 bytes in blocking mode.", 0, c);

for (int p = 0; p < c; p++) {
assertEquals("At position: " + j, (byte) (i % 128), buffer[p]);
assertEquals("At position: " + j, (byte) (i & 0xFF), buffer[p]);
if (++j % BUFFER_SIZE == 0) {
i++;
Thread.yield(); // Give the other thread a chance to run.
Expand Down