From 0e0ca96cfd4c4c6fcdc993352d7d6c1343547f50 Mon Sep 17 00:00:00 2001 From: zainab-ali Date: Mon, 13 May 2024 14:07:15 +0100 Subject: [PATCH] Use a LinkedBlockingQueue instead of polling. --- modules/framework/jvm/src/main/scala/RunnerCompat.scala | 6 +++--- modules/framework/jvm/src/main/scala/SbtTask.scala | 5 +++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/modules/framework/jvm/src/main/scala/RunnerCompat.scala b/modules/framework/jvm/src/main/scala/RunnerCompat.scala index d4834c5..653d0ec 100644 --- a/modules/framework/jvm/src/main/scala/RunnerCompat.scala +++ b/modules/framework/jvm/src/main/scala/RunnerCompat.scala @@ -3,7 +3,6 @@ package framework import org.typelevel.scalaccompat.annotation.unused import java.io.PrintStream -import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger } import scala.concurrent.duration._ @@ -16,6 +15,7 @@ import cats.effect.{ Ref, Sync, _ } import cats.syntax.all._ import sbt.testing.{ Task, TaskDef } +import java.util.concurrent.LinkedBlockingQueue trait RunnerCompat[F[_]] { self: sbt.testing.Runner => @@ -73,7 +73,7 @@ trait RunnerCompat[F[_]] { self: sbt.testing.Runner => // dispatching logs through a single logger at a time. val loggerPermit = new java.util.concurrent.Semaphore(1, true) - val queue = new ConcurrentLinkedQueue[SuiteEvent]() + val queue = new LinkedBlockingQueue[SuiteEvent]() val broker = new ConcurrentQueueEventBroker(queue) val startingBlock = unsafeRun.fromFuture { promise.future.map(_ => ())(ExecutionContext.global) @@ -252,7 +252,7 @@ trait RunnerCompat[F[_]] { self: sbt.testing.Runner => } class ConcurrentQueueEventBroker( - concurrentQueue: ConcurrentLinkedQueue[SuiteEvent]) + concurrentQueue: LinkedBlockingQueue[SuiteEvent]) extends SuiteEventBroker { def send(suiteEvent: SuiteEvent): F[Unit] = { Sync[F].delay(concurrentQueue.add(suiteEvent)).void diff --git a/modules/framework/jvm/src/main/scala/SbtTask.scala b/modules/framework/jvm/src/main/scala/SbtTask.scala index e36479b..3e0d3e6 100644 --- a/modules/framework/jvm/src/main/scala/SbtTask.scala +++ b/modules/framework/jvm/src/main/scala/SbtTask.scala @@ -6,6 +6,7 @@ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger } import cats.data.Chain import sbt.testing.{ Event, EventHandler, Logger, Task, TaskDef } +import java.util.concurrent.TimeUnit private[framework] class SbtTask( val taskDef: TaskDef, @@ -13,7 +14,7 @@ private[framework] class SbtTask( stillRunning: AtomicInteger, waitForResourcesShutdown: java.util.concurrent.Semaphore, start: scala.concurrent.Promise[Unit], - queue: java.util.concurrent.ConcurrentLinkedQueue[SuiteEvent], + queue: java.util.concurrent.LinkedBlockingQueue[SuiteEvent], loggerPermit: java.util.concurrent.Semaphore, readFailed: () => Chain[(SuiteName, TestOutcome)] ) extends Task { @@ -30,7 +31,7 @@ private[framework] class SbtTask( loggerPermit.acquire() try { while (!finished && !isDone.get()) { - val nextEvent = Option(queue.poll()) + val nextEvent = Option(queue.poll(1, TimeUnit.SECONDS)) nextEvent.foreach { case s @ SuiteStarted(_) => log(s)