Skip to content

Commit

Permalink
Merge pull request #1800 from djspiewak/feature/uncancelable-gap
Browse files Browse the repository at this point in the history
Opened uncancelable region to the right
  • Loading branch information
djspiewak committed Mar 21, 2021
2 parents 24d6334 + 16f971c commit 4082e6f
Show file tree
Hide file tree
Showing 16 changed files with 155 additions and 107 deletions.
39 changes: 16 additions & 23 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,14 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
* between termination conditions
*/
def guarantee(finalizer: IO[Unit]): IO[A] =
guaranteeCase(_ => finalizer)
// this is a little faster than the default implementation, which helps Resource
IO uncancelable { poll =>
val handled = finalizer handleErrorWith { t =>
IO.executionContext.flatMap(ec => IO(ec.reportFailure(t)))
}

poll(this).onCancel(finalizer).onError(_ => handled).flatTap(_ => finalizer)
}

/**
* Executes the given `finalizer` when the source is finished,
Expand All @@ -387,7 +394,7 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
* @see [[guarantee]] for the simpler version
*/
def guaranteeCase(finalizer: OutcomeIO[A @uncheckedVariance] => IO[Unit]): IO[A] =
onCase { case oc => finalizer(oc) }
IO.unit.bracketCase(_ => this)((_, oc) => finalizer(oc))

/**
* Handle any error, potentially recovering from it, by mapping it to another
Expand Down Expand Up @@ -416,26 +423,6 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
def onCancel(fin: IO[Unit]): IO[A] =
IO.OnCancel(this, fin)

def onCase(pf: PartialFunction[OutcomeIO[A @uncheckedVariance], IO[Unit]]): IO[A] = {
def doOutcome(outcome: OutcomeIO[A]): IO[Unit] =
pf.lift(outcome)
.fold(IO.unit)(_.handleErrorWith { t =>
IO.executionContext.flatMap(ec => IO(ec.reportFailure(t)))
})

IO uncancelable { poll =>
val base = poll(this)
val finalized = pf.lift(Outcome.Canceled()).map(base.onCancel).getOrElse(base)

finalized.attempt flatMap {
case Left(e) =>
doOutcome(Outcome.Errored(e)) *> IO.raiseError(e)
case Right(a) =>
doOutcome(Outcome.Succeeded(IO.pure(a))).as(a)
}
}
}

def onError(f: Throwable => IO[Unit]): IO[A] =
handleErrorWith(t => f(t).attempt *> IO.raiseError(t))

Expand Down Expand Up @@ -1260,6 +1247,12 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
def pure[A](x: A): IO[A] =
IO.pure(x)

override def guarantee[A](fa: IO[A], fin: IO[Unit]): IO[A] =
fa.guarantee(fin)

override def guaranteeCase[A](fa: IO[A])(fin: OutcomeIO[A] => IO[Unit]): IO[A] =
fa.guaranteeCase(fin)

def handleErrorWith[A](fa: IO[A])(f: Throwable => IO[A]): IO[A] =
fa.handleErrorWith(f)

Expand All @@ -1278,7 +1271,7 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
ioa.onCancel(fin)

override def bracketFull[A, B](acquire: Poll[IO] => IO[A])(use: A => IO[B])(
release: (A, Outcome[IO, Throwable, B]) => IO[Unit]): IO[B] =
release: (A, OutcomeIO[B]) => IO[Unit]): IO[B] =
IO.bracketFull(acquire)(use)(release)

val monotonic: IO[FiniteDuration] = IO.monotonic
Expand Down
22 changes: 6 additions & 16 deletions core/shared/src/main/scala/cats/effect/IOFiber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ private final class IOFiber[A](
// iteration 0.
val nextIteration = iteration + 1

// println(s"<$name> looping on $cur0")
// System.out.println(s"looping on $cur0")
/*
* The cases have to use continuous constants to generate a `tableswitch`.
* Do not name or reorder them.
Expand Down Expand Up @@ -860,7 +860,7 @@ private final class IOFiber[A](
}

private[this] def asyncCancel(cb: Either[Throwable, Unit] => Unit): Unit = {
// println(s"<$name> running cancelation (finalizers.length = ${finalizers.unsafeIndex()})")
// System.out.println(s"running cancelation (finalizers.length = ${finalizers.unsafeIndex()})")
finalizing = true

if (!finalizers.isEmpty()) {
Expand Down Expand Up @@ -1238,24 +1238,14 @@ private final class IOFiber[A](

private[this] def uncancelableSuccessK(result: Any, depth: Int): IO[Any] = {
masks -= 1

if (shouldFinalize()) {
asyncCancel(null)
IOEndFiber
} else {
succeeded(result, depth + 1)
}
// System.out.println(s"unmasking after uncancelable (isUnmasked = ${isUnmasked()})")
succeeded(result, depth + 1)
}

private[this] def uncancelableFailureK(t: Throwable, depth: Int): IO[Any] = {
masks -= 1

if (shouldFinalize()) {
asyncCancel(null)
IOEndFiber
} else {
failed(t, depth + 1)
}
// System.out.println(s"unmasking after uncancelable (isUnmasked = ${isUnmasked()})")
failed(t, depth + 1)
}

private[this] def unmaskSuccessK(result: Any, depth: Int): IO[Any] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,12 @@ trait MonadCancelGenerators[F[_], E] extends MonadErrorGenerators[F, E] {

// TODO we can't really use poll :-( since we can't Cogen FunctionK
private def genUncancelable[A: Arbitrary: Cogen](deeper: GenK[F]): Gen[F[A]] =
deeper[A].map(pc => F.uncancelable(_ => pc))
deeper[A].map(pc =>
F.uncancelable(_ => pc)
.flatMap(F.pure(_))
.handleErrorWith(
F.raiseError(_)
)) // this is a bit of a hack to get around functor law breakage

private def genOnCancel[A: Arbitrary: Cogen](deeper: GenK[F]): Gen[F[A]] =
for {
Expand Down Expand Up @@ -291,7 +296,10 @@ trait AsyncGenerators[F[_]] extends GenTemporalGenerators[F, Throwable] with Syn
fo <- deeper[Option[F[Unit]]](
Arbitrary(Gen.option[F[Unit]](deeper[Unit])),
Cogen.cogenOption(cogenFU))
} yield F.async[A](k => F.delay(k(result)) >> fo)
} yield F
.async[A](k => F.delay(k(result)) >> fo)
.flatMap(F.pure(_))
.handleErrorWith(F.raiseError(_))

private def genEvalOn[A: Arbitrary: Cogen](deeper: GenK[F]) =
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ trait MonadCancel[F[_], E] extends MonadError[F, E] {
val finalized = onCancel(poll(F.unit >> use(a)), safeRelease(a, Outcome.Canceled()))
val handled = finalized.onError {
case e =>
safeRelease(a, Outcome.Errored(e)).attempt.void
safeRelease(a, Outcome.Errored(e)).handleError(_ => ())
}
handled.flatMap { b => safeRelease(a, Outcome.Succeeded(b.pure)).as(b) }
}
Expand Down
39 changes: 19 additions & 20 deletions kernel/shared/src/main/scala/cats/effect/kernel/Resource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -407,32 +407,34 @@ sealed abstract class Resource[F[_], +A] {
release: F[Unit]): F[(B, F[Unit])] =
current match {
case Allocate(resource) =>
F.bracketFull(resource) {
case (b, rel) =>
stack match {
case Nil =>
(
b: B,
rel(ExitCase.Succeeded).guarantee(release)
).pure[F]
case Frame(head, tail) =>
continue(head(b), tail, rel(ExitCase.Succeeded).guarantee(release))
}
} {
case (_, Outcome.Succeeded(_)) =>
F.unit
case ((_, release), outcome) =>
release(ExitCase.fromOutcome(outcome))
F uncancelable { poll =>
resource(poll) flatMap {
case (b, rel) =>
val rel2 = rel(ExitCase.Succeeded).guarantee(release)

stack match {
case Nil =>
F.pure((b, rel2))

case Frame(head, tail) =>
poll(continue(head(b), tail, rel2))
.onCancel(rel(ExitCase.Canceled).handleError(_ => ()))
.onError { case e => rel(ExitCase.Errored(e)).handleError(_ => ()) }
}
}
}

case Bind(source, fs) =>
loop(source, Frame(fs, stack), release)

case Pure(v) =>
stack match {
case Nil =>
(v: B, release).pure[F]
case Frame(head, tail) =>
loop(head(v), tail, release)
}

case Eval(fa) =>
fa.flatMap(a => continue(Resource.pure(a), stack, release))
}
Expand Down Expand Up @@ -1215,10 +1217,7 @@ abstract private[effect] class ResourceSemigroupK[F[_]] extends SemigroupK[Resou
def combineK[A](ra: Resource[F, A], rb: Resource[F, A]): Resource[F, A] =
Resource.make(Ref[F].of(F.unit))(_.get.flatten).evalMap { finalizers =>
def allocate(r: Resource[F, A]): F[A] =
r.fold(
_.pure[F],
(release: F[Unit]) =>
finalizers.update(MonadCancel[F, Throwable].guarantee(_, release)))
r.fold(_.pure[F], (release: F[Unit]) => finalizers.update(_.guarantee(release)))

K.combineK(allocate(ra), allocate(rb))
}
Expand Down
4 changes: 2 additions & 2 deletions laws/shared/src/main/scala/cats/effect/laws/AsyncLaws.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ trait AsyncLaws[F[_]] extends GenTemporalLaws[F, Throwable] with SyncLaws[F] {

// format: off
def asyncRightIsUncancelableSequencedPure[A](a: A, fu: F[Unit]) =
F.async[A](k => F.delay(k(Right(a))) >> fu.as(None)) <-> (F.uncancelable(_ => fu) >> F.pure(a))
(F.async[A](k => F.delay(k(Right(a))) >> fu.as(None)) <* F.unit) <-> (F.uncancelable(_ => fu) >> F.pure(a))
// format: on

// format: off
def asyncLeftIsUncancelableSequencedRaiseError[A](e: Throwable, fu: F[Unit]) =
F.async[A](k => F.delay(k(Left(e))) >> fu.as(None)) <-> (F.uncancelable(_ => fu) >> F.raiseError(e))
(F.async[A](k => F.delay(k(Left(e))) >> fu.as(None)) <* F.unit) <-> (F.uncancelable(_ => fu) >> F.raiseError(e))
// format: on

def asyncRepeatedCallbackIgnored[A](a: A) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ trait AsyncTests[F[_]] extends GenTemporalTests[F, Throwable] with SyncTests[F]
iso: Isomorphisms[F],
faPP: F[A] => Pretty,
fuPP: F[Unit] => Pretty,
aFUPP: (A => F[Unit]) => Pretty,
ePP: Throwable => Pretty,
foaPP: F[Outcome[F, Throwable, A]] => Pretty,
feauPP: F[Either[A, Unit]] => Pretty,
Expand Down
16 changes: 12 additions & 4 deletions laws/shared/src/main/scala/cats/effect/laws/GenSpawnLaws.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,24 @@ trait GenSpawnLaws[F[_], E] extends MonadCancelLaws[F, E] with UniqueLaws[F] {
}

def raceCanceledIdentityLeft[A](fa: F[A]) =
F.race(F.canceled, fa) <-> fa.map(_.asRight[Unit])
F.race(F.canceled, fa.flatMap(F.pure(_)).handleErrorWith(F.raiseError(_))) <-> fa.map(
_.asRight[Unit])

def raceCanceledIdentityRight[A](fa: F[A]) =
F.race(fa, F.canceled) <-> fa.map(_.asLeft[Unit])
F.race(fa.flatMap(F.pure(_)).handleErrorWith(F.raiseError(_)), F.canceled) <-> fa.map(
_.asLeft[Unit])

def raceNeverNoncanceledIdentityLeft[A](fa: F[A]) =
F.race(F.never[Unit], fa) <-> F.onCancel(fa.map(_.asRight[Unit]), F.never)
F.race(F.never[Unit], fa.flatMap(F.pure(_)).handleErrorWith(F.raiseError(_))) <-> F
.onCancel(
fa.flatMap(r => F.pure(r.asRight[Unit])).handleErrorWith(F.raiseError(_)),
F.never)

def raceNeverNoncanceledIdentityRight[A](fa: F[A]) =
F.race(fa, F.never[Unit]) <-> F.onCancel(fa.map(_.asLeft[Unit]), F.never)
F.race(fa.flatMap(F.pure(_)).handleErrorWith(F.raiseError(_)), F.never[Unit]) <-> F
.onCancel(
fa.flatMap(r => F.pure(r.asLeft[Unit])).handleErrorWith(F.raiseError(_)),
F.never)

// I really like these laws, since they relate cede to timing, but they're definitely nondeterministic
/*def raceLeftCedeYields[A](a: A) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ trait GenSpawnTests[F[_], E] extends MonadCancelTests[F, E] with UniqueTests[F]
iso: Isomorphisms[F],
faPP: F[A] => Pretty,
fuPP: F[Unit] => Pretty,
aFUPP: (A => F[Unit]) => Pretty,
ePP: E => Pretty,
foaPP: F[Outcome[F, E, A]] => Pretty,
feauPP: F[Either[A, Unit]] => Pretty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ trait GenTemporalTests[F[_], E] extends GenSpawnTests[F, E] with ClockTests[F] {
iso: Isomorphisms[F],
faPP: F[A] => Pretty,
fuPP: F[Unit] => Pretty,
aFUPP: (A => F[Unit]) => Pretty,
ePP: E => Pretty,
foaPP: F[Outcome[F, E, A]] => Pretty,
feauPP: F[Either[A, Unit]] => Pretty,
Expand Down
23 changes: 20 additions & 3 deletions laws/shared/src/main/scala/cats/effect/laws/MonadCancelLaws.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,23 @@ trait MonadCancelLaws[F[_], E] extends MonadErrorLaws[F, E] {
def uncancelableEliminatesOnCancel[A](fa: F[A], fin: F[Unit]) =
F.uncancelable(_ => F.onCancel(fa, fin)) <-> F.uncancelable(_ => fa)

/*
* NB: This is effectively in violation of the monad laws, since
* we consider finalizers to associate over the boundary here, but
* we do NOT consider them to right-associate over map or flatMap.
* This simply stems from the fact that cancellation is fundamentally
* uncomposable, and it's better to pick a semantic for uncancelable
* which allows regional composition, since this avoids "gaps" in
* otherwise-safe code.
*
* The argument is that cancellation is a *hint* not a mandate. This
* holds for self-cancellation just as much as external-cancellation.
* Thus, laws about where the cancellation is visible are always going
* to be a bit off.
*/
def onCancelAssociatesOverUncancelableBoundary[A](fa: F[A], fin: F[Unit]) =
F.uncancelable(_ => F.onCancel(fa, fin)) <-> F.onCancel(F.uncancelable(_ => fa), fin)

def forceRDiscardsPure[A, B](a: A, fa: F[B]) =
F.forceR(F.pure(a))(fa) <-> fa

Expand All @@ -59,9 +76,9 @@ trait MonadCancelLaws[F[_], E] extends MonadErrorLaws[F, E] {
F.onCancel(F.onCancel(F.canceled, fin1), fin2) <->
F.forceR(F.uncancelable(_ => F.forceR(fin1)(fin2)))(F.canceled)

def uncancelableCanceledAssociatesRightOverFlatMap[A](a: A, f: A => F[Unit]) =
F.uncancelable(_ => F.canceled.as(a).flatMap(f)) <->
F.forceR(F.uncancelable(_ => f(a)))(F.canceled)
def uncancelableCanceledAssociatesRightOverFlatMapAttempt[A](fa: F[A]) =
(F.uncancelable(_ => F.canceled >> fa).attempt >> F.unit) <->
F.forceR(F.uncancelable(_ => fa))(F.canceled)

def canceledAssociatesLeftOverFlatMap[A](fa: F[A]) =
F.canceled >> fa.void <-> F.canceled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ trait MonadCancelTests[F[_], E] extends MonadErrorTests[F, E] {
iso: Isomorphisms[F],
faPP: F[A] => Pretty,
fuPP: F[Unit] => Pretty,
aFUPP: (A => F[Unit]) => Pretty,
ePP: E => Pretty): RuleSet = {

new RuleSet {
Expand All @@ -70,6 +69,8 @@ trait MonadCancelTests[F[_], E] extends MonadErrorTests[F, E] {
laws.uncancelablePollInverseNestIsUncancelable[A] _),
"uncancelable eliminates onCancel" -> forAll(
laws.uncancelableEliminatesOnCancel[A] _),
"onCancel associates over uncancelable boundary" -> forAll(
laws.onCancelAssociatesOverUncancelableBoundary[A] _),
"forceR discards pure" -> forAll(laws.forceRDiscardsPure[A, B] _),
"forceR discards error" -> forAll(laws.forceRDiscardsError[A] _),
"forceR canceled short-circuits" -> forAll(laws.forceRCanceledShortCircuits[A] _),
Expand All @@ -81,8 +82,8 @@ trait MonadCancelTests[F[_], E] extends MonadErrorTests[F, E] {
Seq(
"canceled sequences onCancel in order" -> forAll(
laws.canceledSequencesOnCancelInOrder _),
"uncancelable canceled associates right over flatMap" -> forAll(
laws.uncancelableCanceledAssociatesRightOverFlatMap[A] _),
"uncancelable canceled associates right over flatMap attempt" -> forAll(
laws.uncancelableCanceledAssociatesRightOverFlatMapAttempt[A] _),
"canceled associates left over flatMap" -> forAll(
laws.canceledAssociatesLeftOverFlatMap[A] _)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,11 @@ trait TestInstances extends ParallelFGenerators with OutcomeGenerators with Sync
try {
var results: Outcome[Option, Throwable, A] = Outcome.Succeeded(None)

ioa.unsafeRunAsyncOutcome { oc => results = oc.mapK(someK) }(
unsafe.IORuntime(ticker.ctx, ticker.ctx, scheduler, () => (), unsafe.IORuntimeConfig()))
ioa
.flatMap(IO.pure(_))
.handleErrorWith(IO.raiseError(_))
.unsafeRunAsyncOutcome { oc => results = oc.mapK(someK) }(unsafe
.IORuntime(ticker.ctx, ticker.ctx, scheduler, () => (), unsafe.IORuntimeConfig()))

ticker.ctx.tickAll(1.second)

Expand Down
Loading

0 comments on commit 4082e6f

Please sign in to comment.