Skip to content

Commit

Permalink
REFACTORING: supporting a list of bytes, which will help fix issues w…
Browse files Browse the repository at this point in the history
…ith ungect19
  • Loading branch information
josedonizetti committed May 1, 2013
1 parent 639752e commit 5fe7db0
Showing 1 changed file with 53 additions and 36 deletions.
89 changes: 53 additions & 36 deletions src/org/jruby/util/io/ChannelStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.nio.channels.IllegalBlockingModeException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectableChannel;
import java.util.LinkedList;

import org.jruby.Finalizable;
import org.jruby.Ruby;
Expand All @@ -55,8 +56,6 @@
import org.jruby.util.log.Logger;
import org.jruby.util.log.LoggerFactory;

import java.nio.channels.spi.SelectorProvider;

/**
* This file implements a seekable IO file.
*/
Expand Down Expand Up @@ -105,7 +104,7 @@ public class ChannelStream implements Stream, Finalizable {
protected boolean reading; // are we reading or writing?
private ChannelDescriptor descriptor;
private boolean blocking = true;
protected int ungotc = -1;
private LinkedList<Byte> ungotChars = new LinkedList<Byte>();
private volatile boolean closedExplicitly = false;

private volatile boolean eof = false;
Expand Down Expand Up @@ -184,7 +183,11 @@ public void waitUntilReady() throws IOException, InterruptedException {
}

public boolean readDataBuffered() {
return reading && (ungotc != -1 || buffer.hasRemaining());
return reading && (hasUngotChars() || buffer.hasRemaining());
}

private boolean hasUngotChars() {
return ungotChars.size() > 0;
}

public boolean writeDataBuffered() {
Expand Down Expand Up @@ -287,12 +290,16 @@ public synchronized int getline(ByteList dst, byte terminator) throws IOExceptio

int totalRead = 0;
boolean found = false;
if (ungotc != -1) {
dst.append((byte) ungotc);
found = ungotc == terminator;
ungotc = -1;
++totalRead;

if (hasUngotChars()) {
Byte ungotc;
while((ungotc = ungotChars.poll()) != null){
dst.append(ungotc);
found = ungotc == terminator;
++totalRead;
}
}

while (!found) {
final byte[] bytes = buffer.array();
final int begin = buffer.arrayOffset() + buffer.position();
Expand Down Expand Up @@ -326,13 +333,17 @@ public synchronized int getline(ByteList dst, byte terminator, long limit) throw

int totalRead = 0;
boolean found = false;
if (ungotc != -1) {
dst.append((byte) ungotc);
found = ungotc == terminator;
ungotc = -1;
limit--;
++totalRead;

if (hasUngotChars()) {
Byte ungotc;
while((ungotc = ungotChars.poll()) != null){
dst.append(ungotc);
found = ungotc == terminator;
limit--;
++totalRead;
}
}

while (!found) {
final byte[] bytes = buffer.array();
final int begin = buffer.arrayOffset() + buffer.position();
Expand Down Expand Up @@ -455,9 +466,11 @@ public synchronized ByteList readall() throws IOException, BadDescriptorExceptio
private final int copyBufferedBytes(ByteBuffer dst) {
final int bytesToCopy = dst.remaining();

if (ungotc != -1 && dst.hasRemaining()) {
dst.put((byte) ungotc);
ungotc = -1;
if (hasUngotChars() && dst.hasRemaining()) {
Byte ungotc;
while((ungotc = ungotChars.poll()) != null){
dst.put(ungotc);
}
}

if (buffer.hasRemaining() && dst.hasRemaining()) {
Expand Down Expand Up @@ -490,11 +503,13 @@ private final int copyBufferedBytes(ByteBuffer dst) {
*/
private final int copyBufferedBytes(byte[] dst, int off, int len) {
int bytesCopied = 0;

if (ungotc != -1 && len > 0) {
dst[off++] = (byte) ungotc;
ungotc = -1;
++bytesCopied;

if (hasUngotChars() && len > 0) {
Byte ungotc;
while((ungotc = ungotChars.poll()) != null){
dst[off++] = ungotc;
++bytesCopied;
}
}

final int n = Math.min(len - bytesCopied, buffer.remaining());
Expand All @@ -515,11 +530,13 @@ private final int copyBufferedBytes(ByteList dst, int len) {
int bytesCopied = 0;

dst.ensure(Math.min(len, bufferedInputBytesRemaining()));

if (bytesCopied < len && ungotc != -1) {
++bytesCopied;
dst.append((byte) ungotc);
ungotc = -1;

if (bytesCopied < len && hasUngotChars()) {
Byte ungotc;
while((ungotc = ungotChars.poll()) != null){
++bytesCopied;
dst.append(ungotc);
}
}

//
Expand All @@ -540,7 +557,7 @@ private final int copyBufferedBytes(ByteList dst, int len) {
* @return The number of bytes that can be read without reading the underlying stream.
*/
private final int bufferedInputBytesRemaining() {
return reading ? (buffer.remaining() + (ungotc != -1 ? 1 : 0)) : 0;
return reading ? (buffer.remaining() + (ungotChars.size())) : 0;
}

/**
Expand All @@ -549,7 +566,7 @@ private final int bufferedInputBytesRemaining() {
* @return <tt>true</tt> if there are bytes available in the read buffer.
*/
private final boolean hasBufferedInputBytes() {
return reading && (buffer.hasRemaining() || ungotc != -1);
return reading && (buffer.hasRemaining() || hasUngotChars());
}

/**
Expand Down Expand Up @@ -728,7 +745,7 @@ public synchronized long fgetpos() throws IOException, PipeException, InvalidVal
// Adjust for buffered data
if (reading) {
pos -= buffer.remaining();
return pos - (pos > 0 && ungotc != -1 ? 1 : 0);
return pos - (pos > 0 && hasUngotChars() ? 1 : 0);
} else {
return pos + buffer.position();
}
Expand All @@ -750,7 +767,7 @@ public synchronized long fgetpos() throws IOException, PipeException, InvalidVal
public synchronized void lseek(long offset, int type) throws IOException, InvalidValueException, PipeException, BadDescriptorException {
if (descriptor.isSeekable()) {
FileChannel fileChannel = (FileChannel)descriptor.getChannel();
ungotc = -1;
ungotChars.clear();
int adj = 0;
if (reading) {
// for SEEK_CUR, need to adjust for buffered data
Expand Down Expand Up @@ -1215,7 +1232,7 @@ public int ungetc(int c) {
eof = false;

// save the ungot
ungotc = c;
ungotChars.push((byte) c);

return c;
}
Expand Down Expand Up @@ -1350,9 +1367,9 @@ public synchronized int read() throws IOException, BadDescriptorException {
try {
descriptor.checkOpen();

if (ungotc >= 0) {
int c = ungotc;
ungotc = -1;
if (hasUngotChars()) {
int c = ungotChars.poll();
ungotChars.clear();
return c;
}

Expand Down

0 comments on commit 5fe7db0

Please sign in to comment.