Skip to content

Commit

Permalink
Port java.util.concurrent.ForkJoinPool from JSR-166 (#3136)
Browse files Browse the repository at this point in the history
Ports `ForkJoinPool` and other required types from `java.util.concurrent` types: 
* `AbstractExecutorService`
* `Callable` 
* `CompletionService`
* `CountedCompleter`
* `Executor`
* `ExecutorCompletionService`
* `ExecutorService`
* `Executors` (partially)
* `ForkJoinPool`
* `ForkJoinTask`
* `ForkJoinWorkerThread`
* `Future`
* `FutureTask`
* `RecursiveAction`
* `RecursiveTask`
* `RejectedExecutionHandler`
* `RunnableFuture`
* `ThreadFactory`

* Add missing `java.util.concurrent.TimeUnit` methods
* Don't report deprecation warning in `ThreadTest`
* Run scalafmt
* Disable JVM tests failing on JDK 8
  • Loading branch information
WojciechMazur committed Feb 5, 2023
1 parent 1cd433a commit 3580845
Show file tree
Hide file tree
Showing 37 changed files with 15,497 additions and 60 deletions.
7 changes: 7 additions & 0 deletions javalib/src/main/scala-2/java/util/concurrent/TimeUnit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ abstract class TimeUnit private (name: String, ordinal: Int)
def toMinutes(a: Long): Long
def toHours(a: Long): Long
def toDays(a: Long): Long

def sleep(timeout: Long): Unit =
if (timeout > 0) Thread.sleep(toMillis(timeout))
def timedJoin(thread: Thread, timeout: Long) =
if (timeout > 0) thread.join(toMillis(timeout))
def timedWait(obj: Object, timeout: Long) =
if (timeout > 0) obj.wait(toMillis(timeout))
}

object TimeUnit {
Expand Down
7 changes: 7 additions & 0 deletions javalib/src/main/scala-3/java/util/concurrent/TimeUnit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ enum TimeUnit extends Enum[TimeUnit] {
case HOURS extends TimeUnit
case DAYS extends TimeUnit

def sleep(timeout: Long): Unit =
if (timeout > 0) Thread.sleep(toMillis(timeout))
def timedJoin(thread: Thread, timeout: Long) =
if (timeout > 0) thread.join(toMillis(timeout))
def timedWait(obj: Object, timeout: Long) =
if (timeout > 0) obj.wait(toMillis(timeout))

def convert(a: Long, u: TimeUnit): Long = this match {
case TimeUnit.NANOSECONDS => u.toNanos(a)
case TimeUnit.MICROSECONDS => u.toMicros(a)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/

package java.util.concurrent

import java.util
import java.lang
import java.util.concurrent.TimeUnit._
import scala.annotation.tailrec

abstract class AbstractExecutorService() extends ExecutorService {

protected[concurrent] def newTaskFor[T <: AnyRef](
runnable: Runnable,
value: T
): RunnableFuture[T] =
new FutureTask[T](runnable, value)

protected[concurrent] def newTaskFor[T <: AnyRef](
callable: Callable[T]
): RunnableFuture[T] =
new FutureTask[T](callable)

@throws[NullPointerException]
@throws[java.lang.RejectedExecutionException]
override def submit(task: Runnable): Future[_] = {
if (task == null) throw new NullPointerException()
val ftask: RunnableFuture[Object] = newTaskFor(task, null)
execute(ftask)
ftask
}

@throws[NullPointerException]
@throws[java.lang.RejectedExecutionException]
override def submit[T <: AnyRef](task: Runnable, result: T): Future[T] = {
if (task == null) throw new NullPointerException()
val ftask: RunnableFuture[T] = newTaskFor(task, result)
execute(ftask)
ftask
}

@throws[NullPointerException]
@throws[java.lang.RejectedExecutionException]
override def submit[T <: AnyRef](task: Callable[T]): Future[T] = {
if (task == null) throw new NullPointerException()
val ftask: RunnableFuture[T] = newTaskFor(task)
execute(ftask)
ftask
}

@throws[InterruptedException]
@throws[TimeoutException]
@throws[ExecutionException]
private def doInvokeAny[T <: AnyRef](
tasks: util.Collection[_ <: Callable[T]],
timed: Boolean,
n: Long
): T = {
var nanos: Long = n
if (tasks == null)
throw new NullPointerException()

var ntasks: Int = tasks.size()
if (ntasks == 0)
throw new IllegalArgumentException()

val futures = new util.ArrayList[Future[T]](ntasks)
val ecs = new ExecutorCompletionService[T](this)

// For efficiency, especially in executors with limited
// parallelism, check to see if previously submitted tasks are
// done before submitting more of them. This interleaving
// plus the exception mechanics account for messiness of main
// loop.

try {
// Record exceptions so that if we fail to obtain any
// result, we can throw the last exception we got.
var ee: ExecutionException = null
val deadline = if (timed) System.nanoTime() + nanos else 0L
val it = tasks.iterator()

// Start one task for sure; the rest incrementally
futures.add(ecs.submit(it.next()))
ntasks -= 1
var active: Int = 1

var break: Boolean = false
while (!break) {
val f: Future[T] = ecs.poll() match {
case null =>
if (ntasks > 0) {
ntasks -= 1
futures.add(ecs.submit(it.next()))
active += 1
null
} else if (active == 0) {
break = true
null
} else if (timed)
ecs.poll(nanos, TimeUnit.NANOSECONDS) match {
case null => throw new TimeoutException()
case f =>
nanos = deadline - System.nanoTime()
f
}
else ecs.take()

case f => f
}
if (!break && f != null) {
active -= 1
try {
return f.get()
} catch {
case eex: ExecutionException => ee = eex
case rex: RuntimeException => ee = new ExecutionException(rex)
}
}
}
if (ee == null) ee = new ExecutionException(null: Throwable)
throw ee
} finally cancelAll(futures)
}

@throws[InterruptedException]
@throws[ExecutionException]
override def invokeAny[T <: AnyRef](
tasks: util.Collection[_ <: Callable[T]]
): T =
doInvokeAny(tasks, false, 0)

@throws[InterruptedException]
@throws[ExecutionException]
@throws[TimeoutException]
override def invokeAny[T <: AnyRef](
tasks: java.util.Collection[_ <: Callable[T]],
timeout: Long,
unit: TimeUnit
): T = doInvokeAny(tasks, true, unit.toNanos(timeout))

@throws[InterruptedException]
override def invokeAll[T <: AnyRef](
tasks: java.util.Collection[_ <: Callable[T]]
): java.util.List[Future[T]] = {
if (tasks == null) throw new NullPointerException()
val futures: util.List[Future[T]] =
new util.ArrayList[Future[T]](tasks.size())
var done: Boolean = false
try {
val it = tasks.iterator()
while (it.hasNext()) {
val f: RunnableFuture[T] = newTaskFor(it.next())
futures.add(f)
execute(f)
}

val it1 = futures.iterator()
while (it1.hasNext()) {
val f = it1.next()
if (!f.isDone()) {
try {
f.get()
} catch {
case ignore: CancellationException =>
case ignore: ExecutionException =>
}
}
}
done = true
futures
} finally if (!done) cancelAll(futures)
}

@throws[InterruptedException]
override def invokeAll[T <: AnyRef](
tasks: util.Collection[_ <: Callable[T]],
timeout: Long,
unit: TimeUnit
): util.List[Future[T]] = {
if (tasks == null || unit == null)
throw new NullPointerException()
val nanos: Long = unit.toNanos(timeout)
val deadline = System.nanoTime() + nanos
val futures = new util.ArrayList[Future[T]](tasks.size())
var lastIdx = 0

try {
val it = tasks.iterator()
while (it.hasNext()) {
futures.add(newTaskFor(it.next()))
}
val size = futures.size()

// Interleave time checks and calls to execute in case
// executor doesn't have any/much parallelism.
@tailrec def executeLoop(i: Int): Boolean =
if (i >= size) false
else {
val remainingTime =
if (i == 0) nanos
else deadline - System.nanoTime()
if (remainingTime <= 0) true // timeout
else {
execute(futures.get(i).asInstanceOf[Runnable])
executeLoop(i + 1)
}
}

@tailrec def awaitLoop(i: Int): Boolean =
if (i >= size) false
else {
val f = futures.get(i)
val timedOut =
if (f.isDone()) false
else
try {
f.get(deadline - System.nanoTime(), NANOSECONDS)
false
} catch {
case _: CancellationException | _: ExecutionException => false
case _: TimeoutException =>
lastIdx = i
true
}
if (timedOut) timedOut
else awaitLoop(i + 1)
}

val timedOut = executeLoop(0) || awaitLoop(0)
if (timedOut) cancelAll(futures, lastIdx)
} catch {
case t: Throwable =>
cancelAll(futures)
throw t
}
futures
}

private def cancelAll[T](futures: util.List[Future[T]], from: Int = 0) =
for (i <- from until futures.size()) {
futures.get(i).cancel(true)
}

}
6 changes: 6 additions & 0 deletions javalib/src/main/scala/java/util/concurrent/Callable.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/

package java.util.concurrent

trait Callable[V] {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/

package java.util.concurrent

trait CompletionService[V] {

def submit(task: Callable[V]): Future[V]

def submit(task: Runnable, result: V): Future[V]

def take(): Future[V]

def poll(): Future[V]

def poll(timeout: Long, unit: TimeUnit): Future[V]

}

0 comments on commit 3580845

Please sign in to comment.