Skip to content

Commit

Permalink
Merge pull request alteryx#14 from kayousterhout/untangle_scheduler
Browse files Browse the repository at this point in the history
Improved organization of scheduling packages.

This commit does not change any code -- only file organization.
Please let me know if there was some masterminded strategy behind
the existing organization that I failed to understand!

There are two components of this change:
(1) Moving files out of the cluster package, and down
a level to the scheduling package. These files are all used by
the local scheduler in addition to the cluster scheduler(s), so
should not be in the cluster package. As a result of this change,
none of the files in the local package reference files in the
cluster package.

(2) Moving the mesos package to within the cluster package.
The mesos scheduling code is for a cluster, and represents a
specific case of cluster scheduling (the Mesos-related classes
often subclass cluster scheduling classes). Thus, the most logical
place for it seems to be within the cluster package.

The one thing about the scheduling code that seems a little funny to me
is the naming of the SchedulerBackends.  The StandaloneSchedulerBackend
is not just for Standalone mode, but instead is used by Mesos coarse grained
mode and Yarn, and the backend that *is* just for Standalone mode is instead called SparkDeploySchedulerBackend. I didn't change this because I wasn't sure if there
was a reason for this naming that I'm just not aware of.

(cherry picked from commit 70a0b99)
Signed-off-by: Reynold Xin <rxin@apache.org>
  • Loading branch information
rxin committed Sep 26, 2013
1 parent 5a62844 commit 976fe60
Show file tree
Hide file tree
Showing 34 changed files with 62 additions and 71 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
ClusterScheduler, Schedulable, SchedulingMode}
ClusterScheduler}
import org.apache.spark.scheduler.local.LocalScheduler
import org.apache.spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.storage.{StorageUtils, BlockManagerSource}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ClosureCleaner, Utils, MetadataCleaner, TimeStampedHashMap}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.scheduler.cluster.TaskInfo
import org.apache.spark.storage.{BlockManager, BlockManagerMaster}
import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.scheduler

import java.util.Properties

import org.apache.spark.scheduler.cluster.TaskInfo
import scala.collection.mutable.Map

import org.apache.spark._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import scala.io.Source
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.cluster.TaskInfo

// Used to record runtime information for each job, including RDD graph
// tasks' start/stop shuffle information and information from outside
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster
package org.apache.spark.scheduler

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap

import org.apache.spark.Logging
import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode

/**
* An Schedulable entity that represent collection of Pools or TaskSetManagers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster
package org.apache.spark.scheduler

import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode

import scala.collection.mutable.ArrayBuffer
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster
package org.apache.spark.scheduler

import java.io.{FileInputStream, InputStream}
import java.util.{NoSuchElementException, Properties}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster
package org.apache.spark.scheduler

/**
* An interface for sort algorithm
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster
package org.apache.spark.scheduler

/**
* "FAIR" and "FIFO" determines which policy is used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.scheduler

import java.util.Properties
import org.apache.spark.scheduler.cluster.TaskInfo
import org.apache.spark.util.{Utils, Distribution}
import org.apache.spark.{Logging, SparkContext, TaskEndReason}
import org.apache.spark.executor.TaskMetrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.spark.scheduler

import org.apache.spark.scheduler.cluster.TaskInfo
import scala.collection._

import org.apache.spark.executor.TaskMetrics

case class StageInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster
package org.apache.spark.scheduler

import java.nio.ByteBuffer
import org.apache.spark.util.SerializableBuffer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster
package org.apache.spark.scheduler

import org.apache.spark.util.Utils

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster
package org.apache.spark.scheduler


private[spark] object TaskLocality
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.spark.scheduler

import org.apache.spark.scheduler.cluster.Pool
import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode

/**
* Low-level task scheduler interface, implemented by both ClusterScheduler and LocalScheduler.
* Each TaskScheduler schedulers task for a single SparkContext.
* These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage,
* and are responsible for sending the tasks to the cluster, running them, retrying if there
* are failures, and mitigating stragglers. They return events to the DAGScheduler through
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.scheduler

import org.apache.spark.scheduler.cluster.TaskInfo
import scala.collection.mutable.Map

import org.apache.spark.TaskEndReason
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster
package org.apache.spark.scheduler

import java.nio.ByteBuffer

import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.TaskSet

/**
* Tracks and schedules the tasks within a single TaskSet. This class keeps track of the status of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.mutable.HashSet
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong
import java.util.{TimerTask, Timer}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,12 @@ import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.math.max
import scala.math.min
import scala.Some

import org.apache.spark.{FetchFailed, Logging, Resubmitted, SparkEnv, Success, TaskEndReason, TaskState}
import org.apache.spark.{ExceptionFailure, SparkException, TaskResultTooBigFailure}
import org.apache.spark.{ExceptionFailure, FetchFailed, Logging, Resubmitted, SparkEnv,
SparkException, Success, TaskEndReason, TaskResultTooBigFailure, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler._
import scala.Some
import org.apache.spark.FetchFailed
import org.apache.spark.ExceptionFailure
import org.apache.spark.TaskResultTooBigFailure
import org.apache.spark.util.{SystemClock, Clock}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster
import java.nio.ByteBuffer

import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.util.{Utils, SerializableBuffer}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import akka.util.Duration
import akka.util.duration._

import org.apache.spark.{SparkException, Logging, TaskState}
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._
import org.apache.spark.util.Utils

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,22 @@
* limitations under the License.
*/

package org.apache.spark.scheduler.mesos
package org.apache.spark.scheduler.cluster.mesos

import com.google.protobuf.ByteString
import java.io.File
import java.util.{ArrayList => JArrayList, List => JList}
import java.util.Collections

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.JavaConversions._

import com.google.protobuf.ByteString
import org.apache.mesos.{Scheduler => MScheduler}
import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}

import org.apache.spark.{SparkException, Logging, SparkContext}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.JavaConversions._
import java.io.File
import org.apache.spark.scheduler.cluster._
import java.util.{ArrayList => JArrayList, List => JList}
import java.util.Collections
import org.apache.spark.TaskState
import org.apache.spark.{SparkException, Logging, SparkContext, TaskState}
import org.apache.spark.scheduler.cluster.{ClusterScheduler, StandaloneSchedulerBackend}

/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,24 @@
* limitations under the License.
*/

package org.apache.spark.scheduler.mesos
package org.apache.spark.scheduler.cluster.mesos

import com.google.protobuf.ByteString
import java.io.File
import java.util.{ArrayList => JArrayList, List => JList}
import java.util.Collections

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.JavaConversions._

import com.google.protobuf.ByteString
import org.apache.mesos.{Scheduler => MScheduler}
import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}

import org.apache.spark.{SparkException, Logging, SparkContext}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.JavaConversions._
import java.io.File
import org.apache.spark.scheduler.cluster._
import java.util.{ArrayList => JArrayList, List => JList}
import java.util.Collections
import org.apache.spark.TaskState
import org.apache.spark.{Logging, SparkException, SparkContext, TaskState}
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.{ClusterScheduler, ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.cluster.{SchedulerBackend, SlaveLost, WorkerOffer}
import org.apache.spark.util.Utils

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.ExecutorURLClassLoader
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster._
import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import akka.actor._
import org.apache.spark.util.Utils

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import scala.collection.mutable.HashMap

import org.apache.spark.{ExceptionFailure, Logging, SparkEnv, Success, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.{Task, TaskResult, TaskSet}
import org.apache.spark.scheduler.cluster.{Schedulable, TaskDescription, TaskInfo, TaskLocality, TaskSetManager}
import org.apache.spark.scheduler.{Schedulable, Task, TaskDescription, TaskInfo, TaskLocality,
TaskResult, TaskSet, TaskSetManager}


private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: TaskSet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.util.Random

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.scheduler.cluster.SchedulingMode
import org.apache.spark.scheduler.SchedulingMode


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import org.eclipse.jetty.server.Handler

import org.apache.spark.{ExceptionFailure, Logging, SparkContext}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.cluster.TaskInfo
import org.apache.spark.scheduler.{SparkListenerTaskStart, SparkListenerTaskEnd, SparkListener}
import org.apache.spark.scheduler.TaskInfo
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.ui.Page.Executors
import org.apache.spark.ui.UIUtils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import javax.servlet.http.HttpServletRequest

import scala.xml.{NodeSeq, Node}

import org.apache.spark.scheduler.cluster.SchedulingMode
import org.apache.spark.scheduler.SchedulingMode
import org.apache.spark.ui.Page._
import org.apache.spark.ui.UIUtils._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ import scala.Seq
import scala.collection.mutable.{ListBuffer, HashMap, HashSet}

import org.apache.spark.{ExceptionFailure, SparkContext, Success}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.TaskInfo
import org.apache.spark.executor.TaskMetrics
import collection.mutable
import org.apache.spark.scheduler._

/**
* Tracks task-level information to be displayed in the UI.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import org.apache.spark.ui.JettyUtils._
import org.apache.spark.{ExceptionFailure, SparkContext, Success}
import org.apache.spark.scheduler._
import collection.mutable
import org.apache.spark.scheduler.cluster.SchedulingMode
import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.SchedulingMode
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.util.Utils

/** Web UI showing progress status of all jobs in the given SparkContext. */
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.xml.Node

import org.apache.spark.scheduler.Stage
import org.apache.spark.scheduler.cluster.Schedulable
import org.apache.spark.scheduler.{Schedulable, Stage}
import org.apache.spark.ui.UIUtils

/** Table showing list of pools */
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import javax.servlet.http.HttpServletRequest

import scala.xml.Node

import org.apache.spark.{ExceptionFailure}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.ui.UIUtils._
import org.apache.spark.ui.Page._
import org.apache.spark.util.{Utils, Distribution}
import org.apache.spark.{ExceptionFailure}
import org.apache.spark.scheduler.cluster.TaskInfo
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.TaskInfo

/** Page showing statistics and task list for a given stage */
private[spark] class StagePage(parent: JobProgressUI) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import java.util.Date
import scala.xml.Node
import scala.collection.mutable.HashSet

import org.apache.spark.scheduler.cluster.{SchedulingMode, TaskInfo}
import org.apache.spark.scheduler.Stage
import org.apache.spark.scheduler.{SchedulingMode, Stage, TaskInfo}
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils

Expand Down
Loading

0 comments on commit 976fe60

Please sign in to comment.