Skip to content

Commit

Permalink
faster check for conflicting task sets
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed Jul 7, 2015
1 parent 6542b42 commit b2faef5
Showing 1 changed file with 13 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ private[spark] class TaskSchedulerImpl(
// TaskSetManagers are not thread safe, so any access to one should be synchronized
// on this class.
val activeTaskSets = new HashMap[String, TaskSetManager]
val taskSetsByStage = new HashMap[Int, HashMap[Int, TaskSetManager]]

val taskIdToTaskSetId = new HashMap[Long, String]
val taskIdToExecutorId = new HashMap[Long, String]
Expand Down Expand Up @@ -164,13 +165,14 @@ private[spark] class TaskSchedulerImpl(
val manager = createTaskSetManager(taskSet, maxTaskFailures)
activeTaskSets(taskSet.id) = manager
val stage = taskSet.stageId
val conflictingTaskSet = activeTaskSets.exists { case (id, ts) =>
// if the id matches, it really should be the same taskSet, but in some unit tests
// we add new taskSets with the same id
id != taskSet.id && !ts.isZombie && ts.stageId == stage
val stageTaskSets = taskSetsByStage.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.attempt) = manager
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
ts.taskSet != taskSet && !ts.isZombie
}
if (conflictingTaskSet) {
throw new SparkIllegalStateException(s"more than one active taskSet for stage $stage")
throw new SparkIllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
}
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

Expand Down Expand Up @@ -224,6 +226,12 @@ private[spark] class TaskSchedulerImpl(
*/
def taskSetFinished(manager: TaskSetManager): Unit = synchronized {
activeTaskSets -= manager.taskSet.id
taskSetsByStage.get(manager.taskSet.stageId).foreach { taskSetsForStage =>
taskSetsForStage -= manager.taskSet.attempt
if (taskSetsForStage.isEmpty) {
taskSetsByStage -= manager.taskSet.stageId
}
}
manager.parent.removeSchedulable(manager)
logInfo("Removed TaskSet %s, whose tasks have all completed, from pool %s"
.format(manager.taskSet.id, manager.parent.name))
Expand Down

0 comments on commit b2faef5

Please sign in to comment.