Skip to content

Commit

Permalink
SPARK-1093: Annotate developer and experimental API's
Browse files Browse the repository at this point in the history
This patch marks some existing classes as private[spark] and adds two types of API annotations:
- `EXPERIMENTAL API` = experimental user-facing module
- `DEVELOPER API - UNSTABLE` = developer-facing API that might change

There is some discussion of the different mechanisms for doing this here:
https://issues.apache.org/jira/browse/SPARK-1081

I was pretty aggressive with marking things private. Keep in mind that if we want to open something up in the future we can, but we can never reduce visibility.

A few notes here:
- In the past we've been inconsistent with the visiblity of the X-RDD classes. This patch marks them private whenever there is an existing function in RDD that can directly creat them (e.g. CoalescedRDD and rdd.coalesce()). One trade-off here is users can't subclass them.
- Noted that compression and serialization formats don't have to be wire compatible across versions.
- Compression codecs and serialization formats are semi-private as users typically don't instantiate them directly.
- Metrics sources are made private - user only interacts with them through Spark's reflection

Author: Patrick Wendell <pwendell@gmail.com>
Author: Andrew Or <andrewor14@gmail.com>

Closes apache#274 from pwendell/private-apis and squashes the following commits:

44179e4 [Patrick Wendell] Merge remote-tracking branch 'apache-github/master' into private-apis
042c803 [Patrick Wendell] spark.annotations -> spark.annotation
bfe7b52 [Patrick Wendell] Adding experimental for approximate counts
8d0c873 [Patrick Wendell] Warning in SparkEnv
99b223a [Patrick Wendell] Cleaning up annotations
e849f64 [Patrick Wendell] Merge pull request #2 from andrewor14/annotations
982a473 [Andrew Or] Generalize jQuery matching for non Spark-core API docs
a01c076 [Patrick Wendell] Merge pull request #1 from andrewor14/annotations
c1bcb41 [Andrew Or] DeveloperAPI -> DeveloperApi
0d48908 [Andrew Or] Comments and new lines (minor)
f3954e0 [Andrew Or] Add identifier tags in comments to work around scaladocs bug
99192ef [Andrew Or] Dynamically add badges based on annotations
824011b [Andrew Or] Add support for injecting arbitrary JavaScript to API docs
037755c [Patrick Wendell] Some changes after working with andrew or
f7d124f [Patrick Wendell] Small fixes
c318b24 [Patrick Wendell] Use CSS styles
e4c76b9 [Patrick Wendell] Logging
f390b13 [Patrick Wendell] Better visibility for workaround constructors
d6b0afd [Patrick Wendell] Small chang to existing constructor
403ba52 [Patrick Wendell] Style fix
870a7ba [Patrick Wendell] Work around for SI-8479
7fb13b2 [Patrick Wendell] Changes to UnionRDD and EmptyRDD
4a9e90c [Patrick Wendell] EXPERIMENTAL API --> EXPERIMENTAL
c581dce [Patrick Wendell] Changes after building against Shark.
8452309 [Patrick Wendell] Style fixes
1ed27d2 [Patrick Wendell] Formatting and coloring of badges
cd7a465 [Patrick Wendell] Code review feedback
2f706f1 [Patrick Wendell] Don't use floats
542a736 [Patrick Wendell] Small fixes
cf23ec6 [Patrick Wendell] Marking GraphX as alpha
d86818e [Patrick Wendell] Another naming change
5a76ed6 [Patrick Wendell] More visiblity clean-up
42c1f09 [Patrick Wendell] Using better labels
9d48cbf [Patrick Wendell] Initial pass
  • Loading branch information
pwendell committed Apr 9, 2014
1 parent 9689b66 commit 87bd1f9
Show file tree
Hide file tree
Showing 84 changed files with 614 additions and 130 deletions.
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@

package org.apache.spark

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap}

/**
* :: DeveloperApi ::
* A set of functions used to aggregate data.
*
* @param createCombiner function to create the initial value of the aggregation.
* @param mergeValue function to merge a new value into the aggregation result.
* @param mergeCombiners function to merge outputs from multiple mergeValue function.
*/
@DeveloperApi
case class Aggregator[K, V, C] (
createCombiner: V => C,
mergeValue: (C, V) => C,
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,24 @@

package org.apache.spark

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer

/**
* :: DeveloperApi ::
* Base class for dependencies.
*/
@DeveloperApi
abstract class Dependency[T](val rdd: RDD[T]) extends Serializable


/**
* :: DeveloperApi ::
* Base class for dependencies where each partition of the parent RDD is used by at most one
* partition of the child RDD. Narrow dependencies allow for pipelined execution.
*/
@DeveloperApi
abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
/**
* Get the parent partitions for a child partition.
Expand All @@ -41,13 +46,15 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {


/**
* :: DeveloperApi ::
* Represents a dependency on the output of a shuffle stage.
* @param rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to null,
* the default serializer, as specified by `spark.serializer` config option, will
* be used.
*/
@DeveloperApi
class ShuffleDependency[K, V](
@transient rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
Expand All @@ -61,20 +68,24 @@ class ShuffleDependency[K, V](


/**
* :: DeveloperApi ::
* Represents a one-to-one dependency between partitions of the parent and child RDDs.
*/
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int) = List(partitionId)
}


/**
* :: DeveloperApi ::
* Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
* @param rdd the parent RDD
* @param inStart the start of the range in the parent RDD
* @param outStart the start of the range in the child RDD
* @param length the length of the range
*/
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {

Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ import scala.concurrent._
import scala.concurrent.duration.Duration
import scala.util.Try

import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter}

/**
* :: Experimental ::
* A future for the result of an action to support cancellation. This is an extension of the
* Scala Future interface to support cancellation.
*/
@Experimental
trait FutureAction[T] extends Future[T] {
// Note that we redefine methods of the Future trait here explicitly so we can specify a different
// documentation (with reference to the word "action").
Expand Down Expand Up @@ -84,9 +87,11 @@ trait FutureAction[T] extends Future[T] {


/**
* :: Experimental ::
* A [[FutureAction]] holding the result of an action that triggers a single job. Examples include
* count, collect, reduce.
*/
@Experimental
class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T)
extends FutureAction[T] {

Expand Down Expand Up @@ -148,10 +153,12 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:


/**
* :: Experimental ::
* A [[FutureAction]] for actions that could trigger multiple Spark jobs. Examples include take,
* takeSample. Cancellation works by setting the cancelled flag to true and interrupting the
* action thread if it is being blocked by a job.
*/
@Experimental
class ComplexFutureAction[T] extends FutureAction[T] {

// Pointer to the thread that is executing the action. It is set when the action is run.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.spark
* An iterator that wraps around an existing iterator to provide task killing functionality.
* It works by checking the interrupted flag in [[TaskContext]].
*/
class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])
private[spark] class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])
extends Iterator[T] {

def hasNext: Boolean = !context.interrupted && delegate.hasNext
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,18 @@ import org.apache.log4j.{LogManager, PropertyConfigurator}
import org.slf4j.{Logger, LoggerFactory}
import org.slf4j.impl.StaticLoggerBinder

import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows
* logging messages at different levels using methods that only evaluate parameters lazily if the
* log level is enabled.
*
* NOTE: DO NOT USE this class outside of Spark. It is intended as an internal utility.
* This will likely be changed or removed in future releases.
*/
@DeveloperApi
trait Logging {
// Make the log field transient so that objects with Logging can
// be serialized and used on another machine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.ObjectWritable
import org.apache.hadoop.io.Writable

import org.apache.spark.annotation.DeveloperApi

@DeveloperApi
class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable {
def value = t
override def toString = t.toString
Expand Down
86 changes: 73 additions & 13 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHad
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.mesos.MesosNativeLibrary

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.input.WholeTextFileInputFormat
Expand All @@ -48,22 +49,35 @@ import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}

/**
* :: DeveloperApi ::
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
*
* @param config a Spark Config object describing the application configuration. Any settings in
* this config overrides the default configs as well as system properties.
* @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. Can
* be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
* from a list of input files or InputFormats for the application.
*/
class SparkContext(
config: SparkConf,
// This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
// etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
// contains a map from hostname to a list of input format splits on the host.
val preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map())
extends Logging {

@DeveloperApi
class SparkContext(config: SparkConf) extends Logging {

// This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
// etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
// contains a map from hostname to a list of input format splits on the host.
private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()

/**
* :: DeveloperApi ::
* Alternative constructor for setting preferred locations where Spark will create executors.
*
* @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. Ca
* be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
* from a list of input files or InputFormats for the application.
*/
@DeveloperApi
def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = {
this(config)
this.preferredNodeLocationData = preferredNodeLocationData
}

/**
* Alternative constructor that allows setting common Spark properties directly
Expand Down Expand Up @@ -93,10 +107,45 @@ class SparkContext(
environment: Map[String, String] = Map(),
preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) =
{
this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment),
preferredNodeLocationData)
this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment))
this.preferredNodeLocationData = preferredNodeLocationData
}

// NOTE: The below constructors could be consolidated using default arguments. Due to
// Scala bug SI-8479, however, this causes the compile step to fail when generating docs.
// Until we have a good workaround for that bug the constructors remain broken out.

/**
* Alternative constructor that allows setting common Spark properties directly
*
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName A name for your application, to display on the cluster web UI.
*/
private[spark] def this(master: String, appName: String) =
this(master, appName, null, Nil, Map(), Map())

/**
* Alternative constructor that allows setting common Spark properties directly
*
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName A name for your application, to display on the cluster web UI.
* @param sparkHome Location where Spark is installed on cluster nodes.
*/
private[spark] def this(master: String, appName: String, sparkHome: String) =
this(master, appName, sparkHome, Nil, Map(), Map())

/**
* Alternative constructor that allows setting common Spark properties directly
*
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName A name for your application, to display on the cluster web UI.
* @param sparkHome Location where Spark is installed on cluster nodes.
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
*/
private[spark] def this(master: String, appName: String, sparkHome: String, jars: Seq[String]) =
this(master, appName, sparkHome, jars, Map(), Map())

private[spark] val conf = config.clone()

/**
Expand Down Expand Up @@ -189,7 +238,7 @@ class SparkContext(
jars.foreach(addJar)
}

def warnSparkMem(value: String): String = {
private def warnSparkMem(value: String): String = {
logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " +
"deprecated, please use spark.executor.memory instead.")
value
Expand Down Expand Up @@ -653,6 +702,9 @@ class SparkContext(
def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] =
new UnionRDD(this, Seq(first) ++ rest)

/** Get an RDD that has no partitions or elements. */
def emptyRDD[T: ClassTag] = new EmptyRDD[T](this)

// Methods for creating shared variables

/**
Expand Down Expand Up @@ -716,6 +768,11 @@ class SparkContext(
postEnvironmentUpdate()
}

/**
* :: DeveloperApi ::
* Register a listener to receive up-calls from events that happen during execution.
*/
@DeveloperApi
def addSparkListener(listener: SparkListener) {
listenerBus.addListener(listener)
}
Expand Down Expand Up @@ -1021,8 +1078,10 @@ class SparkContext(
}

/**
* :: DeveloperApi ::
* Run a job that can return approximate results.
*/
@DeveloperApi
def runApproximateJob[T, U, R](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
Expand All @@ -1040,6 +1099,7 @@ class SparkContext(
/**
* Submit a job for execution and return a FutureJob holding the result.
*/
@Experimental
def submitJob[T, U, R](
rdd: RDD[T],
processPartition: Iterator[T] => U,
Expand Down
8 changes: 7 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.util.Properties
import akka.actor._
import com.google.common.collect.MapMaker

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.python.PythonWorkerFactory
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.metrics.MetricsSystem
Expand All @@ -35,13 +36,18 @@ import org.apache.spark.storage._
import org.apache.spark.util.{AkkaUtils, Utils}

/**
* :: DeveloperApi ::
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
* including the serializer, Akka actor system, block manager, map output tracker, etc. Currently
* Spark code finds the SparkEnv through a thread-local variable, so each thread that accesses these
* objects needs to have the right SparkEnv set. You can get the current environment with
* SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
*
* NOTE: This is not intended for external use. This is exposed for Shark and may be made private
* in a future release.
*/
class SparkEnv private[spark] (
@DeveloperApi
class SparkEnv (
val executorId: String,
val actorSystem: ActorSystem,
val serializer: Serializer,
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@ package org.apache.spark

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics

/**
* :: DeveloperApi ::
* Contextual information about a task which can be read or mutated during execution.
*/
@DeveloperApi
class TaskContext(
val stageId: Int,
val partitionId: Int,
Expand Down
Loading

0 comments on commit 87bd1f9

Please sign in to comment.