Skip to content

Commit

Permalink
Merge PR #55: Add more extensive tracing for actor constructs
Browse files Browse the repository at this point in the history
  • Loading branch information
smarr committed Nov 30, 2016
2 parents 4114a39 + 392a021 commit c342b48
Show file tree
Hide file tree
Showing 24 changed files with 1,009 additions and 318 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ newspeak
*.ns3
tools/kompos/node_modules
tools/kompos/out
traces
8 changes: 8 additions & 0 deletions som
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ tools.add_argument('-dm', '--dynamic-metrics', help='Capture Dynamic Metrics',
dest='dynamic_metrics', action='store_true', default=False)
tools.add_argument('-at', '--actor-tracing', help='enable tracing of actor operations',
dest='actor_tracing', action='store_true', default=False)
tools.add_argument('-mt', '--memory-tracing', help='enable tracing of memory usage and GC',
dest='memory_tracing', action='store_true', default=False)
parser.add_argument('-tf', '--trace-file', help='Trace file destination, default: traces/trace',
dest='trace_file', default=BASE_DIR + '/traces/tracea')
tools.add_argument('--coveralls', nargs=1, help='determine code coverage and report to Coveralls with',
dest='coveralls_repo_token', default=False, metavar='coveralls-repo-token')

Expand Down Expand Up @@ -226,6 +230,10 @@ if args.dynamic_metrics:

if args.actor_tracing:
flags += ['-Dsom.actorTracing=true']
if args.trace_file:
flags += ['-Dsom.traceFile=%s' % args.trace_file ]
if args.memory_tracing:
flags += ['-Dsom.memoryTracing=true']

if (args.truffle_profile or args.truffle_debugger or args.web_debugger or
args.dynamic_metrics or args.coveralls_repo_token):
Expand Down
12 changes: 12 additions & 0 deletions src/som/VM.java
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ public static void exit(final int errorCode) {
* To be called from {@link ObjectSystem}.
*/
public void realExit(final int errorCode) {
Actor.shutDownActorPool();
engine.dispose();
System.exit(errorCode);
}
Expand Down Expand Up @@ -240,6 +241,17 @@ public static void println(final String msg) {
// Checkstyle: resume
}

@TruffleBoundary
/**
* This method is used to print reports about the number of created artifacts.
* For example actors, messages and promises.
*/
public static void printConcurrencyEntitiesReport(final String msg) {
// Checkstyle: stop
System.out.println(msg);
// Checkstyle: resume
}

public static boolean isAvoidingExit() {
return vm.avoidExitForTesting;
}
Expand Down
4 changes: 4 additions & 0 deletions src/som/VmSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ public class VmSettings {
public static final boolean FAIL_ON_MISSING_OPTIMIZATIONS;
public static final boolean DEBUG_MODE;
public static final boolean ACTOR_TRACING;
public static final boolean MEMORY_TRACING;
public static final String TRACE_FILE;
public static final boolean INSTRUMENTATION;
public static final boolean DYNAMIC_METRICS;
public static final boolean DNU_PRINT_STACK_TRACE;
Expand All @@ -26,6 +28,8 @@ public class VmSettings {
FAIL_ON_MISSING_OPTIMIZATIONS = getBool("som.failOnMissingOptimization", false);
DEBUG_MODE = getBool("som.debugMode", false);
ACTOR_TRACING = getBool("som.actorTracing", false);
TRACE_FILE = System.getProperty("som.traceFile", System.getProperty("user.dir") + "/traces/trace");
MEMORY_TRACING = getBool("som.memoryTracing", false);

boolean dm = getBool("som.dynamicMetrics", false);
DYNAMIC_METRICS = dm;
Expand Down
154 changes: 141 additions & 13 deletions src/som/interpreter/actors/Actor.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package som.interpreter.actors;

import java.lang.Thread.UncaughtExceptionHandler;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import com.oracle.truffle.api.CompilerDirectives.TruffleBoundary;

Expand All @@ -16,7 +19,6 @@
import som.vmobjects.SArray.STransferArray;
import som.vmobjects.SObject;
import som.vmobjects.SObjectWithClass.SObjectWithoutFields;
import tools.ObjectBuffer;
import tools.actors.ActorExecutionTrace;
import tools.debugger.WebDebugger;

Expand All @@ -43,6 +45,8 @@ public class Actor {
public static Actor createActor() {
if (VmSettings.DEBUG_MODE) {
return new DebugActor();
} else if (VmSettings.ACTOR_TRACING) {
return new TracingActor();
} else {
return new Actor();
}
Expand All @@ -52,19 +56,29 @@ public static void traceActorsExceptMainOne(final SFarReference actorFarRef) {
Thread current = Thread.currentThread();
if (current instanceof ActorProcessingThread) {
ActorProcessingThread t = (ActorProcessingThread) current;
t.createdActors.append(actorFarRef);
}
}

/** Used to shift the thread id to the 8 most significant bits. */
private static final int THREAD_ID_SHIFT = 56;

/** Buffer for incoming messages. */
private ObjectBuffer<EventualMessage> mailbox = new ObjectBuffer<>(16);
private Mailbox mailbox = Mailbox.createNewMailbox(16);

/** Flag to indicate whether there is currently a F/J task executing. */
private boolean isExecuting;

/** Is scheduled on the pool, and executes messages to this actor. */
private final ExecAllMessages executor;

//used to collect absolute numbers from the threads
private static long numCreatedMessages = 0;
private static long numCreatedActors = 0;
private static long numCreatedPromises = 0;
private static long numResolvedPromises = 0;

private static ArrayList<ActorProcessingThread> threads = new ArrayList<>();

/**
* Possible roles for an actor.
*/
Expand Down Expand Up @@ -122,6 +136,7 @@ public final Object wrapForUse(final Object o, final Actor owner,
protected void logMessageAddedToMailbox(final EventualMessage msg) { }
protected void logMessageBeingExecuted(final EventualMessage msg) { }
protected void logNoTaskForActor() { }
public long getActorId() { return 0; }

/**
* Send the give message to the actor.
Expand All @@ -131,6 +146,9 @@ protected void logNoTaskForActor() { }
@TruffleBoundary
public synchronized void send(final EventualMessage msg) {
assert msg.getTarget() == this;
if (VmSettings.ACTOR_TRACING) {
mailbox.addMessageSendTime();
}
mailbox.append(msg);
logMessageAddedToMailbox(msg);

Expand All @@ -140,13 +158,18 @@ public synchronized void send(final EventualMessage msg) {
}
}

public synchronized long sendAndGetId(final EventualMessage msg) {
send(msg);
return mailbox.getBasemessageId() + mailbox.size() - 1;
}

/**
* Is scheduled on the fork/join pool and executes messages for a specific
* actor.
*/
private static final class ExecAllMessages implements Runnable {
private final Actor actor;
private ObjectBuffer<EventualMessage> current;
private Mailbox current;

ExecAllMessages(final Actor actor) {
this.actor = actor;
Expand All @@ -171,6 +194,11 @@ public void run() {
}

private void processCurrentMessages(final ActorProcessingThread currentThread, final WebDebugger dbg) {
if (VmSettings.ACTOR_TRACING) {
current.setExecutionStart(System.nanoTime());
current.setBaseMessageId(currentThread.generateMessageBaseId(current.size()));
currentThread.currentMessageId = current.getBasemessageId();
}
for (EventualMessage msg : current) {
actor.logMessageBeingExecuted(msg);
currentThread.currentMessage = msg;
Expand All @@ -180,11 +208,16 @@ private void processCurrentMessages(final ActorProcessingThread currentThread, f
dbg.prepareSteppingUntilNextRootNode();
}
}

if (VmSettings.ACTOR_TRACING) {
current.addMessageExecutionStart();
}
msg.execute();
if (VmSettings.ACTOR_TRACING) {
currentThread.currentMessageId += 1;
}
}
if (VmSettings.ACTOR_TRACING) {
currentThread.processedMessages.append(current);
ActorExecutionTrace.mailboxExecuted(current, actor);
}
}

Expand All @@ -195,10 +228,13 @@ private boolean getCurrentMessagesOrCompleteExecution() {
if (current.isEmpty()) {
// complete execution after all messages are processed
actor.isExecuting = false;

return false;
}
actor.mailbox = new ObjectBuffer<>(actor.mailbox.size());

actor.mailbox = Mailbox.createNewMailbox(actor.mailbox.size());
}

return true;
}
}
Expand All @@ -223,21 +259,75 @@ public static boolean isPoolIdle() {
private static final class ActorProcessingThreadFactor implements ForkJoinWorkerThreadFactory {
@Override
public ForkJoinWorkerThread newThread(final ForkJoinPool pool) {
return new ActorProcessingThread(pool);
ActorProcessingThread t = new ActorProcessingThread(pool);
threads.add(t);
return t;
}
}

public static final class ActorProcessingThread extends ForkJoinWorkerThread {
public EventualMessage currentMessage;
private static AtomicInteger threadIdGen = new AtomicInteger(0);
protected Actor currentlyExecutingActor;
protected final ObjectBuffer<SFarReference> createdActors;
protected final ObjectBuffer<ObjectBuffer<EventualMessage>> processedMessages;
protected final long threadId;
protected long nextActorId = 1;
protected long nextMessageId;
protected long nextPromiseId;
protected long currentMessageId;
protected ByteBuffer tracingDataBuffer;
public long resolvedPromises;

protected ActorProcessingThread(final ForkJoinPool pool) {
super(pool);
threadId = threadIdGen.getAndIncrement();
if (VmSettings.ACTOR_TRACING) {
ActorExecutionTrace.swapBuffer(this);
}
}

protected long generateActorId() {
long result = (threadId << THREAD_ID_SHIFT) | nextActorId;
nextActorId++;
return result;
}

protected long generateMessageBaseId(final int numMessages) {
long result = (threadId << THREAD_ID_SHIFT) | nextMessageId;
nextMessageId += numMessages;
return result;
}

protected long generatePromiseId() {
long result = (threadId << THREAD_ID_SHIFT) | nextPromiseId;
nextPromiseId++;
return result;
}

public ByteBuffer getThreadLocalBuffer() {
return tracingDataBuffer;
}

createdActors = ActorExecutionTrace.createActorBuffer();
processedMessages = ActorExecutionTrace.createProcessedMessagesBuffer();
public void setThreadLocalBuffer(final ByteBuffer threadLocalBuffer) {
this.tracingDataBuffer = threadLocalBuffer;
}

public long getCurrentMessageId() {
return currentMessageId;
}

@Override
protected void onTermination(final Throwable exception) {
if (VmSettings.ACTOR_TRACING) {
ActorExecutionTrace.returnBuffer(this.tracingDataBuffer);
this.tracingDataBuffer = null;
VM.printConcurrencyEntitiesReport("[Thread " + threadId + "]\tA#" + (nextActorId - 1) + "\t\tM#" + nextMessageId + "\t\tP#" + nextPromiseId);
numCreatedActors += nextActorId - 1;
numCreatedMessages += nextMessageId;
numCreatedPromises += nextPromiseId;
numResolvedPromises += resolvedPromises;
}
threads.remove(this);
super.onTermination(exception);
}
}

Expand All @@ -262,12 +352,31 @@ public void uncaughtException(final Thread t, final Throwable e) {
VmSettings.NUM_THREADS, new ActorProcessingThreadFactor(),
new UncaughtExceptions(), true);

public static final void shutDownActorPool() {
actorPool.shutdown();
try {
actorPool.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (VmSettings.ACTOR_TRACING) {
VM.printConcurrencyEntitiesReport("[Total]\t\tA#" + numCreatedActors + "\t\tM#" + numCreatedMessages + "\t\tP#" + numCreatedPromises);
VM.printConcurrencyEntitiesReport("[Unresolved] " + (numCreatedPromises - numResolvedPromises));
}
}

public static final void forceSwapBuffers() {
for (ActorProcessingThread t: threads) {
ActorExecutionTrace.swapBuffer(t);
}
}

@Override
public String toString() {
return "Actor";
}

public ObjectBuffer<EventualMessage> getMailbox() {
public Mailbox getMailbox() {
return mailbox;
}

Expand Down Expand Up @@ -307,4 +416,23 @@ public String toString() {
return "Actor[" + (isMain ? "main" : id) + "]";
}
}

public static final class TracingActor extends Actor {
protected final long actorId;

public TracingActor() {
super();
if (Thread.currentThread() instanceof ActorProcessingThread) {
ActorProcessingThread t = (ActorProcessingThread) Thread.currentThread();
this.actorId = t.generateActorId();
} else {
actorId = 0; //main actor
}
}

@Override
public long getActorId() {
return actorId;
}
}
}
Loading

0 comments on commit c342b48

Please sign in to comment.