Skip to content

Commit

Permalink
scheduler - prepin tasks running on pinless schedulers by first migra…
Browse files Browse the repository at this point in the history
…ting
  • Loading branch information
lytles@takashi committed Oct 24, 2018
1 parent 17bb748 commit 3fdd734
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 10 deletions.
4 changes: 3 additions & 1 deletion src/kilim/ForkJoinScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ public void publish(TimerService.WatchdogTask dog) {
public boolean isEmpty() {
return count.get()==0;
}

public boolean isPinnable() { return false; }

public void schedule(int index,Task task) {
assert(index < 0);
assert index < 0 : "attempt to pin task to FJS";
ForkJoinPool current = ForkJoinTask.getPool();
ForkedTask fajita = new ForkedTask(task);
count.incrementAndGet();
Expand Down
4 changes: 4 additions & 0 deletions src/kilim/ReentrantLock.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ public Thread getOwner() {

Thread locker = null; // /***************************

public void preLock() throws Pausable {
Scheduler.getCurrentTask().prePin();
}

@Override
public void lock() {
super.lock();
Expand Down
8 changes: 8 additions & 0 deletions src/kilim/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
public abstract class Scheduler {
private static final int defaultQueueSize_ = Integer.MAX_VALUE;
public static volatile Scheduler defaultScheduler = null;
public static volatile Scheduler pinnableScheduler = null;
public static int defaultNumberThreads;
private static final ThreadLocal<Task> taskMgr_ = new ThreadLocal<Task>();
public static Logger defaultLogger = new BasicLogger();
Expand Down Expand Up @@ -62,6 +63,8 @@ protected static void setCurrentTask(Task t) {
public abstract boolean isEmptyish();

public abstract int numThreads();

public boolean isPinnable() { return true; }

/**
* Schedule a task to run.
Expand Down Expand Up @@ -139,6 +142,11 @@ public synchronized static Scheduler getDefaultScheduler() {
: new ForkJoinScheduler(defaultNumberThreads);
return defaultScheduler;
}
public synchronized static Scheduler getPinnableScheduler() {
if (pinnableScheduler==null)
pinnableScheduler = Scheduler.make(defaultNumberThreads);
return pinnableScheduler;
}

public static void setDefaultScheduler(Scheduler s) {
defaultScheduler = s;
Expand Down
6 changes: 6 additions & 0 deletions src/kilim/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,12 @@ public String dump() {
}
}

public void prePin() throws Pausable {
if (scheduler.isPinnable()) return;
scheduler = Scheduler.getPinnableScheduler();
yield();
}

public void pinToThread() {
numActivePins++;
}
Expand Down
38 changes: 29 additions & 9 deletions test/kilim/test/TestLock.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,69 @@

import junit.framework.TestCase;
import kilim.ExitMsg;
import kilim.ForkJoinScheduler;
import kilim.Mailbox;
import kilim.Pausable;
import kilim.ReentrantLock;
import kilim.Scheduler;
import kilim.Task;

public class TestLock extends TestCase{
static int numThreads = 4;
static int maxDelay = 30;
static int numTasks = 20;
static int numIters = 20;
static boolean affine = true;
static boolean preLock = true;
static int ratio = 10;
static int timeout = maxDelay/ratio*numIters*numTasks/numThreads + maxDelay*numIters;

public void testLocks() {
Scheduler scheduler = Scheduler.make(4);
Scheduler scheduler = affine ? Scheduler.make(numThreads) : new ForkJoinScheduler(numThreads);
Mailbox<ExitMsg> mb = new Mailbox<ExitMsg>();
for (int i = 0; i < 100; i++) {
for (int i = 0; i < numTasks; i++) {
Task t = new LockTask();
t.informOnExit(mb);
t.setScheduler(scheduler);
t.start();
}
boolean ok = true;
for (int i = 0; i < 100; i++) {
ExitMsg em = mb.getb(5000);
assertNotNull("Timed out. #tasks finished = " + i + "/100", em);
for (int i = 0; i < numTasks; i++) {
ExitMsg em = mb.getb(timeout);
assertNotNull("Timed out. #tasks finished = " + i + " of " + numTasks, em);
if (em.result instanceof Exception) {
ok = false; break;
}
}
scheduler.shutdown();
assertTrue(ok);
}

static void sleep(int delay) {
try { Thread.sleep(delay); }
catch (InterruptedException ex) {}
}

static int delay() {
return (int) (maxDelay*Math.random());
}

static class LockTask extends Task {
ReentrantLock syncLock = new ReentrantLock();
@Override
public void execute() throws Pausable, Exception {
// System.out.println("Start #" + id);
Task.sleep(delay());
if (preLock) syncLock.preLock();
try {
for (int i = 0; i < 1000; i++) {
for (int i = 0; i < numIters; i++) {
syncLock.lock();
Task.yield();
Task.sleep(delay());
syncLock.unlock();
TestLock.sleep(delay()/ratio);
}
} catch (Exception e) {
e.printStackTrace();
}
// System.out.println("Done #" + id);
}
}
}

0 comments on commit 3fdd734

Please sign in to comment.