Skip to content

Commit

Permalink
unit test just to make sure we fail fast on concurrent attempts
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed Jun 10, 2015
1 parent 06a0af6 commit 6e14683
Showing 1 changed file with 16 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,20 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
assert(taskDescriptions.map(_.executorId) === Seq("executor0"))
}

test("refuse to schedule concurrent attempts for the same stage (SPARK-8103)") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskScheduler = new TaskSchedulerImpl(sc)
taskScheduler.initialize(new FakeSchedulerBackend)
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
val dagScheduler = new DAGScheduler(sc, taskScheduler) {
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
override def executorAdded(execId: String, host: String) {}
}
taskScheduler.setDAGScheduler(dagScheduler)
val attempt1 = new TaskSet(Array(new FakeTask(0)), 0, 0, 0, null)
val attempt2 = new TaskSet(Array(new FakeTask(0)), 0, 1, 0, null)
taskScheduler.submitTasks(attempt1)
intercept[SparkIllegalStateException] { taskScheduler.submitTasks(attempt2)}
}

}

0 comments on commit 6e14683

Please sign in to comment.