diff --git a/project/MimaFilters.scala b/project/MimaFilters.scala index ad847e7b0a31..8088df181f9d 100644 --- a/project/MimaFilters.scala +++ b/project/MimaFilters.scala @@ -33,6 +33,9 @@ object MimaFilters extends AutoPlugin { // #8835 ProblemFilters.exclude[ReversedMissingMethodProblem]("scala.reflect.runtime.SynchronizedOps#SynchronizedBaseTypeSeq.scala$reflect$runtime$SynchronizedOps$SynchronizedBaseTypeSeq$$super$maxDepthOfElems"), + + // this is an internal class and adding a final override here should not be a problem + ProblemFilters.exclude[FinalMethodProblem]("scala.concurrent.impl.Promise#DefaultPromise.zipWith"), ) override val buildSettings = Seq( diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 4439b6507f7d..3bcedc53a84a 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -383,10 +383,11 @@ trait Future[+T] extends Awaitable[T] { /** Zips the values of `this` and `that` future, and creates * a new future holding the tuple of their results. * - * If `this` future fails, the resulting future is failed - * with the throwable stored in `this`. - * Otherwise, if `that` future fails, the resulting future is failed - * with the throwable stored in `that`. + * If either input future fails, the resulting future is failed with the same + * throwable, without waiting for the other input future to complete. + * + * If the application of `f` throws a non-fatal throwable, the resulting future + * is failed with that throwable. * * @tparam U the type of the other `Future` * @param that the other `Future` @@ -399,12 +400,11 @@ trait Future[+T] extends Awaitable[T] { /** Zips the values of `this` and `that` future using a function `f`, * and creates a new future holding the result. * - * If `this` future fails, the resulting future is failed - * with the throwable stored in `this`. - * Otherwise, if `that` future fails, the resulting future is failed - * with the throwable stored in `that`. - * If the application of `f` throws a throwable, the resulting future - * is failed with that throwable if it is non-fatal. + * If either input future fails, the resulting future is failed with the same + * throwable, without waiting for the other input future to complete. + * + * If the application of `f` throws a non-fatal throwable, the resulting future + * is failed with that throwable. * * @tparam U the type of the other `Future` * @tparam R the type of the resulting `Future` @@ -413,8 +413,14 @@ trait Future[+T] extends Awaitable[T] { * @return a `Future` with the result of the application of `f` to the results of `this` and `that` * @group Transformations */ - def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] = + def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] = { + // This is typically overriden by the implementation in DefaultPromise, which provides + // symmetric fail-fast behavior regardless of which future fails first. + // + // TODO: remove this implementation and make Future#zipWith abstract + // when we're next willing to make a binary incompatible change flatMap(r1 => that.map(r2 => f(r1, r2)))(if (executor.isInstanceOf[BatchingExecutor]) executor else parasitic) + } /** Creates a new future which holds the result of this future if it was completed successfully, or, if not, * the result of the `that` future if `that` is completed successfully. diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index 2ec0ebe9a24e..e031e51bd011 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -130,6 +130,42 @@ private[concurrent] object Promise { override final def transformWith[S](f: Try[T] => Future[S])(implicit executor: ExecutionContext): Future[S] = dispatchOrAddCallbacks(get(), new Transformation[T, S](Xform_transformWith, f, executor)) + override final def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] = { + val state = get() + if (state.isInstanceOf[Try[T]]) { + if (state.asInstanceOf[Try[T]].isFailure) this.asInstanceOf[Future[R]] + else { + val l = state.asInstanceOf[Success[T]].get + that.map(r => f(l, r)) + } + } else { + val buffer = new AtomicReference[Success[Any]]() + val zipped = new DefaultPromise[R]() + + val thisF: Try[T] => Unit = { + case left: Success[T] => + val right = buffer.getAndSet(left).asInstanceOf[Success[U]] + if (right ne null) + zipped.tryComplete(try Success(f(left.get, right.get)) catch { case e if NonFatal(e) => Failure(e) }) + case f => // Can only be Failure + zipped.tryComplete(f.asInstanceOf[Failure[R]]) + } + + val thatF: Try[U] => Unit = { + case right: Success[U] => + val left = buffer.getAndSet(right).asInstanceOf[Success[T]] + if (left ne null) + zipped.tryComplete(try Success(f(left.get, right.get)) catch { case e if NonFatal(e) => Failure(e) }) + case f => // Can only be Failure + zipped.tryComplete(f.asInstanceOf[Failure[R]]) + } + // Cheaper than this.onComplete since we already polled the state + this.dispatchOrAddCallbacks(state, new Transformation[T, Unit](Xform_onComplete, thisF, executor)) + that.onComplete(thatF) + zipped.future + } + } + override final def foreach[U](f: T => U)(implicit executor: ExecutionContext): Unit = { val state = get() if (!state.isInstanceOf[Failure[T]]) dispatchOrAddCallbacks(state, new Transformation[T, Unit](Xform_foreach, f, executor)) diff --git a/test/files/jvm/future-spec/FutureTests.scala b/test/files/jvm/future-spec/FutureTests.scala index f2c83a64aeeb..7181abd144c7 100644 --- a/test/files/jvm/future-spec/FutureTests.scala +++ b/test/files/jvm/future-spec/FutureTests.scala @@ -147,6 +147,7 @@ class FutureTests extends MinimalScalaTest { assert( ECNotUsed(ec => f.filter(_ => fail("filter should not have been called"))(ec)) eq f) assert( ECNotUsed(ec => f.collect({ case _ => fail("collect should not have been called")})(ec)) eq f) assert( ECNotUsed(ec => f.zipWith(f)({ (_,_) => fail("zipWith should not have been called")})(ec)) eq f) + } } diff --git a/test/junit/scala/concurrent/FutureTest.scala b/test/junit/scala/concurrent/FutureTest.scala index 45069e274170..8c3e3310f687 100644 --- a/test/junit/scala/concurrent/FutureTest.scala +++ b/test/junit/scala/concurrent/FutureTest.scala @@ -6,8 +6,46 @@ import org.junit.Test import scala.tools.testkit.AssertUtil._ import scala.util.Try +import duration.Duration.Inf class FutureTest { + @Test + def testZipWithFailFastBothWays(): Unit = { + import ExecutionContext.Implicits.global + + val p1 = Promise[Int]() + val p2 = Promise[Int]() + + // Make sure that the combined future fails early, after the earlier failure occurs, and does not + // wait for the later failure regardless of which one is on the left and which is on the right + p1.failure(new Exception("Boom Early")) + val f1 = p1.future + val f2 = p2.future + + val scala.util.Failure(fa) = Try(Await.result(f1.zip(f2), Inf)) + val scala.util.Failure(fb) = Try(Await.result(f2.zip(f1), Inf)) + + val scala.util.Failure(fc) = Try(Await.result(f1.zipWith(f2)((_, _)), Inf)) + val scala.util.Failure(fd) = Try(Await.result(f2.zipWith(f1)((_, _)), Inf)) + + val scala.util.Failure(fe) = Try(Await.result(Future.sequence(Seq(f1, f2)), Inf)) + val scala.util.Failure(ff) = Try(Await.result(Future.sequence(Seq(f2, f1)), Inf)) + + val scala.util.Failure(fg) = Try(Await.result(Future.traverse(Seq(0, 1))(Seq(f1, f2)(_)), Inf)) + val scala.util.Failure(fh) = Try(Await.result(Future.traverse(Seq(0, 1))(Seq(f1, f2)(_)), Inf)) + + // Make sure the early failure is always reported, regardless of whether it's on + // the left or right of the zip/zipWith/sequence/traverse + assert(fa.getMessage == "Boom Early") + assert(fb.getMessage == "Boom Early") + assert(fc.getMessage == "Boom Early") + assert(fd.getMessage == "Boom Early") + assert(fe.getMessage == "Boom Early") + assert(ff.getMessage == "Boom Early") + assert(fg.getMessage == "Boom Early") + assert(fh.getMessage == "Boom Early") + } + @Test def `bug/issues#10513 firstCompletedOf must not leak references`(): Unit = { val unfulfilled = Promise[AnyRef]()