From f6de6b8cc18304d3cfcd9cdfcbffeb85248c4737 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Tue, 31 Mar 2020 17:26:58 -0700 Subject: [PATCH] [SPARK-31248][CORE][TEST] Fix flaky ExecutorAllocationManagerSuite.interleaving add and remove ### What changes were proposed in this pull request? This PR (SPARK-31248) uses `ManualClock` to disable `ExecutorAllocationManager.schedule()` in order to avoid unexpected update of target executors. ### Why are the changes needed? `ExecutorAllocationManager` will call `schedule` periodically, which may update target executors before we checking https://github.com/apache/spark/blob/496f6ac86001d284cbfb7488a63dd3a168919c0f/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala#L864 And fail the check: ``` sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: 12 did not equal 8 at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560) at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:503) at org.apache.spark.ExecutorAllocationManagerSuite.$anonfun$new$51(ExecutorAllocationManagerSuite.scala:864) at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186) at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:151) at org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184) ``` ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Update test. Closes #28084 from Ngone51/spark_31248. Authored-by: yi.wu Signed-off-by: Dongjoon Hyun --- .../scala/org/apache/spark/ExecutorAllocationManager.scala | 2 +- .../org/apache/spark/ExecutorAllocationManagerSuite.scala | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 5cb3160711a90..0b09f1fa2db2b 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -437,7 +437,7 @@ private[spark] class ExecutorAllocationManager( } else { logDebug(s"Lowering target number of executors to" + s" ${numExecutorsTargetPerResourceProfileId(rpId)} (previously " + - s"$targetNum.oldNumExecutorsTarget for resource profile id: ${rpId}) " + + s"${targetNum.oldNumExecutorsTarget} for resource profile id: ${rpId}) " + "because not all requested executors " + "are actually needed") } diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 8fa33f4915ea4..9fac1199a80be 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -841,7 +841,10 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { } test ("interleaving add and remove") { - val manager = createManager(createConf(5, 12, 5)) + // use ManualClock to disable ExecutorAllocationManager.schedule() + // in order to avoid unexpected update of target executors + val clock = new ManualClock() + val manager = createManager(createConf(5, 12, 5), clock) post(SparkListenerStageSubmitted(createStageInfo(0, 1000))) val updatesNeeded =