Skip to content

Commit

Permalink
[Truffle] Rewrite a good part of RubyFiber and RubyThread.
Browse files Browse the repository at this point in the history
* Now a Thread has multipe Fibers and always has a root Fiber.
* Properly kill Fibers when a thread exits.
* Share all the logic.
  • Loading branch information
eregon committed Apr 22, 2015
1 parent d5f5320 commit 5ccc678
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 89 deletions.
Expand Up @@ -23,6 +23,7 @@
import org.jruby.truffle.runtime.core.RubyFiber;
import org.jruby.truffle.runtime.core.RubyNilClass;
import org.jruby.truffle.runtime.core.RubyProc;
import org.jruby.truffle.runtime.core.RubyThread;

@CoreClass(name = "Fiber")
public abstract class FiberNodes {
Expand Down Expand Up @@ -57,12 +58,13 @@ protected Object transfer(RubyFiber fiber, boolean isYield, Object[] args) {
throw new RaiseException(getContext().getCoreLibrary().deadFiberCalledError(this));
}

if (fiber.getRubyThread() != getContext().getThreadManager().getCurrentThread()) {
RubyThread currentThread = getContext().getThreadManager().getCurrentThread();
if (fiber.getRubyThread() != currentThread) {
CompilerDirectives.transferToInterpreter();
throw new RaiseException(getContext().getCoreLibrary().fiberError("fiber called across threads", this));
}

final RubyFiber sendingFiber = getContext().getFiberManager().getCurrentFiber();
final RubyFiber sendingFiber = currentThread.getFiberManager().getCurrentFiber();

return singleValue(sendingFiber.transferControlTo(fiber, isYield, args));
}
Expand Down Expand Up @@ -129,10 +131,11 @@ public YieldNode(YieldNode prev) {

@Specialization
public Object yield(Object[] args) {
final RubyFiber yieldingFiber = getContext().getFiberManager().getCurrentFiber();
RubyThread currentThread = getContext().getThreadManager().getCurrentThread();
final RubyFiber yieldingFiber = currentThread.getFiberManager().getCurrentFiber();
final RubyFiber fiberYieldedTo = yieldingFiber.getLastResumedByFiber();

if (yieldingFiber.isTopLevel() || fiberYieldedTo == null) {
if (yieldingFiber.isRootFiber() || fiberYieldedTo == null) {
throw new RaiseException(getContext().getCoreLibrary().yieldFromRootFiberError(this));
}

Expand Down
Expand Up @@ -80,7 +80,7 @@ public ExitModuleNode(ExitModuleNode prev) {

@Specialization
public RubyNilClass exit() {
getContext().getThreadManager().getCurrentThread().exit();
getContext().getThreadManager().getCurrentThread().shutdown();
return nil();
}

Expand All @@ -100,14 +100,12 @@ public KillNode(KillNode prev) {
@Specialization
public RubyThread kill(final RubyThread thread) {
getContext().getSafepointManager().pauseAllThreadsAndExecute(this, new SafepointAction() {

@Override
public void run(RubyThread currentThread, Node currentNode) {
if (currentThread == thread) {
currentThread.exit();
if (currentThread == thread && thread.isCurrentJavaThreadRootFiber()) {
thread.shutdown();
}
}

});

return thread;
Expand Down Expand Up @@ -266,7 +264,7 @@ public RubyNilClass raise(VirtualFrame frame, final RubyThread thread, RubyClass

@Override
public void run(RubyThread currentThread, Node currentNode) {
if (currentThread == thread) {
if (currentThread == thread && thread.isCurrentJavaThreadCurrentFiber()) {
throw exceptionWrapper;
}
}
Expand Down
12 changes: 2 additions & 10 deletions truffle/src/main/java/org/jruby/truffle/runtime/RubyContext.java
Expand Up @@ -20,6 +20,7 @@

import jnr.posix.POSIX;
import jnr.posix.POSIXFactory;

import org.jcodings.Encoding;
import org.jcodings.specific.ASCIIEncoding;
import org.jcodings.specific.UTF8Encoding;
Expand Down Expand Up @@ -64,7 +65,6 @@ public class RubyContext extends ExecutionContext {
private final TraceManager traceManager;
private final ObjectSpaceManager objectSpaceManager;
private final ThreadManager threadManager;
private final FiberManager fiberManager;
private final AtExitManager atExitManager;
private final RubySymbol.SymbolTable symbolTable = new RubySymbol.SymbolTable(this);
private final Shape emptyShape;
Expand Down Expand Up @@ -118,10 +118,8 @@ public RubyContext(Ruby runtime) {
traceManager = new TraceManager();
atExitManager = new AtExitManager();

// Must initialize threads before fibers

threadManager = new ThreadManager(this);
fiberManager = new FiberManager(this);
threadManager.initialize();

rubiniusPrimitiveManager = RubiniusPrimitiveManager.create();

Expand Down Expand Up @@ -280,8 +278,6 @@ public void shutdown() {
instrumentationServerManager.shutdown();
}

fiberManager.shutdown();

threadManager.shutdown();
}

Expand Down Expand Up @@ -469,10 +465,6 @@ public ObjectSpaceManager getObjectSpaceManager() {
return objectSpaceManager;
}

public FiberManager getFiberManager() {
return fiberManager;
}

public ThreadManager getThreadManager() {
return threadManager;
}
Expand Down
105 changes: 71 additions & 34 deletions truffle/src/main/java/org/jruby/truffle/runtime/core/RubyFiber.java
Expand Up @@ -25,6 +25,8 @@

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;

/**
* Represents the Ruby {@code Fiber} class. The current implementation uses Java threads and message
Expand Down Expand Up @@ -80,67 +82,100 @@ public RubyException getException() {
}

public static class FiberExitException extends ControlFlowException {

private static final long serialVersionUID = 1522270454305076317L;

}

private final FiberManager fiberManager;
private final ThreadManager threadManager;
private final RubyThread rubyThread;

private String name;
private final boolean topLevel;
private final BlockingQueue<FiberMessage> messageQueue = new ArrayBlockingQueue<>(1);
private final boolean isRootFiber;
// we need 2 slots when the safepoint manager sends the kill message and there is another message unprocessed
private final BlockingQueue<FiberMessage> messageQueue = new LinkedBlockingQueue<>(2);
private RubyFiber lastResumedByFiber = null;
private boolean alive = true;

public RubyFiber(RubyClass rubyClass, FiberManager fiberManager, ThreadManager threadManager, String name, boolean topLevel) {
protected volatile Thread thread;

public RubyFiber(RubyThread parent, RubyClass rubyClass, String name) {
this(parent, parent.getFiberManager(), parent.getThreadManager(), rubyClass, name, false);
}

public static RubyFiber newRootFiber(RubyThread thread, FiberManager fiberManager, ThreadManager threadManager) {
RubyContext context = thread.getContext();
return new RubyFiber(thread, fiberManager, threadManager, context.getCoreLibrary().getFiberClass(), "root Fiber for Thread", true);
}

private RubyFiber(RubyThread parent, FiberManager fiberManager, ThreadManager threadManager, RubyClass rubyClass, String name, boolean isRootFiber) {
super(rubyClass);
this.rubyThread = parent;
this.fiberManager = fiberManager;
this.threadManager = threadManager;
this.name = name;
this.topLevel = topLevel;
this.rubyThread = threadManager.getCurrentThread();
this.isRootFiber = isRootFiber;
}

public void initialize(RubyProc block) {
public void initialize(final RubyProc block) {
RubyNode.notDesignedForCompilation();

name = "Ruby Fiber@" + block.getSharedMethodInfo().getSourceSection().getShortDescription();

final RubyFiber finalFiber = this;
final RubyProc finalBlock = block;

final Thread thread = new Thread(new Runnable() {

@Override
public void run() {
fiberManager.registerFiber(finalFiber);
finalFiber.getContext().getSafepointManager().enterThread();
threadManager.enterGlobalLock(rubyThread);
handleFiberExceptions(block);
}
});
thread.setName(name);
thread.start();
}

private void handleFiberExceptions(final RubyProc block) {
run(new Runnable() {
@Override
public void run() {
try {
final Object[] args = finalFiber.waitForResume();
final Object result = finalBlock.rootCall(args);
finalFiber.resume(finalFiber.lastResumedByFiber, true, result);
} catch (FiberExitException | ThreadExitException e) { // TODO (eregon, 21 Apr. 2015): The thread should cleanly kill its fibers when dying.
// Naturally exit the thread on catching this
final Object[] args = waitForResume();
final Object result = block.rootCall(args);
resume(lastResumedByFiber, true, result);
} catch (FiberExitException e) {
assert !isRootFiber;
// Naturally exit the Java thread on catching this
} catch (ReturnException e) {
sendMessageTo(finalFiber.lastResumedByFiber, new FiberExceptionMessage(finalFiber.getContext().getCoreLibrary().unexpectedReturn(null)));
sendMessageTo(lastResumedByFiber, new FiberExceptionMessage(getContext().getCoreLibrary().unexpectedReturn(null)));
} catch (RaiseException e) {
sendMessageTo(finalFiber.lastResumedByFiber, new FiberExceptionMessage(e.getRubyException()));
} finally {
alive = false;
threadManager.leaveGlobalLock();
finalFiber.getContext().getSafepointManager().leaveThread();
fiberManager.unregisterFiber(finalFiber);
sendMessageTo(lastResumedByFiber, new FiberExceptionMessage(e.getRubyException()));
}
}

});
thread.setName(name);
thread.start();
}

protected void run(final Runnable task) {
RubyNode.notDesignedForCompilation();

start();
try {
task.run();
} finally {
cleanup();
}
}

// Only used by the main thread which cannot easily wrap everything inside a try/finally.
public void start() {
thread = Thread.currentThread();
fiberManager.registerFiber(this);
getContext().getSafepointManager().enterThread();
threadManager.enterGlobalLock(rubyThread);
}

// Only used by the main thread which cannot easily wrap everything inside a try/finally.
public void cleanup() {
alive = false;
threadManager.leaveGlobalLock();
getContext().getSafepointManager().leaveThread();
fiberManager.unregisterFiber(this);
thread = null;
}

public RubyThread getRubyThread() {
Expand Down Expand Up @@ -201,6 +236,7 @@ public Object[] transferControlTo(RubyFiber fiber, boolean yield, Object[] args)
}

public void shutdown() {
assert !isRootFiber;
RubyNode.notDesignedForCompilation();

sendMessageTo(this, new FiberExitMessage());
Expand All @@ -216,8 +252,8 @@ public RubyFiber getLastResumedByFiber() {
return lastResumedByFiber;
}

public boolean isTopLevel() {
return topLevel;
public boolean isRootFiber() {
return isRootFiber;
}

public String getName() {
Expand All @@ -228,7 +264,8 @@ public static class FiberAllocator implements Allocator {

@Override
public RubyBasicObject allocate(RubyContext context, RubyClass rubyClass, Node currentNode) {
return new RubyFiber(rubyClass, context.getFiberManager(), context.getThreadManager(), null, false);
RubyThread parent = context.getThreadManager().getCurrentThread();
return new RubyFiber(parent, rubyClass, null);
}

}
Expand Down

0 comments on commit 5ccc678

Please sign in to comment.