Skip to content

Commit afc5062

Browse files
committed
Backport Queue, SizedQueue, ThreadFiber and support code.
1 parent b110106 commit afc5062

File tree

5 files changed

+320
-192
lines changed

5 files changed

+320
-192
lines changed

core/src/main/java/org/jruby/RubyThread.java

Lines changed: 71 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@
6262
import org.jruby.runtime.builtin.IRubyObject;
6363

6464
import java.util.concurrent.ExecutionException;
65+
import java.util.concurrent.Semaphore;
66+
import java.util.concurrent.TimeUnit;
6567
import java.util.concurrent.atomic.AtomicReference;
6668
import java.util.concurrent.locks.Lock;
6769
import org.jruby.anno.JRubyMethod;
@@ -154,6 +156,12 @@ public static enum Status {
154156
/** The current task blocking a thread, to allow interrupting it in an appropriate way */
155157
private volatile BlockingTask currentBlockingTask;
156158

159+
/** A function to use to unblock this thread, if possible */
160+
private Unblocker unblockFunc;
161+
162+
/** Argument to pass to the unblocker */
163+
private Object unblockArg;
164+
157165
/** The list of locks this thread currently holds, so they can be released on exit */
158166
private final List<Lock> heldLocks = new Vector<Lock>();
159167

@@ -927,7 +935,6 @@ public IRubyObject raise(IRubyObject[] args, Block block) {
927935
* Ruby threads like Timeout's thread.
928936
*
929937
* @param args Same args as for Thread#raise
930-
* @param block Same as for Thread#raise
931938
*/
932939
public void internalRaise(IRubyObject[] args) {
933940
Ruby runtime = getRuntime();
@@ -1033,6 +1040,15 @@ public static interface BlockingTask {
10331040
public void wakeup();
10341041
}
10351042

1043+
public interface Unblocker<Data> {
1044+
public void wakeup(RubyThread thread, Data self);
1045+
}
1046+
1047+
public interface Task<Data, Return> extends Unblocker<Data> {
1048+
public Return run(ThreadContext context, Data data) throws InterruptedException;
1049+
public void wakeup(RubyThread thread, Data data);
1050+
}
1051+
10361052
public static final class SleepTask implements BlockingTask {
10371053
private final Object object;
10381054
private final long millis;
@@ -1057,6 +1073,30 @@ public void wakeup() {
10571073
}
10581074
}
10591075

1076+
private static final class SleepTask2 implements Task<Object[], Long> {
1077+
@Override
1078+
public Long run(ThreadContext context, Object[] data) throws InterruptedException {
1079+
long millis = (Long)data[1];
1080+
int nanos = (Integer)data[2];
1081+
1082+
long start = System.currentTimeMillis();
1083+
// TODO: nano handling?
1084+
if (millis == 0) {
1085+
((Semaphore) data[0]).acquire();
1086+
} else {
1087+
((Semaphore) data[0]).tryAcquire(millis, TimeUnit.MILLISECONDS);
1088+
}
1089+
return System.currentTimeMillis() - start;
1090+
}
1091+
1092+
@Override
1093+
public void wakeup(RubyThread thread, Object[] data) {
1094+
((Semaphore)data[0]).release();
1095+
}
1096+
}
1097+
1098+
private static final Task<Object[], Long> SLEEP_TASK2 = new SleepTask2();
1099+
10601100
public void executeBlockingTask(BlockingTask task) throws InterruptedException {
10611101
enterSleep();
10621102
try {
@@ -1070,6 +1110,25 @@ public void executeBlockingTask(BlockingTask task) throws InterruptedException {
10701110
}
10711111
}
10721112

1113+
public <Data, Return> Return executeTask(ThreadContext context, Data data, Task<Data, Return> task) throws InterruptedException {
1114+
try {
1115+
this.unblockFunc = task;
1116+
this.unblockArg = data;
1117+
1118+
// check for interrupt before going into blocking call
1119+
pollThreadEvents(context);
1120+
1121+
enterSleep();
1122+
1123+
return task.run(context, data);
1124+
} finally {
1125+
exitSleep();
1126+
this.unblockFunc = null;
1127+
this.unblockArg = null;
1128+
pollThreadEvents(context);
1129+
}
1130+
}
1131+
10731132
public void enterSleep() {
10741133
status.set(Status.SLEEP);
10751134
}
@@ -1351,10 +1410,18 @@ public void interrupt() {
13511410
if (iowait != null) {
13521411
iowait.cancel();
13531412
}
1354-
1355-
BlockingTask task = currentBlockingTask;
1413+
1414+
Unblocker task = this.unblockFunc;
13561415
if (task != null) {
1357-
task.wakeup();
1416+
task.wakeup(this, unblockArg);
1417+
}
1418+
1419+
// deprecated
1420+
{
1421+
BlockingTask t = currentBlockingTask;
1422+
if (t != null) {
1423+
t.wakeup();
1424+
}
13581425
}
13591426
}
13601427
private volatile BlockingIO.Condition blockingIO = null;

0 commit comments

Comments
 (0)