-
Notifications
You must be signed in to change notification settings - Fork 21
Description
This is randomly happening behavior, which stops our entire system in runtime. I managed to create a reproducer.
Scenario:
stage1 - run 600 parallel tasks on global execution context with blocking closure inside
stage2 - run 100 parallel tasks on global execution context without blocking closure inside
Observed behaviors (I observe one or the other, the second is a partial case of the first and most frequently observed):
- stage1 is completed by 'scala.concurrent.context.maxExtraThreads + scala.concurrent.context.maxThreads' threads in parallel, stage2 is completed by 1 thread (i.e. all tasks executed by 1 thread, all threads except one from the global context are in WAIT state)
- stage1 is completed by 'scala.concurrent.context.maxExtraThreads + scala.concurrent.context.maxThreads' threads in parallel, stage2 hangs without completing any tasks (all threads from the global context are in WAIT state)
Expected behavior:
stage1 is completed by 'scala.concurrent.context.maxExtraThreads + scala.concurrent.context.maxThreads' threads in parallel, stage2 is completed by 'scala.concurrent.context.maxThreads' (4 in my case) threads in parallel
Environment
Scala version: 2.12.1
Java version "1.8.0_144"
Java(TM) SE Runtime Environment (build 1.8.0_144-b01)
Java HotSpot(TM) 64-Bit Server VM (build 25.144-b01, mixed mode)
Reproducer
import java.util.Calendar
import java.util.concurrent.{Executors, ForkJoinPool}
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
object AsyncTest extends App {
start()
def start(): Unit = {
//System.setProperty("scala.concurrent.context.maxThreads", "3")
implicit val ec = scala.concurrent.ExecutionContext.Implicits.global
//implicit val ec = scala.concurrent.ExecutionContext.fromExecutor(null)
//implicit val ec = scala.concurrent.ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor)
//implicit val ec = scala.concurrent.ExecutionContext.fromExecutor(Executors.newWorkStealingPool(2))
val blockingTasks = 600
val paralleleTasks = 100
def stage1 = {
val futures = Vector.range(1, blockingTasks + 1).map { i =>
Future {
val sleepTime = 2000
scala.concurrent.blocking {
Thread.sleep(sleepTime)
}
val today = Calendar.getInstance().getTime()
println(Thread.currentThread().getName + " Future: " + i + " - sleep was: " + sleepTime + " - " + today)
1;
}
}
val all = Future.sequence(futures)
println("waiting on " + Thread.currentThread().getName)
Await.result(all, Duration("100s"))
println("finished waiting on " + Thread.currentThread().getName)
}
def stage2 = {
val futures2 = Vector.range(1, paralleleTasks + 1).map { i =>
Future {
val sleepTime = 2000
Thread.sleep(sleepTime)
val today = Calendar.getInstance().getTime()
println(Thread.currentThread().getName + " Future 2: " + i + " - sleep was: " + sleepTime + " - " + today)
1
}
}
val all2 = Future.sequence(futures2)
println("waiting on " + Thread.currentThread().getName)
Await.result(all2, Duration("1000s"))
println("finished waiting on " + Thread.currentThread().getName)
}
stage1
stage2
}
}