diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 4439b6507f7d..e220b95ca887 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -413,8 +413,31 @@ trait Future[+T] extends Awaitable[T] { * @return a `Future` with the result of the application of `f` to the results of `this` and `that` * @group Transformations */ - def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] = - flatMap(r1 => that.map(r2 => f(r1, r2)))(if (executor.isInstanceOf[BatchingExecutor]) executor else parasitic) + def zipWith[U, R](that: Future[U])(f: (T, U) => R)(implicit executor: ExecutionContext): Future[R] = { + val buffer = new AtomicReference[Either[T, U]]() + val outPromise = Promise[R]() + this.onComplete{ + case scala.util.Success(v) => + val isFirst = buffer.compareAndSet(null, Left(v)) + if (!isFirst) { + val oldValue = buffer.get.getOrElse(throw new Exception("CANT HAPPEN")) + outPromise.complete(Try(f(v, oldValue))) + } + + case scala.util.Failure(e) => outPromise.tryFailure(e) + } + that.onComplete{ + case scala.util.Success(v) => + val isFirst = buffer.compareAndSet(null, Right(v)) + if (!isFirst) { + val oldValue = buffer.get.swap.getOrElse(throw new Exception("CANT HAPPEN")) + outPromise.complete(Try(f(oldValue, v))) + } + + case scala.util.Failure(e) => outPromise.tryFailure(e) + } + outPromise.future + } /** Creates a new future which holds the result of this future if it was completed successfully, or, if not, * the result of the `that` future if `that` is completed successfully.