Skip to content

Commit

Permalink
Fix JRUBY-5946: IO.select is returning write array with nil elements
Browse files Browse the repository at this point in the history
* Patch by Javier Alcazar
  • Loading branch information
headius committed Aug 16, 2011
1 parent f8e10de commit 2c8e310
Showing 1 changed file with 41 additions and 21 deletions.
62 changes: 41 additions & 21 deletions src/org/jruby/util/io/SelectBlob.java
Expand Up @@ -33,7 +33,10 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyFixnum;
Expand Down Expand Up @@ -114,11 +117,14 @@ private void processReads(Ruby runtime, IRubyObject[] args, ThreadContext contex
readArray = null;
} else {
readIOs = new RubyIO[readSize];
Map<Character,Integer> attachment = new HashMap<Character,Integer>(1);
for (int i = 0; i < readSize; i++) {
RubyIO ioObj = saveReadIO(i, context);
saveReadBlocking(ioObj, i);
saveBufferedRead(ioObj, i);
trySelectRead(context, i, ioObj);
saveBufferedRead(ioObj, i);
attachment.clear();
attachment.put('r', i);
trySelectRead(context, attachment, ioObj);
}
}
}
Expand All @@ -145,15 +151,15 @@ private void saveBufferedRead(RubyIO ioObj, int i) throws BadDescriptorException
}
}

private void trySelectRead(ThreadContext context, int i, RubyIO ioObj) throws IOException {
if (ioObj.getChannel() instanceof SelectableChannel && registerSelect(context, getSelector(context), i, ioObj, SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) {
private void trySelectRead(ThreadContext context, Map<Character,Integer> attachment, RubyIO ioObj) throws IOException {
if (ioObj.getChannel() instanceof SelectableChannel && registerSelect(context, getSelector(context), attachment, ioObj, SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) {
selectedReads++;
if (ioObj.writeDataBuffered()) {
getPendingReads()[i] = true;
getPendingReads()[(Integer)attachment.get('r')] = true;
}
} else {
if ((ioObj.getOpenFile().getMode() & OpenFile.READABLE) != 0) {
getUnselectableReads()[i] = true;
getUnselectableReads()[(Integer)attachment.get('r')] = true;
}
}
}
Expand All @@ -169,10 +175,13 @@ private void processWrites(Ruby runtime, IRubyObject[] args, ThreadContext conte
writeArray = null;
} else {
writeIOs = new RubyIO[writeSize];
Map<Character,Integer> attachment = new HashMap<Character,Integer>(1);
for (int i = 0; i < writeSize; i++) {
RubyIO ioObj = saveWriteIO(i, context);
saveWriteBlocking(ioObj, i);
trySelectWrite(context, i, ioObj);
attachment.clear();
attachment.put('w', i);
trySelectWrite(context, attachment, ioObj);
}
}
}
Expand Down Expand Up @@ -202,11 +211,11 @@ private void saveWriteBlocking(RubyIO ioObj, int i) {
}
}

private void trySelectWrite(ThreadContext context, int i, RubyIO ioObj) throws IOException {
if (!registerSelect(context, getSelector(context), i, ioObj, SelectionKey.OP_WRITE)) {
private void trySelectWrite(ThreadContext context, Map<Character,Integer> attachment, RubyIO ioObj) throws IOException {
if (!registerSelect(context, getSelector(context), attachment, ioObj, SelectionKey.OP_WRITE)) {
selectedReads++;
if ((ioObj.getOpenFile().getMode() & OpenFile.WRITABLE) != 0) {
getUnselectableWrites()[i] = true;
getUnselectableWrites()[(Integer)attachment.get('w')] = true;
}
}
}
Expand Down Expand Up @@ -245,41 +254,45 @@ private void doSelect(final boolean has_timeout, long timeout) throws IOExceptio
}
}

@SuppressWarnings("unchecked")
private void processSelectedKeys(Ruby runtime) {
if (selector != null) {
for (Iterator i = selector.selectedKeys().iterator(); i.hasNext();) {
SelectionKey key = (SelectionKey) i.next();
int ioIndex = (Integer) key.attachment();
int readIoIndex = 0;
int writeIoIndex = 0;
try {
int interestAndReady = key.interestOps() & key.readyOps();
if (readArray != null && (interestAndReady & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT | SelectionKey.OP_CONNECT)) != 0) {
getReadResults().append(readArray.eltOk(ioIndex));
readIoIndex = ((Map<Character,Integer>)key.attachment()).get('r');
getReadResults().append(readArray.eltOk(readIoIndex));
if (pendingReads != null) {
pendingReads[ioIndex] = false;
pendingReads[readIoIndex] = false;
}
}
if (writeArray != null && (interestAndReady & (SelectionKey.OP_WRITE)) != 0) {
getWriteResults().append(writeArray.eltOk(ioIndex));
writeIoIndex = ((Map<Character,Integer>)key.attachment()).get('w');
getWriteResults().append(writeArray.eltOk(writeIoIndex));
}
} catch (CancelledKeyException cke) {
// TODO: is this the right thing to do?
int interest = key.interestOps();
if (readArray != null && (interest & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT | SelectionKey.OP_CONNECT)) != 0) {
if (pendingReads != null) {
pendingReads[ioIndex] = false;
pendingReads[readIoIndex] = false;
}
if (errorResults != null) {
errorResults = RubyArray.newArray(runtime, readArray.size() + writeArray.size());
}
if (fastSearch(errorResults.toJavaArrayUnsafe(), readIOs[ioIndex]) == -1) {
if (fastSearch(errorResults.toJavaArrayUnsafe(), readIOs[readIoIndex]) == -1) {
// only add to error if not there
getErrorResults().append(readArray.eltOk(ioIndex));
getErrorResults().append(readArray.eltOk(readIoIndex));
}
}
if (writeArray != null && (interest & (SelectionKey.OP_WRITE)) != 0) {
if (fastSearch(errorResults.toJavaArrayUnsafe(), writeIOs[ioIndex]) == -1) {
if (fastSearch(errorResults.toJavaArrayUnsafe(), writeIOs[writeIoIndex]) == -1) {
// only add to error if not there
errorResults.append(writeArray.eltOk(ioIndex));
errorResults.append(writeArray.eltOk(writeIoIndex));
}
}
}
Expand Down Expand Up @@ -419,7 +432,8 @@ private static void checkArrayType(Ruby runtime, IRubyObject obj) {
}
}

private static boolean registerSelect(ThreadContext context, Selector selector, Object obj, RubyIO ioObj, int ops) throws IOException {
@SuppressWarnings("unchecked")
private static boolean registerSelect(ThreadContext context, Selector selector, Map<Character,Integer> obj, RubyIO ioObj, int ops) throws IOException {
Channel channel = ioObj.getChannel();
if (channel == null || !(channel instanceof SelectableChannel)) {
return false;
Expand All @@ -430,13 +444,19 @@ private static boolean registerSelect(ThreadContext context, Selector selector,
SelectionKey key = ((SelectableChannel) channel).keyFor(selector);

if (key == null) {
((SelectableChannel) channel).register(selector, real_ops, obj);
Map<Character,Integer> attachment = new HashMap<Character,Integer> (1);
attachment.putAll(obj);
((SelectableChannel) channel).register(selector, real_ops, attachment );
} else {
key.interestOps(key.interestOps() | real_ops);
Map<Character,Integer> att = (Map<Character,Integer>)key.attachment();
att.putAll(obj);
key.attach(att);
}

return true;
}

Ruby runtime;
RubyArray readArray = null;
int readSize = 0;
Expand Down

0 comments on commit 2c8e310

Please sign in to comment.