diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 01d8cae..f93acaa 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -75,11 +75,11 @@ jobs: - name: Make target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: mkdir -p util/target target .rootFinagle/target effect/target scalafix/input/target effect3/target finagle/target .finagle-core/target scalafix/rules/target scalafix/tests/target benchmark/target project/target + run: mkdir -p util/target target .rootFinagle/target scalafix/input/target effect3/target finagle/target .finagle-core/target scalafix/rules/target scalafix/tests/target benchmark/target project/target - name: Compress target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: tar cf targets.tar util/target target .rootFinagle/target effect/target scalafix/input/target effect3/target finagle/target .finagle-core/target scalafix/rules/target scalafix/tests/target benchmark/target project/target + run: tar cf targets.tar util/target target .rootFinagle/target scalafix/input/target effect3/target finagle/target .finagle-core/target scalafix/rules/target scalafix/tests/target benchmark/target project/target - name: Upload target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') diff --git a/build.sbt b/build.sbt index ee2a834..760a470 100644 --- a/build.sbt +++ b/build.sbt @@ -72,8 +72,8 @@ lazy val root = project |import org.typelevel.catbird.util._ """.stripMargin ) - .aggregate(util, effect, effect3, finagle, benchmark, `scalafix-rules`, `scalafix-tests`, FinaglePlugin.rootFinagle) - .dependsOn(util, effect, finagle) + .aggregate(util, effect3, finagle, benchmark, `scalafix-rules`, `scalafix-tests`, FinaglePlugin.rootFinagle) + .dependsOn(util, finagle) lazy val util = project .settings(moduleName := "catbird-util") @@ -85,20 +85,6 @@ lazy val util = project } ) -lazy val effect = project - .settings(moduleName := "catbird-effect") - .settings(allSettings) - .settings( - libraryDependencies ++= Seq( - "org.typelevel" %% "cats-effect" % catsEffectVersion, - "org.typelevel" %% "cats-effect-laws" % catsEffectVersion % Test - ), - Test / scalacOptions ~= { - _.filterNot(Set("-Yno-imports", "-Yno-predef")) - } - ) - .dependsOn(util, util % "test->test") - lazy val effect3 = project .in(file("effect3")) .settings(moduleName := "catbird-effect3") diff --git a/effect/src/main/scala/org/typelevel/catbird/util/effect/FutureInstances.scala b/effect/src/main/scala/org/typelevel/catbird/util/effect/FutureInstances.scala deleted file mode 100644 index 4463a77..0000000 --- a/effect/src/main/scala/org/typelevel/catbird/util/effect/FutureInstances.scala +++ /dev/null @@ -1,22 +0,0 @@ -package org.typelevel.catbird.util.effect - -import cats.effect.{ Bracket, ExitCase } -import com.twitter.util.{ Future, Monitor } -import org.typelevel.catbird.util.FutureMonadError -import java.lang.Throwable -import scala.Unit - -trait FutureInstances { - implicit final val futureBracketInstance: Bracket[Future, Throwable] = - new FutureMonadError with Bracket[Future, Throwable] { - final def bracketCase[A, B](acquire: Future[A])(use: A => Future[B])( - release: (A, ExitCase[Throwable]) => Future[Unit] - ): Future[B] = - acquire - .flatMap(a => - use(a).liftToTry - .flatMap(result => release(a, tryToExitCase(result)).handle(Monitor.catcher).map(_ => result)) - ) - .lowerFromTry - } -} diff --git a/effect/src/main/scala/org/typelevel/catbird/util/effect/RerunnableClock.scala b/effect/src/main/scala/org/typelevel/catbird/util/effect/RerunnableClock.scala deleted file mode 100644 index b8e96f8..0000000 --- a/effect/src/main/scala/org/typelevel/catbird/util/effect/RerunnableClock.scala +++ /dev/null @@ -1,24 +0,0 @@ -package org.typelevel.catbird.util.effect - -import java.util.concurrent.TimeUnit - -import cats.effect.Clock -import org.typelevel.catbird.util.Rerunnable -import scala.Long -import java.lang.System - -import scala.concurrent.duration.TimeUnit - -object RerunnableClock { - - def apply(): RerunnableClock = new RerunnableClock -} - -final private[effect] class RerunnableClock extends Clock[Rerunnable] { - - override def realTime(unit: TimeUnit): Rerunnable[Long] = - Rerunnable(unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS)) - - override def monotonic(unit: TimeUnit): Rerunnable[Long] = - Rerunnable(unit.convert(System.nanoTime(), TimeUnit.NANOSECONDS)) -} diff --git a/effect/src/main/scala/org/typelevel/catbird/util/effect/RerunnableContextShift.scala b/effect/src/main/scala/org/typelevel/catbird/util/effect/RerunnableContextShift.scala deleted file mode 100644 index 800a305..0000000 --- a/effect/src/main/scala/org/typelevel/catbird/util/effect/RerunnableContextShift.scala +++ /dev/null @@ -1,73 +0,0 @@ -package org.typelevel.catbird.util.effect - -import cats.effect.ContextShift -import com.twitter.util.{ Future, FuturePool, Promise } -import org.typelevel.catbird.util.Rerunnable - -import scala.Unit -import java.lang.Runnable -import java.util.concurrent.ExecutorService - -import scala.concurrent.{ ExecutionContext, ExecutionContextExecutorService } - -/** - * The goal here is to provide an implicit instance for `ContextShift[Rerunnable]`, so you can use libraries like `fs2` - * in a finagle-based application without converting between `Future` and `IO` everywhere. - * - * Usage: - * {{{ - * implicit val rerunnableCS: ContextShift[Rerunnable] = RerunnableContextShift.global - * }}} - */ -object RerunnableContextShift { - - final def fromExecutionContext(ec: ExecutionContext): ContextShift[Rerunnable] = - new RerunnableContextShift(ec) - - final def fromExecutorService(es: ExecutorService): ContextShift[Rerunnable] = - fromExecutionContext(ExecutionContext.fromExecutorService(es)) - - final def fromExecutionContextExecutorService(eces: ExecutionContextExecutorService): ContextShift[Rerunnable] = - fromExecutorService(eces) - - final lazy val global: ContextShift[Rerunnable] = - fromExecutionContext(scala.concurrent.ExecutionContext.global) - - /** - * Mirrors the api of `scala.concurrent.ExecutionContext.Implicit.global`. - * - * Usage: - * {{{ - * import org.typelevel.catbird.util.effect.RerunnableContextShift.Implicits.global - * }}} - */ - object Implicits { - final implicit def global: ContextShift[Rerunnable] = RerunnableContextShift.global - } -} - -final private[effect] class RerunnableContextShift private (ec: ExecutionContext) extends ContextShift[Rerunnable] { - private final lazy val futurePool = FuturePool.interruptible(ec.asInstanceOf[ExecutionContextExecutorService]) - - override def shift: Rerunnable[Unit] = - Rerunnable.withFuturePool(futurePool)(()) // This is a bit of a hack, but it will have to do - - override def evalOn[A](targetEc: ExecutionContext)(fa: Rerunnable[A]): Rerunnable[A] = - for { - r <- executeOn(targetEc)(fa).liftToTry - _ <- shift - a <- Rerunnable.fromFuture(Future.value(r).lowerFromTry) - } yield a - - private def executeOn[A](targetEc: ExecutionContext)(fa: Rerunnable[A]): Rerunnable[A] = - Rerunnable.fromFuture { - val p = Promise[A]() - - targetEc.execute(new Runnable { - override def run(): Unit = - fa.run.proxyTo[A](p) - }) - - p - } -} diff --git a/effect/src/main/scala/org/typelevel/catbird/util/effect/RerunnableInstances.scala b/effect/src/main/scala/org/typelevel/catbird/util/effect/RerunnableInstances.scala deleted file mode 100644 index 935798c..0000000 --- a/effect/src/main/scala/org/typelevel/catbird/util/effect/RerunnableInstances.scala +++ /dev/null @@ -1,68 +0,0 @@ -package org.typelevel.catbird.util.effect - -import cats.effect.{ Effect, ExitCase, IO, SyncIO } -import com.twitter.util.{ Future, Monitor, Promise } -import org.typelevel.catbird.util.{ Rerunnable, RerunnableMonadError } -import java.lang.Throwable -import scala.Unit -import scala.util.{ Either, Left, Right } - -trait RerunnableInstances { - implicit final val rerunnableEffectInstance: Effect[Rerunnable] = - new RerunnableMonadError with Effect[Rerunnable] { - final def suspend[A](thunk: => Rerunnable[A]): Rerunnable[A] = Rerunnable.suspend[A](thunk) - - override final def delay[A](thunk: => A): Rerunnable[A] = Rerunnable[A](thunk) - - final def async[A](k: (Either[Throwable, A] => Unit) => Unit): Rerunnable[A] = - new Rerunnable[A] { - final def run: Future[A] = { - val promise = new Promise[A] - - k { e => - if (promise.isDefined) () - else - e match { - case Right(a) => promise.setValue(a) - case Left(err) => promise.setException(err) - } - } - - promise - } - } - - final def asyncF[A](k: (Either[Throwable, A] => Unit) => Rerunnable[Unit]): Rerunnable[A] = - new Rerunnable[A] { - final def run: Future[A] = { - val promise = new Promise[A] - - val rerunnable = k { e => - if (promise.isDefined) () - else - e match { - case Right(a) => promise.setValue(a) - case Left(err) => promise.setException(err) - } - } - - rerunnable.run.flatMap(_ => promise) - } - } - - final def runAsync[A](fa: Rerunnable[A])(cb: Either[Throwable, A] => IO[Unit]): SyncIO[Unit] = - rerunnableToIO[A](fa).runAsync(cb) - - final def bracketCase[A, B](acquire: Rerunnable[A])(use: A => Rerunnable[B])( - release: (A, ExitCase[Throwable]) => Rerunnable[Unit] - ): Rerunnable[B] = new Rerunnable[B] { - final def run: Future[B] = - acquire.run.flatMap { a => - val future = use(a).run - future.transform(result => - release(a, tryToExitCase(result)).run.handle(Monitor.catcher).flatMap(_ => future) - ) - } - } - } -} diff --git a/effect/src/main/scala/org/typelevel/catbird/util/effect/RerunnableTimer.scala b/effect/src/main/scala/org/typelevel/catbird/util/effect/RerunnableTimer.scala deleted file mode 100644 index d7a2a9b..0000000 --- a/effect/src/main/scala/org/typelevel/catbird/util/effect/RerunnableTimer.scala +++ /dev/null @@ -1,46 +0,0 @@ -package org.typelevel.catbird.util.effect - -import cats.effect.{ Clock, Timer } -import org.typelevel.catbird.util.Rerunnable -import com.twitter.util.Future -import com.twitter.util -import scala.Unit - -import scala.concurrent.duration.FiniteDuration - -/** - * Can be used to construct a `cats.effect.Timer` instance for `Rerunnable` which let's you delay execution or retrieve - * the current time via `RerunnableClock`. - * - * Usage: - * {{{ - * // In a Finagle application - * implicit val timer: Timer[Rerunnable] = RerunnableTimer(com.twitter.finagle.util.DefaultTimer) - * - * // In tests (for instant execution of delays) - * implicit val timer: Timer[Rerunnable] = RerunnableTimer(com.twitter.util.Timer.Nil) - * - * // A dedicated `JavaTimer` - * implicit val timer: Timer[Rerunnable] = RerunnableTimer() - * }}} - */ -object RerunnableTimer { - - def apply(implicit twitterTimer: util.Timer): RerunnableTimer = new RerunnableTimer - - def apply(): RerunnableTimer = { - implicit val twitterTimer: util.Timer = new util.JavaTimer - - new RerunnableTimer - } -} - -final private[effect] class RerunnableTimer private (implicit underlyingTimer: util.Timer) extends Timer[Rerunnable] { - - override val clock: Clock[Rerunnable] = RerunnableClock() - - override def sleep(duration: FiniteDuration): Rerunnable[Unit] = - Rerunnable.fromFuture( - Future.Unit.delayed(util.Duration.fromNanoseconds(duration.toNanos)) - ) -} diff --git a/effect/src/main/scala/org/typelevel/catbird/util/effect/package.scala b/effect/src/main/scala/org/typelevel/catbird/util/effect/package.scala deleted file mode 100644 index f17ccd3..0000000 --- a/effect/src/main/scala/org/typelevel/catbird/util/effect/package.scala +++ /dev/null @@ -1,55 +0,0 @@ -package org.typelevel.catbird.util - -import cats.effect.{ Async, ContextShift, ExitCase, IO } -import com.twitter.util.{ Future, Return, Throw, Try } -import java.lang.Throwable - -import scala.util.{ Left, Right } - -package object effect extends FutureInstances with RerunnableInstances { - - /** - * Converts the `Future` to `F` without changing the underlying execution (same thread pool!). - */ - def futureToAsync[F[_], A](fa: => Future[A])(implicit F: Async[F]): F[A] = F.async { k => - fa.respond { - case Return(a) => k(Right[Throwable, A](a)) - case Throw(err) => k(Left[Throwable, A](err)) - } - - () - } - - /** - * The same as `futureToAsync` but doesn't stay on the thread pool of the `Future` and instead shifts execution back - * to the one provided by `ContextShift[F]` (which is usually the default one). - * - * This is likely what you want when you interact with libraries that return a `Future` like `finagle-http` where the - * `Future` is running on a thread pool controlled by the library (e.g. the underlying Netty pool). It also is closer - * to the behavior of `IO.fromFuture` for Scala futures which also shifts back. - */ - def futureToAsyncAndShift[F[_], A](fa: => Future[A])(implicit F: Async[F], CS: ContextShift[F]): F[A] = - F.guarantee(futureToAsync[F, A](fa))(CS.shift) - - /** - * Converts the `Rerunnable` to `F` without changing the underlying execution (same thread pool!). - */ - final def rerunnableToIO[A](fa: Rerunnable[A]): IO[A] = - futureToAsync[IO, A](fa.run) - - /** - * The same as `rerunnableToIO` but doesn't stay on the thread pool of the `Rerunnable` and instead shifts execution - * back to the one provided by `ContextShift[F]` (which is usually the default one). - */ - final def rerunnableToIOAndShift[A](fa: Rerunnable[A])(implicit CS: ContextShift[IO]): IO[A] = - futureToAsyncAndShift[IO, A](fa.run) - - /** - * Convert a twitter-util Try to cats-effect ExitCase - */ - final def tryToExitCase[A](ta: Try[A]): ExitCase[Throwable] = - ta match { - case Return(_) => ExitCase.complete - case Throw(e) => ExitCase.error(e) - } -} diff --git a/effect/src/test/scala/org/typelevel/catbird/util/effect/ContextShiftingSuite.scala b/effect/src/test/scala/org/typelevel/catbird/util/effect/ContextShiftingSuite.scala deleted file mode 100644 index 5f40646..0000000 --- a/effect/src/test/scala/org/typelevel/catbird/util/effect/ContextShiftingSuite.scala +++ /dev/null @@ -1,64 +0,0 @@ -package org.typelevel.catbird.util.effect - -import cats.effect.{ ContextShift, IO } -import com.twitter.util.{ ExecutorServiceFuturePool, Future, FuturePool } -import org.scalatest.Outcome -import org.scalatest.funsuite.FixtureAnyFunSuite - -import scala.concurrent.ExecutionContext - -class ContextShiftingSuite extends FixtureAnyFunSuite with ThreadPoolNamingSupport { - - protected final class FixtureParam { - val ioPoolName = "io-pool" - val futurePoolName = "future-pool" - - val ioPool = newNamedThreadPool(ioPoolName) - - val futurePool: ExecutorServiceFuturePool = // threadpool of Future (often managed by a library like finagle-http) - FuturePool(newNamedThreadPool(futurePoolName)) - - def newIO: IO[String] = IO(currentThreadName()) - - def newFuture: Future[String] = futurePool.apply { - // Not 100% sure why but this sleep is needed to reproduce the error. There might be an optimization if the - // Future is already resolved at some point - Thread.sleep(200) - currentThreadName() - } - } - - test("After resolving the Future with futureToAsync stay on the Future threadpool") { f => - implicit val contextShift: ContextShift[IO] = // threadpool of IO (often provided by IOApp) - IO.contextShift(ExecutionContext.fromExecutor(f.ioPool)) - - val (futurePoolName, ioPoolName) = (for { - futurePoolName <- futureToAsync[IO, String](f.newFuture) - - ioPoolName <- f.newIO - } yield (futurePoolName, ioPoolName)).start(contextShift).flatMap(_.join).unsafeRunSync() - - assert(futurePoolName == f.futurePoolName) - assert(ioPoolName == f.futurePoolName) // Uh oh, this is likely not what the user wants - } - - test("After resolving the Future with futureToAsyncAndShift shift back to the threadpool of ContextShift[F]") { f => - implicit val contextShift: ContextShift[IO] = // threadpool of IO (often provided by IOApp) - IO.contextShift(ExecutionContext.fromExecutor(f.ioPool)) - - // If you'd use `futureToAsync` here instead, this whole thing would sometimes stay on the future-pool - val (futurePoolName, ioPoolName) = (for { - futurePoolName <- futureToAsyncAndShift[IO, String](f.newFuture) - - ioPoolName <- f.newIO - } yield (futurePoolName, ioPoolName)) - .start(contextShift) // start the computation on the default threadpool... - .flatMap(_.join) // ...then block until we have the results - .unsafeRunSync() - - assert(futurePoolName == f.futurePoolName) - assert(ioPoolName == f.ioPoolName) - } - - override protected def withFixture(test: OneArgTest): Outcome = withFixture(test.toNoArgTest(new FixtureParam)) -} diff --git a/effect/src/test/scala/org/typelevel/catbird/util/effect/FutureSuite.scala b/effect/src/test/scala/org/typelevel/catbird/util/effect/FutureSuite.scala deleted file mode 100644 index c85296f..0000000 --- a/effect/src/test/scala/org/typelevel/catbird/util/effect/FutureSuite.scala +++ /dev/null @@ -1,29 +0,0 @@ -package org.typelevel.catbird.util.effect - -import cats.Eq -import cats.effect.laws.discipline.BracketTests -import cats.effect.laws.util.{ TestContext, TestInstances } -import cats.instances.either._ -import cats.instances.int._ -import cats.instances.tuple._ -import cats.instances.unit._ -import cats.laws.discipline.arbitrary._ -import com.twitter.conversions.DurationOps._ -import com.twitter.util.Future -import org.typelevel.catbird.util.{ ArbitraryInstances, futureEqWithFailure } -import org.scalatest.funsuite.AnyFunSuite -import org.scalatest.prop.Configuration -import org.typelevel.discipline.scalatest.FunSuiteDiscipline - -class FutureSuite - extends AnyFunSuite - with FunSuiteDiscipline - with Configuration - with ArbitraryInstances - with TestInstances { - implicit val context: TestContext = TestContext() - implicit def futureEq[A](implicit A: Eq[A]): Eq[Future[A]] = - futureEqWithFailure(1.seconds) - - checkAll("Future[Int]", BracketTests[Future, Throwable].bracket[Int, Int, Int]) -} diff --git a/effect/src/test/scala/org/typelevel/catbird/util/effect/RerunnableClockSuite.scala b/effect/src/test/scala/org/typelevel/catbird/util/effect/RerunnableClockSuite.scala deleted file mode 100644 index 9b20f75..0000000 --- a/effect/src/test/scala/org/typelevel/catbird/util/effect/RerunnableClockSuite.scala +++ /dev/null @@ -1,50 +0,0 @@ -package org.typelevel.catbird.util.effect - -import java.time.Instant -import java.util.concurrent.TimeUnit - -import cats.effect.Clock -import com.twitter.util.Await -import org.typelevel.catbird.util.Rerunnable -import org.scalatest.Outcome -import org.scalatest.concurrent.Eventually -import org.scalatest.funsuite.FixtureAnyFunSuite - -/** - * We'll use `eventually` and a reasonably big tolerance here to prevent CI from failing if it is a bit slow. - * - * Technically the implementation is just an extremely thin wrapper around `System.currentTimeMillis()` and - * `System.nanoTime()` so as long as the result is the same order of magnitude (and therefore the unit-conversion is - * correct) we should be fine. - */ -class RerunnableClockSuite extends FixtureAnyFunSuite with Eventually { - - protected final class FixtureParam { - def now: Instant = Instant.now() - - val clock: Clock[Rerunnable] = RerunnableClock() - } - - test("Retrieval of real time") { f => - eventually { - val result = Await.result( - f.clock.realTime(TimeUnit.MILLISECONDS).map(Instant.ofEpochMilli).run - ) - - assert(java.time.Duration.between(result, f.now).abs().toMillis < 50) - } - } - - test("Retrieval of monotonic time") { f => - eventually { - val result = Await.result( - f.clock.monotonic(TimeUnit.NANOSECONDS).run - ) - - val durationBetween = Math.abs(System.nanoTime() - result) - assert(TimeUnit.MILLISECONDS.convert(durationBetween, TimeUnit.NANOSECONDS) < 5) - } - } - - override protected def withFixture(test: OneArgTest): Outcome = withFixture(test.toNoArgTest(new FixtureParam)) -} diff --git a/effect/src/test/scala/org/typelevel/catbird/util/effect/RerunnableContextShiftSuite.scala b/effect/src/test/scala/org/typelevel/catbird/util/effect/RerunnableContextShiftSuite.scala deleted file mode 100644 index bf75f2a..0000000 --- a/effect/src/test/scala/org/typelevel/catbird/util/effect/RerunnableContextShiftSuite.scala +++ /dev/null @@ -1,89 +0,0 @@ -package org.typelevel.catbird.util.effect - -import cats.effect.{ ContextShift, IO, Sync } -import com.twitter.util.{ Await, Future, FuturePool } -import org.typelevel.catbird.util.Rerunnable -import org.scalatest.Outcome -import org.scalatest.funsuite.FixtureAnyFunSuite - -class RerunnableContextShiftSuite extends FixtureAnyFunSuite with ThreadPoolNamingSupport { - - protected final class FixtureParam { - val futurePoolName = "future-pool" - val otherPoolName = "other-pool" - val ioPoolName = "io-pool" - - val futurePool = FuturePool.interruptible(newNamedThreadPool(futurePoolName)) - val otherPool = newNamedThreadPool(otherPoolName) - val ioPool = newNamedThreadPool(ioPoolName) - - def newIO: IO[String] = IO(currentThreadName()) - - def newFuture: Future[String] = futurePool(currentThreadName()) - - def newRerunnable: Rerunnable[String] = Rerunnable(currentThreadName()) - } - - test("ContextShift[Rerunnable].shift shifts to the pool of the instance") { f => - implicit val cs: ContextShift[Rerunnable] = - RerunnableContextShift.fromExecutionContext(f.ioPool) - - val (poolName1, poolName2, poolName3) = - (for { - poolName1 <- Rerunnable.fromFuture(f.newFuture) - - _ <- ContextShift[Rerunnable](cs).shift - - poolName2 <- Sync[Rerunnable].delay(currentThreadName()) - - poolName3 <- Rerunnable.fromFuture(f.newFuture) - } yield (poolName1, poolName2, poolName3)).run.await - - assert(poolName1 == f.futurePoolName) - assert(poolName2 == f.ioPoolName) - assert(poolName2 == f.ioPoolName) - assert(poolName3 == f.futurePoolName) - } - - test("ContextShift[Rerunnable].evalOn executes on correct pool and shifts back to previous pool") { f => - implicit val cs: ContextShift[Rerunnable] = - RerunnableContextShift.fromExecutionContext(f.ioPool) - - val (poolName1, poolName2, poolName3) = - (for { - poolName1 <- f.newRerunnable - - poolName2 <- ContextShift[Rerunnable].evalOn(f.otherPool)(f.newRerunnable) - - poolName3 <- f.newRerunnable - } yield (poolName1, poolName2, poolName3)).run.await - - assert(poolName1 == currentThreadName()) // The first rerunnable is not explicitly evaluated on a dedicated pool - assert(poolName2 == f.otherPoolName) - assert(poolName3 == f.ioPoolName) - } - - test("ContextShift[Rerunnable].evalOn executes on correct pool and shifts back to future pool") { f => - implicit val cs: ContextShift[Rerunnable] = - RerunnableContextShift.fromExecutionContext(f.ioPool) - - val (poolName1, poolName2, poolName3) = - (for { - poolName1 <- Rerunnable.fromFuture(f.newFuture) // The future was started on a dedicated pool (e.g. netty) - - poolName2 <- ContextShift[Rerunnable].evalOn(f.otherPool)(f.newRerunnable) - - poolName3 <- Rerunnable.fromFuture(f.newFuture) - } yield (poolName1, poolName2, poolName3)).run.await - - assert(poolName1 == f.futurePoolName) - assert(poolName2 == f.otherPoolName) - assert(poolName3 == f.futurePoolName) - } - - implicit private class FutureAwaitOps[A](future: Future[A]) { - def await: A = Await.result(future) - } - - override protected def withFixture(test: OneArgTest): Outcome = withFixture(test.toNoArgTest(new FixtureParam)) -} diff --git a/effect/src/test/scala/org/typelevel/catbird/util/effect/RerunnableSuite.scala b/effect/src/test/scala/org/typelevel/catbird/util/effect/RerunnableSuite.scala deleted file mode 100644 index 8747057..0000000 --- a/effect/src/test/scala/org/typelevel/catbird/util/effect/RerunnableSuite.scala +++ /dev/null @@ -1,47 +0,0 @@ -package org.typelevel.catbird.util.effect - -import cats.effect.laws.discipline.EffectTests -import cats.effect.laws.discipline.arbitrary.catsEffectLawsArbitraryForIO -import cats.effect.laws.util.{ TestContext, TestInstances } -import cats.effect.{ Bracket, IO } -import cats.instances.either._ -import cats.instances.int._ -import cats.instances.tuple._ -import cats.instances.unit._ -import cats.kernel.Eq -import cats.laws.discipline.arbitrary._ -import com.twitter.util.{ Await, Monitor, Throw } -import org.typelevel.catbird.util.{ ArbitraryInstances, Rerunnable } -import org.scalatest.funsuite.AnyFunSuite -import org.scalatest.prop.Configuration -import org.typelevel.discipline.scalatest.FunSuiteDiscipline - -class RerunnableSuite - extends AnyFunSuite - with FunSuiteDiscipline - with Configuration - with ArbitraryInstances - with TestInstances { - implicit val context: TestContext = TestContext() - implicit def rerunnableEq[A](implicit A: Eq[A]): Eq[Rerunnable[A]] = - Eq.by[Rerunnable[A], IO[A]](rerunnableToIO) - - checkAll("Rerunnable[Int]", EffectTests[Rerunnable].effect[Int, Int, Int]) - - test("Exceptions thrown by release are handled by Monitor") { - val useException = new Exception("thrown by use") - val releaseException = new Exception("thrown by release") - - var monitoredException: Throwable = null - val monitor = Monitor.mk { case e => monitoredException = e; true; } - - val rerunnable = Bracket[Rerunnable, Throwable] - .bracket(Rerunnable.Unit)(_ => Rerunnable.raiseError(useException))(_ => Rerunnable.raiseError(releaseException)) - .liftToTry - - val result = Await.result(Monitor.using(monitor)(rerunnable.run)) - - assert(result == Throw(useException)) - assert(monitoredException == releaseException) - } -} diff --git a/effect/src/test/scala/org/typelevel/catbird/util/effect/RerunnableTimerSuite.scala b/effect/src/test/scala/org/typelevel/catbird/util/effect/RerunnableTimerSuite.scala deleted file mode 100644 index 433e618..0000000 --- a/effect/src/test/scala/org/typelevel/catbird/util/effect/RerunnableTimerSuite.scala +++ /dev/null @@ -1,37 +0,0 @@ -package org.typelevel.catbird.util.effect - -import cats.effect.Timer -import org.scalatest.Outcome -import org.scalatest.funsuite.FixtureAnyFunSuite -import com.twitter.util -import com.twitter.util.{ Await, Future } -import org.typelevel.catbird.util.Rerunnable - -import scala.concurrent.duration._ - -class RerunnableTimerSuite extends FixtureAnyFunSuite { - - protected final class FixtureParam { - val twitterTimer: util.Timer = new util.JavaTimer - } - - test("A timer can be used to delay execution") { f => - implicit val timer: Timer[Rerunnable] = RerunnableTimer(f.twitterTimer) - - val result = Await.result( - Future.selectIndex( - Vector( - for { - _ <- Timer[Rerunnable].sleep(100.milliseconds).run - r <- Future.value("slow") - } yield r, - Future.value("fast").delayed(util.Duration.fromMilliseconds(50))(f.twitterTimer) - ) - ) - ) - - assert(result == 1) // The first future is delayed for longer, so we expect the second one to win - } - - override protected def withFixture(test: OneArgTest): Outcome = withFixture(test.toNoArgTest(new FixtureParam)) -} diff --git a/effect/src/test/scala/org/typelevel/catbird/util/effect/ThreadPoolNamingSupport.scala b/effect/src/test/scala/org/typelevel/catbird/util/effect/ThreadPoolNamingSupport.scala deleted file mode 100644 index 11c30f5..0000000 --- a/effect/src/test/scala/org/typelevel/catbird/util/effect/ThreadPoolNamingSupport.scala +++ /dev/null @@ -1,23 +0,0 @@ -package org.typelevel.catbird.util.effect - -import java.lang.{ Runnable, Thread } -import java.util.concurrent.{ Executors, ThreadFactory } - -import scala.concurrent.{ ExecutionContext, ExecutionContextExecutorService } - -trait ThreadPoolNamingSupport { - - def newNamedThreadPool(name: String): ExecutionContextExecutorService = - ExecutionContext.fromExecutorService( - Executors.newSingleThreadExecutor(new ThreadFactory { - override def newThread(r: Runnable): Thread = { - val thread = Executors.defaultThreadFactory().newThread(r) - thread.setName(name) - thread.setDaemon(true) // Don't block shutdown of JVM - thread - } - }) - ) - - def currentThreadName(): String = Thread.currentThread().getName -}