Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 230 additions & 15 deletions _overviews/core/futures.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ permalink: /overviews/core/:title.html
## Introduction

Futures provide a way to reason about performing many operations
in parallel-- in an efficient and non-blocking way.
in parallel -- in an efficient and non-blocking way.

A [`Future`](https://www.scala-lang.org/api/current/scala/concurrent/Future.html)
is a placeholder object for a value that may not yet exist.
Generally, the value of the Future is supplied concurrently and can subsequently be used.
Expand Down Expand Up @@ -283,7 +284,7 @@ Completion can take one of two forms:
A `Future` has an important property that it may only be assigned
once.
Once a `Future` object is given a value or an exception, it becomes
in effect immutable-- it can never be overwritten.
in effect immutable -- it can never be overwritten.

The simplest way to create a future object is to invoke the `Future.apply`
method which starts an asynchronous computation and returns a
Expand Down Expand Up @@ -335,8 +336,8 @@ To obtain the list of friends of a user, a request
has to be sent over a network, which can take a long time.
This is illustrated with the call to the method `getFriends` that returns `List[Friend]`.
To better utilize the CPU until the response arrives, we should not
block the rest of the program-- this computation should be scheduled
asynchronously. The `Future.apply` method does exactly that-- it performs
block the rest of the program -- this computation should be scheduled
asynchronously. The `Future.apply` method does exactly that -- it performs
the specified computation block concurrently, in this case sending
a request to the server and waiting for a response.

Expand Down Expand Up @@ -396,7 +397,7 @@ We are often interested in the result of the computation, not just its
side-effects.

In many future implementations, once the client of the future becomes interested
in its result, it has to block its own computation and wait until the future is completed--
in its result, it has to block its own computation and wait until the future is completed --
only then can it use the value of the future to continue its own computation.
Although this is allowed by the Scala `Future` API as we will show later,
from a performance point of view a better way to do it is in a completely
Expand Down Expand Up @@ -428,7 +429,7 @@ value is a `Throwable`.
Coming back to our social network example, let's assume we want to
fetch a list of our own recent posts and render them to the screen.
We do so by calling a method `getRecentPosts` which returns
a `List[String]`-- a list of recent textual posts:
a `List[String]` -- a list of recent textual posts:

{% tabs futures-05 class=tabs-scala-version %}
{% tab 'Scala 2' for=futures-05 %}
Expand Down Expand Up @@ -650,7 +651,7 @@ some other currency. We would have to repeat this pattern within the
to reason about.

Second, the `purchase` future is not in the scope with the rest of
the code-- it can only be acted upon from within the `foreach`
the code -- it can only be acted upon from within the `foreach`
callback. This means that other parts of the application do not
see the `purchase` future and cannot register another `foreach`
callback to it, for example, to sell some other currency.
Expand Down Expand Up @@ -760,7 +761,7 @@ Here is an example of `flatMap` and `withFilter` usage within for-comprehensions
{% endtabs %}

The `purchase` future is completed only once both `usdQuote`
and `chfQuote` are completed-- it depends on the values
and `chfQuote` are completed -- it depends on the values
of both these futures so its own computation cannot begin
earlier.

Expand Down Expand Up @@ -1086,7 +1087,7 @@ Here is an example of how to block on the result of a future:

In the case that the future fails, the caller is forwarded the
exception that the future is failed with. This includes the `failed`
projection-- blocking on it results in a `NoSuchElementException`
projection -- blocking on it results in a `NoSuchElementException`
being thrown if the original future is completed successfully.

Alternatively, calling `Await.ready` waits until the future becomes
Expand All @@ -1095,7 +1096,7 @@ that method will not throw an exception if the future is failed.

The `Future` trait implements the `Awaitable` trait with methods
`ready()` and `result()`. These methods cannot be called directly
by the clients-- they can only be called by the execution context.
by the clients -- they can only be called by the execution context.



Expand All @@ -1105,8 +1106,8 @@ When asynchronous computations throw unhandled exceptions, futures
associated with those computations fail. Failed futures store an
instance of `Throwable` instead of the result value. `Future`s provide
the `failed` projection method, which allows this `Throwable` to be
treated as the success value of another `Future`. The following special
exceptions are treated differently:
treated as the success value of another `Future`.
The following exceptions receive special treatment:

1. `scala.runtime.NonLocalReturnControl[_]` -- this exception holds a value
associated with the return. Typically, `return` constructs in method
Expand All @@ -1121,11 +1122,225 @@ behind this is to prevent propagation of critical and control-flow related
exceptions normally not handled by the client code and at the same time inform
the client in which future the computation failed.

Fatal exceptions (as determined by `NonFatal`) are rethrown in the thread executing
Fatal exceptions (as determined by `NonFatal`) are rethrown from the thread executing
the failed asynchronous computation. This informs the code managing the executing
threads of the problem and allows it to fail fast, if necessary. See
[`NonFatal`](https://www.scala-lang.org/api/current/scala/util/control/NonFatal$.html)
for a more precise description of the semantics.
for a more precise description of which exceptions are considered fatal.

`ExecutionContext.global` handles fatal exceptions by printing a stack trace, by default.

A fatal exception means that the `Future` associated with the computation will never complete.
That is, "fatal" means that the error is not recoverable for the `ExecutionContext`
and is also not intended to be handled by user code. By contrast, application code may
attempt recovery from a "failed" `Future`, which has completed but with an exception.

An execution context can be customized with a reporter that handles fatal exceptions.
See the factory methods [`fromExecutor`](https://www.scala-lang.org/api/current/scala/concurrent/ExecutionContext$.html#fromExecutor(e:java.util.concurrent.Executor,reporter:Throwable=%3EUnit):scala.concurrent.ExecutionContextExecutor)
and [`fromExecutorService`](https://www.scala-lang.org/api/current/scala/concurrent/ExecutionContext$.html#fromExecutorService(e:java.util.concurrent.ExecutorService,reporter:Throwable=%3EUnit):scala.concurrent.ExecutionContextExecutorService).

Since it is necessary to set the [`UncaughtExceptionHandler`](https://docs.oracle.com/en/java/javase/20/docs/api/java.base/java/lang/Thread.UncaughtExceptionHandler.html)
for executing threads, as a convenience, when passed a `null` executor,
`fromExecutor` will create a context that is configured the same as `global`,
but with the supplied reporter for handling exceptions.

The following example demonstrates how to obtain an `ExecutionContext` with custom error handling
and also shows the result of different exceptions, as described above:

{% tabs exceptions class=tabs-scala-version %}
{% tab 'Scala 2' for=exceptions %}
~~~ scala
import java.util.concurrent.{ForkJoinPool, TimeoutException}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.DurationInt
import scala.util.{Failure, Success}

object Test extends App {
def crashing(): Int = throw new NoSuchMethodError("test")
def failing(): Int = throw new NumberFormatException("test")
def interrupt(): Int = throw new InterruptedException("test")
def erroring(): Int = throw new AssertionError("test")

// computations can fail in the middle of a chain of combinators, after the initial Future job has completed
def testCrashes()(implicit ec: ExecutionContext): Future[Int] =
Future.unit.map(_ => crashing())
def testFails()(implicit ec: ExecutionContext): Future[Int] =
Future.unit.map(_ => failing())
def testInterrupted()(implicit ec: ExecutionContext): Future[Int] =
Future.unit.map(_ => interrupt())
def testError()(implicit ec: ExecutionContext): Future[Int] =
Future.unit.map(_ => erroring())

// Wait for 1 second for the the completion of the passed `future` value and print it
def check(future: Future[Int]): Unit =
try {
Await.ready(future, 1.second)
for (completion <- future.value) {
println(s"completed $completion")
// In case of failure, also print the cause of the exception, when defined
completion match {
case Failure(exception) if exception.getCause != null =>
println(s" caused by ${exception.getCause}")
_ => ()
}
}
} catch {
// If the future value did not complete within 1 second, the call
// to `Await.ready` throws a TimeoutException
case _: TimeoutException => println(s"did not complete")
}

def reporter(t: Throwable) = println(s"reported $t")

locally {
// using the `global` implicit context
import ExecutionContext.Implicits._
// a successful Future
check(Future(42)) // completed Success(42)
// a Future that completes with an application exception
check(Future(failing())) // completed Failure(java.lang.NumberFormatException: test)
// same, but the exception is thrown somewhere in the chain of combinators
check(testFails()) // completed Failure(java.lang.NumberFormatException: test)
// a Future that does not complete because of a linkage error;
// the trace is printed to stderr by default
check(testCrashes()) // did not complete
// a Future that completes with an operational exception that is wrapped
check(testInterrupted()) // completed Failure(java.util.concurrent.ExecutionException: Boxed Exception)
// caused by java.lang.InterruptedException: test
// a Future that completes due to a failed assert, which is bad for the app,
// but is handled the same as interruption
check(testError()) // completed Failure(java.util.concurrent.ExecutionException: Boxed Exception)
// caused by java.lang.AssertionError: test
}
locally {
// same as `global`, but adds a custom reporter that will handle uncaught
// exceptions and errors reported to the context
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(null, reporter)
check(testCrashes()) // reported java.lang.NoSuchMethodError: test
// did not complete
}
locally {
// does not handle uncaught exceptions; the executor would have to be
// configured separately
val executor = ForkJoinPool.commonPool()
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(executor, reporter)
// the reporter is not invoked and the Future does not complete
check(testCrashes()) // did not complete
}
locally {
// sample minimal configuration for a context and underlying pool that
// use the reporter
val handler: Thread.UncaughtExceptionHandler =
(_: Thread, t: Throwable) => reporter(t)
val executor = new ForkJoinPool(
Runtime.getRuntime.availableProcessors,
ForkJoinPool.defaultForkJoinWorkerThreadFactory, // threads use the pool's handler
handler,
/*asyncMode=*/ false
)
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(executor, reporter)
check(testCrashes()) // reported java.lang.NoSuchMethodError: test
// did not complete
}
}
~~~
{% endtab %}

{% tab 'Scala 3' for=exceptions %}
~~~ scala
import java.util.concurrent.{ForkJoinPool, TimeoutException}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.DurationInt
import scala.util.{Failure, Success}

def crashing(): Int = throw new NoSuchMethodError("test")
def failing(): Int = throw new NumberFormatException("test")
def interrupt(): Int = throw new InterruptedException("test")
def erroring(): Int = throw new AssertionError("test")

// computations can fail in the middle of a chain of combinators,
// after the initial Future job has completed
def testCrashes()(using ExecutionContext): Future[Int] =
Future.unit.map(_ => crashing())
def testFails()(using ExecutionContext): Future[Int] =
Future.unit.map(_ => failing())
def testInterrupted()(using ExecutionContext): Future[Int] =
Future.unit.map(_ => interrupt())
def testError()(using ExecutionContext): Future[Int] =
Future.unit.map(_ => erroring())

// Wait for 1 second for the the completion of the passed `future` value and print it
def check(future: Future[Int]): Unit =
try
Await.ready(future, 1.second)
for completion <- future.value do
println(s"completed $completion")
// In case of failure, also print the cause of the exception, when defined
completion match
case Failure(exception) if exception.getCause != null =>
println(s" caused by ${exception.getCause}")
case _ => ()
catch
// If the future value did not complete within 1 second, the call
// to `Await.ready` throws a TimeoutException
case _: TimeoutException => println(s"did not complete")

def reporter(t: Throwable) = println(s"reported $t")

@main def test(): Unit =
locally:
// using the `global` implicit context
import ExecutionContext.Implicits.given
// a successful Future
check(Future(42)) // completed Success(42)
// a Future that completes with an application exception
check(Future(failing())) // completed Failure(java.lang.NumberFormatException: test)
// same, but the exception is thrown somewhere in the chain of combinators
check(testFails()) // completed Failure(java.lang.NumberFormatException: test)
// a Future that does not complete because of a linkage error;
// the trace is printed to stderr by default
check(testCrashes()) // did not complete
// a Future that completes with an operational exception that is wrapped
check(testInterrupted()) // completed Failure(java.util.concurrent.ExecutionException: Boxed Exception)
// caused by java.lang.InterruptedException: test
// a Future that completes due to a failed assert, which is bad for the app,
// but is handled the same as interruption
check(testError()) // completed Failure(java.util.concurrent.ExecutionException: Boxed Exception)
// caused by java.lang.AssertionError: test

locally:
// same as `global`, but adds a custom reporter that will handle uncaught
// exceptions and errors reported to the context
given ExecutionContext = ExecutionContext.fromExecutor(null, reporter)
check(testCrashes()) // reported java.lang.NoSuchMethodError: test
// did not complete

locally:
// does not handle uncaught exceptions; the executor would have to be
// configured separately
val executor = ForkJoinPool.commonPool()
given ExecutionContext = ExecutionContext.fromExecutor(executor, reporter)
// the reporter is not invoked and the Future does not complete
check(testCrashes()) // did not complete

locally:
// sample minimal configuration for a context and underlying pool that
// use the reporter
val handler: Thread.UncaughtExceptionHandler =
(_: Thread, t: Throwable) => reporter(t)
val executor = new ForkJoinPool(
Runtime.getRuntime.availableProcessors,
ForkJoinPool.defaultForkJoinWorkerThreadFactory, // threads use the pool's handler
handler,
/*asyncMode=*/ false
)
given ExecutionContext = ExecutionContext.fromExecutor(executor, reporter)
check(testCrashes()) // reported java.lang.NoSuchMethodError: test
// did not complete
end test
~~~
{% endtab %}
{% endtabs %}

## Promises

Expand Down Expand Up @@ -1226,7 +1441,7 @@ continues its computation, and finally completes the future `f` with a
valid result, by completing promise `p`.

Promises can also be completed with a `complete` method which takes
a potential value `Try[T]`-- either a failed result of type `Failure[Throwable]` or a
a potential value `Try[T]` -- either a failed result of type `Failure[Throwable]` or a
successful result of type `Success[T]`.

Analogous to `success`, calling `failure` and `complete` on a promise that has already
Expand Down