You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
importjava.util.concurrent._importscala.concurrent.{Await, ExecutionContext, Future}
valthreadPoolExecutor=newThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, newArrayBlockingQueue(1))
valthreadPool:ExecutionContext=ExecutionContext.fromExecutor(threadPoolExecutor,
t => { throw(t) })
valfuture1=Future({ Thread.sleep(9999999)})(threadPool)
valfuture2=Future({ Thread.sleep(9999999)})(threadPool)
valfuture3=Future({ Thread.sleep(9999999)})(threadPool) // Future(Failure) due to RejectedRequestimplicitvalimplicitThreadpool:ExecutionContext= threadPool
Future.sequence(Seq(future1)) // Future(<not completed>) -- fineFuture.sequence(Seq(future3)) // Future(Failure(java.util.concurrent.RejectedExecutionException: Task Future(<not completed>) rejected from java.util.concurrent.ThreadPoolExecutor@416a4275[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0])) -- fineFuture.sequence(Seq(future3, future1)) // val res2: scala.concurrent.Future[Seq[Unit]] = Future(Failure(java.util.concurrent.RejectedExecutionException: Task Future(<not completed>) rejected from java.util.concurrent.ThreadPoolExecutor@416a4275[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0])) -- fineFuture.sequence(Seq(future1, future3))
/*java.util.concurrent.RejectedExecutionException: Task Future(<not completed>) rejected from java.util.concurrent.ThreadPoolExecutor@416a4275[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at scala.concurrent.impl.ExecutionContextImpl.execute(ExecutionContextImpl.scala:21) at scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:429) at scala.concurrent.impl.Promise$DefaultPromise.submitWithValue(Promise.scala:338) at scala.concurrent.impl.Promise$DefaultPromise.dispatchOrAddCallbacks(Promise.scala:312) at scala.concurrent.impl.Promise$DefaultPromise.onComplete(Promise.scala:216) at scala.concurrent.impl.Promise$DefaultPromise.zipWith(Promise.scala:164) at scala.concurrent.Future$.$anonfun$sequence$1(Future.scala:720) at scala.collection.IterableOnceOps.foldLeft(IterableOnce.scala:676) at scala.collection.IterableOnceOps.foldLeft$(IterableOnce.scala:670) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1300) at scala.concurrent.Future$.sequence(Future.scala:720)*/
Problem
I would've expected Future.sequence(Seq(future3, future1)) to match Future.sequence(Seq(future1, future3)) in either both returning an exception or both returning a failed future.
Also, with Scala 2.12, we would get the RejectedExecutionException when we submitted the 3rd future that exceeded the thread pool's queue limit. With Scala 2.13, we now get back a Future(Failure(ex)) instead of the exception. If that's the case, I would've expected the Future.sequence to also not throw an exception.
The text was updated successfully, but these errors were encountered:
achyan
changed the title
inconsistent RejectedRequestException behavior with Future.sequence
inconsistent RejectedExecutionException behavior with Future.sequence
Nov 8, 2023
We have a "scala/collections" team of volunteers we can summon on collections stuff. This ticket is a good example of why we ought to assemble a similar "scala/concurrency" team, as I bet there are people out there who might have some insight into this, but they apparently aren't watching scala/bug.
The behavior is because sequence must execute on the execution context. The weird failure is due to the poorly configured EC.
Here is a better look:
// make it easier to see when the error happens
val threadPool: ExecutionContext = ExecutionContext.fromExecutor(threadPoolExecutor, t => println(s"ERROR ${t.getMessage}"))
// helper to pass in a specific EC
def seqtest[A, CC[X] <: IterableOnce[X], To](in: CC[Future[A]])(ec: ExecutionContext)(implicit bf: BuildFrom[CC[Future[A]], A, To]): Future[To] = Future.sequence(in)(bf, ec)
val fs = Try(seqtest(Seq(future1, future3))(global))
val gs = Try(seqtest(Seq(future3, future1))(global))
val hs = Try(seqtest(Seq(future1, future3))(threadPool))
val is = Try(seqtest(Seq(future3, future1))(threadPool))
You'll see ERROR Task Future(<not completed>) due to the failed sequence.
The sequence on global will print the expected failure twice:
for the first one, "1 then 3". The reason is that "3 then 1" is an error right away, but "1 then 3" has to submit an extra job to combine the results, and that job fails.
The other workaround is to use ExecutionContext.parasitic for the sequence.
The question is whether this is the same problem as #9071.
After all that verbiage, it seems to be a regression in scala/scala#9655. It works on 2.13.6 or stubbing Haoyi's override.
Reproduction steps
Scala version: 2.13.12
Problem
I would've expected Future.sequence(Seq(future3, future1)) to match Future.sequence(Seq(future1, future3)) in either both returning an exception or both returning a failed future.
Also, with Scala 2.12, we would get the RejectedExecutionException when we submitted the 3rd future that exceeded the thread pool's queue limit. With Scala 2.13, we now get back a Future(Failure(ex)) instead of the exception. If that's the case, I would've expected the Future.sequence to also not throw an exception.
The text was updated successfully, but these errors were encountered: