Skip to content

Commit

Permalink
Move test to jdk 9
Browse files Browse the repository at this point in the history
  • Loading branch information
som-snytt committed Apr 11, 2024
1 parent 0bb170c commit 8da02ce
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 107 deletions.
2 changes: 2 additions & 0 deletions test/files/jvm/scala-concurrent-tck-b.check
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
starting testUncaughtExceptionReporting
finished testUncaughtExceptionReporting
142 changes: 142 additions & 0 deletions test/files/jvm/scala-concurrent-tck-b.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// javaVersion: 9+
import scala.concurrent.{
TimeoutException,
ExecutionContextExecutorService,
Await,
Awaitable,
}
import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.tools.testkit.AssertUtil.{Fast, Slow, waitFor, waitForIt}
import scala.util.{Try, Success, Failure}
import scala.util.chaining._
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit.{MILLISECONDS => Milliseconds, SECONDS => Seconds}

trait TestBase {

trait Done { def apply(proof: => Boolean): Unit }

def once(body: Done => Unit): Unit = {
import java.util.concurrent.LinkedBlockingQueue
val q = new LinkedBlockingQueue[Try[Boolean]]
body(new Done {
def apply(proof: => Boolean): Unit = q offer Try(proof)
})
var tried: Try[Boolean] = null
def check = q.poll(5000L, Milliseconds).tap(tried = _) != null
waitForIt(check, progress = Slow, label = "concurrent-tck")
assert(tried.isSuccess)
assert(tried.get)
// Check that we don't get more than one completion
assert(q.poll(50, Milliseconds) eq null)
}

def test[T](name: String)(body: => T): T = {
println(s"starting $name")
body.tap(_ => println(s"finished $name"))
}

def await[A](value: Awaitable[A]): A = {
def check: Option[A] =
Try(Await.result(value, Duration(500, "ms"))) match {
case Success(x) => Some(x)
case Failure(_: TimeoutException) => None
case Failure(t) => throw t
}
waitFor(check, progress = Fast, label = "concurrent-tck test result")
}
}

class ReportingExecutionContext extends TestBase {
val progress = Fast
@volatile var thread: Thread = null
@volatile var reportedOn: Thread = null
@volatile var reported: Throwable = null
val latch = new CountDownLatch(1)

def report(t: Thread, e: Throwable): Unit = {
reportedOn = t
reported = e
latch.countDown()
}

def ecesUsingDefaultFactory = {
import java.util.concurrent.{ForkJoinPool}
import java.util.function.Predicate
import scala.reflect.internal.util.RichClassLoader._

val path = "java.util.concurrent.ForkJoinPool"
val n = 2 // parallelism
val factory = scala.concurrent.TestUtil.threadFactory(report)
val ueh: Thread.UncaughtExceptionHandler = report(_, _)
val async = true
val coreSize = 4
val maxSize = 4
val minRun = 1 // minimumRunnable for liveness
val saturate: Predicate[ForkJoinPool] = (fjp: ForkJoinPool) => false // whether to continue after blocking at maxSize
val keepAlive = 2000L
val fjp = this.getClass.getClassLoader.create[ForkJoinPool](path, _ => ())(n, factory, ueh, async, coreSize, maxSize, minRun, saturate, keepAlive, Milliseconds)
//ForkJoinPool(int parallelism, ForkJoinPool.ForkJoinWorkerThreadFactory factory, Thread.UncaughtExceptionHandler handler, boolean asyncMode, int corePoolSize, int maximumPoolSize, int minimumRunnable, Predicate<? super ForkJoinPool> saturate, long keepAliveTime, TimeUnit unit)
new ExecutionContextExecutorService {
// Members declared in scala.concurrent.ExecutionContext
def reportFailure(cause: Throwable): Unit = report(null, cause)

// Members declared in java.util.concurrent.Executor
def execute(r: Runnable): Unit = fjp.execute(r)

// Members declared in java.util.concurrent.ExecutorService
def awaitTermination(x$1: Long, x$2: java.util.concurrent.TimeUnit): Boolean = ???
def invokeAll[T](x$1: java.util.Collection[_ <: java.util.concurrent.Callable[T]], x$2: Long, x$3: java.util.concurrent.TimeUnit): java.util.List[java.util.concurrent.Future[T]] = ???
def invokeAll[T](x$1: java.util.Collection[_ <: java.util.concurrent.Callable[T]]): java.util.List[java.util.concurrent.Future[T]] = ???
def invokeAny[T](x$1: java.util.Collection[_ <: java.util.concurrent.Callable[T]], x$2: Long, x$3: java.util.concurrent.TimeUnit): T = ???
def invokeAny[T](x$1: java.util.Collection[_ <: java.util.concurrent.Callable[T]]): T = ???
def isShutdown(): Boolean = fjp.isShutdown
def isTerminated(): Boolean = fjp.isTerminated
def shutdown(): Unit = fjp.shutdown()
def shutdownNow(): java.util.List[Runnable] = fjp.shutdownNow()
def submit(r: Runnable): java.util.concurrent.Future[_] = fjp.submit(r)
def submit[T](task: Runnable, res: T): java.util.concurrent.Future[T] = fjp.submit(task, res)
def submit[T](task: java.util.concurrent.Callable[T]): java.util.concurrent.Future[T] = fjp.submit(task)
}
}

def testUncaughtExceptionReporting(ec: ExecutionContextExecutorService): Unit = once {
done =>
val example = new InterruptedException

@tailrec def spinForThreadDeath(turns: Int): Boolean =
turns > 0 && (thread != null && !thread.isAlive || { Thread.sleep(100L); spinForThreadDeath(turns - 1) })

def truthfully(b: Boolean): Option[Boolean] = if (b) Some(true) else None

// jdk17 thread receives pool exception handler, so wait for thread to die slow and painful expired keepalive
def threadIsDead = waitFor(truthfully(spinForThreadDeath(turns = 10)), progress = progress, label = "concurrent-tck-thread-death")

try {
ec.execute(() => {
thread = Thread.currentThread
throw example
})
latch.await(2, Seconds)
done(threadIsDead && (example.eq(reported) || example.eq(reported.getCause)))
}
finally ec.shutdown()
}

test("testUncaughtExceptionReporting")(testUncaughtExceptionReporting {
ecesUsingDefaultFactory
})
}

object Test extends App {
new ReportingExecutionContext

System.exit(0)
}

package scala.concurrent {
object TestUtil {
def threadFactory(uncaughtExceptionHandler: Thread.UncaughtExceptionHandler) = new impl.ExecutionContextImpl.DefaultThreadFactory(daemonic=true, maxBlockers=256, prefix="test-thread", uncaughtExceptionHandler)
}
}
2 changes: 0 additions & 2 deletions test/files/jvm/scala-concurrent-tck.check
Original file line number Diff line number Diff line change
Expand Up @@ -134,5 +134,3 @@ starting testOnComplete
finished testOnComplete
starting testMap
finished testMap
starting testUncaughtExceptionReporting
finished testUncaughtExceptionReporting
106 changes: 1 addition & 105 deletions test/files/jvm/scala-concurrent-tck.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

import scala.concurrent.{
Future,
Promise,
Expand Down Expand Up @@ -1045,104 +1046,6 @@ class ExecutionContextPrepare extends TestBase {
test("testMap")(testMap())
}

class ReportingExecutionContext extends TestBase {
final val slowly = false // true for using default FJP with long keepAlive (60 secs)
val progress = if (slowly) Slow else Fast
@volatile var thread: Thread = null
@volatile var reportedOn: Thread = null
@volatile var reported: Throwable = null
val latch = new CountDownLatch(1)

def report(t: Thread, e: Throwable): Unit = {
reportedOn = t
reported = e
latch.countDown()
}
def underlyingPool = {
import java.util.concurrent.{LinkedBlockingQueue, RejectedExecutionHandler, ThreadFactory, ThreadPoolExecutor}
val coreSize = 4
val maxSize = 4
val keepAlive = 2000L
val q = new LinkedBlockingQueue[Runnable]
val factory: ThreadFactory = (r: Runnable) => new Thread(r).tap(_.setUncaughtExceptionHandler(report))
val handler: RejectedExecutionHandler = (r: Runnable, x: ThreadPoolExecutor) => ???
new ThreadPoolExecutor(coreSize, maxSize, keepAlive, Milliseconds, q, factory, handler)
}
def ecesFromUnderlyingPool = ExecutionContext.fromExecutorService(underlyingPool, report(null, _))

def ecesUsingDefaultFactory = {
import java.util.concurrent.{ForkJoinPool, RejectedExecutionHandler, ThreadPoolExecutor}
import java.util.function.Predicate
import scala.reflect.internal.util.RichClassLoader._

val path = "java.util.concurrent.ForkJoinPool"
val n = 2 // parallelism
val factory = scala.concurrent.TestUtil.threadFactory(report)
val ueh: Thread.UncaughtExceptionHandler = report(_, _)
val async = true
val coreSize = 4
val maxSize = 4
val minRun = 1 // minimumRunnable for liveness
val saturate: Predicate[ForkJoinPool] = (fjp: ForkJoinPool) => false // whether to continue after blocking at maxSize
val keepAlive = 2000L
val fjp = this.getClass.getClassLoader.create[ForkJoinPool](path, _ => ())(n, factory, ueh, async, coreSize, maxSize, minRun, saturate, keepAlive, Milliseconds)
//ForkJoinPool(int parallelism, ForkJoinPool.ForkJoinWorkerThreadFactory factory, Thread.UncaughtExceptionHandler handler, boolean asyncMode, int corePoolSize, int maximumPoolSize, int minimumRunnable, Predicate<? super ForkJoinPool> saturate, long keepAliveTime, TimeUnit unit)
new ExecutionContextExecutorService {
// Members declared in scala.concurrent.ExecutionContext
def reportFailure(cause: Throwable): Unit = report(null, cause)

// Members declared in java.util.concurrent.Executor
def execute(r: Runnable): Unit = fjp.execute(r)

// Members declared in java.util.concurrent.ExecutorService
def awaitTermination(x$1: Long, x$2: java.util.concurrent.TimeUnit): Boolean = ???
def invokeAll[T](x$1: java.util.Collection[_ <: java.util.concurrent.Callable[T]], x$2: Long, x$3: java.util.concurrent.TimeUnit): java.util.List[java.util.concurrent.Future[T]] = ???
def invokeAll[T](x$1: java.util.Collection[_ <: java.util.concurrent.Callable[T]]): java.util.List[java.util.concurrent.Future[T]] = ???
def invokeAny[T](x$1: java.util.Collection[_ <: java.util.concurrent.Callable[T]], x$2: Long, x$3: java.util.concurrent.TimeUnit): T = ???
def invokeAny[T](x$1: java.util.Collection[_ <: java.util.concurrent.Callable[T]]): T = ???
def isShutdown(): Boolean = fjp.isShutdown
def isTerminated(): Boolean = fjp.isTerminated
def shutdown(): Unit = fjp.shutdown()
def shutdownNow(): java.util.List[Runnable] = fjp.shutdownNow()
def submit(r: Runnable): java.util.concurrent.Future[_] = fjp.submit(r)
def submit[T](task: Runnable, res: T): java.util.concurrent.Future[T] = fjp.submit(task, res)
def submit[T](task: java.util.concurrent.Callable[T]): java.util.concurrent.Future[T] = fjp.submit(task)
}
}

def ecesUsingDefaultFJP = ExecutionContext.fromExecutorService(null, report(null, _))

def testUncaughtExceptionReporting(ec: ExecutionContextExecutorService): Unit = once {
done =>
val example = new InterruptedException

@tailrec def spinForThreadDeath(turns: Int): Boolean =
turns > 0 && (thread != null && !thread.isAlive || { Thread.sleep(100L); spinForThreadDeath(turns - 1) })

def truthfully(b: Boolean): Option[Boolean] = if (b) Some(true) else None

// jdk17 thread receives pool exception handler, so wait for thread to die slow and painful expired keepalive
def threadIsDead = waitFor(truthfully(spinForThreadDeath(turns = 10)), progress = progress, label = "concurrent-tck-thread-death")

try {
ec.execute(() => {
thread = Thread.currentThread
throw example
})
latch.await(2, Seconds)
done(threadIsDead && (example.eq(reported) || example.eq(reported.getCause)))
}
finally ec.shutdown()
}

test("testUncaughtExceptionReporting")(testUncaughtExceptionReporting {
import scala.util.Properties.isJavaAtLeast
if (slowly) ecesUsingDefaultFJP
else if (isJavaAtLeast(9)) ecesUsingDefaultFactory
else ecesFromUnderlyingPool
})
}

object Test
extends App {
new FutureCallbacks
Expand All @@ -1155,13 +1058,6 @@ extends App {
new GlobalExecutionContext
new CustomExecutionContext
new ExecutionContextPrepare
new ReportingExecutionContext

System.exit(0)
}

package scala.concurrent {
object TestUtil {
def threadFactory(uncaughtExceptionHandler: Thread.UncaughtExceptionHandler) = new impl.ExecutionContextImpl.DefaultThreadFactory(daemonic=true, maxBlockers=256, prefix="test-thread", uncaughtExceptionHandler)
}
}

0 comments on commit 8da02ce

Please sign in to comment.