Skip to content

Commit

Permalink
Release 0.0.22
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Mar 6, 2024
1 parent a38f975 commit 7d7e0ff
Show file tree
Hide file tree
Showing 13 changed files with 308 additions and 53 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Safe direct-style concurrency and resiliency for Scala on the JVM. Requires JDK
[sbt](https://www.scala-sbt.org) dependency:

```scala
"com.softwaremill.ox" %% "core" % "0.0.21"
"com.softwaremill.ox" %% "core" % "0.0.22"
```

Documentation is available at [https://ox.softwaremill.com](https://ox.softwaremill.com).
Expand Down
39 changes: 39 additions & 0 deletions generated-doc/out/adr/0004-channels-safe-unsafe-operations.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# 4. Channels: safe/unsafe Operations

Date: 2024-02-28

## Context

Channel operations such as `send`, `receive`, `select`, `close` etc. might fail because a channel is closed. How should
this be signalled to the user?

## Decision

We decided to have two variants of the methods:

* default: `send`, `receive` etc., which throw an exception, when the channel is closed
* safe: `sendSafe`, `receiveSafe` etc., which return a `ChannelClosed` value, when the channel is closed

The "safe" variants are more performant: no stack trace is created, when the channel is closed. They are used by all
channel combinators (such as `map`, `filter` etc.), to detect and propagate the errors downstream.

### Why not `Either` or `Try`?

To avoid allocations on each operation (e.g. receive). Channels might be on the "hot path" and they might be important
for performance. Union types provide a nice alternative here.

Even with `Either`, though, if e.g. `send` had a signature `Either[ChannelClosed, Unit]`, discarding the result would
at most be a warning (not in all cases), so potentially an error might go unnoticed.

### Why is the default to throw?

Let's consider `send`. If the default would be `send(t: T): ChannelClosed | Unit`, with an additional exception-throwing
variant `sendUnsafe(t: T): Unit`, then the API would be quite surprising.

Coming to the library as a new user, they could just call send / receive. The compiler might warn them in some cases
that they discard the non-unit result of `send`, but (a) would they pay attention to those warnings, and (b) would they
get them in the first place (this type of compiler warning isn't detected in 100% o fcases).

In other words - it would be quite easy to mistakenly discard the results of `send`, so a default which guards against
that (by throwing exceptions) is better, and the "safe" can always be used intentionally version if that's what's
needed.
28 changes: 28 additions & 0 deletions generated-doc/out/adr/0005-application-errors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# 5. Application errors

Date: 2024-03-05

## Context

In some cases, it's useful to treat some return values as errors, which should cause the enclosing scope to end.

## Decision

For computation combinators, which include `par`, `race` and `supervised`, we decided to introduce the concept of
application errors. These are values of a shape defined by an `ErrorMode`, which are specially treated by ox - if
such a value represents an error, the enclosing scope ends.

Some design limitations include:

* we want normal scopes to remain unchanged
* methods requiring a concurrency scope (that is, `using Ox`) should be callable from the new scope
* all forks that might report application errors, must be constrained to return the same type of application errors
* computation combinators, such as `par`, should have a single implmentation both when using application errors and
exceptions only

Taking this into account, we separate the `Ox` capability, which allows starting forks, and `OxError`, which
additionally allows reporting application errors. An inheritance hierarchy, `OxError <: Ox` ensures that we can call
methods requiring the `Ox` capability if `OxError` is available, but not the other way round.

Finally, introducing a special `forkError` method allows us to require that it is run within a `supervisedError` scope
and that it must return a value of the correct shape.
35 changes: 26 additions & 9 deletions generated-doc/out/dictionary.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,30 @@

How we use various terms throughout the codebase and the documentation (or at least try to):

* **concurrency scope**: either `supervised` (default) or `scoped` ("advanced")
* a scope **ends**: when unsupervised, the main body is entirely evaluated; when supervised, all user (non-daemon),
supervised forks complete successfully, or at least one supervised fork fails. When the scope ends, all running
forks are interrupted
* scope **completes**, once all forks complete and finalizers are run. In other words, the `supervised` or `scoped`
Scopes:
* **concurrency scope**: either `supervised` (default), `supervisedError` (permitting application errors),
or `scoped` ("advanced")
* scope **body**: the code block passed to a concurrency scope (the `supervised` or `scoped` method)

Fork lifecycle:
* within scopes, asynchronously running **forks** can be **started**
* after being started a fork is **running**
* then, forks **complete**: either a fork **succeeds** with a value, or a fork **fails** with an exception
* external **cancellation** (`Fork.cancel()`) interrupts the fork and waits until it completes; interruption uses
JVM's mechanism of injecting an `InterruptedException`

Scope lifecycle:
* a scope **ends**: when unsupervised, the scope's body is entirely evaluated; when supervised, all user (non-daemon) &
supervised forks complete successfully, or at least one user/daemon supervised fork fails, or an application error
is reported. When the scope ends, all forks that are still running are cancelled
* scope **completes**, once all forks complete and finalizers are run; then, the `supervised` or `scoped`
method returns.
* forks are **started**, and then they are **running**
* forks **complete**: either a fork **succeeds**, or a fork **fails** with an exception
* **cancellation** (`Fork.cancel()`) interrupts the fork and waits until it completes
* scope **body**: the code block passed to a `supervised` or `scoped` method

Errors:
* fork **failure**: when a fork fails with an exception
* **application error**: forks might successfully complete with values which are considered application-level errors;
such values are reported to the enclosing scope and cause the scope to end

Other:
* **computation combinator**: a method which takes user-provided functions and manages their execution, e.g. using
concurrency, interruption, and appropriately handling errors; examples include `par`, `race`, `retry`, `timeout`
58 changes: 58 additions & 0 deletions generated-doc/out/error-handling-scopes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Error handling in scopes

How errors are handled depends on the type of concurrency scope that is used.

## Supervised scope

The "default" and recommended scope is created using `supervised`. When this scope is used, any fork created using
`fork` or `forkUser` that fails with an exception, will cause the enclosing scope to end:

```scala
import ox.{forkUser, supervised}

supervised {
forkUser {
Thread.sleep(100)
throw new RuntimeException("boom!")
}
forkUser {
// other forks will be interrupted
}
}
// will re-throw the "boom!' exception
```

If an unsupervised fork fails (created using `forkUnsupervised` / `forkCancellable`), that exception will be thrown
when invoking `Fork.join`.

## Supervised scope with application errors

Additionally, supervised scopes can be created with an error mode, which allows ending the scope when a fork returns
a value that is an [application error](error-handling.md). This can be done by using `supervisedError` and `forkError`,
for example:

```scala
import ox.{EitherMode, forkUserError, supervisedError}

supervisedError(EitherMode[Int]) {
forkUserError { Left(10) }
Right(())
}
// returns Left(10)
```

Even though the body of the scope returns success (a `Right`), the scope ends with an application error (a `Left`),
which is reported by a user fork. Note that if we used a daemon fork, the scope might have ended before the error
was reported.

Only forks created with `forkError` and `forkUserError` can report application errors, and they **must** return a value
of the shape as described by the error mode (in the example above, all `forkError`, `forkUserError` and the scope body
must return an `Either[Int, T]` for arbitrary `T`s).

The behavior of `fork` and `forkUser` in `supervisedError` scopes is unchanged, that is, their return values are not
inspected.

## Unsupervised scopes

In an unsupervised scope (created using `scoped`), failures of the forks won't be reported in any way, unless they
are explicitly joined. Hence, if there's no `Fork.join`, the exception might go unnoticed.
39 changes: 39 additions & 0 deletions generated-doc/out/error-handling.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# General approach to error handling

The primary error signalling mechanism in ox are exceptions. They are appropriately handled by computation combinators,
such as [`par`](par.md), [`race`](race.md), as well as by [scopes](fork-join.md) and [channels](channels/index.md).

The general rule for computation combinators is that using them should throw exactly the same exceptions, as if the
provided code was executed directly. That is, no additional exceptions might be thrown, and no exceptions are swallowed.
The only difference is that some exceptions might be added as suppressed (e.g. interrupted exceptions).

Some examples of exception handling in ox include:

* short-circuiting in `par` and `race` when one of the computations fails
* retrying computations in `retry` when they fail
* ending a `supervised` concurrency scope when a supervised fork fails

## Application errors

Some of the functionalities provided by ox also support application-level errors. Such errors are represented as values,
e.g. the left side of an `Either[MyError, MyResult]`. They are not thrown, but returned from the computations which
are orchestrated by ox.

Ox must be made aware of how such application errors are represented. This is done through an `ErrorMode`. Provided
implementations include `EitherMode[E]` (where left sides of `Either`s are used to represent errors), and
`UnionMode[E]`, where a union type of `E` and a successful value is used. Arbitrary user-provided implementations
are possible as well.

Error modes can be used in [`supervisedError`](error-handling-scopes.md) scopes, as well as in variants of the `par`
and `race` methods.

```eval_rst
.. note::
Using application errors allows specifying the possible errors in the type signatures of the methods, and is hence
more type-safe. If used consistently, exceptions might be avoided altogether, except for signalling bugs in the code.
However, representing errors as values might incur a syntax overhead, and might be less convenient in some cases.
Moreover, all I/O libraries typically throw exceptions - to use them with errors-as-values, one would need to provide
a wrapper which would convert such exceptions to values. Hence, while application errors provide a lot of benefits,
they are not a universal solution to error handling.
```
2 changes: 1 addition & 1 deletion generated-doc/out/extension.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Extension methods

Extension-method syntax can be imported using `import ox.syntax.*`. This allows calling methods such as
`.fork`, `.raceSuccessWith`, `.parWith`, `.forever`, `.useInScope` directly on code blocks / values.
`.fork`, `.raceWith`, `.parWith`, `.forever`, `.useInScope` directly on code blocks / values.
29 changes: 11 additions & 18 deletions generated-doc/out/fork-join.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Fork & join threads

It's safest to use higher-level methods, such as `par` or `raceSuccess`, however this isn't always sufficient. For
It's safest to use higher-level methods, such as `par` or `race`, however this isn't always sufficient. For
these cases, threads can be started using the structured concurrency APIs described below.

Forks (new threads) can only be started with a **scope**. Such a scope is defined using the `supervised` or `scoped`
methods.
Forks (new threads) can only be started within a **concurrency scope**. Such a scope is defined using the `supervised`,
`supervisedError` or `scoped` methods.

The lifetime of the forks is defined by the structure of the code, and corresponds to the enclosing `supervised` or
`scoped` block. Once the code block passed to the scope completes, any forks that are still running are interrupted.
Expand All @@ -13,10 +13,11 @@ The whole block will complete only once all forks have completed (successfully,
Hence, it is guaranteed that all forks started within `supervised` or `scoped` will finish successfully, with an
exception, or due to an interrupt.

For example, the code below is equivalent to `par`:

```scala
import ox.{fork, supervised}

// same as `par`
supervised {
val f1 = fork {
Thread.sleep(2000)
Expand Down Expand Up @@ -59,8 +60,8 @@ The default scope, created with `supervised`, watches over the forks that are st

This means that the scope will end only when either:

* all (user, supervised) forks, including the main body passed to `supervised`, succeed
* or any (supervised) fork, including the main body passed to `supervised`, fails
* all (user, supervised) forks, including the body passed to `supervised`, succeed
* or any (supervised) fork, including the body passed to `supervised`, fails

Hence an exception in any of the forks will cause the whole scope to end. Ending the scope means that all running forks
are cancelled (using interruption). Once all forks complete, the exception is propagated further, that is re-thrown by
Expand All @@ -85,14 +86,14 @@ supervised {

## User, daemon and unsupervised forks

In supervised scoped, forks created using `fork` behave as daemon threads. That is, their failure ends the scope, but
the scope will also end once the main body and all user forks succeed, regardless if the (daemon) fork is still running.
In supervised scopes, forks created using `fork` behave as daemon threads. That is, their failure ends the scope, but
the scope will also end once the body and all user forks succeed, regardless if the (daemon) fork is still running.

Alternatively, a user fork can be created using `forkUser`. Such a fork is required to complete successfully, in order
for the scope to end successfully. Hence when the main body of the scope completes, the scope will wait until all user
for the scope to end successfully. Hence, when the body of the scope completes, the scope will wait until all user
forks have completed as well.

Finally, entirely unsupervised forks can be ran using `forkUnsupervised`.
Finally, entirely unsupervised forks can be started using `forkUnsupervised`.

## Unsupervised scopes

Expand All @@ -115,11 +116,3 @@ it involves creating a nested scope and two virtual threads, instead of one.
The `CancellableFork` trait exposes the `.cancel` method, which interrupts the fork and awaits its completion.
Alternatively, `.cancelNow` returns immediately. In any case, the enclosing scope will only complete once all forks have
completed.

## Error handling

In supervised mode, if a fork fails with an exception, the enclosing scope will end.

Moreover, if a fork fails with an exception, the `Fork.join` method will throw that exception.

In unsupervised mode, if there's no join and the fork fails, the exception might go unnoticed.
4 changes: 3 additions & 1 deletion generated-doc/out/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ In addition to this documentation, ScalaDocs can be browsed at [https://javadoc.
## sbt dependency

```scala
"com.softwaremill.ox" %% "core" % "0.0.21"
"com.softwaremill.ox" %% "core" % "0.0.22"
```

## Scope of the project
Expand Down Expand Up @@ -64,7 +64,9 @@ We offer commercial support for ox and related technologies, as well as developm
race
collections
timeout
error-handling
fork-join
error-handling-scopes
scoped-values
retries
interruptions
Expand Down
2 changes: 1 addition & 1 deletion generated-doc/out/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Dependency:

```scala
"com.softwaremill.ox" %% "kafka" % "0.0.21"
"com.softwaremill.ox" %% "kafka" % "0.0.22"
```

`Source`s which read from a Kafka topic, mapping stages and drains which publish to Kafka topics are available through
Expand Down
47 changes: 45 additions & 2 deletions generated-doc/out/par.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Running computations in parallel

A number of computations can be ran in parallel using the `par` method, for example:

```scala
import ox.par

Expand All @@ -11,8 +13,49 @@ def computation2: String =
Thread.sleep(1000)
"2"

val result: (Int, String) = par(computation1)(computation2)
val result: (Int, String) = par(computation1, computation2)
// (1, "2")
```

If one of the computations fails, the other is interrupted, and `par` waits until both branches complete.
If any of the computations fails, the other is interrupted. In such case, `par` waits until both branches complete
and then re-throws the exception.

It's also possible to run a sequence of computations given as a `Seq[() => T]` in parallel, optionally limiting the
parallelism using `parLimit`:

```scala
import ox.parLimit

def computation(n: Int): Int =
Thread.sleep(1000)
println(s"Running $n")
n*2

val computations = (1 to 20).map(n => () => computation(n))
val result: Seq[Int] = parLimit(5)(computations)
// (1, "2")
```

## Using application errors

Some values might be considered as application errors. In a computation returns such an error, other computations are
interrupted, same as when an exception is thrown. The error is then returned by the `par` method.

It's possible to use an arbitrary [error mode](error-handling.md) by providing it as the initial argument to `par`.
Alternatively, a built-in version using `Either` is available as `parEither`:

```scala
import ox.parEither

val result = parEither(
{
Thread.sleep(200)
Right("ok")
}, {
Thread.sleep(100)
Left(-1)
}
)

// result is Left(-1), the other branch is interrupted
```

0 comments on commit 7d7e0ff

Please sign in to comment.