Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
Browse files Browse the repository at this point in the history
  • Loading branch information
MLnick committed Apr 23, 2014
2 parents e001b94 + 39f85e0 commit bef3afb
Show file tree
Hide file tree
Showing 202 changed files with 4,389 additions and 2,124 deletions.
2 changes: 2 additions & 0 deletions conf/spark-env.sh.template
Expand Up @@ -5,6 +5,7 @@

# Options read when launching programs locally with
# ./bin/run-example or ./bin/spark-submit
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# - SPARK_PUBLIC_DNS, to set the public dns name of the driver program
# - SPARK_CLASSPATH, default classpath entries to append
Expand All @@ -17,6 +18,7 @@
# - MESOS_NATIVE_LIBRARY, to point to your libmesos.so if you use Mesos

# Options read in YARN client mode
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_EXECUTOR_INSTANCES, Number of workers to start (Default: 2)
# - SPARK_EXECUTOR_CORES, Number of cores for the workers (Default: 1).
# - SPARK_EXECUTOR_MEMORY, Memory per Worker (e.g. 1000M, 2G) (Default: 1G)
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Expand Up @@ -269,7 +269,7 @@
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>pyrolite</artifactId>
<version>2.0</version>
<version>2.0.1</version>
</dependency>
</dependencies>
<build>
Expand Down
23 changes: 23 additions & 0 deletions core/src/main/java/org/apache/spark/package-info.java
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Core Spark classes in Scala. A few classes here, such as {@link org.apache.spark.Accumulator}
* and {@link org.apache.spark.storage.StorageLevel}, are also used in Java, but the
* {@link org.apache.spark.api.java} package contains the main Java API.
*/
package org.apache.spark;
11 changes: 6 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Expand Up @@ -1110,6 +1110,7 @@ class SparkContext(config: SparkConf) extends Logging {
}

/**
* :: Experimental ::
* Submit a job for execution and return a FutureJob holding the result.
*/
@Experimental
Expand Down Expand Up @@ -1345,19 +1346,19 @@ object SparkContext extends Logging {
* Find the JAR from which a given class was loaded, to make it easy for users to pass
* their JARs to SparkContext.
*/
def jarOfClass(cls: Class[_]): Seq[String] = {
def jarOfClass(cls: Class[_]): Option[String] = {
val uri = cls.getResource("/" + cls.getName.replace('.', '/') + ".class")
if (uri != null) {
val uriStr = uri.toString
if (uriStr.startsWith("jar:file:")) {
// URI will be of the form "jar:file:/path/foo.jar!/package/cls.class",
// so pull out the /path/foo.jar
List(uriStr.substring("jar:file:".length, uriStr.indexOf('!')))
Some(uriStr.substring("jar:file:".length, uriStr.indexOf('!')))
} else {
Nil
None
}
} else {
Nil
None
}
}

Expand All @@ -1366,7 +1367,7 @@ object SparkContext extends Logging {
* to pass their JARs to SparkContext. In most cases you can call jarOfObject(this) in
* your driver program.
*/
def jarOfObject(obj: AnyRef): Seq[String] = jarOfClass(obj.getClass)
def jarOfObject(obj: AnyRef): Option[String] = jarOfClass(obj.getClass)

/**
* Creates a modified version of a SparkConf with the parameters that can be passed separately
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/TaskContext.scala
Expand Up @@ -33,7 +33,7 @@ class TaskContext(
val attemptId: Long,
val runningLocally: Boolean = false,
@volatile var interrupted: Boolean = false,
private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty()
private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty
) extends Serializable {

@deprecated("use partitionId", "0.8.1")
Expand Down
Expand Up @@ -19,7 +19,14 @@

import java.lang.annotation.*;

/** A new component of Spark which may have unstable API's. */
/**
* A new component of Spark which may have unstable API's.
*
* NOTE: If there exists a Scaladoc comment that immediately precedes this annotation, the first
* line of the comment must be ":: AlphaComponent ::" with no trailing blank line. This is because
* of the known issue that Scaladoc displays only either the annotation or the comment, whichever
* comes first.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER,
ElementType.CONSTRUCTOR, ElementType.LOCAL_VARIABLE, ElementType.PACKAGE})
Expand Down
Expand Up @@ -23,6 +23,11 @@
* A lower-level, unstable API intended for developers.
*
* Developer API's might change or be removed in minor versions of Spark.
*
* NOTE: If there exists a Scaladoc comment that immediately precedes this annotation, the first
* line of the comment must be ":: DeveloperApi ::" with no trailing blank line. This is because
* of the known issue that Scaladoc displays only either the annotation or the comment, whichever
* comes first.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER,
Expand Down
Expand Up @@ -24,6 +24,11 @@
*
* Experimental API's might change or be removed in minor versions of Spark, or be adopted as
* first-class Spark API's.
*
* NOTE: If there exists a Scaladoc comment that immediately precedes this annotation, the first
* line of the comment must be ":: Experimental ::" with no trailing blank line. This is because
* of the known issue that Scaladoc displays only either the annotation or the comment, whichever
* comes first.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER,
Expand Down
Expand Up @@ -196,7 +196,7 @@ object SparkSubmit {
childArgs ++= appArgs.childArgs
} else if (clusterManager == YARN) {
for (arg <- appArgs.childArgs) {
childArgs += ("--args", arg)
childArgs += ("--arg", arg)
}
}
}
Expand Down
Expand Up @@ -116,6 +116,15 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
if (args.length == 0) printUsageAndExit(-1)
if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource")
if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class")

if (master.startsWith("yarn")) {
val hasHadoopEnv = sys.env.contains("HADOOP_CONF_DIR") || sys.env.contains("YARN_CONF_DIR")
val testing = sys.env.contains("SPARK_TESTING")
if (!hasHadoopEnv && !testing) {
throw new Exception(s"When running with master '$master' " +
"either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.")
}
}
}

override def toString = {
Expand Down
Expand Up @@ -98,6 +98,11 @@ class HistoryServer(
def initialize() {
attachPage(new HistoryPage(this))
attachHandler(createStaticHandler(STATIC_RESOURCE_DIR, "/static"))
}

/** Bind to the HTTP server behind this web interface. */
override def bind() {
super.bind()
logCheckingThread.start()
}

Expand Down
Expand Up @@ -84,7 +84,7 @@ class TaskMetrics extends Serializable {
}

private[spark] object TaskMetrics {
def empty(): TaskMetrics = new TaskMetrics
def empty: TaskMetrics = new TaskMetrics
}


Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/package.scala
Expand Up @@ -30,7 +30,7 @@ package org.apache
* type (e.g. RDD[(Int, Int)] through implicit conversions when you
* `import org.apache.spark.SparkContext._`.
*
* Java programmers should reference the [[spark.api.java]] package
* Java programmers should reference the [[org.apache.spark.api.java]] package
* for Spark programming APIs in Java.
*
* Classes and methods marked with <span class="experimental badge" style="float: none;">
Expand Down
31 changes: 25 additions & 6 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.rdd
import java.util.Random

import scala.collection.Map
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.{classTag, ClassTag}

Expand Down Expand Up @@ -94,26 +95,20 @@ abstract class RDD[T: ClassTag](
def compute(split: Partition, context: TaskContext): Iterator[T]

/**
* :: DeveloperApi ::
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
@DeveloperApi
protected def getPartitions: Array[Partition]

/**
* :: DeveloperApi ::
* Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
@DeveloperApi
protected def getDependencies: Seq[Dependency[_]] = deps

/**
* :: DeveloperApi ::
* Optionally overridden by subclasses to specify placement preferences.
*/
@DeveloperApi
protected def getPreferredLocations(split: Partition): Seq[String] = Nil

/** Optionally overridden by subclasses to specify how they are partitioned. */
Expand Down Expand Up @@ -235,6 +230,30 @@ abstract class RDD[T: ClassTag](
}
}

/**
* Return the ancestors of the given RDD that are related to it only through a sequence of
* narrow dependencies. This traverses the given RDD's dependency tree using DFS, but maintains
* no ordering on the RDDs returned.
*/
private[spark] def getNarrowAncestors: Seq[RDD[_]] = {
val ancestors = new mutable.HashSet[RDD[_]]

def visit(rdd: RDD[_]) {
val narrowDependencies = rdd.dependencies.filter(_.isInstanceOf[NarrowDependency[_]])
val narrowParents = narrowDependencies.map(_.rdd)
val narrowParentsNotVisited = narrowParents.filterNot(ancestors.contains)
narrowParentsNotVisited.foreach { parent =>
ancestors.add(parent)
visit(parent)
}
}

visit(this)

// In case there is a cycle, do not include the root itself
ancestors.filterNot(_ == this).toSeq
}

/**
* Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
*/
Expand Down
Expand Up @@ -207,7 +207,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val taskInfo = taskEnd.taskInfo
var taskStatus = "TASK_TYPE=%s".format(taskEnd.taskType)
val taskMetrics = if (taskEnd.taskMetrics != null) taskEnd.taskMetrics else TaskMetrics.empty()
val taskMetrics = if (taskEnd.taskMetrics != null) taskEnd.taskMetrics else TaskMetrics.empty
taskEnd.reason match {
case Success => taskStatus += " STATUS=SUCCESS"
recordTaskMetrics(taskEnd.stageId, taskStatus, taskInfo, taskMetrics)
Expand Down
Expand Up @@ -24,8 +24,12 @@ import org.apache.spark.annotation.DeveloperApi
// information about a specific split instance : handles both split instances.
// So that we do not need to worry about the differences.
@DeveloperApi
class SplitInfo(val inputFormatClazz: Class[_], val hostLocation: String, val path: String,
val length: Long, val underlyingSplit: Any) {
class SplitInfo(
val inputFormatClazz: Class[_],
val hostLocation: String,
val path: String,
val length: Long,
val underlyingSplit: Any) {
override def toString(): String = {
"SplitInfo " + super.toString + " .. inputFormatClazz " + inputFormatClazz +
", hostLocation : " + hostLocation + ", path : " + path +
Expand Down
19 changes: 12 additions & 7 deletions core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.storage.RDDInfo
* Stores information about a stage to pass from the scheduler to SparkListeners.
*/
@DeveloperApi
class StageInfo(val stageId: Int, val name: String, val numTasks: Int, val rddInfo: RDDInfo) {
class StageInfo(val stageId: Int, val name: String, val numTasks: Int, val rddInfos: Seq[RDDInfo]) {
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
var submissionTime: Option[Long] = None
/** Time when all tasks in the stage completed or when the stage was cancelled. */
Expand All @@ -41,12 +41,17 @@ class StageInfo(val stageId: Int, val name: String, val numTasks: Int, val rddIn
}
}

private[spark]
object StageInfo {
private[spark] object StageInfo {
/**
* Construct a StageInfo from a Stage.
*
* Each Stage is associated with one or many RDDs, with the boundary of a Stage marked by
* shuffle dependencies. Therefore, all ancestor RDDs related to this Stage's RDD through a
* sequence of narrow dependencies should also be associated with this Stage.
*/
def fromStage(stage: Stage): StageInfo = {
val rdd = stage.rdd
val rddName = Option(rdd.name).getOrElse(rdd.id.toString)
val rddInfo = new RDDInfo(rdd.id, rddName, rdd.partitions.size, rdd.getStorageLevel)
new StageInfo(stage.id, stage.name, stage.numTasks, rddInfo)
val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos
new StageInfo(stage.id, stage.name, stage.numTasks, rddInfos)
}
}
55 changes: 55 additions & 0 deletions core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.storage

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils

@DeveloperApi
class RDDInfo(
val id: Int,
val name: String,
val numPartitions: Int,
val storageLevel: StorageLevel)
extends Ordered[RDDInfo] {

var numCachedPartitions = 0
var memSize = 0L
var diskSize = 0L
var tachyonSize = 0L

override def toString = {
import Utils.bytesToString
("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; " +
"TachyonSize: %s; DiskSize: %s").format(
name, id, storageLevel.toString, numCachedPartitions, numPartitions,
bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize))
}

override def compare(that: RDDInfo) = {
this.id - that.id
}
}

private[spark] object RDDInfo {
def fromRdd(rdd: RDD[_]): RDDInfo = {
val rddName = Option(rdd.name).getOrElse(rdd.id.toString)
new RDDInfo(rdd.id, rddName, rdd.partitions.size, rdd.getStorageLevel)
}
}

0 comments on commit bef3afb

Please sign in to comment.