Skip to content

Commit

Permalink
[SPARK-9851] Support submitting map stages individually in DAGScheduler
Browse files Browse the repository at this point in the history
This patch adds support for submitting map stages in a DAG individually so that we can make downstream decisions after seeing statistics about their output, as part of SPARK-9850. I also added more comments to many of the key classes in DAGScheduler. By itself, the patch is not super useful except maybe to switch between a shuffle and broadcast join, but with the other subtasks of SPARK-9850 we'll be able to do more interesting decisions.

The main entry point is SparkContext.submitMapStage, which lets you run a map stage and see stats about the map output sizes. Other stats could also be collected through accumulators. See AdaptiveSchedulingSuite for a short example.

Author: Matei Zaharia <matei@databricks.com>

Closes apache#8180 from mateiz/spark-9851.
  • Loading branch information
mateiz authored and markhamstra committed Oct 27, 2015
1 parent 432ae9e commit 1fe0e2c
Show file tree
Hide file tree
Showing 12 changed files with 708 additions and 90 deletions.
27 changes: 27 additions & 0 deletions core/src/main/scala/org/apache/spark/MapOutputStatistics.scala
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

/**
* Holds statistics about the output sizes in a map stage. May become a DeveloperApi in the future.
*
* @param shuffleId ID of the shuffle
* @param bytesByPartitionId approximate number of output bytes for each map output partition
* (may be inexact due to use of compressed map statuses)
*/
private[spark] class MapOutputStatistics(val shuffleId: Int, val bytesByPartitionId: Array[Long])
49 changes: 38 additions & 11 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark

import java.io._
import java.util.Arrays
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

Expand Down Expand Up @@ -132,13 +133,43 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
* describing the shuffle blocks that are stored at that block manager.
*/
def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int)
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, reduce $reduceId")
val startTime = System.currentTimeMillis
val statuses = getStatuses(shuffleId)
// Synchronize on the returned array because, on the driver, it gets mutated in place
statuses.synchronized {
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
}
}

/**
* Return statistics about all of the outputs for a given shuffle.
*/
def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
val statuses = getStatuses(dep.shuffleId)
// Synchronize on the returned array because, on the driver, it gets mutated in place
statuses.synchronized {
val totalSizes = new Array[Long](dep.partitioner.numPartitions)
for (s <- statuses) {
for (i <- 0 until totalSizes.length) {
totalSizes(i) += s.getSizeForBlock(i)
}
}
new MapOutputStatistics(dep.shuffleId, totalSizes)
}
}

/**
* Get or fetch the array of MapStatuses for a given shuffle ID. NOTE: clients MUST synchronize
* on this array when reading it, because on the driver, we may be changing it in place.
*
* (It would be nice to remove this restriction in the future.)
*/
private def getStatuses(shuffleId: Int): Array[MapStatus] = {
val statuses = mapStatuses.get(shuffleId).orNull
if (statuses == null) {
logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
val startTime = System.currentTimeMillis
var fetchedStatuses: Array[MapStatus] = null
fetching.synchronized {
// Someone else is fetching it; wait for them to be done
Expand All @@ -160,7 +191,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
}

if (fetchedStatuses == null) {
// We won the race to fetch the output locs; do so
// We won the race to fetch the statuses; do so
logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
// This try-finally prevents hangs due to timeouts:
try {
Expand All @@ -175,22 +206,18 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
}
}
}
logDebug(s"Fetching map output location for shuffle $shuffleId, reduce $reduceId took " +
logDebug(s"Fetching map output statuses for shuffle $shuffleId took " +
s"${System.currentTimeMillis - startTime} ms")

if (fetchedStatuses != null) {
fetchedStatuses.synchronized {
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
}
return fetchedStatuses
} else {
logError("Missing all output locations for shuffle " + shuffleId)
throw new MetadataFetchFailedException(
shuffleId, reduceId, "Missing all output locations for shuffle " + shuffleId)
shuffleId, -1, "Missing all output locations for shuffle " + shuffleId)
}
} else {
statuses.synchronized {
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
}
return statuses
}
}

Expand Down
16 changes: 16 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Expand Up @@ -2017,6 +2017,22 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

new SimpleFutureAction(waiter, resultFunc)
}
/**
* Submit a map stage for execution. This is currently an internal API only, but might be
* promoted to DeveloperApi in the future.
*/
private[spark] def submitMapStage[K, V, C](dependency: ShuffleDependency[K, V, C])
: SimpleFutureAction[MapOutputStatistics] = {
assertNotStopped()
val callSite = getCallSite()
var result: MapOutputStatistics = null
val waiter = dagScheduler.submitMapStage(
dependency,
(r: MapOutputStatistics) => { result = r },
callSite,
localProperties.get)
new SimpleFutureAction[MapOutputStatistics](waiter, result)
}

/**
* Cancel active jobs for the specified group. See [[org.apache.spark.SparkContext.setJobGroup]]
Expand Down
34 changes: 29 additions & 5 deletions core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
Expand Up @@ -23,18 +23,42 @@ import org.apache.spark.TaskContext
import org.apache.spark.util.CallSite

/**
* Tracks information about an active job in the DAGScheduler.
* A running job in the DAGScheduler. Jobs can be of two types: a result job, which computes a
* ResultStage to execute an action, or a map-stage job, which computes the map outputs for a
* ShuffleMapStage before any downstream stages are submitted. The latter is used for adaptive
* query planning, to look at map output statistics before submitting later stages. We distinguish
* between these two types of jobs using the finalStage field of this class.
*
* Jobs are only tracked for "leaf" stages that clients directly submitted, through DAGScheduler's
* submitJob or submitMapStage methods. However, either type of job may cause the execution of
* other earlier stages (for RDDs in the DAG it depends on), and multiple jobs may share some of
* these previous stages. These dependencies are managed inside DAGScheduler.
*
* @param jobId A unique ID for this job.
* @param finalStage The stage that this job computes (either a ResultStage for an action or a
* ShuffleMapStage for submitMapStage).
* @param callSite Where this job was initiated in the user's program (shown on UI).
* @param listener A listener to notify if tasks in this job finish or the job fails.
* @param properties Scheduling properties attached to the job, such as fair scheduler pool name.
*/
private[spark] class ActiveJob(
val jobId: Int,
val finalStage: ResultStage,
val func: (TaskContext, Iterator[_]) => _,
val partitions: Array[Int],
val finalStage: Stage,
val callSite: CallSite,
val listener: JobListener,
val properties: Properties) {

val numPartitions = partitions.length
/**
* Number of partitions we need to compute for this job. Note that result stages may not need
* to compute all partitions in their target RDD, for actions like first() and lookup().
*/
val numPartitions = finalStage match {
case r: ResultStage => r.partitions.length
case m: ShuffleMapStage => m.rdd.partitions.length
}

/** Which partitions of the stage have finished */
val finished = Array.fill[Boolean](numPartitions)(false)

var numFinished = 0
}

0 comments on commit 1fe0e2c

Please sign in to comment.