Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

First rework of thread events, with ThreadService acting as arbitrato…

…r and a substantial reduction of complexity. Still needs more work, especially wrt blocking IO.
  • Loading branch information...
commit c4c035bee9ca79f33cfe78e121cf535cb489308d 1 parent 97963ec
@headius headius authored
View
2  rubyspecs.revision
@@ -3,4 +3,4 @@
mspec.revision=cfe394e3b0e995733819235bdea693ea50d164a9
-rubyspecs.revision=cf58ae7a004ef14466419ccedaa3099abc07b27f
+rubyspecs.revision=7d14aa9e75e0781c4f1b25fd4460ff522a3e5722
View
1  spec/tags/ruby/core/thread/stop_tags.txt
@@ -1,2 +1,3 @@
fails(JRUBY-3459):Thread#stop? describes a killed thread
fails(JRUBY-3459):Thread#stop? describes a thread with an uncaught exception
+fails(JRUBY-3587):Thread#stop? reports aborting on a killed thread
View
280 src/org/jruby/RubyThread.java
@@ -55,7 +55,6 @@
import org.jruby.runtime.builtin.IRubyObject;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.locks.ReentrantLock;
import org.jruby.anno.JRubyMethod;
import org.jruby.anno.JRubyClass;
import org.jruby.runtime.ObjectMarshal;
@@ -81,25 +80,20 @@
private boolean abortOnException;
private IRubyObject finalResult;
private RaiseException exitingException;
- private IRubyObject receivedException;
private RubyThreadGroup threadGroup;
private final ThreadService threadService;
- private volatile boolean isStopped = false;
- private volatile boolean isDead = false;
- private volatile boolean isWoken = false;
- public final Object stopLock = new Object();
-
- private volatile boolean killed = false;
- public Object killLock = new Object();
-
- public final ReentrantLock lock = new ReentrantLock();
// Error info is per-thread
private IRubyObject errorInfo;
private static final boolean DEBUG = false;
+ public static enum Status { RUN, SLEEP, ABORTING, DEAD }
+
+ private volatile ThreadService.Event mail;
+ private volatile Status status = Status.RUN;
+
protected RubyThread(Ruby runtime, RubyClass type) {
super(runtime, type);
this.threadService = runtime.getThreadService();
@@ -109,6 +103,45 @@ protected RubyThread(Ruby runtime, RubyClass type) {
errorInfo = runtime.getNil();
}
+ public synchronized void receiveMail(ThreadService.Event event) {
+ // if we're already aborting, we can receive no further mail
+ if (status == Status.ABORTING) return;
+
+ mail = event;
+ switch (event.type) {
+ case KILL:
+ status = Status.ABORTING;
+ }
+
+ // If this thread is sleeping or stopped, wake it
+ notify();
+
+ // interrupt the target thread in case it's blocking or waiting
+ // WARNING: We no longer interrupt the target thread, since this usually means
+ // interrupting IO and with NIO that means the channel is no longer usable.
+ // We either need a new way to handle waking a target thread that's waiting
+ // on IO, or we need to accept that we can't wake such threads and must wait
+ // for them to complete their operation.
+ //threadImpl.interrupt();
+
+ // new interrupt, to hopefully wake it out of any blocking IO
+ this.interrupt();
+
+ }
+
+ public synchronized void checkMail(ThreadContext context) {
+ ThreadService.Event myEvent = mail;
+ mail = null;
+ if (myEvent != null) {
+ switch (myEvent.type) {
+ case RAISE:
+ receivedAnException(context, myEvent.exception);
+ case KILL:
+ throwThreadKill();
+ }
+ }
+ }
+
public IRubyObject getErrorInfo() {
return errorInfo;
}
@@ -207,7 +240,6 @@ public IRubyObject initialize(IRubyObject[] args, Block block) {
FutureThread futureThread = new FutureThread(this, runnable);
threadImpl = futureThread;
- // set to default thread group
runtime.getDefaultThreadGroup().addDirectly(this);
threadImpl.start();
@@ -254,8 +286,10 @@ private static RubyThread startThread(final IRubyObject recv, final IRubyObject[
public synchronized void cleanTerminate(IRubyObject result) {
finalResult = result;
- isStopped = true;
- isDead = true;
+ }
+
+ public synchronized void beDead() {
+ status = status.DEAD;
}
public void pollThreadEvents() {
@@ -263,11 +297,10 @@ public void pollThreadEvents() {
}
public void pollThreadEvents(ThreadContext context) {
- if (killed) throwThreadKill();
- if (receivedException != null) receivedAnException(context);
+ if (mail != null) checkMail(context);
}
- private void throwThreadKill() {
+ private static void throwThreadKill() {
throw new ThreadKill();
}
@@ -371,7 +404,7 @@ public IRubyObject abort_on_exception_set(IRubyObject val) {
@JRubyMethod(name = "alive?")
public RubyBoolean alive_p() {
- return !isDead && threadImpl.isAlive() ? getRuntime().getTrue() : getRuntime().getFalse();
+ return threadImpl.isAlive() ? getRuntime().getTrue() : getRuntime().getFalse();
}
@JRubyMethod(name = "join", optional = 1, backtrace = true)
@@ -400,8 +433,8 @@ public IRubyObject join(IRubyObject[] args) {
try {
if (threadService.getCritical()) {
// If the target thread is sleeping or stopped, wake it
- synchronized (stopLock) {
- stopLock.notify();
+ synchronized (this) {
+ notify();
}
// interrupt the target thread in case it's blocking or waiting
@@ -471,25 +504,13 @@ void setThreadGroup(RubyThreadGroup rubyThreadGroup) {
@JRubyMethod(name = "inspect")
@Override
- public IRubyObject inspect() {
+ public synchronized IRubyObject inspect() {
// FIXME: There's some code duplication here with RubyObject#inspect
StringBuilder part = new StringBuilder();
String cname = getMetaClass().getRealClass().getName();
part.append("#<").append(cname).append(":0x");
part.append(Integer.toHexString(System.identityHashCode(this)));
-
- if (threadImpl.isAlive()) {
- if (isStopped) {
- part.append(getRuntime().newString(" sleep"));
- } else if (killed) {
- part.append(getRuntime().newString(" aborting"));
- } else {
- part.append(getRuntime().newString(" run"));
- }
- } else {
- part.append(" dead");
- }
-
+ part.append(status.toString().toLowerCase());
part.append(">");
return getRuntime().newString(part.toString());
}
@@ -521,23 +542,20 @@ public static IRubyObject critical(IRubyObject receiver) {
}
@JRubyMethod(name = "stop", meta = true)
- public static IRubyObject stop(IRubyObject receiver) {
- RubyThread rubyThread = receiver.getRuntime().getThreadService().getCurrentContext().getThread();
- Object stopLock = rubyThread.stopLock;
+ public static IRubyObject stop(ThreadContext context, IRubyObject receiver) {
+ RubyThread rubyThread = context.getThread();
- synchronized (stopLock) {
- rubyThread.pollThreadEvents();
+ synchronized (rubyThread) {
+ rubyThread.checkMail(context);
try {
// attempt to decriticalize all if we're the critical thread
receiver.getRuntime().getThreadService().setCritical(false);
-
- rubyThread.isStopped = true;
-
- stopLock.wait();
+
+ rubyThread.status = Status.SLEEP;
+ rubyThread.wait();
} catch (InterruptedException ie) {
- rubyThread.pollThreadEvents();
- } finally {
- rubyThread.isStopped = false;
+ rubyThread.checkMail(context);
+ rubyThread.status = Status.RUN;
}
}
@@ -553,26 +571,24 @@ public static IRubyObject kill(IRubyObject receiver, IRubyObject rubyThread, Blo
@JRubyMethod(name = "exit", frame = true, meta = true)
public static IRubyObject s_exit(IRubyObject receiver, Block block) {
RubyThread rubyThread = receiver.getRuntime().getThreadService().getCurrentContext().getThread();
-
- rubyThread.killed = true;
- // attempt to decriticalize all if we're the critical thread
- receiver.getRuntime().getThreadService().setCritical(false);
-
- throw new ThreadKill();
+
+ synchronized (rubyThread) {
+ rubyThread.status = Status.ABORTING;
+ rubyThread.mail = null;
+ receiver.getRuntime().getThreadService().setCritical(false);
+ throw new ThreadKill();
+ }
}
@JRubyMethod(name = "stop?")
public RubyBoolean stop_p() {
// not valid for "dead" state
- return getRuntime().newBoolean(isStopped);
+ return getRuntime().newBoolean(status == Status.SLEEP || status == Status.DEAD);
}
@JRubyMethod(name = "wakeup")
- public RubyThread wakeup() {
- synchronized (stopLock) {
- isWoken = true;
- stopLock.notifyAll();
- }
+ public synchronized RubyThread wakeup() {
+ notifyAll();
return this;
}
@@ -611,34 +627,11 @@ public IRubyObject raise(IRubyObject[] args, Block block) {
if (DEBUG) System.out.println("thread " + Thread.currentThread() + " before raising");
RubyThread currentThread = getRuntime().getCurrentContext().getThread();
- try {
- while (!(currentThread.lock.tryLock() && this.lock.tryLock())) {
- if (currentThread.lock.isHeldByCurrentThread()) currentThread.lock.unlock();
- }
- currentThread.pollThreadEvents();
- if (DEBUG) System.out.println("thread " + Thread.currentThread() + " raising");
- receivedException = prepareRaiseException(runtime, args, block);
-
- // If the target thread is sleeping or stopped, wake it
- synchronized (stopLock) {
- stopLock.notify();
- }
+ if (DEBUG) System.out.println("thread " + Thread.currentThread() + " raising");
+ IRubyObject exception = prepareRaiseException(runtime, args, block);
- // interrupt the target thread in case it's blocking or waiting
- // WARNING: We no longer interrupt the target thread, since this usually means
- // interrupting IO and with NIO that means the channel is no longer usable.
- // We either need a new way to handle waking a target thread that's waiting
- // on IO, or we need to accept that we can't wake such threads and must wait
- // for them to complete their operation.
- //threadImpl.interrupt();
-
- // new interrupt, to hopefully wake it out of any blocking IO
- this.interrupt();
- } finally {
- if (currentThread.lock.isHeldByCurrentThread()) currentThread.lock.unlock();
- if (this.lock.isHeldByCurrentThread()) this.lock.unlock();
- }
+ runtime.getThreadService().deliverEvent(new ThreadService.Event(currentThread, this, ThreadService.Event.Type.RAISE, exception));
return this;
}
@@ -684,14 +677,8 @@ private IRubyObject prepareRaiseException(Ruby runtime, IRubyObject[] args, Bloc
}
@JRubyMethod(name = "run")
- public IRubyObject run() {
- // if stopped, unstop
- synchronized (stopLock) {
- if (isStopped) {
- isStopped = false;
- stopLock.notifyAll();
- }
- }
+ public synchronized IRubyObject run() {
+ notifyAll();
return this;
}
@@ -701,37 +688,29 @@ public IRubyObject run() {
* explicitly wakeup and we wait less than requested amount we will return false. We will
* return true if we sleep right amount or less than right amount via spurious wakeup.
*/
- public boolean sleep(long millis) throws InterruptedException {
+ public synchronized boolean sleep(long millis) throws InterruptedException {
assert this == getRuntime().getCurrentContext().getThread();
+ boolean result = true;
- synchronized (stopLock) {
+ synchronized (this) {
pollThreadEvents();
try {
- isStopped = true;
- stopLock.wait(millis);
+ status = Status.SLEEP;
+ wait(millis);
} finally {
- isStopped = false;
pollThreadEvents();
- if (isWoken) {
- isWoken = false;
- return false;
- }
+ status = Status.RUN;
}
}
- return true;
+ return result;
}
@JRubyMethod(name = "status")
- public IRubyObject status() {
+ public synchronized IRubyObject status() {
if (threadImpl.isAlive()) {
- if (isSleeping()) {
- return getRuntime().newString("sleep");
- } else if (killed) {
- return getRuntime().newString("aborting");
- }
-
- return getRuntime().newString("run");
+ // TODO: no java stringity
+ return getRuntime().newString(status.toString().toLowerCase());
} else if (exitingException != null) {
return getRuntime().getNil();
} else {
@@ -739,16 +718,12 @@ public IRubyObject status() {
}
}
- private boolean isSleeping() {
- return isStopped || (currentSelector != null && currentSelector.isOpen()) || blockingIO != null || currentWaitObject != null;
- }
-
public void enterSleep() {
- isStopped = true;
+ status = Status.SLEEP;
}
public void exitSleep() {
- isStopped = false;
+ status = Status.RUN;
}
@JRubyMethod(name = {"kill", "exit", "terminate"})
@@ -759,46 +734,24 @@ public IRubyObject kill() {
// If the killee thread is the same as the killer thread, just die
if (currentThread == this) throwThreadKill();
- try {
- if (DEBUG) System.out.println("thread " + Thread.currentThread() + " trying to kill");
- while (!(currentThread.lock.tryLock() && this.lock.tryLock())) {
- if (currentThread.lock.isHeldByCurrentThread()) currentThread.lock.unlock();
- }
-
- currentThread.pollThreadEvents();
+ if (DEBUG) System.out.println("thread " + Thread.currentThread() + " trying to kill");
- if (DEBUG) System.out.println("thread " + Thread.currentThread() + " succeeded with kill");
- killed = true;
-
- // If the target thread is sleeping or stopped, wake it
- synchronized (stopLock) {
- stopLock.notify();
- }
+ currentThread.pollThreadEvents();
- // interrupt the target thread in case it's blocking or waiting
- // WARNING: We no longer interrupt the target thread, since this usually means
- // interrupting IO and with NIO that means the channel is no longer usable.
- // We either need a new way to handle waking a target thread that's waiting
- // on IO, or we need to accept that we can't wake such threads and must wait
- // for them to complete their operation.
- //threadImpl.interrupt();
-
- // new interrupt, to hopefully wake it out of any blocking IO
- this.interrupt();
- } finally {
- if (currentThread.lock.isHeldByCurrentThread()) currentThread.lock.unlock();
- if (this.lock.isHeldByCurrentThread()) this.lock.unlock();
- }
+ getRuntime().getThreadService().deliverEvent(new ThreadService.Event(currentThread, this, ThreadService.Event.Type.KILL));
- try {
- threadImpl.join();
- } catch (InterruptedException ie) {
- // we were interrupted, check thread events again
- currentThread.pollThreadEvents();
- } catch (ExecutionException ie) {
- // we were interrupted, check thread events again
- currentThread.pollThreadEvents();
- }
+ if (DEBUG) System.out.println("thread " + Thread.currentThread() + " succeeded with kill");
+
+ // FIXME: is this still necessary?
+// try {
+// threadImpl.join();
+// } catch (InterruptedException ie) {
+// // we were interrupted, check thread events again
+// currentThread.pollThreadEvents();
+// } catch (ExecutionException ie) {
+// // we were interrupted, check thread events again
+// currentThread.pollThreadEvents();
+// }
return this;
}
@@ -870,6 +823,7 @@ public boolean select(RubyIO io, int ops) {
SelectionKey key = selectable.register(currentSelector, ops);
+ beforeBlockingCall();
int result = currentSelector.select();
// check for thread events, in case we've been woken up to die
@@ -887,6 +841,7 @@ public boolean select(RubyIO io, int ops) {
} catch (IOException ioe) {
throw io.getRuntime().newRuntimeError("Error with selector: " + ioe);
} finally {
+ afterBlockingCall();
if (currentSelector != null) {
try {
currentSelector.close();
@@ -955,22 +910,19 @@ public boolean waitForIO(ThreadContext context, RubyIO io, int ops) {
}
}
public void beforeBlockingCall() {
- isStopped = true;
+ enterSleep();
}
public void afterBlockingCall() {
- isStopped = false;
+ exitSleep();
}
- private void receivedAnException(ThreadContext context) {
- // clear this so we don't keep re-throwing
- IRubyObject raiseException = receivedException;
- receivedException = null;
+ private void receivedAnException(ThreadContext context, IRubyObject exception) {
RubyModule kernelModule = getRuntime().getKernel();
if (DEBUG) {
- System.out.println("thread " + Thread.currentThread() + " before propagating exception: " + killed);
+ System.out.println("thread " + Thread.currentThread() + " before propagating exception: " + status);
}
- kernelModule.callMethod(context, "raise", raiseException);
+ kernelModule.callMethod(context, "raise", exception);
}
public boolean wait_timeout(IRubyObject o, Double timeout) throws InterruptedException {
@@ -982,8 +934,11 @@ public boolean wait_timeout(IRubyObject o, Double timeout) throws InterruptedExc
int delay_ns_remainder = (int)( delay_ns % 1000000 );
try {
currentWaitObject = o;
+ status = Status.SLEEP;
o.wait(delay_ms, delay_ns_remainder);
} finally {
+ pollThreadEvents();
+ status = Status.RUN;
currentWaitObject = null;
}
}
@@ -992,8 +947,11 @@ public boolean wait_timeout(IRubyObject o, Double timeout) throws InterruptedExc
} else {
try {
currentWaitObject = o;
+ status = Status.SLEEP;
o.wait();
} finally {
+ pollThreadEvents();
+ status = Status.RUN;
currentWaitObject = null;
}
return true;
View
6 src/org/jruby/RubyThreadGroup.java
@@ -103,8 +103,10 @@ void addDirectly(RubyThread rubyThread) {
}
public synchronized void remove(RubyThread rubyThread) {
- rubyThread.setThreadGroup(null);
- rubyThreadList.remove(rubyThread);
+ synchronized (rubyThread) {
+ rubyThread.setThreadGroup(null);
+ rubyThreadList.remove(rubyThread);
+ }
}
@JRubyMethod(name = "enclose", frame = true)
View
7 src/org/jruby/internal/runtime/RubyNativeThread.java
@@ -48,7 +48,6 @@
private RubyProc proc;
private IRubyObject[] arguments;
private RubyThread rubyThread;
- private boolean setContextCC;
public RubyNativeThread(RubyThread rubyThread, IRubyObject[] args, Block currentBlock, boolean setContextCC) {
throw new RuntimeException("RubyNativeThread is deprecated; do not use it");
@@ -81,9 +80,9 @@ public void run() {
rubyThread.cleanTerminate(result);
} catch (ThreadKill tk) {
// notify any killer waiting on our thread that we're going bye-bye
- synchronized (rubyThread.killLock) {
- rubyThread.killLock.notifyAll();
- }
+// synchronized (rubyThread.killLock) {
+// rubyThread.killLock.notifyAll();
+// }
} catch (JumpException.ReturnJump rj) {
rubyThread.exceptionRaised(runtime.newThreadError("return can't jump across threads"));
} catch (RaiseException e) {
View
12 src/org/jruby/internal/runtime/RubyRunnable.java
@@ -91,9 +91,9 @@ public void run() {
rubyThread.cleanTerminate(result);
} catch (ThreadKill tk) {
// notify any killer waiting on our thread that we're going bye-bye
- synchronized (rubyThread.killLock) {
- rubyThread.killLock.notifyAll();
- }
+// synchronized (rubyThread.killLock) {
+// rubyThread.killLock.notifyAll();
+// }
} catch (JumpException.ReturnJump rj) {
rubyThread.exceptionRaised(runtime.newThreadError("return can't jump across threads"));
} catch (RaiseException e) {
@@ -102,13 +102,11 @@ public void run() {
// Someone called exit!, so we need to kill the main thread
runtime.getThreadService().getMainThread().kill();
} finally {
+ rubyThread.beDead();
runtime.getThreadService().setCritical(false);
runtime.getThreadService().unregisterThread(rubyThread);
- // synchronize on the RubyThread object for threadgroup updates
- synchronized (rubyThread) {
- ((RubyThreadGroup)rubyThread.group()).remove(rubyThread);
- }
+ ((RubyThreadGroup)rubyThread.group()).remove(rubyThread);
// restore context classloader, in case we're using a thread pool
try {
View
28 src/org/jruby/internal/runtime/ThreadService.java
@@ -42,6 +42,7 @@
import java.util.concurrent.Future;
import org.jruby.Ruby;
import org.jruby.RubyThread;
+import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.runtime.ThreadContext;
public class ThreadService {
@@ -212,4 +213,31 @@ public void setCritical(boolean critical) {
public boolean getCritical() {
return criticalLock.isHeldByCurrentThread();
}
+
+ public static class Event {
+ public enum Type { KILL, RAISE, WAKEUP }
+ public final RubyThread sender;
+ public final RubyThread target;
+ public final Type type;
+ public final IRubyObject exception;
+
+ public Event(RubyThread sender, RubyThread target, Type type) {
+ this(sender, target, type, null);
+ }
+
+ public Event(RubyThread sender, RubyThread target, Type type, IRubyObject exception) {
+ this.sender = sender;
+ this.target = target;
+ this.type = type;
+ this.exception = exception;
+ }
+ }
+
+ public synchronized void deliverEvent(Event event) {
+ // first, check if the sender has unreceived mail
+ event.sender.checkMail(getCurrentContext());
+
+ // then deliver mail to the target
+ event.target.receiveMail(event);
+ }
}
View
92 src/org/jruby/util/ShellLauncher.java
@@ -546,20 +546,24 @@ public int exitValue() {
@Override
public void destroy() {
try {
- child.destroy();
if (input != null) input.close();
if (inerr != null) inerr.close();
if (output != null) output.close();
if (inputChannel != null) inputChannel.close();
if (inerrChannel != null) inerrChannel.close();
if (outputChannel != null) outputChannel.close();
- if (inputPumper != null) inputPumper.quit();
- if (inerrPumper != null) inerrPumper.quit();
- if (outputPumper != null) outputPumper.quit();
+
+ // processes seem to have some peculiar locking sequences, so we
+ // need to ensure nobody is trying to close/destroy while we are
+ synchronized (this) {
+ if (inputPumper != null) synchronized(inputPumper) {inputPumper.quit();}
+ if (inerrPumper != null) synchronized(inerrPumper) {inerrPumper.quit();}
+ if (outputPumper != null) synchronized(outputPumper) {outputPumper.quit();}
+ child.destroy();
+ }
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
- child.destroy();
}
private void prepareInput(Process child) {
@@ -608,9 +612,9 @@ private void pumpInput(Process child, Ruby runtime) {
parentOutChannel = ((FileOutputStream) parentOut).getChannel();
}
if (childInChannel != null && parentOutChannel != null) {
- inputPumper = new ChannelPumper(childInChannel, parentOutChannel, Pumper.Slave.IN);
+ inputPumper = new ChannelPumper(childInChannel, parentOutChannel, Pumper.Slave.IN, this);
} else {
- inputPumper = new StreamPumper(childIn, parentOut, false, Pumper.Slave.IN);
+ inputPumper = new StreamPumper(childIn, parentOut, false, Pumper.Slave.IN, this);
}
inputPumper.start();
input = null;
@@ -630,36 +634,14 @@ private void pumpInerr(Process child, Ruby runtime) {
parentOutChannel = ((FileOutputStream) parentOut).getChannel();
}
if (childInChannel != null && parentOutChannel != null) {
- inerrPumper = new ChannelPumper(childInChannel, parentOutChannel, Pumper.Slave.IN);
+ inerrPumper = new ChannelPumper(childInChannel, parentOutChannel, Pumper.Slave.IN, this);
} else {
- inerrPumper = new StreamPumper(childIn, parentOut, false, Pumper.Slave.IN);
+ inerrPumper = new StreamPumper(childIn, parentOut, false, Pumper.Slave.IN, this);
}
inerrPumper.start();
inerr = null;
inerrChannel = null;
}
-
- private void pumpOutput(Process child, Ruby runtime) {
- // no write requested, hook up write to parent runtime's input
- OutputStream childOut = unwrapBufferedStream(child.getOutputStream());
- FileChannel childOutChannel = null;
- if (childOut instanceof FileOutputStream) {
- childOutChannel = ((FileOutputStream) childOut).getChannel();
- }
- InputStream parentIn = unwrapBufferedStream(runtime.getIn());
- FileChannel parentInChannel = null;
- if (parentIn instanceof FileInputStream) {
- parentInChannel = ((FileInputStream) parentIn).getChannel();
- }
- if (parentInChannel != null && childOutChannel != null) {
- outputPumper = new ChannelPumper(parentInChannel, childOutChannel, Pumper.Slave.OUT);
- } else {
- outputPumper = new StreamPumper(parentIn, childOut, false, Pumper.Slave.OUT);
- }
- outputPumper.start();
- output = null;
- outputChannel = null;
- }
}
public static Process run(Ruby runtime, IRubyObject[] rawArgs) throws IOException {
@@ -708,14 +690,16 @@ public static Process run(Ruby runtime, IRubyObject[] rawArgs) throws IOExceptio
private final OutputStream out;
private final boolean onlyIfAvailable;
private final Object waitLock = new Object();
+ private final Object sync;
private final Slave slave;
private volatile boolean quit;
- StreamPumper(InputStream in, OutputStream out, boolean avail, Slave slave) {
+ StreamPumper(InputStream in, OutputStream out, boolean avail, Slave slave, Object sync) {
this.in = in;
this.out = out;
this.onlyIfAvailable = avail;
this.slave = slave;
+ this.sync = sync;
setDaemon(true);
}
@Override
@@ -754,14 +738,16 @@ public void run() {
} catch (Exception e) {
} finally {
if (onlyIfAvailable) {
- // We need to close the out, since some
- // processes would just wait for the stream
- // to be closed before they process its content,
- // and produce the output. E.g.: "cat".
- if (slave == Slave.OUT) {
- // we only close out if it's the slave stream, to avoid
- // closing a directly-mapped stream from parent process
- try { out.close(); } catch (IOException ioe) {}
+ synchronized (sync) {
+ // We need to close the out, since some
+ // processes would just wait for the stream
+ // to be closed before they process its content,
+ // and produce the output. E.g.: "cat".
+ if (slave == Slave.OUT) {
+ // we only close out if it's the slave stream, to avoid
+ // closing a directly-mapped stream from parent process
+ try { out.close(); } catch (IOException ioe) {}
+ }
}
}
}
@@ -778,13 +764,15 @@ public void quit() {
private final FileChannel inChannel;
private final FileChannel outChannel;
private final Slave slave;
+ private final Object sync;
private volatile boolean quit;
- ChannelPumper(FileChannel inChannel, FileChannel outChannel, Slave slave) {
+ ChannelPumper(FileChannel inChannel, FileChannel outChannel, Slave slave, Object sync) {
if (DEBUG) out.println("using channel pumper");
this.inChannel = inChannel;
this.outChannel = outChannel;
this.slave = slave;
+ this.sync = sync;
setDaemon(true);
}
@Override
@@ -801,12 +789,16 @@ public void run() {
}
} catch (Exception e) {
} finally {
- switch (slave) {
- case OUT:
- try { outChannel.close(); } catch (IOException ioe) {}
- break;
- case IN:
- try { inChannel.close(); } catch (IOException ioe) {}
+ // processes seem to have some peculiar locking sequences, so we
+ // need to ensure nobody is trying to close/destroy while we are
+ synchronized (sync) {
+ switch (slave) {
+ case OUT:
+ try { outChannel.close(); } catch (IOException ioe) {}
+ break;
+ case IN:
+ try { inChannel.close(); } catch (IOException ioe) {}
+ }
}
}
}
@@ -821,13 +813,13 @@ private static void handleStreams(Process p, InputStream in, OutputStream out, O
InputStream pErr = p.getErrorStream();
OutputStream pIn = p.getOutputStream();
- StreamPumper t1 = new StreamPumper(pOut, out, false, Pumper.Slave.IN);
- StreamPumper t2 = new StreamPumper(pErr, err, false, Pumper.Slave.IN);
+ StreamPumper t1 = new StreamPumper(pOut, out, false, Pumper.Slave.IN, p);
+ StreamPumper t2 = new StreamPumper(pErr, err, false, Pumper.Slave.IN, p);
// The assumption here is that the 'in' stream provides
// proper available() support. If available() always
// returns 0, we'll hang!
- StreamPumper t3 = new StreamPumper(in, pIn, true, Pumper.Slave.OUT);
+ StreamPumper t3 = new StreamPumper(in, pIn, true, Pumper.Slave.OUT, p);
t1.start();
t2.start();
Please sign in to comment.
Something went wrong with that request. Please try again.