Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opened uncancelable region to the right #1800

Merged
merged 2 commits into from
Mar 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 16 additions & 23 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,14 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
* between termination conditions
*/
def guarantee(finalizer: IO[Unit]): IO[A] =
guaranteeCase(_ => finalizer)
// this is a little faster than the default implementation, which helps Resource
IO uncancelable { poll =>
val handled = finalizer handleErrorWith { t =>
IO.executionContext.flatMap(ec => IO(ec.reportFailure(t)))
}

poll(this).onCancel(finalizer).onError(_ => handled).flatTap(_ => finalizer)
}

/**
* Executes the given `finalizer` when the source is finished,
Expand All @@ -387,7 +394,7 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
* @see [[guarantee]] for the simpler version
*/
def guaranteeCase(finalizer: OutcomeIO[A @uncheckedVariance] => IO[Unit]): IO[A] =
onCase { case oc => finalizer(oc) }
IO.unit.bracketCase(_ => this)((_, oc) => finalizer(oc))

/**
* Handle any error, potentially recovering from it, by mapping it to another
Expand Down Expand Up @@ -416,26 +423,6 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
def onCancel(fin: IO[Unit]): IO[A] =
IO.OnCancel(this, fin)

def onCase(pf: PartialFunction[OutcomeIO[A @uncheckedVariance], IO[Unit]]): IO[A] = {
def doOutcome(outcome: OutcomeIO[A]): IO[Unit] =
pf.lift(outcome)
.fold(IO.unit)(_.handleErrorWith { t =>
IO.executionContext.flatMap(ec => IO(ec.reportFailure(t)))
})

IO uncancelable { poll =>
val base = poll(this)
val finalized = pf.lift(Outcome.Canceled()).map(base.onCancel).getOrElse(base)

finalized.attempt flatMap {
case Left(e) =>
doOutcome(Outcome.Errored(e)) *> IO.raiseError(e)
case Right(a) =>
doOutcome(Outcome.Succeeded(IO.pure(a))).as(a)
}
}
}

djspiewak marked this conversation as resolved.
Show resolved Hide resolved
def onError(f: Throwable => IO[Unit]): IO[A] =
handleErrorWith(t => f(t).attempt *> IO.raiseError(t))

Expand Down Expand Up @@ -1260,6 +1247,12 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
def pure[A](x: A): IO[A] =
IO.pure(x)

override def guarantee[A](fa: IO[A], fin: IO[Unit]): IO[A] =
fa.guarantee(fin)

override def guaranteeCase[A](fa: IO[A])(fin: OutcomeIO[A] => IO[Unit]): IO[A] =
fa.guaranteeCase(fin)

def handleErrorWith[A](fa: IO[A])(f: Throwable => IO[A]): IO[A] =
fa.handleErrorWith(f)

Expand All @@ -1278,7 +1271,7 @@ object IO extends IOCompanionPlatform with IOLowPriorityImplicits {
ioa.onCancel(fin)

override def bracketFull[A, B](acquire: Poll[IO] => IO[A])(use: A => IO[B])(
release: (A, Outcome[IO, Throwable, B]) => IO[Unit]): IO[B] =
release: (A, OutcomeIO[B]) => IO[Unit]): IO[B] =
IO.bracketFull(acquire)(use)(release)

val monotonic: IO[FiniteDuration] = IO.monotonic
Expand Down
22 changes: 6 additions & 16 deletions core/shared/src/main/scala/cats/effect/IOFiber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ private final class IOFiber[A](
// iteration 0.
val nextIteration = iteration + 1

// println(s"<$name> looping on $cur0")
// System.out.println(s"looping on $cur0")
/*
* The cases have to use continuous constants to generate a `tableswitch`.
* Do not name or reorder them.
Expand Down Expand Up @@ -860,7 +860,7 @@ private final class IOFiber[A](
}

private[this] def asyncCancel(cb: Either[Throwable, Unit] => Unit): Unit = {
// println(s"<$name> running cancelation (finalizers.length = ${finalizers.unsafeIndex()})")
// System.out.println(s"running cancelation (finalizers.length = ${finalizers.unsafeIndex()})")
finalizing = true

if (!finalizers.isEmpty()) {
Expand Down Expand Up @@ -1238,24 +1238,14 @@ private final class IOFiber[A](

private[this] def uncancelableSuccessK(result: Any, depth: Int): IO[Any] = {
masks -= 1

if (shouldFinalize()) {
asyncCancel(null)
IOEndFiber
} else {
succeeded(result, depth + 1)
}
// System.out.println(s"unmasking after uncancelable (isUnmasked = ${isUnmasked()})")
succeeded(result, depth + 1)
}

private[this] def uncancelableFailureK(t: Throwable, depth: Int): IO[Any] = {
masks -= 1

if (shouldFinalize()) {
asyncCancel(null)
IOEndFiber
} else {
failed(t, depth + 1)
}
// System.out.println(s"unmasking after uncancelable (isUnmasked = ${isUnmasked()})")
failed(t, depth + 1)
}

private[this] def unmaskSuccessK(result: Any, depth: Int): IO[Any] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,12 @@ trait MonadCancelGenerators[F[_], E] extends MonadErrorGenerators[F, E] {

// TODO we can't really use poll :-( since we can't Cogen FunctionK
private def genUncancelable[A: Arbitrary: Cogen](deeper: GenK[F]): Gen[F[A]] =
deeper[A].map(pc => F.uncancelable(_ => pc))
deeper[A].map(pc =>
F.uncancelable(_ => pc)
.flatMap(F.pure(_))
.handleErrorWith(
F.raiseError(_)
)) // this is a bit of a hack to get around functor law breakage

private def genOnCancel[A: Arbitrary: Cogen](deeper: GenK[F]): Gen[F[A]] =
for {
Expand Down Expand Up @@ -291,7 +296,10 @@ trait AsyncGenerators[F[_]] extends GenTemporalGenerators[F, Throwable] with Syn
fo <- deeper[Option[F[Unit]]](
Arbitrary(Gen.option[F[Unit]](deeper[Unit])),
Cogen.cogenOption(cogenFU))
} yield F.async[A](k => F.delay(k(result)) >> fo)
} yield F
.async[A](k => F.delay(k(result)) >> fo)
.flatMap(F.pure(_))
.handleErrorWith(F.raiseError(_))

private def genEvalOn[A: Arbitrary: Cogen](deeper: GenK[F]) =
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ trait MonadCancel[F[_], E] extends MonadError[F, E] {
val finalized = onCancel(poll(F.unit >> use(a)), safeRelease(a, Outcome.Canceled()))
val handled = finalized.onError {
case e =>
safeRelease(a, Outcome.Errored(e)).attempt.void
safeRelease(a, Outcome.Errored(e)).handleError(_ => ())
}
handled.flatMap { b => safeRelease(a, Outcome.Succeeded(b.pure)).as(b) }
}
Expand Down
39 changes: 19 additions & 20 deletions kernel/shared/src/main/scala/cats/effect/kernel/Resource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -407,32 +407,34 @@ sealed abstract class Resource[F[_], +A] {
release: F[Unit]): F[(B, F[Unit])] =
current match {
case Allocate(resource) =>
F.bracketFull(resource) {
case (b, rel) =>
stack match {
case Nil =>
(
b: B,
rel(ExitCase.Succeeded).guarantee(release)
).pure[F]
case Frame(head, tail) =>
continue(head(b), tail, rel(ExitCase.Succeeded).guarantee(release))
}
} {
case (_, Outcome.Succeeded(_)) =>
F.unit
case ((_, release), outcome) =>
release(ExitCase.fromOutcome(outcome))
F uncancelable { poll =>
resource(poll) flatMap {
case (b, rel) =>
val rel2 = rel(ExitCase.Succeeded).guarantee(release)

stack match {
case Nil =>
F.pure((b, rel2))

case Frame(head, tail) =>
poll(continue(head(b), tail, rel2))
.onCancel(rel(ExitCase.Canceled).handleError(_ => ()))
.onError { case e => rel(ExitCase.Errored(e)).handleError(_ => ()) }
}
}
}

case Bind(source, fs) =>
loop(source, Frame(fs, stack), release)

case Pure(v) =>
stack match {
case Nil =>
(v: B, release).pure[F]
case Frame(head, tail) =>
loop(head(v), tail, release)
}

case Eval(fa) =>
fa.flatMap(a => continue(Resource.pure(a), stack, release))
}
Expand Down Expand Up @@ -1215,10 +1217,7 @@ abstract private[effect] class ResourceSemigroupK[F[_]] extends SemigroupK[Resou
def combineK[A](ra: Resource[F, A], rb: Resource[F, A]): Resource[F, A] =
Resource.make(Ref[F].of(F.unit))(_.get.flatten).evalMap { finalizers =>
def allocate(r: Resource[F, A]): F[A] =
r.fold(
_.pure[F],
(release: F[Unit]) =>
finalizers.update(MonadCancel[F, Throwable].guarantee(_, release)))
r.fold(_.pure[F], (release: F[Unit]) => finalizers.update(_.guarantee(release)))

K.combineK(allocate(ra), allocate(rb))
}
Expand Down
4 changes: 2 additions & 2 deletions laws/shared/src/main/scala/cats/effect/laws/AsyncLaws.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ trait AsyncLaws[F[_]] extends GenTemporalLaws[F, Throwable] with SyncLaws[F] {

// format: off
def asyncRightIsUncancelableSequencedPure[A](a: A, fu: F[Unit]) =
F.async[A](k => F.delay(k(Right(a))) >> fu.as(None)) <-> (F.uncancelable(_ => fu) >> F.pure(a))
(F.async[A](k => F.delay(k(Right(a))) >> fu.as(None)) <* F.unit) <-> (F.uncancelable(_ => fu) >> F.pure(a))
// format: on

// format: off
def asyncLeftIsUncancelableSequencedRaiseError[A](e: Throwable, fu: F[Unit]) =
F.async[A](k => F.delay(k(Left(e))) >> fu.as(None)) <-> (F.uncancelable(_ => fu) >> F.raiseError(e))
(F.async[A](k => F.delay(k(Left(e))) >> fu.as(None)) <* F.unit) <-> (F.uncancelable(_ => fu) >> F.raiseError(e))
// format: on

def asyncRepeatedCallbackIgnored[A](a: A) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ trait AsyncTests[F[_]] extends GenTemporalTests[F, Throwable] with SyncTests[F]
iso: Isomorphisms[F],
faPP: F[A] => Pretty,
fuPP: F[Unit] => Pretty,
aFUPP: (A => F[Unit]) => Pretty,
ePP: Throwable => Pretty,
foaPP: F[Outcome[F, Throwable, A]] => Pretty,
feauPP: F[Either[A, Unit]] => Pretty,
Expand Down
16 changes: 12 additions & 4 deletions laws/shared/src/main/scala/cats/effect/laws/GenSpawnLaws.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,24 @@ trait GenSpawnLaws[F[_], E] extends MonadCancelLaws[F, E] with UniqueLaws[F] {
}

def raceCanceledIdentityLeft[A](fa: F[A]) =
F.race(F.canceled, fa) <-> fa.map(_.asRight[Unit])
F.race(F.canceled, fa.flatMap(F.pure(_)).handleErrorWith(F.raiseError(_))) <-> fa.map(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving surfacing of cancelation into the argument might make the intent of the laws clearer. Something like

final class CancelationSurfaced[F[_], A](fa: F[A]) // where the generator for CancellationSurfaced applies the flatMap -> handleError transformatiuon

def raceCanceledIdentifyLeft[A](fa: CancelationSurfaced[F, A]) = ...

especially as the particular sequence used to ensure cancelation is visible isn't unique in that function.

Arguably, if you're reasoning over the laws then you're also likely aware of these cancelation semantics and why the "identity" is applied, so maybe it's a feature, but you have to learn to look through it anyhow to see the intent of the law.

The new type would also give a place to document the effect on the laws.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh this is a really really good idea. It breaks bincompat so we have to do it now if we're going to do it.

_.asRight[Unit])

def raceCanceledIdentityRight[A](fa: F[A]) =
F.race(fa, F.canceled) <-> fa.map(_.asLeft[Unit])
F.race(fa.flatMap(F.pure(_)).handleErrorWith(F.raiseError(_)), F.canceled) <-> fa.map(
_.asLeft[Unit])

def raceNeverNoncanceledIdentityLeft[A](fa: F[A]) =
F.race(F.never[Unit], fa) <-> F.onCancel(fa.map(_.asRight[Unit]), F.never)
F.race(F.never[Unit], fa.flatMap(F.pure(_)).handleErrorWith(F.raiseError(_))) <-> F
.onCancel(
fa.flatMap(r => F.pure(r.asRight[Unit])).handleErrorWith(F.raiseError(_)),
F.never)

def raceNeverNoncanceledIdentityRight[A](fa: F[A]) =
F.race(fa, F.never[Unit]) <-> F.onCancel(fa.map(_.asLeft[Unit]), F.never)
F.race(fa.flatMap(F.pure(_)).handleErrorWith(F.raiseError(_)), F.never[Unit]) <-> F
.onCancel(
fa.flatMap(r => F.pure(r.asLeft[Unit])).handleErrorWith(F.raiseError(_)),
F.never)

// I really like these laws, since they relate cede to timing, but they're definitely nondeterministic
/*def raceLeftCedeYields[A](a: A) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ trait GenSpawnTests[F[_], E] extends MonadCancelTests[F, E] with UniqueTests[F]
iso: Isomorphisms[F],
faPP: F[A] => Pretty,
fuPP: F[Unit] => Pretty,
aFUPP: (A => F[Unit]) => Pretty,
ePP: E => Pretty,
foaPP: F[Outcome[F, E, A]] => Pretty,
feauPP: F[Either[A, Unit]] => Pretty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ trait GenTemporalTests[F[_], E] extends GenSpawnTests[F, E] with ClockTests[F] {
iso: Isomorphisms[F],
faPP: F[A] => Pretty,
fuPP: F[Unit] => Pretty,
aFUPP: (A => F[Unit]) => Pretty,
ePP: E => Pretty,
foaPP: F[Outcome[F, E, A]] => Pretty,
feauPP: F[Either[A, Unit]] => Pretty,
Expand Down
23 changes: 20 additions & 3 deletions laws/shared/src/main/scala/cats/effect/laws/MonadCancelLaws.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,23 @@ trait MonadCancelLaws[F[_], E] extends MonadErrorLaws[F, E] {
def uncancelableEliminatesOnCancel[A](fa: F[A], fin: F[Unit]) =
F.uncancelable(_ => F.onCancel(fa, fin)) <-> F.uncancelable(_ => fa)

/*
* NB: This is effectively in violation of the monad laws, since
* we consider finalizers to associate over the boundary here, but
* we do NOT consider them to right-associate over map or flatMap.
* This simply stems from the fact that cancellation is fundamentally
* uncomposable, and it's better to pick a semantic for uncancelable
* which allows regional composition, since this avoids "gaps" in
* otherwise-safe code.
*
* The argument is that cancellation is a *hint* not a mandate. This
* holds for self-cancellation just as much as external-cancellation.
* Thus, laws about where the cancellation is visible are always going
* to be a bit off.
*/
def onCancelAssociatesOverUncancelableBoundary[A](fa: F[A], fin: F[Unit]) =
F.uncancelable(_ => F.onCancel(fa, fin)) <-> F.onCancel(F.uncancelable(_ => fa), fin)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this hold?

uncancelable(poll => fa(poll).onCancel(fin)) <-> uncancelable(poll => fa(poll)).onCancel(fin)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. If you see cancelation on the inside, you'll see it on the outside. If you don't see it on the inside, you won't see it on the outside.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, as expected. Worth documenting it in its full form then, somewhere.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to do a full deep-dive discussion in the docs about cancelation in general


def forceRDiscardsPure[A, B](a: A, fa: F[B]) =
F.forceR(F.pure(a))(fa) <-> fa

Expand All @@ -59,9 +76,9 @@ trait MonadCancelLaws[F[_], E] extends MonadErrorLaws[F, E] {
F.onCancel(F.onCancel(F.canceled, fin1), fin2) <->
F.forceR(F.uncancelable(_ => F.forceR(fin1)(fin2)))(F.canceled)

def uncancelableCanceledAssociatesRightOverFlatMap[A](a: A, f: A => F[Unit]) =
F.uncancelable(_ => F.canceled.as(a).flatMap(f)) <->
F.forceR(F.uncancelable(_ => f(a)))(F.canceled)
def uncancelableCanceledAssociatesRightOverFlatMapAttempt[A](fa: F[A]) =
(F.uncancelable(_ => F.canceled >> fa).attempt >> F.unit) <->
F.forceR(F.uncancelable(_ => fa))(F.canceled)

def canceledAssociatesLeftOverFlatMap[A](fa: F[A]) =
F.canceled >> fa.void <-> F.canceled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ trait MonadCancelTests[F[_], E] extends MonadErrorTests[F, E] {
iso: Isomorphisms[F],
faPP: F[A] => Pretty,
fuPP: F[Unit] => Pretty,
aFUPP: (A => F[Unit]) => Pretty,
ePP: E => Pretty): RuleSet = {

new RuleSet {
Expand All @@ -70,6 +69,8 @@ trait MonadCancelTests[F[_], E] extends MonadErrorTests[F, E] {
laws.uncancelablePollInverseNestIsUncancelable[A] _),
"uncancelable eliminates onCancel" -> forAll(
laws.uncancelableEliminatesOnCancel[A] _),
"onCancel associates over uncancelable boundary" -> forAll(
laws.onCancelAssociatesOverUncancelableBoundary[A] _),
"forceR discards pure" -> forAll(laws.forceRDiscardsPure[A, B] _),
"forceR discards error" -> forAll(laws.forceRDiscardsError[A] _),
"forceR canceled short-circuits" -> forAll(laws.forceRCanceledShortCircuits[A] _),
Expand All @@ -81,8 +82,8 @@ trait MonadCancelTests[F[_], E] extends MonadErrorTests[F, E] {
Seq(
"canceled sequences onCancel in order" -> forAll(
laws.canceledSequencesOnCancelInOrder _),
"uncancelable canceled associates right over flatMap" -> forAll(
laws.uncancelableCanceledAssociatesRightOverFlatMap[A] _),
"uncancelable canceled associates right over flatMap attempt" -> forAll(
laws.uncancelableCanceledAssociatesRightOverFlatMapAttempt[A] _),
"canceled associates left over flatMap" -> forAll(
laws.canceledAssociatesLeftOverFlatMap[A] _)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,11 @@ trait TestInstances extends ParallelFGenerators with OutcomeGenerators with Sync
try {
var results: Outcome[Option, Throwable, A] = Outcome.Succeeded(None)

ioa.unsafeRunAsyncOutcome { oc => results = oc.mapK(someK) }(
unsafe.IORuntime(ticker.ctx, ticker.ctx, scheduler, () => (), unsafe.IORuntimeConfig()))
ioa
.flatMap(IO.pure(_))
.handleErrorWith(IO.raiseError(_))
.unsafeRunAsyncOutcome { oc => results = oc.mapK(someK) }(unsafe
.IORuntime(ticker.ctx, ticker.ctx, scheduler, () => (), unsafe.IORuntimeConfig()))
djspiewak marked this conversation as resolved.
Show resolved Hide resolved

ticker.ctx.tickAll(1.second)

Expand Down
Loading