-
I'm trying to come up with a way to cancel/interrupt a (potentially) long-running CPU-bound computation. By default, I would run the computation with
This is what I came up with: https://github.com/durban/choam/blob/97e3b751e77daf0641869efb08bbcbb8a021270f/core/shared/src/test/scala/dev/tauri/choam/IOSpec.scala#L33-L50 Here is the important part: def stoppable[F[_], A](task: AtomicBoolean => F[A])(implicit F: Spawn[F], S: Sync[F]): F[A] = {
F.flatMap(S.delay { new AtomicBoolean(false) }) { stopSignal =>
val tsk: F[A] = task(stopSignal)
F.uncancelable { poll =>
F.flatMap(F.start(tsk)) { fiber =>
F.onCancel(
fa = poll(fiber.joinWithNever),
fin = F.productR(S.delay { stopSignal.set(true) })(
// so that cancel backpressures until
// the task actually observes the signal
// and stops whatever it is doing:
F.void(fiber.joinWithNever)
)
)
}
}
}
} It can be used like this: stoppable { stop =>
@tailrec
def go(n: Long): Long = {
if (stop.get()) {
println(s"Stopping at ${n}")
n
} else {
go(n + 1)
}
}
F.delay { go(0L) }
} This seems to work fine, but I have the following questions:
|
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 7 replies
-
Well, the idiomatic thing to do is to split it into multiple blocks, so that there is possibility for cancellation between them. F.delay { step1() } *> F.delay { step2() } *> F.delay { step3() } ... But, there are bigger concerns here than cancellation.
Yes, there is a downside: your
From https://typelevel.org/cats-effect/docs/thread-model#thread-blocking This is why |
Beta Was this translation helpful? Give feedback.
-
This might be simpler :-) def stoppable[F[_]: Async, A](task: AtomicBoolean => F[A])(implicit S: Sync[F]): F[A] =
Async[F] async { cb =>
Sync[F] defer {
val flag = new AtomicBoolean(false)
val runner = Sync[F].delay(task(flag)).attempt.flatMap(e => Sync[F].delay(cb(e)))
runner.start.as(Some(Sync[F].delay(flag.set(true))))
}
} |
Beta Was this translation helpful? Give feedback.
This might be simpler :-)