Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

cats-effect 3.1.x for Rerunnable -> Sync + Clock + MonadCancel #267

Merged
merged 7 commits into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 22 additions & 2 deletions build.sbt
@@ -1,5 +1,9 @@
val catsVersion = "2.6.1"

// For the transition period, we publish artifacts for both cats-effect 2.x and 3.x
val catsEffectVersion = "2.5.1"
val catsEffect3Version = "3.1.1"

val utilVersion = "21.5.0"
val finagleVersion = "21.5.0"

Expand Down Expand Up @@ -64,7 +68,7 @@ lazy val root = project
.enablePlugins(GhpagesPlugin, ScalaUnidocPlugin)
.settings(allSettings ++ noPublishSettings)
.settings(
(ScalaUnidoc / unidoc / unidocProjectFilter) := inAnyProject -- inProjects(benchmark),
(ScalaUnidoc / unidoc / unidocProjectFilter) := inAnyProject -- inProjects(benchmark, effect3),
addMappingsToSiteDir((ScalaUnidoc / packageDoc / mappings), docMappingsApiDir),
git.remoteRepo := "git@github.com:travisbrown/catbird.git"
)
Expand All @@ -77,7 +81,7 @@ lazy val root = project
|import io.catbird.util._
""".stripMargin
)
.aggregate(util, effect, finagle, benchmark)
.aggregate(util, effect, effect3, finagle, benchmark)
.dependsOn(util, effect, finagle)

lazy val util = project
Expand All @@ -104,6 +108,22 @@ lazy val effect = project
)
.dependsOn(util, util % "test->test")

lazy val effect3 = project
.in(file("effect3"))
.settings(moduleName := "catbird-effect3")
.settings(allSettings)
.settings(
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-effect" % catsEffect3Version,
"org.typelevel" %% "cats-effect-laws" % catsEffect3Version % Test,
"org.typelevel" %% "cats-effect-testkit" % catsEffect3Version % Test
),
(Test / scalacOptions) ~= {
_.filterNot(Set("-Yno-imports", "-Yno-predef"))
}
)
.dependsOn(util, util % "test->test")

lazy val finagle = project
.settings(moduleName := "catbird-finagle")
.settings(allSettings)
Expand Down
@@ -0,0 +1,35 @@
package io.catbird.util.effect

import cats.effect.kernel.{ MonadCancel, Outcome }
import com.twitter.util.{ Future, Monitor }
import io.catbird.util.FutureMonadError

import java.lang.Throwable

import scala.Unit

trait FutureInstances {
implicit final val futureMonadCancelInstance
: MonadCancel[Future, Throwable] with MonadCancel.Uncancelable[Future, Throwable] =
new FutureMonadError with MonadCancel[Future, Throwable] with MonadCancel.Uncancelable[Future, Throwable] {

final override def forceR[A, B](fa: Future[A])(fb: Future[B]): Future[B] =
fa.liftToTry.flatMap { resultA =>
resultA.handle(Monitor.catcher)
fb
}

/**
* Special implementation so exceptions in release are cought by the `Monitor`.
*/
final override def bracketCase[A, B](acquire: Future[A])(use: A => Future[B])(
release: (A, Outcome[Future, Throwable, B]) => Future[Unit]
): Future[B] =
acquire
.flatMap(a =>
use(a).liftToTry
.flatMap(result => release(a, tryToFutureOutcome(result)).handle(Monitor.catcher).map(_ => result))
)
.lowerFromTry
}
}
@@ -0,0 +1,53 @@
package io.catbird.util.effect

import cats.effect.Clock
import cats.effect.kernel.{ MonadCancel, Outcome, Sync }
import com.twitter.util.{ Future, Monitor }
import io.catbird.util.{ Rerunnable, RerunnableMonadError }

import java.lang.Throwable
import java.util.concurrent.TimeUnit
import java.lang.System

import scala.Unit
import scala.concurrent.duration.FiniteDuration

trait RerunnableInstances {
implicit final val rerunnableInstance
: Sync[Rerunnable] with Clock[Rerunnable] with MonadCancel.Uncancelable[Rerunnable, Throwable] =
new RerunnableMonadError
with Sync[Rerunnable]
with Clock[Rerunnable]
with MonadCancel.Uncancelable[Rerunnable, Throwable] {

final override def suspend[A](hint: Sync.Type)(thunk: => A): Rerunnable[A] =
Rerunnable(thunk)

final override def realTime: Rerunnable[FiniteDuration] =
Rerunnable(FiniteDuration(System.currentTimeMillis(), TimeUnit.MILLISECONDS))

final override def monotonic: Rerunnable[FiniteDuration] =
Rerunnable(FiniteDuration(System.nanoTime(), TimeUnit.NANOSECONDS))

final override def forceR[A, B](fa: Rerunnable[A])(fb: Rerunnable[B]): Rerunnable[B] =
fa.liftToTry.flatMap { resultA =>
resultA.handle(Monitor.catcher)
fb
}

/**
* Special implementation so exceptions in release are cought by the `Monitor`.
felixbr marked this conversation as resolved.
Show resolved Hide resolved
*/
final override def bracketCase[A, B](acquire: Rerunnable[A])(use: A => Rerunnable[B])(
release: (A, Outcome[Rerunnable, Throwable, B]) => Rerunnable[Unit]
): Rerunnable[B] = new Rerunnable[B] {
final def run: Future[B] =
acquire.run.flatMap { a =>
val future = use(a).run
future.transform(result =>
release(a, tryToRerunnableOutcome(result)).run.handle(Monitor.catcher).flatMap(_ => future)
)
}
}
}
}
57 changes: 57 additions & 0 deletions effect3/src/main/scala/io/catbird/util/effect/package.scala
@@ -0,0 +1,57 @@
package io.catbird.util

import cats.effect.{ Async, IO }
import com.twitter.util.{ Future, Return, Throw, Try }

import java.lang.Throwable

import cats.effect.Outcome
import cats.effect.kernel.Resource.ExitCase

import scala.util.{ Left, Right }

package object effect extends FutureInstances with RerunnableInstances {

/**
* Converts the `Future` to `F`.
*/
def futureToAsync[F[_], A](fa: => Future[A])(implicit F: Async[F]): F[A] = F.async_ { k =>
fa.respond {
case Return(a) => k(Right(a))
case Throw(err) => k(Left(err))
}
}

/**
* Converts the `Rerunnable` to `IO`.
*/
final def rerunnableToIO[A](fa: Rerunnable[A]): IO[A] =
futureToAsync[IO, A](fa.run)

/**
* Convert a twitter-util Try to cats-effect ExitCase
*/
final def tryToExitCase[A](ta: Try[A]): ExitCase =
ta match {
case Return(_) => ExitCase.Succeeded
case Throw(e) => ExitCase.Errored(e)
}

/**
* Convert a twitter-util Try to cats-effect Outcome for Rerunnable
*/
final def tryToRerunnableOutcome[A](ta: Try[A]): Outcome[Rerunnable, Throwable, A] =
ta match {
case Return(a) => Outcome.Succeeded(Rerunnable.const(a))
case Throw(e) => Outcome.Errored(e)
}

/**
* Convert a twitter-util Try to cats-effect Outcome for Future
*/
final def tryToFutureOutcome[A](ta: Try[A]): Outcome[Future, Throwable, A] =
ta match {
case Return(a) => Outcome.Succeeded(Future.value(a))
case Throw(e) => Outcome.Errored(e)
}
}
27 changes: 27 additions & 0 deletions effect3/src/test/scala/io/catbird/util/effect/FutureSuite.scala
@@ -0,0 +1,27 @@
package io.catbird.util.effect

import cats.Eq
import cats.effect.laws.MonadCancelTests
import cats.instances.all._
import cats.laws.discipline.MonadErrorTests
import cats.laws.discipline.arbitrary._
import com.twitter.conversions.DurationOps._
import com.twitter.util.Future
import io.catbird.util.{ ArbitraryInstances, EqInstances, futureEqWithFailure }
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.prop.Configuration
import org.typelevel.discipline.scalatest.FunSuiteDiscipline

class FutureSuite
extends AnyFunSuite
with FunSuiteDiscipline
with Configuration
with ArbitraryInstances
with EqInstances {

implicit def futureEq[A](implicit A: Eq[A]): Eq[Future[A]] =
futureEqWithFailure(1.seconds)

checkAll("Future[Int]", MonadErrorTests[Future, Throwable].monadError[Int, Int, Int])
checkAll("Future[Int]", MonadCancelTests[Future, Throwable].monadCancel[Int, Int, Int])
}
@@ -0,0 +1,48 @@
package io.catbird.util.effect

import java.time.Instant
import java.util.concurrent.TimeUnit

import cats.effect.Clock
import com.twitter.util.Await
import io.catbird.util.Rerunnable
import org.scalatest.{ Outcome }
import org.scalatest.concurrent.{ Eventually, IntegrationPatience }
import org.scalatest.funsuite.FixtureAnyFunSuite

/**
* We'll use `eventually` and a reasonably big tolerance here to prevent CI from failing if it is a bit slow.
*
* Technically the implementation is just an extremely thin wrapper around `System.currentTimeMillis()`
* and `System.nanoTime()` so as long as the result is the same order of magnitude (and therefore the
* unit-conversion is correct) we should be fine.
*/
class RerunnableClockSuite extends FixtureAnyFunSuite with Eventually with IntegrationPatience {

protected final class FixtureParam {
def now: Instant = Instant.now()
}

test("Retrieval of real time") { f =>
eventually {
val result = Await.result(
Clock[Rerunnable].realTime.map(duration => Instant.ofEpochMilli(duration.toMillis)).run
)

assert(java.time.Duration.between(result, f.now).abs().toMillis < 50)
}
}

test("Retrieval of monotonic time") { f =>
eventually {
val result = Await.result(
Clock[Rerunnable].monotonic.map(duration => duration.toNanos).run
)

val durationBetween = Math.abs(System.nanoTime() - result)
assert(TimeUnit.MILLISECONDS.convert(durationBetween, TimeUnit.NANOSECONDS) < 5)
}
}

override protected def withFixture(test: OneArgTest): Outcome = withFixture(test.toNoArgTest(new FixtureParam))
}
@@ -0,0 +1,45 @@
package io.catbird.util.effect

import cats.effect.MonadCancel
import cats.effect.kernel.testkit.SyncTypeGenerators
import cats.effect.laws.SyncTests
import cats.instances.either._
import cats.instances.int._
import cats.instances.tuple._
import cats.instances.unit._
import cats.laws.discipline.arbitrary._
import com.twitter.util.{ Await, Monitor, Throw }
import io.catbird.util.{ ArbitraryInstances, EqInstances, Rerunnable }
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.prop.Configuration
import org.typelevel.discipline.scalatest.FunSuiteDiscipline

class RerunnableSuite
extends AnyFunSuite
with FunSuiteDiscipline
with Configuration
with ArbitraryInstances
with SyncTypeGenerators
with EqInstances
with Runners {

// This includes tests for Clock, MonadCancel, and MonadError
checkAll("Rerunnable[Int]", SyncTests[Rerunnable].sync[Int, Int, Int])
felixbr marked this conversation as resolved.
Show resolved Hide resolved

test("Exceptions thrown by release are handled by Monitor") {
val useException = new Exception("thrown by use")
val releaseException = new Exception("thrown by release")

var monitoredException: Throwable = null
val monitor = Monitor.mk { case e => monitoredException = e; true; }

val rerunnable = MonadCancel[Rerunnable, Throwable]
.bracket(Rerunnable.Unit)(_ => Rerunnable.raiseError(useException))(_ => Rerunnable.raiseError(releaseException))
.liftToTry

val result = Await.result(Monitor.using(monitor)(rerunnable.run))

assert(result == Throw(useException))
assert(monitoredException == releaseException)
}
}
64 changes: 64 additions & 0 deletions effect3/src/test/scala/io/catbird/util/effect/Runners.scala
@@ -0,0 +1,64 @@
package io.catbird.util.effect

import cats.Eq
import cats.effect.{ IO, Outcome, unsafe }
import cats.effect.testkit.TestContext
import cats.effect.unsafe.IORuntimeConfig
import io.catbird.util.{ EqInstances, Rerunnable }
import org.scalacheck.Prop

import scala.annotation.implicitNotFound
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.language.implicitConversions

/**
* Test helpers mostly taken from the cats-effect IOSpec.
*/
trait Runners { self: EqInstances =>

implicit val ticker: Ticker = Ticker(TestContext())

implicit def eqIOA[A: Eq](implicit ticker: Ticker): Eq[IO[A]] =
Eq.by(unsafeRun(_))

implicit def rerunnableEq[A](implicit A: Eq[A]): Eq[Rerunnable[A]] =
Eq.by[Rerunnable[A], IO[A]](rerunnableToIO)

implicit def boolRunnings(rerunnableB: Rerunnable[Boolean])(implicit ticker: Ticker): Prop =
Prop(unsafeRun(rerunnableToIO(rerunnableB)).fold(false, _ => false, _.getOrElse(false)))

def unsafeRun[A](ioa: IO[A])(implicit ticker: Ticker): Outcome[Option, Throwable, A] =
try {
var results: Outcome[Option, Throwable, A] = Outcome.Succeeded(None)

ioa.unsafeRunAsync {
case Left(t) => results = Outcome.Errored(t)
case Right(a) => results = Outcome.Succeeded(Some(a))
}(unsafe.IORuntime(ticker.ctx, ticker.ctx, scheduler, () => (), IORuntimeConfig()))

ticker.ctx.tickAll(1.days)

results
} catch {
case t: Throwable =>
t.printStackTrace()
throw t
}

def scheduler(implicit ticker: Ticker): unsafe.Scheduler =
new unsafe.Scheduler {
import ticker.ctx

def sleep(delay: FiniteDuration, action: Runnable): Runnable = {
val cancel = ctx.schedule(delay, action)
new Runnable { def run() = cancel() }
}

def nowMillis() = ctx.now().toMillis
def monotonicNanos() = ctx.now().toNanos
}

@implicitNotFound("could not find an instance of Ticker; try using `in ticked { implicit ticker =>`")
case class Ticker(ctx: TestContext)
}