Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

First rework of thread events, with ThreadService acting as arbitrato…

…r and a substantial reduction of complexity. Still needs more work, especially wrt blocking IO.
  • Loading branch information...
commit c4c035bee9ca79f33cfe78e121cf535cb489308d 1 parent 97963ec
@headius headius authored
View
2  rubyspecs.revision
@@ -3,4 +3,4 @@
mspec.revision=cfe394e3b0e995733819235bdea693ea50d164a9
-rubyspecs.revision=cf58ae7a004ef14466419ccedaa3099abc07b27f
+rubyspecs.revision=7d14aa9e75e0781c4f1b25fd4460ff522a3e5722
View
1  spec/tags/ruby/core/thread/stop_tags.txt
@@ -1,2 +1,3 @@
fails(JRUBY-3459):Thread#stop? describes a killed thread
fails(JRUBY-3459):Thread#stop? describes a thread with an uncaught exception
+fails(JRUBY-3587):Thread#stop? reports aborting on a killed thread
View
280 src/org/jruby/RubyThread.java
@@ -55,7 +55,6 @@
import org.jruby.runtime.builtin.IRubyObject;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.locks.ReentrantLock;
import org.jruby.anno.JRubyMethod;
import org.jruby.anno.JRubyClass;
import org.jruby.runtime.ObjectMarshal;
@@ -81,25 +80,20 @@
private boolean abortOnException;
private IRubyObject finalResult;
private RaiseException exitingException;
- private IRubyObject receivedException;
private RubyThreadGroup threadGroup;
private final ThreadService threadService;
- private volatile boolean isStopped = false;
- private volatile boolean isDead = false;
- private volatile boolean isWoken = false;
- public final Object stopLock = new Object();
-
- private volatile boolean killed = false;
- public Object killLock = new Object();
-
- public final ReentrantLock lock = new ReentrantLock();
// Error info is per-thread
private IRubyObject errorInfo;
private static final boolean DEBUG = false;
+ public static enum Status { RUN, SLEEP, ABORTING, DEAD }
+
+ private volatile ThreadService.Event mail;
+ private volatile Status status = Status.RUN;
+
protected RubyThread(Ruby runtime, RubyClass type) {
super(runtime, type);
this.threadService = runtime.getThreadService();
@@ -109,6 +103,45 @@ protected RubyThread(Ruby runtime, RubyClass type) {
errorInfo = runtime.getNil();
}
+ public synchronized void receiveMail(ThreadService.Event event) {
+ // if we're already aborting, we can receive no further mail
+ if (status == Status.ABORTING) return;
+
+ mail = event;
+ switch (event.type) {
+ case KILL:
+ status = Status.ABORTING;
+ }
+
+ // If this thread is sleeping or stopped, wake it
+ notify();
+
+ // interrupt the target thread in case it's blocking or waiting
+ // WARNING: We no longer interrupt the target thread, since this usually means
+ // interrupting IO and with NIO that means the channel is no longer usable.
+ // We either need a new way to handle waking a target thread that's waiting
+ // on IO, or we need to accept that we can't wake such threads and must wait
+ // for them to complete their operation.
+ //threadImpl.interrupt();
+
+ // new interrupt, to hopefully wake it out of any blocking IO
+ this.interrupt();
+
+ }
+
+ public synchronized void checkMail(ThreadContext context) {
+ ThreadService.Event myEvent = mail;
+ mail = null;
+ if (myEvent != null) {
+ switch (myEvent.type) {
+ case RAISE:
+ receivedAnException(context, myEvent.exception);
+ case KILL:
+ throwThreadKill();
+ }
+ }
+ }
+
public IRubyObject getErrorInfo() {
return errorInfo;
}
@@ -207,7 +240,6 @@ public IRubyObject initialize(IRubyObject[] args, Block block) {
FutureThread futureThread = new FutureThread(this, runnable);
threadImpl = futureThread;
- // set to default thread group
runtime.getDefaultThreadGroup().addDirectly(this);
threadImpl.start();
@@ -254,8 +286,10 @@ private static RubyThread startThread(final IRubyObject recv, final IRubyObject[
public synchronized void cleanTerminate(IRubyObject result) {
finalResult = result;
- isStopped = true;
- isDead = true;
+ }
+
+ public synchronized void beDead() {
+ status = status.DEAD;
}
public void pollThreadEvents() {
@@ -263,11 +297,10 @@ public void pollThreadEvents() {
}
public void pollThreadEvents(ThreadContext context) {
- if (killed) throwThreadKill();
- if (receivedException != null) receivedAnException(context);
+ if (mail != null) checkMail(context);
}
- private void throwThreadKill() {
+ private static void throwThreadKill() {
throw new ThreadKill();
}
@@ -371,7 +404,7 @@ public IRubyObject abort_on_exception_set(IRubyObject val) {
@JRubyMethod(name = "alive?")
public RubyBoolean alive_p() {
- return !isDead && threadImpl.isAlive() ? getRuntime().getTrue() : getRuntime().getFalse();
+ return threadImpl.isAlive() ? getRuntime().getTrue() : getRuntime().getFalse();
}
@JRubyMethod(name = "join", optional = 1, backtrace = true)
@@ -400,8 +433,8 @@ public IRubyObject join(IRubyObject[] args) {
try {
if (threadService.getCritical()) {
// If the target thread is sleeping or stopped, wake it
- synchronized (stopLock) {
- stopLock.notify();
+ synchronized (this) {
+ notify();
}
// interrupt the target thread in case it's blocking or waiting
@@ -471,25 +504,13 @@ void setThreadGroup(RubyThreadGroup rubyThreadGroup) {
@JRubyMethod(name = "inspect")
@Override
- public IRubyObject inspect() {
+ public synchronized IRubyObject inspect() {
// FIXME: There's some code duplication here with RubyObject#inspect
StringBuilder part = new StringBuilder();
String cname = getMetaClass().getRealClass().getName();
part.append("#<").append(cname).append(":0x");
part.append(Integer.toHexString(System.identityHashCode(this)));
-
- if (threadImpl.isAlive()) {
- if (isStopped) {
- part.append(getRuntime().newString(" sleep"));
- } else if (killed) {
- part.append(getRuntime().newString(" aborting"));
- } else {
- part.append(getRuntime().newString(" run"));
- }
- } else {
- part.append(" dead");
- }
-
+ part.append(status.toString().toLowerCase());
part.append(">");
return getRuntime().newString(part.toString());
}
@@ -521,23 +542,20 @@ public static IRubyObject critical(IRubyObject receiver) {
}
@JRubyMethod(name = "stop", meta = true)
- public static IRubyObject stop(IRubyObject receiver) {
- RubyThread rubyThread = receiver.getRuntime().getThreadService().getCurrentContext().getThread();
- Object stopLock = rubyThread.stopLock;
+ public static IRubyObject stop(ThreadContext context, IRubyObject receiver) {
+ RubyThread rubyThread = context.getThread();
- synchronized (stopLock) {
- rubyThread.pollThreadEvents();
+ synchronized (rubyThread) {
+ rubyThread.checkMail(context);
try {
// attempt to decriticalize all if we're the critical thread
receiver.getRuntime().getThreadService().setCritical(false);
-
- rubyThread.isStopped = true;
-
- stopLock.wait();
+
+ rubyThread.status = Status.SLEEP;
+ rubyThread.wait();
} catch (InterruptedException ie) {
- rubyThread.pollThreadEvents();
- } finally {
- rubyThread.isStopped = false;
+ rubyThread.checkMail(context);
+ rubyThread.status = Status.RUN;
}
}
@@ -553,26 +571,24 @@ public static IRubyObject kill(IRubyObject receiver, IRubyObject rubyThread, Blo
@JRubyMethod(name = "exit", frame = true, meta = true)
public static IRubyObject s_exit(IRubyObject receiver, Block block) {
RubyThread rubyThread = receiver.getRuntime().getThreadService().getCurrentContext().getThread();
-
- rubyThread.killed = true;
- // attempt to decriticalize all if we're the critical thread
- receiver.getRuntime().getThreadService().setCritical(false);
-
- throw new ThreadKill();
+
+ synchronized (rubyThread) {
+ rubyThread.status = Status.ABORTING;
+ rubyThread.mail = null;
+ receiver.getRuntime().getThreadService().setCritical(false);
+ throw new ThreadKill();
+ }
}
@JRubyMethod(name = "stop?")
public RubyBoolean stop_p() {
// not valid for "dead" state
- return getRuntime().newBoolean(isStopped);
+ return getRuntime().newBoolean(status == Status.SLEEP || status == Status.DEAD);
}
@JRubyMethod(name = "wakeup")
- public RubyThread wakeup() {
- synchronized (stopLock) {
- isWoken = true;
- stopLock.notifyAll();
- }
+ public synchronized RubyThread wakeup() {
+ notifyAll();
return this;
}
@@ -611,34 +627,11 @@ public IRubyObject raise(IRubyObject[] args, Block block) {
if (DEBUG) System.out.println("thread " + Thread.currentThread() + " before raising");
RubyThread currentThread = getRuntime().getCurrentContext().getThread();
- try {
- while (!(currentThread.lock.tryLock() && this.lock.tryLock())) {
- if (currentThread.lock.isHeldByCurrentThread()) currentThread.lock.unlock();
- }
- currentThread.pollThreadEvents();
- if (DEBUG) System.out.println("thread " + Thread.currentThread() + " raising");
- receivedException = prepareRaiseException(runtime, args, block);
-
- // If the target thread is sleeping or stopped, wake it
- synchronized (stopLock) {
- stopLock.notify();
- }
+ if (DEBUG) System.out.println("thread " + Thread.currentThread() + " raising");
+ IRubyObject exception = prepareRaiseException(runtime, args, block);
- // interrupt the target thread in case it's blocking or waiting
- // WARNING: We no longer interrupt the target thread, since this usually means
- // interrupting IO and with NIO that means the channel is no longer usable.
- // We either need a new way to handle waking a target thread that's waiting
- // on IO, or we need to accept that we can't wake such threads and must wait
- // for them to complete their operation.
- //threadImpl.interrupt();
-
- // new interrupt, to hopefully wake it out of any blocking IO
- this.interrupt();
- } finally {
- if (currentThread.lock.isHeldByCurrentThread()) currentThread.lock.unlock();
- if (this.lock.isHeldByCurrentThread()) this.lock.unlock();
- }
+ runtime.getThreadService().deliverEvent(new ThreadService.Event(currentThread, this, ThreadService.Event.Type.RAISE, exception));
return this;
}
@@ -684,14 +677,8 @@ private IRubyObject prepareRaiseException(Ruby runtime, IRubyObject[] args, Bloc
}
@JRubyMethod(name = "run")
- public IRubyObject run() {
- // if stopped, unstop
- synchronized (stopLock) {
- if (isStopped) {
- isStopped = false;
- stopLock.notifyAll();
- }
- }
+ public synchronized IRubyObject run() {
+ notifyAll();
return this;
}
@@ -701,37 +688,29 @@ public IRubyObject run() {
* explicitly wakeup and we wait less than requested amount we will return false. We will
* return true if we sleep right amount or less than right amount via spurious wakeup.
*/
- public boolean sleep(long millis) throws InterruptedException {
+ public synchronized boolean sleep(long millis) throws InterruptedException {
assert this == getRuntime().getCurrentContext().getThread();
+ boolean result = true;
- synchronized (stopLock) {
+ synchronized (this) {
pollThreadEvents();
try {
- isStopped = true;
- stopLock.wait(millis);
+ status = Status.SLEEP;
+ wait(millis);
} finally {
- isStopped = false;
pollThreadEvents();
- if (isWoken) {
- isWoken = false;
- return false;
- }
+ status = Status.RUN;
}
}
- return true;
+ return result;
}
@JRubyMethod(name = "status")
- public IRubyObject status() {
+ public synchronized IRubyObject status() {
if (threadImpl.isAlive()) {
- if (isSleeping()) {
- return getRuntime().newString("sleep");
- } else if (killed) {
- return getRuntime().newString("aborting");
- }
-
- return getRuntime().newString("run");
+ // TODO: no java stringity
+ return getRuntime().newString(status.toString().toLowerCase());
} else if (exitingException != null) {
return getRuntime().getNil();
} else {
@@ -739,16 +718,12 @@ public IRubyObject status() {
}
}
- private boolean isSleeping() {
- return isStopped || (currentSelector != null && currentSelector.isOpen()) || blockingIO != null || currentWaitObject != null;
- }
-
public void enterSleep() {
- isStopped = true;
+ status = Status.SLEEP;
}
public void exitSleep() {
- isStopped = false;
+ status = Status.RUN;
}
@JRubyMethod(name = {"kill", "exit", "terminate"})
@@ -759,46 +734,24 @@ public IRubyObject kill() {
// If the killee thread is the same as the killer thread, just die
if (currentThread == this) throwThreadKill();
- try {
- if (DEBUG) System.out.println("thread " + Thread.currentThread() + " trying to kill");
- while (!(currentThread.lock.tryLock() && this.lock.tryLock())) {
- if (currentThread.lock.isHeldByCurrentThread()) currentThread.lock.unlock();
- }
-
- currentThread.pollThreadEvents();
+ if (DEBUG) System.out.println("thread " + Thread.currentThread() + " trying to kill");
- if (DEBUG) System.out.println("thread " + Thread.currentThread() + " succeeded with kill");
- killed = true;
-
- // If the target thread is sleeping or stopped, wake it
- synchronized (stopLock) {
- stopLock.notify();
- }
+ currentThread.pollThreadEvents();
- // interrupt the target thread in case it's blocking or waiting
- // WARNING: We no longer interrupt the target thread, since this usually means
- // interrupting IO and with NIO that means the channel is no longer usable.
- // We either need a new way to handle waking a target thread that's waiting
- // on IO, or we need to accept that we can't wake such threads and must wait
- // for them to complete their operation.
- //threadImpl.interrupt();
-
- // new interrupt, to hopefully wake it out of any blocking IO
- this.interrupt();
- } finally {
- if (currentThread.lock.isHeldByCurrentThread()) currentThread.lock.unlock();
- if (this.lock.isHeldByCurrentThread()) this.lock.unlock();
- }
+ getRuntime().getThreadService().deliverEvent(new ThreadService.Event(currentThread, this, ThreadService.Event.Type.KILL));
- try {
- threadImpl.join();
- } catch (InterruptedException ie) {
- // we were interrupted, check thread events again
- currentThread.pollThreadEvents();
- } catch (ExecutionException ie) {
- // we were interrupted, check thread events again
- currentThread.pollThreadEvents();
- }
+ if (DEBUG) System.out.println("thread " + Thread.currentThread() + " succeeded with kill");
+
+ // FIXME: is this still necessary?
+// try {
+// threadImpl.join();
+// } catch (InterruptedException ie) {
+// // we were interrupted, check thread events again
+// currentThread.pollThreadEvents();
+// } catch (ExecutionException ie) {
+// // we were interrupted, check thread events again
+// currentThread.pollThreadEvents();
+// }
return this;
}
@@ -870,6 +823,7 @@ public boolean select(RubyIO io, int ops) {
SelectionKey key = selectable.register(currentSelector, ops);
+ beforeBlockingCall();
int result = currentSelector.select();
// check for thread events, in case we've been woken up to die
@@ -887,6 +841,7 @@ public boolean select(RubyIO io, int ops) {
} catch (IOException ioe) {
throw io.getRuntime().newRuntimeError("Error with selector: " + ioe);
} finally {
+ afterBlockingCall();
if (currentSelector != null) {
try {
currentSelector.close();
@@ -955,22 +910,19 @@ public boolean waitForIO(ThreadContext context, RubyIO io, int ops) {
}
}
public void beforeBlockingCall() {
- isStopped = true;
+ enterSleep();
}
public void afterBlockingCall() {
- isStopped = false;
+ exitSleep();
}
- private void receivedAnException(ThreadContext context) {
- // clear this so we don't keep re-throwing
- IRubyObject raiseException = receivedException;
- receivedException = null;
+ private void receivedAnException(ThreadContext context, IRubyObject exception) {
RubyModule kernelModule = getRuntime().getKernel();
if (DEBUG) {
- System.out.println("thread " + Thread.currentThread() + " before propagating exception: " + killed);
+ System.out.println("thread " + Thread.currentThread() + " before propagating exception: " + status);
}
- kernelModule.callMethod(context, "raise", raiseException);
+ kernelModule.callMethod(context, "raise", exception);
}
public boolean wait_timeout(IRubyObject o, Double timeout) throws InterruptedException {
@@ -982,8 +934,11 @@ public boolean wait_timeout(IRubyObject o, Double timeout) throws InterruptedExc
int delay_ns_remainder = (int)( delay_ns % 1000000 );
try {
currentWaitObject = o;
+ status = Status.SLEEP;
o.wait(delay_ms, delay_ns_remainder);
} finally {
+ pollThreadEvents();
+ status = Status.RUN;
currentWaitObject = null;
}
}
@@ -992,8 +947,11 @@ public boolean wait_timeout(IRubyObject o, Double timeout) throws InterruptedExc
} else {
try {
currentWaitObject = o;
+ status = Status.SLEEP;
o.wait();
} finally {
+ pollThreadEvents();
+ status = Status.RUN;
currentWaitObject = null;
}
return true;
View
6 src/org/jruby/RubyThreadGroup.java
@@ -103,8 +103,10 @@ void addDirectly(RubyThread rubyThread) {
}
public synchronized void remove(RubyThread rubyThread) {
- rubyThread.setThreadGroup(null);
- rubyThreadList.remove(rubyThread);
+ synchronized (rubyThread) {
+ rubyThread.setThreadGroup(null);
+ rubyThreadList.remove(rubyThread);
+ }
}
@JRubyMethod(name = "enclose", frame = true)
View
7 src/org/jruby/internal/runtime/RubyNativeThread.java
@@ -48,7 +48,6 @@
private RubyProc proc;
private IRubyObject[] arguments;
private RubyThread rubyThread;
- private boolean setContextCC;
public RubyNativeThread(RubyThread rubyThread, IRubyObject[] args, Block currentBlock, boolean setContextCC) {
throw new RuntimeException("RubyNativeThread is deprecated; do not use it");
@@ -81,9 +80,9 @@ public void run() {
rubyThread.cleanTerminate(result);
} catch (ThreadKill tk) {
// notify any killer waiting on our thread that we're going bye-bye
- synchronized (rubyThread.killLock) {
- rubyThread.killLock.notifyAll();
- }
+// synchronized (rubyThread.killLock) {
+// rubyThread.killLock.notifyAll();
+// }
} catch (JumpException.ReturnJump rj) {
rubyThread.exceptionRaised(runtime.newThreadError("return can't jump across threads"));
} catch (RaiseException e) {
View
12 src/org/jruby/internal/runtime/RubyRunnable.java
@@ -91,9 +91,9 @@ public void run() {
rubyThread.cleanTerminate(result);
} catch (ThreadKill tk) {
// notify any killer waiting on our thread that we're going bye-bye
- synchronized (rubyThread.killLock) {
- rubyThread.killLock.notifyAll();
- }
+// synchronized (rubyThread.killLock) {
+// rubyThread.killLock.notifyAll();
+// }
} catch (JumpException.ReturnJump rj) {
rubyThread.exceptionRaised(runtime.newThreadError("return can't jump across threads"));
} catch (RaiseException e) {
@@ -102,13 +102,11 @@ public void run() {
// Someone called exit!, so we need to kill the main thread
runtime.getThreadService().getMainThread().kill();
} finally {
+ rubyThread.beDead();
runtime.getThreadService().setCritical(false);
runtime.getThreadService().unregisterThread(rubyThread);
- // synchronize on the RubyThread object for threadgroup updates
- synchronized (rubyThread) {
- ((RubyThreadGroup)rubyThread.group()).remove(rubyThread);
- }
+ ((RubyThreadGroup)rubyThread.group()).remove(rubyThread);
// restore context classloader, in case we're using a thread pool
try {
View
28 src/org/jruby/internal/runtime/ThreadService.java
@@ -42,6 +42,7 @@
import java.util.concurrent.Future;
import org.jruby.Ruby;
import org.jruby.RubyThread;
+import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.runtime.ThreadContext;
public class ThreadService {
@@ -212,4 +213,31 @@ public void setCritical(boolean critical) {
public boolean getCritical() {
return criticalLock.isHeldByCurrentThread();
}
+
+ public static class Event {
+ public enum Type { KILL, RAISE, WAKEUP }
+ public final RubyThread sender;
+ public final RubyThread target;
+ public final Type type;
+ public final IRubyObject exception;
+
+ public Event(RubyThread sender, RubyThread target, Type type) {
+ this(sender, target, type, null);
+ }
+
+ public Event(RubyThread sender, RubyThread target, Type type, IRubyObject exception) {
+ this.sender = sender;
+ this.target = target;
+ this.type = type;
+ this.exception = exception;
+ }
+ }
+
+ public synchronized void deliverEvent(Event event) {
+ // first, check if the sender has unreceived mail
+ event.sender.checkMail(getCurrentContext());
+
+ // then deliver mail to the target
+ event.target.receiveMail(event);
+ }
}
View
92 src/org/jruby/util/ShellLauncher.java
@@ -546,20 +546,24 @@ public int exitValue() {
@Override
public void destroy() {
try {
- child.destroy();
if (input != null) input.close();
if (inerr != null) inerr.close();
if (output != null) output.close();
if (inputChannel != null) inputChannel.close();
if (inerrChannel != null) inerrChannel.close();
if (outputChannel != null) outputChannel.close();
- if (inputPumper != null) inputPumper.quit();
- if (inerrPumper != null) inerrPumper.quit();
- if (outputPumper != null) outputPumper.quit();
+
+ // processes seem to have some peculiar locking sequences, so we
+ // need to ensure nobody is trying to close/destroy while we are
+ synchronized (this) {
+ if (inputPumper != null) synchronized(inputPumper) {inputPumper.quit();}
+ if (inerrPumper != null) synchronized(inerrPumper) {inerrPumper.quit();}
+ if (outputPumper != null) synchronized(outputPumper) {outputPumper.quit();}
+ child.destroy();
+ }
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
- child.destroy();
}
private void prepareInput(Process child) {
@@ -608,9 +612,9 @@ private void pumpInput(Process child, Ruby runtime) {
parentOutChannel = ((FileOutputStream) parentOut).getChannel();
}
if (childInChannel != null && parentOutChannel != null) {
- inputPumper = new ChannelPumper(childInChannel, parentOutChannel, Pumper.Slave.IN);
+ inputPumper = new ChannelPumper(childInChannel, parentOutChannel, Pumper.Slave.IN, this);
} else {
- inputPumper = new StreamPumper(childIn, parentOut, false, Pumper.Slave.IN);
+ inputPumper = new StreamPumper(childIn, parentOut, false, Pumper.Slave.IN, this);
}
inputPumper.start();
input = null;
@@ -630,36 +634,14 @@ private void pumpInerr(Process child, Ruby runtime) {
parentOutChannel = ((FileOutputStream) parentOut).getChannel();
}
if (childInChannel != null && parentOutChannel != null) {
- inerrPumper = new ChannelPumper(childInChannel, parentOutChannel, Pumper.Slave.IN);
+ inerrPumper = new ChannelPumper(childInChannel, parentOutChannel, Pumper.Slave.IN, this);
} else {
- inerrPumper = new StreamPumper(childIn, parentOut, false, Pumper.Slave.IN);
+ inerrPumper = new StreamPumper(childIn, parentOut, false, Pumper.Slave.IN, this);
}
inerrPumper.start();
inerr = null;
inerrChannel = null;
}
-
- private void pumpOutput(Process child, Ruby runtime) {
- // no write requested, hook up write to parent runtime's input
- OutputStream childOut = unwrapBufferedStream(child.getOutputStream());
- FileChannel childOutChannel = null;
- if (childOut instanceof FileOutputStream) {
- childOutChannel = ((FileOutputStream) childOut).getChannel();
- }
- InputStream parentIn = unwrapBufferedStream(runtime.getIn());
- FileChannel parentInChannel = null;
- if (parentIn instanceof FileInputStream) {
- parentInChannel = ((FileInputStream) parentIn).getChannel();
- }
- if (parentInChannel != null && childOutChannel != null) {
- outputPumper = new ChannelPumper(parentInChannel, childOutChannel, Pumper.Slave.OUT);
- } else {
- outputPumper = new StreamPumper(parentIn, childOut, false, Pumper.Slave.OUT);
- }
- outputPumper.start();
- output = null;
- outputChannel = null;
- }
}
public static Process run(Ruby runtime, IRubyObject[] rawArgs) throws IOException {
@@ -708,14 +690,16 @@ public static Process run(Ruby runtime, IRubyObject[] rawArgs) throws IOExceptio
private final OutputStream out;
private final boolean onlyIfAvailable;
private final Object waitLock = new Object();
+ private final Object sync;
private final Slave slave;
private volatile boolean quit;
- StreamPumper(InputStream in, OutputStream out, boolean avail, Slave slave) {
+ StreamPumper(InputStream in, OutputStream out, boolean avail, Slave slave, Object sync) {
this.in = in;
this.out = out;
this.onlyIfAvailable = avail;
this.slave = slave;
+ this.sync = sync;
setDaemon(true);
}
@Override
@@ -754,14 +738,16 @@ public void run() {
} catch (Exception e) {
} finally {
if (onlyIfAvailable) {
- // We need to close the out, since some
- // processes would just wait for the stream
- // to be closed before they process its content,
- // and produce the output. E.g.: "cat".
- if (slave == Slave.OUT) {
- // we only close out if it's the slave stream, to avoid
- // closing a directly-mapped stream from parent process
- try { out.close(); } catch (IOException ioe) {}
+ synchronized (sync) {
+ // We need to close the out, since some
+ // processes would just wait for the stream
+ // to be closed before they process its content,
+ // and produce the output. E.g.: "cat".
+ if (slave == Slave.OUT) {
+ // we only close out if it's the slave stream, to avoid
+ // closing a directly-mapped stream from parent process
+ try { out.close(); } catch (IOException ioe) {}
+ }
}
}
}
@@ -778,13 +764,15 @@ public void quit() {
private final FileChannel inChannel;
private final FileChannel outChannel;
private final Slave slave;
+ private final Object sync;
private volatile boolean quit;
- ChannelPumper(FileChannel inChannel, FileChannel outChannel, Slave slave) {
+ ChannelPumper(FileChannel inChannel, FileChannel outChannel, Slave slave, Object sync) {
if (DEBUG) out.println("using channel pumper");
this.inChannel = inChannel;
this.outChannel = outChannel;
this.slave = slave;
+ this.sync = sync;
setDaemon(true);
}
@Override
@@ -801,12 +789,16 @@ public void run() {
}
} catch (Exception e) {
} finally {
- switch (slave) {
- case OUT:
- try { outChannel.close(); } catch (IOException ioe) {}
- break;
- case IN:
- try { inChannel.close(); } catch (IOException ioe) {}
+ // processes seem to have some peculiar locking sequences, so we
+ // need to ensure nobody is trying to close/destroy while we are
+ synchronized (sync) {
+ switch (slave) {
+ case OUT:
+ try { outChannel.close(); } catch (IOException ioe) {}
+ break;
+ case IN:
+ try { inChannel.close(); } catch (IOException ioe) {}
+ }
}
}
}
@@ -821,13 +813,13 @@ private static void handleStreams(Process p, InputStream in, OutputStream out, O
InputStream pErr = p.getErrorStream();
OutputStream pIn = p.getOutputStream();
- StreamPumper t1 = new StreamPumper(pOut, out, false, Pumper.Slave.IN);
- StreamPumper t2 = new StreamPumper(pErr, err, false, Pumper.Slave.IN);
+ StreamPumper t1 = new StreamPumper(pOut, out, false, Pumper.Slave.IN, p);
+ StreamPumper t2 = new StreamPumper(pErr, err, false, Pumper.Slave.IN, p);
// The assumption here is that the 'in' stream provides
// proper available() support. If available() always
// returns 0, we'll hang!
- StreamPumper t3 = new StreamPumper(in, pIn, true, Pumper.Slave.OUT);
+ StreamPumper t3 = new StreamPumper(in, pIn, true, Pumper.Slave.OUT, p);
t1.start();
t2.start();
Please sign in to comment.
Something went wrong with that request. Please try again.