diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index c92b6dc96c8eb..6f1fd25764544 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -141,6 +141,22 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { assert(sched.finishedManagers.contains(manager)) } + test("skip unsatisfiable locality levels") { + sc = new SparkContext("local", "test") + val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execC", "host2")) + val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "execB"))) + val clock = new FakeClock + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) + + // An executor that is not NODE_LOCAL should be rejected. + assert(manager.resourceOffer("execC", "host2", ANY) === None) + + // Because there are no alive PROCESS_LOCAL executors, the base locality level should be + // NODE_LOCAL. So, we should schedule the task on this offered NODE_LOCAL executor before + // any of the locality wait timers expire. + assert(manager.resourceOffer("execA", "host1", ANY).get.index === 0) + } + test("basic delay scheduling") { sc = new SparkContext("local", "test") val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))