From 0fc323b399e8ca57846481ccff8c3e4dbeaac824 Mon Sep 17 00:00:00 2001 From: Li Haoyi Date: Fri, 4 Jun 2021 18:25:28 +0800 Subject: [PATCH] Fix asymmetric failure behavior of Future#{zip,zipWith,traverse,sequence} by making them fail fast regardless of ordering Currently, given the following setup: ```scala val f1 = Future{Thread.sleep(10000)} val f2 = Future{Thread.sleep(2000); throw new Exception("Boom")} ``` The following two snippets exhibit different failure behavior: ```scala val fa = Await.result(f1.zip(f2). Duration.Inf) ``` ```scala val fb = Await.result(f2.zip(f1). Duration.Inf) ``` `fa` fails after 10000ms, while `fb` fails after 2000ms. Both fail with `java.lang.Exception: boom`. When zipping two `Future`s together, if the left `Future` fails early, the zipped `Future` fails early. But if the right `Future` fails early, the zipped `Future` waits until the right `Future` completes before failing. `traverse` and `sequence` are similarly implemented with `zipWith` and should exhibit the same behavior. This all arises because `zipWith` is implemented using `flatMap`, which by definition asymmetric due to waiting fo the left `Future` to complete before even considering the right `Future`. The current behavior makes the failure behavior of `Future`s most unpredictable; in general nobody pays attention to the order of `Future`s when zipping them together, and thus whether a `zipWith`ed/`zip`ed/`traverse`d/`sequence`d `Future` fails early or not is entirely arbitrary. This PR replaces the implementation of `zipWith`, turning it from `flatMap`-based to `Promise`-based, so that when a `Future` fails early, regardless of whether it's the left or right `Future`, the resultant `Future` will fail immediately. Implementation-wise I'm using an `AtomicReference` and `compareAndSet`, which should give us the behavior we want without any locking. It may well be possible to achieve with even less overhead, e.g. using only `volatile`s or even using no concurrency controls at all, but I couldn't come up with anything better. If anyone has a better solution I'll happily include it. This fix would apply to all of `zip`/`zipWith`/`traverse`/`sequence`, since they all are implemented on top of `zipWith` While it is possible that someone could be relying on the left-biased nature of current `zip`/`zipWith`/`traverse`/`sequence` implementation, but it seems like something that's unlikely to be reliable enough to depend upon. In my experience people generally aren't aware that `zipWith`/`zip`/`traverse`/`sequence`, and they don't generally know the total ordering of how long their Futures take to run. That means status quo behavior would just result in some `Future` fails mysterious taking longer to report for no clear reason. Notably, the biased nature of these operators is not documented in any of their scaladoc comments. While there is a non-zero chance that somebody could be intentionally or unintentionally depending on the biased nature of these combinators, there is a much greater chance that someone unaware of the current bias would be puzzled why their highly-concurrent system seems to be taking longer than expected in certain scenarios. It seems likely that this PR would fix more bugs than it would introduce Note that this does not fix the left-biased fail-fast behavior of `flatMap` chains, or their equivalent `for`-comprehensions, as `flatMap`'s API is inherently left-biased. But anyone who wants fail-fast behavior can convert sections of their `flatMap` chains into `.zip`s where possible, and where not possible that's generally because there is some true data dependency between the `flatMap`s --- project/MimaFilters.scala | 3 ++ src/library/scala/concurrent/Future.scala | 28 ++++++++------ .../scala/concurrent/impl/Promise.scala | 36 ++++++++++++++++++ test/files/jvm/future-spec/FutureTests.scala | 1 + test/junit/scala/concurrent/FutureTest.scala | 38 +++++++++++++++++++ 5 files changed, 95 insertions(+), 11 deletions(-) diff --git a/project/MimaFilters.scala b/project/MimaFilters.scala index ad847e7b0a31..8088df181f9d 100644 --- a/project/MimaFilters.scala +++ b/project/MimaFilters.scala @@ -33,6 +33,9 @@ object MimaFilters extends AutoPlugin { // #8835 ProblemFilters.exclude[ReversedMissingMethodProblem]("scala.reflect.runtime.SynchronizedOps#SynchronizedBaseTypeSeq.scala$reflect$runtime$SynchronizedOps$SynchronizedBaseTypeSeq$$super$maxDepthOfElems"), + + // this is an internal class and adding a final override here should not be a problem + ProblemFilters.exclude[FinalMethodProblem]("scala.concurrent.impl.Promise#DefaultPromise.zipWith"), ) override val buildSettings = Seq( diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 4439b6507f7d..3bcedc53a84a 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -383,10 +383,11 @@ trait Future[+T] extends Awaitable[T] { /** Zips the values of `this` and `that` future, and creates * a new future holding the tuple of their results. * - * If `this` future fails, the resulting future is failed - * with the throwable stored in `this`. - * Otherwise, if `that` future fails, the resulting future is failed - * with the throwable stored in `that`. + * If either input future fails, the resulting future is failed with the same + * throwable, without waiting for the other input future to complete. + * + * If the application of `f` throws a non-fatal throwable, the resulting future + * is failed with that throwable. * * @tparam U the type of the other `Future` * @param that the other `Future` @@ -399,12 +400,11 @@ trait Future[+T] extends Awaitable[T] { /** Zips the values of `this` and `that` future using a function `f`, * and creates a new future holding the result. * - * If `this` future fails, the resulting future is failed - * with the throwable stored in `this`. - * Otherwise, if `that` future fails, the resulting future is failed - * with the throwable stored in `that`. - * If the application of `f` throws a throwable, the resulting future - * is failed with that throwable if it is non-fatal. + * If either input future fails, the resulting future is failed with the same + * throwable, without waiting for the other input future to complete. + * + * If the application of `f` throws a non-fatal throwable, the resulting future + * is failed with that throwable. * * @tparam U the type of the other `Future` * @tparam R the type of the resulting `Future` @@ -413,8 +413,14 @@ trait Future[+T] extends Awaitable[T] { * @return a `Future` with the result of the application of `f` to the results of `this` and `that` * @group Transformations */ - def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] = + def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] = { + // This is typically overriden by the implementation in DefaultPromise, which provides + // symmetric fail-fast behavior regardless of which future fails first. + // + // TODO: remove this implementation and make Future#zipWith abstract + // when we're next willing to make a binary incompatible change flatMap(r1 => that.map(r2 => f(r1, r2)))(if (executor.isInstanceOf[BatchingExecutor]) executor else parasitic) + } /** Creates a new future which holds the result of this future if it was completed successfully, or, if not, * the result of the `that` future if `that` is completed successfully. diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index 2ec0ebe9a24e..e031e51bd011 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -130,6 +130,42 @@ private[concurrent] object Promise { override final def transformWith[S](f: Try[T] => Future[S])(implicit executor: ExecutionContext): Future[S] = dispatchOrAddCallbacks(get(), new Transformation[T, S](Xform_transformWith, f, executor)) + override final def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] = { + val state = get() + if (state.isInstanceOf[Try[T]]) { + if (state.asInstanceOf[Try[T]].isFailure) this.asInstanceOf[Future[R]] + else { + val l = state.asInstanceOf[Success[T]].get + that.map(r => f(l, r)) + } + } else { + val buffer = new AtomicReference[Success[Any]]() + val zipped = new DefaultPromise[R]() + + val thisF: Try[T] => Unit = { + case left: Success[T] => + val right = buffer.getAndSet(left).asInstanceOf[Success[U]] + if (right ne null) + zipped.tryComplete(try Success(f(left.get, right.get)) catch { case e if NonFatal(e) => Failure(e) }) + case f => // Can only be Failure + zipped.tryComplete(f.asInstanceOf[Failure[R]]) + } + + val thatF: Try[U] => Unit = { + case right: Success[U] => + val left = buffer.getAndSet(right).asInstanceOf[Success[T]] + if (left ne null) + zipped.tryComplete(try Success(f(left.get, right.get)) catch { case e if NonFatal(e) => Failure(e) }) + case f => // Can only be Failure + zipped.tryComplete(f.asInstanceOf[Failure[R]]) + } + // Cheaper than this.onComplete since we already polled the state + this.dispatchOrAddCallbacks(state, new Transformation[T, Unit](Xform_onComplete, thisF, executor)) + that.onComplete(thatF) + zipped.future + } + } + override final def foreach[U](f: T => U)(implicit executor: ExecutionContext): Unit = { val state = get() if (!state.isInstanceOf[Failure[T]]) dispatchOrAddCallbacks(state, new Transformation[T, Unit](Xform_foreach, f, executor)) diff --git a/test/files/jvm/future-spec/FutureTests.scala b/test/files/jvm/future-spec/FutureTests.scala index f2c83a64aeeb..7181abd144c7 100644 --- a/test/files/jvm/future-spec/FutureTests.scala +++ b/test/files/jvm/future-spec/FutureTests.scala @@ -147,6 +147,7 @@ class FutureTests extends MinimalScalaTest { assert( ECNotUsed(ec => f.filter(_ => fail("filter should not have been called"))(ec)) eq f) assert( ECNotUsed(ec => f.collect({ case _ => fail("collect should not have been called")})(ec)) eq f) assert( ECNotUsed(ec => f.zipWith(f)({ (_,_) => fail("zipWith should not have been called")})(ec)) eq f) + } } diff --git a/test/junit/scala/concurrent/FutureTest.scala b/test/junit/scala/concurrent/FutureTest.scala index 45069e274170..8c3e3310f687 100644 --- a/test/junit/scala/concurrent/FutureTest.scala +++ b/test/junit/scala/concurrent/FutureTest.scala @@ -6,8 +6,46 @@ import org.junit.Test import scala.tools.testkit.AssertUtil._ import scala.util.Try +import duration.Duration.Inf class FutureTest { + @Test + def testZipWithFailFastBothWays(): Unit = { + import ExecutionContext.Implicits.global + + val p1 = Promise[Int]() + val p2 = Promise[Int]() + + // Make sure that the combined future fails early, after the earlier failure occurs, and does not + // wait for the later failure regardless of which one is on the left and which is on the right + p1.failure(new Exception("Boom Early")) + val f1 = p1.future + val f2 = p2.future + + val scala.util.Failure(fa) = Try(Await.result(f1.zip(f2), Inf)) + val scala.util.Failure(fb) = Try(Await.result(f2.zip(f1), Inf)) + + val scala.util.Failure(fc) = Try(Await.result(f1.zipWith(f2)((_, _)), Inf)) + val scala.util.Failure(fd) = Try(Await.result(f2.zipWith(f1)((_, _)), Inf)) + + val scala.util.Failure(fe) = Try(Await.result(Future.sequence(Seq(f1, f2)), Inf)) + val scala.util.Failure(ff) = Try(Await.result(Future.sequence(Seq(f2, f1)), Inf)) + + val scala.util.Failure(fg) = Try(Await.result(Future.traverse(Seq(0, 1))(Seq(f1, f2)(_)), Inf)) + val scala.util.Failure(fh) = Try(Await.result(Future.traverse(Seq(0, 1))(Seq(f1, f2)(_)), Inf)) + + // Make sure the early failure is always reported, regardless of whether it's on + // the left or right of the zip/zipWith/sequence/traverse + assert(fa.getMessage == "Boom Early") + assert(fb.getMessage == "Boom Early") + assert(fc.getMessage == "Boom Early") + assert(fd.getMessage == "Boom Early") + assert(fe.getMessage == "Boom Early") + assert(ff.getMessage == "Boom Early") + assert(fg.getMessage == "Boom Early") + assert(fh.getMessage == "Boom Early") + } + @Test def `bug/issues#10513 firstCompletedOf must not leak references`(): Unit = { val unfulfilled = Promise[AnyRef]()