From 8da02ceb773084f143a6d275a013827a1be6a54a Mon Sep 17 00:00:00 2001 From: Som Snytt Date: Mon, 8 Apr 2024 09:49:17 -0700 Subject: [PATCH] Move test to jdk 9 --- test/files/jvm/scala-concurrent-tck-b.check | 2 + test/files/jvm/scala-concurrent-tck-b.scala | 142 ++++++++++++++++++++ test/files/jvm/scala-concurrent-tck.check | 2 - test/files/jvm/scala-concurrent-tck.scala | 106 +-------------- 4 files changed, 145 insertions(+), 107 deletions(-) create mode 100644 test/files/jvm/scala-concurrent-tck-b.check create mode 100644 test/files/jvm/scala-concurrent-tck-b.scala diff --git a/test/files/jvm/scala-concurrent-tck-b.check b/test/files/jvm/scala-concurrent-tck-b.check new file mode 100644 index 00000000000..9c0aad3aa89 --- /dev/null +++ b/test/files/jvm/scala-concurrent-tck-b.check @@ -0,0 +1,2 @@ +starting testUncaughtExceptionReporting +finished testUncaughtExceptionReporting diff --git a/test/files/jvm/scala-concurrent-tck-b.scala b/test/files/jvm/scala-concurrent-tck-b.scala new file mode 100644 index 00000000000..7e0d8fae489 --- /dev/null +++ b/test/files/jvm/scala-concurrent-tck-b.scala @@ -0,0 +1,142 @@ +// javaVersion: 9+ +import scala.concurrent.{ + TimeoutException, + ExecutionContextExecutorService, + Await, + Awaitable, +} +import scala.annotation.tailrec +import scala.concurrent.duration._ +import scala.tools.testkit.AssertUtil.{Fast, Slow, waitFor, waitForIt} +import scala.util.{Try, Success, Failure} +import scala.util.chaining._ +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit.{MILLISECONDS => Milliseconds, SECONDS => Seconds} + +trait TestBase { + + trait Done { def apply(proof: => Boolean): Unit } + + def once(body: Done => Unit): Unit = { + import java.util.concurrent.LinkedBlockingQueue + val q = new LinkedBlockingQueue[Try[Boolean]] + body(new Done { + def apply(proof: => Boolean): Unit = q offer Try(proof) + }) + var tried: Try[Boolean] = null + def check = q.poll(5000L, Milliseconds).tap(tried = _) != null + waitForIt(check, progress = Slow, label = "concurrent-tck") + assert(tried.isSuccess) + assert(tried.get) + // Check that we don't get more than one completion + assert(q.poll(50, Milliseconds) eq null) + } + + def test[T](name: String)(body: => T): T = { + println(s"starting $name") + body.tap(_ => println(s"finished $name")) + } + + def await[A](value: Awaitable[A]): A = { + def check: Option[A] = + Try(Await.result(value, Duration(500, "ms"))) match { + case Success(x) => Some(x) + case Failure(_: TimeoutException) => None + case Failure(t) => throw t + } + waitFor(check, progress = Fast, label = "concurrent-tck test result") + } +} + +class ReportingExecutionContext extends TestBase { + val progress = Fast + @volatile var thread: Thread = null + @volatile var reportedOn: Thread = null + @volatile var reported: Throwable = null + val latch = new CountDownLatch(1) + + def report(t: Thread, e: Throwable): Unit = { + reportedOn = t + reported = e + latch.countDown() + } + + def ecesUsingDefaultFactory = { + import java.util.concurrent.{ForkJoinPool} + import java.util.function.Predicate + import scala.reflect.internal.util.RichClassLoader._ + + val path = "java.util.concurrent.ForkJoinPool" + val n = 2 // parallelism + val factory = scala.concurrent.TestUtil.threadFactory(report) + val ueh: Thread.UncaughtExceptionHandler = report(_, _) + val async = true + val coreSize = 4 + val maxSize = 4 + val minRun = 1 // minimumRunnable for liveness + val saturate: Predicate[ForkJoinPool] = (fjp: ForkJoinPool) => false // whether to continue after blocking at maxSize + val keepAlive = 2000L + val fjp = this.getClass.getClassLoader.create[ForkJoinPool](path, _ => ())(n, factory, ueh, async, coreSize, maxSize, minRun, saturate, keepAlive, Milliseconds) + //ForkJoinPool(int parallelism, ForkJoinPool.ForkJoinWorkerThreadFactory factory, Thread.UncaughtExceptionHandler handler, boolean asyncMode, int corePoolSize, int maximumPoolSize, int minimumRunnable, Predicate saturate, long keepAliveTime, TimeUnit unit) + new ExecutionContextExecutorService { + // Members declared in scala.concurrent.ExecutionContext + def reportFailure(cause: Throwable): Unit = report(null, cause) + + // Members declared in java.util.concurrent.Executor + def execute(r: Runnable): Unit = fjp.execute(r) + + // Members declared in java.util.concurrent.ExecutorService + def awaitTermination(x$1: Long, x$2: java.util.concurrent.TimeUnit): Boolean = ??? + def invokeAll[T](x$1: java.util.Collection[_ <: java.util.concurrent.Callable[T]], x$2: Long, x$3: java.util.concurrent.TimeUnit): java.util.List[java.util.concurrent.Future[T]] = ??? + def invokeAll[T](x$1: java.util.Collection[_ <: java.util.concurrent.Callable[T]]): java.util.List[java.util.concurrent.Future[T]] = ??? + def invokeAny[T](x$1: java.util.Collection[_ <: java.util.concurrent.Callable[T]], x$2: Long, x$3: java.util.concurrent.TimeUnit): T = ??? + def invokeAny[T](x$1: java.util.Collection[_ <: java.util.concurrent.Callable[T]]): T = ??? + def isShutdown(): Boolean = fjp.isShutdown + def isTerminated(): Boolean = fjp.isTerminated + def shutdown(): Unit = fjp.shutdown() + def shutdownNow(): java.util.List[Runnable] = fjp.shutdownNow() + def submit(r: Runnable): java.util.concurrent.Future[_] = fjp.submit(r) + def submit[T](task: Runnable, res: T): java.util.concurrent.Future[T] = fjp.submit(task, res) + def submit[T](task: java.util.concurrent.Callable[T]): java.util.concurrent.Future[T] = fjp.submit(task) + } + } + + def testUncaughtExceptionReporting(ec: ExecutionContextExecutorService): Unit = once { + done => + val example = new InterruptedException + + @tailrec def spinForThreadDeath(turns: Int): Boolean = + turns > 0 && (thread != null && !thread.isAlive || { Thread.sleep(100L); spinForThreadDeath(turns - 1) }) + + def truthfully(b: Boolean): Option[Boolean] = if (b) Some(true) else None + + // jdk17 thread receives pool exception handler, so wait for thread to die slow and painful expired keepalive + def threadIsDead = waitFor(truthfully(spinForThreadDeath(turns = 10)), progress = progress, label = "concurrent-tck-thread-death") + + try { + ec.execute(() => { + thread = Thread.currentThread + throw example + }) + latch.await(2, Seconds) + done(threadIsDead && (example.eq(reported) || example.eq(reported.getCause))) + } + finally ec.shutdown() + } + + test("testUncaughtExceptionReporting")(testUncaughtExceptionReporting { + ecesUsingDefaultFactory + }) +} + +object Test extends App { + new ReportingExecutionContext + + System.exit(0) +} + +package scala.concurrent { + object TestUtil { + def threadFactory(uncaughtExceptionHandler: Thread.UncaughtExceptionHandler) = new impl.ExecutionContextImpl.DefaultThreadFactory(daemonic=true, maxBlockers=256, prefix="test-thread", uncaughtExceptionHandler) + } +} diff --git a/test/files/jvm/scala-concurrent-tck.check b/test/files/jvm/scala-concurrent-tck.check index 530ab05f92f..dbe69425548 100644 --- a/test/files/jvm/scala-concurrent-tck.check +++ b/test/files/jvm/scala-concurrent-tck.check @@ -134,5 +134,3 @@ starting testOnComplete finished testOnComplete starting testMap finished testMap -starting testUncaughtExceptionReporting -finished testUncaughtExceptionReporting diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala index 0af68565bcb..4bb2c3298c5 100644 --- a/test/files/jvm/scala-concurrent-tck.scala +++ b/test/files/jvm/scala-concurrent-tck.scala @@ -1,3 +1,4 @@ + import scala.concurrent.{ Future, Promise, @@ -1045,104 +1046,6 @@ class ExecutionContextPrepare extends TestBase { test("testMap")(testMap()) } -class ReportingExecutionContext extends TestBase { - final val slowly = false // true for using default FJP with long keepAlive (60 secs) - val progress = if (slowly) Slow else Fast - @volatile var thread: Thread = null - @volatile var reportedOn: Thread = null - @volatile var reported: Throwable = null - val latch = new CountDownLatch(1) - - def report(t: Thread, e: Throwable): Unit = { - reportedOn = t - reported = e - latch.countDown() - } - def underlyingPool = { - import java.util.concurrent.{LinkedBlockingQueue, RejectedExecutionHandler, ThreadFactory, ThreadPoolExecutor} - val coreSize = 4 - val maxSize = 4 - val keepAlive = 2000L - val q = new LinkedBlockingQueue[Runnable] - val factory: ThreadFactory = (r: Runnable) => new Thread(r).tap(_.setUncaughtExceptionHandler(report)) - val handler: RejectedExecutionHandler = (r: Runnable, x: ThreadPoolExecutor) => ??? - new ThreadPoolExecutor(coreSize, maxSize, keepAlive, Milliseconds, q, factory, handler) - } - def ecesFromUnderlyingPool = ExecutionContext.fromExecutorService(underlyingPool, report(null, _)) - - def ecesUsingDefaultFactory = { - import java.util.concurrent.{ForkJoinPool, RejectedExecutionHandler, ThreadPoolExecutor} - import java.util.function.Predicate - import scala.reflect.internal.util.RichClassLoader._ - - val path = "java.util.concurrent.ForkJoinPool" - val n = 2 // parallelism - val factory = scala.concurrent.TestUtil.threadFactory(report) - val ueh: Thread.UncaughtExceptionHandler = report(_, _) - val async = true - val coreSize = 4 - val maxSize = 4 - val minRun = 1 // minimumRunnable for liveness - val saturate: Predicate[ForkJoinPool] = (fjp: ForkJoinPool) => false // whether to continue after blocking at maxSize - val keepAlive = 2000L - val fjp = this.getClass.getClassLoader.create[ForkJoinPool](path, _ => ())(n, factory, ueh, async, coreSize, maxSize, minRun, saturate, keepAlive, Milliseconds) - //ForkJoinPool(int parallelism, ForkJoinPool.ForkJoinWorkerThreadFactory factory, Thread.UncaughtExceptionHandler handler, boolean asyncMode, int corePoolSize, int maximumPoolSize, int minimumRunnable, Predicate saturate, long keepAliveTime, TimeUnit unit) - new ExecutionContextExecutorService { - // Members declared in scala.concurrent.ExecutionContext - def reportFailure(cause: Throwable): Unit = report(null, cause) - - // Members declared in java.util.concurrent.Executor - def execute(r: Runnable): Unit = fjp.execute(r) - - // Members declared in java.util.concurrent.ExecutorService - def awaitTermination(x$1: Long, x$2: java.util.concurrent.TimeUnit): Boolean = ??? - def invokeAll[T](x$1: java.util.Collection[_ <: java.util.concurrent.Callable[T]], x$2: Long, x$3: java.util.concurrent.TimeUnit): java.util.List[java.util.concurrent.Future[T]] = ??? - def invokeAll[T](x$1: java.util.Collection[_ <: java.util.concurrent.Callable[T]]): java.util.List[java.util.concurrent.Future[T]] = ??? - def invokeAny[T](x$1: java.util.Collection[_ <: java.util.concurrent.Callable[T]], x$2: Long, x$3: java.util.concurrent.TimeUnit): T = ??? - def invokeAny[T](x$1: java.util.Collection[_ <: java.util.concurrent.Callable[T]]): T = ??? - def isShutdown(): Boolean = fjp.isShutdown - def isTerminated(): Boolean = fjp.isTerminated - def shutdown(): Unit = fjp.shutdown() - def shutdownNow(): java.util.List[Runnable] = fjp.shutdownNow() - def submit(r: Runnable): java.util.concurrent.Future[_] = fjp.submit(r) - def submit[T](task: Runnable, res: T): java.util.concurrent.Future[T] = fjp.submit(task, res) - def submit[T](task: java.util.concurrent.Callable[T]): java.util.concurrent.Future[T] = fjp.submit(task) - } - } - - def ecesUsingDefaultFJP = ExecutionContext.fromExecutorService(null, report(null, _)) - - def testUncaughtExceptionReporting(ec: ExecutionContextExecutorService): Unit = once { - done => - val example = new InterruptedException - - @tailrec def spinForThreadDeath(turns: Int): Boolean = - turns > 0 && (thread != null && !thread.isAlive || { Thread.sleep(100L); spinForThreadDeath(turns - 1) }) - - def truthfully(b: Boolean): Option[Boolean] = if (b) Some(true) else None - - // jdk17 thread receives pool exception handler, so wait for thread to die slow and painful expired keepalive - def threadIsDead = waitFor(truthfully(spinForThreadDeath(turns = 10)), progress = progress, label = "concurrent-tck-thread-death") - - try { - ec.execute(() => { - thread = Thread.currentThread - throw example - }) - latch.await(2, Seconds) - done(threadIsDead && (example.eq(reported) || example.eq(reported.getCause))) - } - finally ec.shutdown() - } - - test("testUncaughtExceptionReporting")(testUncaughtExceptionReporting { - import scala.util.Properties.isJavaAtLeast - if (slowly) ecesUsingDefaultFJP - else if (isJavaAtLeast(9)) ecesUsingDefaultFactory - else ecesFromUnderlyingPool - }) -} - object Test extends App { new FutureCallbacks @@ -1155,13 +1058,6 @@ extends App { new GlobalExecutionContext new CustomExecutionContext new ExecutionContextPrepare - new ReportingExecutionContext System.exit(0) } - -package scala.concurrent { - object TestUtil { - def threadFactory(uncaughtExceptionHandler: Thread.UncaughtExceptionHandler) = new impl.ExecutionContextImpl.DefaultThreadFactory(daemonic=true, maxBlockers=256, prefix="test-thread", uncaughtExceptionHandler) - } -}