Subject: [PATCH] app errors --- Index: core/src/test/scala/ox/AppErrorTest.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/core/src/test/scala/ox/AppErrorTest.scala b/core/src/test/scala/ox/AppErrorTest.scala new file mode 100644 --- /dev/null (date 1709221858442) +++ b/core/src/test/scala/ox/AppErrorTest.scala (date 1709221858442) @@ -0,0 +1,40 @@ +package ox + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import java.util.concurrent.Semaphore + +class AppErrorTest extends AnyFlatSpec with Matchers: + "supervisedError" should "return the app error from the main body" in { + supervisedError(EitherMode[Int])(Left(10)) shouldBe Left(10) + } + + it should "return success from the main body" in { + supervisedError(EitherMode[Int])(Right("ok")) shouldBe Right("ok") + } + + it should "return the app error returned by a failing fork" in { + supervisedError(EitherMode[Int]) { forkUserError(Left(10)).join(); Right(()) } shouldBe Left(10) + } + + it should "return success from the main body if a fork is successful" in { + supervisedError(EitherMode[Int]) { forkUserError(Right("ok")).join(); Right(()) } shouldBe Right(()) + } + + it should "interrupt other forks if one fails" in { + val s = Semaphore(0) + supervisedError(EitherMode[Int]) { + forkUser { + s.acquire() // will never complete + } + forkUser { + s.acquire() // will never complete + } + forkUserError { + Thread.sleep(100) + Left(-1) + } + Right(()) + } shouldBe Left(-1) + } Index: core/src/main/scala/ox/fork.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/core/src/main/scala/ox/fork.scala b/core/src/main/scala/ox/fork.scala --- a/core/src/main/scala/ox/fork.scala (revision a5c442507ce16b3e5cd489a39133c1b9e3c3b71b) +++ b/core/src/main/scala/ox/fork.scala (date 1709220011750) @@ -1,7 +1,7 @@ package ox import java.util.concurrent.{CompletableFuture, Semaphore} -import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import scala.concurrent.ExecutionException import scala.util.control.NonFatal @@ -23,15 +23,27 @@ * - in case an exception is thrown while evaluating `t`, it will be thrown when calling the returned [[Fork]]'s `.join()` method. * - if the main body of the scope completes successfully, while this fork is still running, the fork will be cancelled */ -def fork[T](f: => T)(using Ox): Fork[T] = +def fork[T](f: => T)(using Ox): Fork[T] = forkError(using summon[Ox].asExceptionModeOxError)(f) + +def forkError[E, F[_], T](using OxError[E, F])(f: => F[T]): Fork[T] = + val oxError = summon[OxError[E, F]] + // the separate result future is needed to wait for the result, as there's no .join on individual tasks (only whole scopes can be joined) val result = new CompletableFuture[T]() - summon[Ox].scope.fork { () => - val supervisor = summon[Ox].supervisor - try result.complete(f) + oxError.scope.fork { () => + val supervisor = oxError.supervisor + try + val resultOrError = f + val errorMode = oxError.errorMode + if errorMode.isError(resultOrError) then + // result is never completed, the supervisor should end the scope + supervisor.forkAppError(errorMode.getError(resultOrError)) + else result.complete(errorMode.getT(resultOrError)) catch case e: Throwable => + // we notify the supervisor first, so that if this is the first failing fork in the scope, the supervisor will + // get first notified of the exception by the "original" (this) fork result.completeExceptionally(e) - supervisor.forkError(e) + supervisor.forkException(e) } newForkUsingResult(result) @@ -52,21 +64,26 @@ * - in case an exception is thrown while evaluating `t`, it will be thrown when calling the returned [[Fork]]'s `.join()` method. * - if the main body of the scope completes successfully, while this fork is still running, the fork will be cancelled */ -def forkUser[T](f: => T)(using Ox): Fork[T] = - // the separate result future is needed to wait for the result, as there's no .join on individual tasks (only whole scopes can be joined) +def forkUser[T](f: => T)(using Ox): Fork[T] = forkUserError(using summon[Ox].asExceptionModeOxError)(f) + +def forkUserError[E, F[_], T](using OxError[E, F])(f: => F[T]): Fork[T] = + val oxError = summon[OxError[E, F]] val result = new CompletableFuture[T]() - val ox = summon[Ox] - ox.supervisor.forkStarts() - ox.scope.fork { () => - val supervisor = summon[Ox].supervisor + oxError.supervisor.forkStarts() + oxError.scope.fork { () => + val supervisor = oxError.supervisor.asInstanceOf[DefaultSupervisor[E]] try - result.complete(f) - supervisor.forkSuccess() + val resultOrError = f + val errorMode = oxError.errorMode + if errorMode.isError(resultOrError) then + // result is never completed, the supervisor should end the scope + supervisor.forkAppError(errorMode.getError(resultOrError)) + else + result.complete(errorMode.getT(resultOrError)) + supervisor.forkSuccess() catch case e: Throwable => - // we notify the supervisor first, so that if this is the first failing fork in the scope, the supervisor will - // get first notified of the exception by the "original" (this) fork - supervisor.forkError(e) + supervisor.forkException(e) result.completeExceptionally(e) } newForkUsingResult(result) @@ -119,20 +136,18 @@ val done = new Semaphore(0) val ox = summon[Ox] ox.scope.fork { () => - scoped { - supervisor(ox.supervisor) { - val nestedOx = summon[Ox] - nestedOx.scope.fork { () => - // "else" means that the fork is already cancelled, so doing nothing in that case - if !started.getAndSet(true) then - try result.complete(f) - catch case e: Throwable => result.completeExceptionally(e) + scopedWithCapability(OxError(DoNothingScope[Any], new AtomicReference(Nil), ox.supervisor, ExceptionMode)) { + val nestedOx = summon[Ox] + nestedOx.scope.fork { () => + // "else" means that the fork is already cancelled, so doing nothing in that case + if !started.getAndSet(true) then + try result.complete(f) + catch case e: Throwable => result.completeExceptionally(e) - done.release() // the nested scope can now finish - } + done.release() // the nested scope can now finish + } - done.acquire() - } + done.acquire() } } new CancellableFork[T]: @@ -157,7 +172,7 @@ private def newForkUsingResult[T](result: CompletableFuture[T]): Fork[T] = new Fork[T]: override def join(): T = unwrapExecutionException(result.get()) -private[ox] def unwrapExecutionException[T](f: => T): T = +private[ox] inline def unwrapExecutionException[T](f: => T): T = try f catch case e: ExecutionException => throw e.getCause Index: core/src/main/scala/ox/scoped.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/core/src/main/scala/ox/scoped.scala b/core/src/main/scala/ox/scoped.scala --- a/core/src/main/scala/ox/scoped.scala (revision a5c442507ce16b3e5cd489a39133c1b9e3c3b71b) +++ b/core/src/main/scala/ox/scoped.scala (date 1709219886468) @@ -25,12 +25,16 @@ * [[supervised]] Starts a scope in supervised mode */ def scoped[T](f: Ox ?=> T): T = + scopedWithCapability(OxError(DoNothingScope[Any](), new AtomicReference(Nil), NoOpSupervisor, ExceptionMode))(f) + +private[ox] def scopedWithCapability[T](capability: Ox)(f: Ox ?=> T): T = def throwWithSuppressed(es: List[Throwable]): Nothing = val e = es.head es.tail.foreach(e.addSuppressed) throw e - val finalizers = new AtomicReference(List.empty[() => Unit]) + val scope = capability.scope + val finalizers = capability.finalizers def runFinalizers(result: Either[Throwable, T]): T = val fs = finalizers.get if fs.isEmpty then result.fold(throw _, identity) @@ -47,16 +51,15 @@ case Right(t) if es.isEmpty => t case _ => throwWithSuppressed(es) - val scope = new DoNothingScope[Any]() try val t = - try - try f(using Ox(scope, finalizers, NoOpSupervisor)) + try { + try f(using capability) finally scope.shutdown() scope.join() - // join might have been interrupted - finally scope.close() + // join might have been interrupted + } finally scope.close() // running the finalizers only once we are sure that all child threads have been terminated, so that no new // finalizers are added, and none are lost Index: core/src/main/scala/ox/supervised.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/core/src/main/scala/ox/supervised.scala b/core/src/main/scala/ox/supervised.scala --- a/core/src/main/scala/ox/supervised.scala (revision a5c442507ce16b3e5cd489a39133c1b9e3c3b71b) +++ b/core/src/main/scala/ox/supervised.scala (date 1709220249575) @@ -1,7 +1,8 @@ package ox -import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} import java.util.concurrent.{CompletableFuture, ConcurrentHashMap} +import scala.reflect.ClassTag /** Starts a new scope, which allows starting forks in the given code block `f`. Forks can be started using [[fork]], [[forkUser]], * [[forkCancellable]] and [[forkUnsupervised]]. All forks are guaranteed to complete before this scope completes. @@ -21,13 +22,21 @@ * @see * [[scoped]] Starts a scope in unsupervised mode */ -def supervised[T](f: Ox ?=> T): T = - val s = DefaultSupervisor() +def supervised[T](f: Ox ?=> T): T = supervisedError(ExceptionMode)(f) + +def supervisedError[E, F[_], T](em: ErrorMode[E, F])(f: OxError[E, F] ?=> F[T]): F[T] = + val s = DefaultSupervisor[E]() + val capability = OxError(DoNothingScope[Any], new AtomicReference(Nil), s, em) try - scoped { - val r = supervisor(s)(forkUser(f)) - s.join() // might throw if any supervised fork threw - r.join() // if no exceptions, the main f-fork must be done by now + scopedWithCapability(capability) { + val mainBodyFork = forkUserError(using capability)(f(using capability)) + val supervisorResult = s.join() // might throw if any supervised fork threw + if supervisorResult == ErrorModeSupervisorResult.Success then + // if no exceptions, the main f-fork must be done by now + em.pure(mainBodyFork.join()) + else + // an app error was reported to the supervisor + em.pureError(supervisorResult.asInstanceOf[E]) } catch case e: Throwable => @@ -36,46 +45,46 @@ s.addOtherExceptionsAsSuppressedTo(e) throw e -trait Supervisor: +sealed trait Supervisor[-E]: def forkStarts(): Unit def forkSuccess(): Unit - def forkError(e: Throwable): Unit + def forkException(e: Throwable): Unit + def forkAppError(e: E): Unit - /** Wait until the count of all supervised, user forks that are running reaches 0, or until any supervised fork fails with an exception. - * - * The completion of this method is typically followed by ending the scope, which cancels any forks that are still running. - * - * Note that (daemon) forks can still start supervised user forks after this method returns. - */ - def join(): Unit - -object NoOpSupervisor extends Supervisor: +object NoOpSupervisor extends Supervisor[Nothing]: override def forkStarts(): Unit = () override def forkSuccess(): Unit = () - override def forkError(e: Throwable): Unit = () - override def join(): Unit = () + override def forkException(e: Throwable): Unit = () + override def forkAppError(e: Nothing): Unit = () -class DefaultSupervisor() extends Supervisor: +class DefaultSupervisor[E]() extends Supervisor[E]: private val running: AtomicInteger = AtomicInteger(0) - private val result: CompletableFuture[Unit] = new CompletableFuture() + private val result: CompletableFuture[ErrorModeSupervisorResult | E] = new CompletableFuture() private val otherExceptions: java.util.Set[Throwable] = ConcurrentHashMap.newKeySet() override def forkStarts(): Unit = running.incrementAndGet() override def forkSuccess(): Unit = val v = running.decrementAndGet() - if v == 0 then result.complete(()) + if v == 0 then result.complete(ErrorModeSupervisorResult.Success) - override def forkError(e: Throwable): Unit = if !result.completeExceptionally(e) then otherExceptions.add(e) + override def forkException(e: Throwable): Unit = if !result.completeExceptionally(e) then otherExceptions.add(e) - override def join(): Unit = unwrapExecutionException(result.get()) + override def forkAppError(e: E): Unit = if !result.complete(e) then otherExceptions.add(SecondaryAppError(e)) + + /** Wait until the count of all supervised, user forks that are running reaches 0, or until any supervised fork fails with an exception. + * + * The completion of this method is typically followed by ending the scope, which cancels any forks that are still running. + * + * Note that (daemon) forks can still start supervised user forks after this method returns. + */ + def join(): ErrorModeSupervisorResult | E = unwrapExecutionException(result.get()) def addOtherExceptionsAsSuppressedTo(e: Throwable): Throwable = otherExceptions.forEach(e2 => if e != e2 then e.addSuppressed(e2)) e -/** Change the supervisor that is being used when running `f`. Doesn't affect existing usages of the current supervisor, or forks ran - * outside of `f`. - */ -def supervisor[T](supervisor: Supervisor)(f: Ox ?=> T)(using Ox): T = - f(using summon[Ox].copy(supervisor = supervisor)) +private[ox] enum ErrorModeSupervisorResult: + case Success + +case class SecondaryAppError[E](e: E) extends Throwable("Secondary app error reported to the supervisor") Index: core/src/main/scala/ox/PoC.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/core/src/main/scala/ox/PoC.scala b/core/src/main/scala/ox/PoC.scala new file mode 100644 --- /dev/null (date 1709213928291) +++ b/core/src/main/scala/ox/PoC.scala (date 1709213928291) @@ -0,0 +1,117 @@ +package ox + +import scala.reflect.ClassTag + +object PoC: + // def findInDb: Either[NotFound, User] = ??? + // def findInCache: Either[Miss, User] = ??? + // race(findInDb, findInCache)(EitherMode): Either[NotFound | Miss | User] + + /* + Most Ox-requiring methods will require only the basic capability, of throwing exceptions, that is a + fork(f: => T): Fork[T] method. Should not inspect the return type of the method and conditionally fail. + + Additionally, we might want to fork & inspect the result for potential "business errors" + + The scope might use a non-trivial error mode, but we should be able to call methods with a simpler (trivial) mode. + + The "generic methods" depend on the trivial ox, not on a generic form. + + Computation combinator: takes user-provided functions & error mode, wants to operate on any mode, it depends on the + user functions to report errors (par, race) + + Two types of forks: + * not unwrapping the returned type + * unwrapping the returned type + Both handle exceptions + + forkError should only be used in supervised scopes - in case of an app error the forks never complete + */ + + trait OxBase {} + + trait OxError[E, F[_]] extends OxBase { + def errorMode: ErrorMode[E, F] + } + + // + + // error is propagated to the scope; add default override which takes => T? + def forkError[E, F[_], T](using OxError[E, F])(f: => F[T]): Fork[T] = { + val oe = summon[OxError[E, F]] + fork2 { + val r = f + oe.errorMode.getT(r) + } + } + + def fork2[T](f: => T)(using OxBase): Fork[T] = ??? + + // + + // throws if forks throw; returns error if there's an error + def supervisedError[E, F[_], U](em: ErrorMode[E, F])(f: OxError[E, F] ?=> F[U]): F[U] = { + f(using + new OxError[E, F] { + override def errorMode: ErrorMode[E, F] = em + } + ) + } + + def supervisedBase[U](f: OxBase ?=> U): U = { + f(using new OxBase {}) // wrap ox + } + + def useOx(x: Int)(using OxBase): Int = 42 + + val result1: Either[String, Int] = supervisedError(new EitherMode[String]) { // (oxx: Oxx[String, [T] =>> Either[String, T]]) => + useOx(10) // (using summon[OxError[String, [T] =>> Either[String, T]]].convert) + + val x: Int = forkError { + Left(""): Either[String, Int] + } + .join() + + Right(x) + } + + val result2: String | Int = supervisedError(new UnionMode[String]) { + useOx(10) + + val r: Int = forkError { // (using summon[OxError[String, [T] =>> String | T]]) + "": String | Int + } + .join() + r + } + + val result3: Int = supervisedBase { + useOx(10) + + val r = fork2 { + (throw new RuntimeException("")): Int + } + .join() + r + } + + val result4: Int = supervisedBase { + useOx(10) + + val x: Fork[Int] = fork2 { 1 } + x.join() + } + + // + + def par2[E, F[_], T, U](errorMode: ErrorMode[E, F])(f1: => F[T], f2: => F[U]): F[(T, U)] = { + supervisedError(errorMode) { + val r1 = forkError(f1) + val r2 = forkError(f2) + val t1 = r1.join() + val t2 = r2.join() + errorMode.pure((t1, t2)) + } + } + + def par2[T, U](f1: => T, f2: => U)(using OxBase): (T, U) = par2(ExceptionMode)(f1, f2) Index: core/src/main/scala/ox/ErrorMode.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/core/src/main/scala/ox/ErrorMode.scala b/core/src/main/scala/ox/ErrorMode.scala new file mode 100644 --- /dev/null (date 1709215940268) +++ b/core/src/main/scala/ox/ErrorMode.scala (date 1709215940268) @@ -0,0 +1,32 @@ +package ox + +import scala.reflect.ClassTag + +trait ErrorMode[E, F[_]] { + def isError[T](f: F[T]): Boolean + def getError[T](f: F[T]): E + def getT[T](f: F[T]): T + def pure[T](t: T): F[T] + def pureError[T](e: E): F[T] +} +object ExceptionMode extends ErrorMode[Nothing, [T] =>> T] { + override def isError[T](f: T): Boolean = false + override def getError[T](f: T): Nothing = throw new IllegalStateException() + override def getT[T](f: T): T = f + override def pure[T](t: T): T = t + override def pureError[T](e: Nothing): T = e +} +class EitherMode[E] extends ErrorMode[E, [T] =>> Either[E, T]] { + override def isError[T](f: Either[E, T]): Boolean = f.isLeft + override def getError[T](f: Either[E, T]): E = f.left.get + override def getT[T](f: Either[E, T]): T = f.right.get + override def pure[T](t: T): Either[E, T] = Right(t) + override def pureError[T](e: E): Either[E, T] = Left(e) +} +class UnionMode[E: ClassTag] extends ErrorMode[E, [T] =>> E | T] { + override def isError[T](f: E | T): Boolean = summon[ClassTag[E]].runtimeClass.isInstance(f) + override def getError[T](f: E | T): E = f.asInstanceOf[E] + override def getT[T](f: E | T): T = f.asInstanceOf[T] + override def pure[T](t: T): E | T = t + override def pureError[T](e: E): E | T = e +} Index: core/src/main/scala/ox/Ox.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/core/src/main/scala/ox/Ox.scala b/core/src/main/scala/ox/Ox.scala --- a/core/src/main/scala/ox/Ox.scala (revision a5c442507ce16b3e5cd489a39133c1b9e3c3b71b) +++ b/core/src/main/scala/ox/Ox.scala (date 1709221986129) @@ -7,9 +7,20 @@ @implicitNotFound( "This operation must be run within a `supervised` or `scoped` block. Alternatively, you must require that the enclosing method is run within a scope, by adding a `using Ox` parameter list." ) -case class Ox( - scope: StructuredTaskScope[Any], - finalizers: AtomicReference[List[() => Unit]], - supervisor: Supervisor -): +trait Ox: + private[ox] def scope: StructuredTaskScope[Any] + private[ox] def finalizers: AtomicReference[List[() => Unit]] + private[ox] def supervisor: Supervisor[Nothing] + private[ox] def addFinalizer(f: () => Unit): Unit = finalizers.updateAndGet(f :: _) + private[ox] def asExceptionModeOxError: OxError[Nothing, [T] =>> T] = OxError(scope, finalizers, supervisor, ExceptionMode) // TODO check if not already in exception mode + +@implicitNotFound( + "This operation must be run within a `supervisedError` block. Alternatively, you must require that the enclosing method is run within a scope, by adding a `using OxError[E, F]` parameter list, using the desired error mode type parameters." +) +case class OxError[E, F[_]]( + private[ox] val scope: StructuredTaskScope[Any], + private[ox] val finalizers: AtomicReference[List[() => Unit]], + private[ox] val supervisor: Supervisor[E], + private[ox] val errorMode: ErrorMode[E, F] +) extends Ox