Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into ldaonline
Browse files Browse the repository at this point in the history
s
  • Loading branch information
hhbyyh committed Feb 8, 2015
2 parents f41c5ca + 5de14cc commit 45884ab
Show file tree
Hide file tree
Showing 166 changed files with 7,832 additions and 766 deletions.
8 changes: 4 additions & 4 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
<deb.pkg.name>spark</deb.pkg.name>
<deb.install.path>/usr/share/spark</deb.install.path>
<deb.user>root</deb.user>
<deb.bin.filemode>744</deb.bin.filemode>
<deb.bin.filemode>755</deb.bin.filemode>
</properties>

<dependencies>
Expand Down Expand Up @@ -280,7 +280,7 @@
<user>${deb.user}</user>
<group>${deb.user}</group>
<prefix>${deb.install.path}/conf</prefix>
<filemode>744</filemode>
<filemode>${deb.bin.filemode}</filemode>
</mapper>
</data>
<data>
Expand All @@ -302,7 +302,7 @@
<user>${deb.user}</user>
<group>${deb.user}</group>
<prefix>${deb.install.path}/sbin</prefix>
<filemode>744</filemode>
<filemode>${deb.bin.filemode}</filemode>
</mapper>
</data>
<data>
Expand All @@ -313,7 +313,7 @@
<user>${deb.user}</user>
<group>${deb.user}</group>
<prefix>${deb.install.path}/python</prefix>
<filemode>744</filemode>
<filemode>${deb.bin.filemode}</filemode>
</mapper>
</data>
</dataSet>
Expand Down
Empty file modified bin/spark-shell.cmd
100755 → 100644
Empty file.
2 changes: 1 addition & 1 deletion bin/spark-submit2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ set ORIG_ARGS=%*
rem Reset the values of all variables used
set SPARK_SUBMIT_DEPLOY_MODE=client

if not defined %SPARK_CONF_DIR% (
if [%SPARK_CONF_DIR%] == [] (
set SPARK_CONF_DIR=%SPARK_HOME%\conf
)
set SPARK_SUBMIT_PROPERTIES_FILE=%SPARK_CONF_DIR%\spark-defaults.conf
Expand Down
8 changes: 8 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,14 @@
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-graphite</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.10</artifactId>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ span.expand-details {
float: right;
}

span.rest-uri {
font-size: 10pt;
font-style: italic;
color: gray;
}

pre {
font-size: 0.8em;
}
Expand Down
11 changes: 8 additions & 3 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,15 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
val inputMetrics = blockResult.inputMetrics
val existingMetrics = context.taskMetrics
.getInputMetricsForReadMethod(inputMetrics.readMethod)
existingMetrics.addBytesRead(inputMetrics.bytesRead)

new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
existingMetrics.incBytesRead(inputMetrics.bytesRead)

val iter = blockResult.data.asInstanceOf[Iterator[T]]
new InterruptibleIterator[T](context, iter) {
override def next(): T = {
existingMetrics.incRecordsRead(1)
delegate.next()
}
}
case None =>
// Acquire a lock for loading this partition
// If another thread already holds the lock, wait for it to finish return its results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ private[spark] class ExecutorAllocationManager(
private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors",
Integer.MAX_VALUE)

// How long there must be backlogged tasks for before an addition is triggered
// How long there must be backlogged tasks for before an addition is triggered (seconds)
private val schedulerBacklogTimeout = conf.getLong(
"spark.dynamicAllocation.schedulerBacklogTimeout", 60)
"spark.dynamicAllocation.schedulerBacklogTimeout", 5)

// Same as above, but used only after `schedulerBacklogTimeout` is exceeded
private val sustainedSchedulerBacklogTimeout = conf.getLong(
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", schedulerBacklogTimeout)

// How long an executor must be idle for before it is removed
// How long an executor must be idle for before it is removed (seconds)
private val executorIdleTimeout = conf.getLong(
"spark.dynamicAllocation.executorIdleTimeout", 600)

Expand Down Expand Up @@ -486,8 +486,8 @@ private[spark] class ExecutorAllocationManager(
}
}

override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = {
val executorId = blockManagerAdded.blockManagerId.executorId
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
val executorId = executorAdded.executorId
if (executorId != SparkContext.DRIVER_IDENTIFIER) {
// This guards against the race condition in which the `SparkListenerTaskStart`
// event is posted before the `SparkListenerBlockManagerAdded` event, which is
Expand All @@ -498,9 +498,8 @@ private[spark] class ExecutorAllocationManager(
}
}

override def onBlockManagerRemoved(
blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = {
allocationManager.onExecutorRemoved(blockManagerRemoved.blockManagerId.executorId)
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
allocationManager.onExecutorRemoved(executorRemoved.executorId)
}

/**
Expand Down
22 changes: 19 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// the bound port to the cluster manager properly
ui.foreach(_.bind())

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
/**
* A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse.
*
* '''Note:''' As it will be reused in all Hadoop RDDs, it's better not to modify it unless you
* plan to set some global configurations for all Hadoop RDDs.
*/
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)

// Add each JAR given through the constructor
Expand Down Expand Up @@ -694,7 +699,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),
* using the older MapReduce API (`org.apache.hadoop.mapred`).
*
* @param conf JobConf for setting up the dataset
* @param conf JobConf for setting up the dataset. Note: This will be put into a Broadcast.
* Therefore if you plan to reuse this conf to create multiple RDDs, you need to make
* sure you won't modify the conf. A safe approach is always creating a new conf for
* a new RDD.
* @param inputFormatClass Class of the InputFormat
* @param keyClass Class of the keys
* @param valueClass Class of the values
Expand Down Expand Up @@ -830,6 +838,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
*
* @param conf Configuration for setting up the dataset. Note: This will be put into a Broadcast.
* Therefore if you plan to reuse this conf to create multiple RDDs, you need to make
* sure you won't modify the conf. A safe approach is always creating a new conf for
* a new RDD.
* @param fClass Class of the InputFormat
* @param kClass Class of the keys
* @param vClass Class of the values
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
Expand Down Expand Up @@ -2094,7 +2110,7 @@ object SparkContext extends Logging {

val scheduler = new TaskSchedulerImpl(sc)
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
val masterUrls = localCluster.start()
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
Expand Down
34 changes: 27 additions & 7 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,20 @@ private[spark] object TestUtils {
* Note: if this is used during class loader tests, class names should be unique
* in order to avoid interference between tests.
*/
def createJarWithClasses(classNames: Seq[String], value: String = ""): URL = {
def createJarWithClasses(
classNames: Seq[String],
toStringValue: String = "",
classNamesWithBase: Seq[(String, String)] = Seq(),
classpathUrls: Seq[URL] = Seq()): URL = {
val tempDir = Utils.createTempDir()
val files = for (name <- classNames) yield createCompiledClass(name, tempDir, value)
val files1 = for (name <- classNames) yield {
createCompiledClass(name, tempDir, toStringValue, classpathUrls = classpathUrls)
}
val files2 = for ((childName, baseName) <- classNamesWithBase) yield {
createCompiledClass(childName, tempDir, toStringValue, baseName, classpathUrls)
}
val jarFile = new File(tempDir, "testJar-%s.jar".format(System.currentTimeMillis()))
createJar(files, jarFile)
createJar(files1 ++ files2, jarFile)
}


Expand Down Expand Up @@ -85,15 +94,26 @@ private[spark] object TestUtils {
}

/** Creates a compiled class with the given name. Class file will be placed in destDir. */
def createCompiledClass(className: String, destDir: File, value: String = ""): File = {
def createCompiledClass(
className: String,
destDir: File,
toStringValue: String = "",
baseClass: String = null,
classpathUrls: Seq[URL] = Seq()): File = {
val compiler = ToolProvider.getSystemJavaCompiler
val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
val sourceFile = new JavaSourceFromString(className,
"public class " + className + " implements java.io.Serializable {" +
" @Override public String toString() { return \"" + value + "\"; }}")
"public class " + className + extendsText + " implements java.io.Serializable {" +
" @Override public String toString() { return \"" + toStringValue + "\"; }}")

// Calling this outputs a class file in pwd. It's easier to just rename the file than
// build a custom FileManager that controls the output location.
compiler.getTask(null, null, null, null, null, Seq(sourceFile)).call()
val options = if (classpathUrls.nonEmpty) {
Seq("-classpath", classpathUrls.map { _.getFile }.mkString(File.pathSeparator))
} else {
Seq()
}
compiler.getTask(null, null, null, options, null, Seq(sourceFile)).call()

val fileName = className + ".class"
val result = new File(fileName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,15 @@ class JavaSparkContext(val sc: SparkContext)
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
* etc).
*
* @param conf JobConf for setting up the dataset. Note: This will be put into a Broadcast.
* Therefore if you plan to reuse this conf to create multiple RDDs, you need to make
* sure you won't modify the conf. A safe approach is always creating a new conf for
* a new RDD.
* @param inputFormatClass Class of the InputFormat
* @param keyClass Class of the keys
* @param valueClass Class of the values
* @param minPartitions Minimum number of Hadoop Splits to generate.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
Expand All @@ -395,6 +404,14 @@ class JavaSparkContext(val sc: SparkContext)
* Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
*
* @param conf JobConf for setting up the dataset. Note: This will be put into a Broadcast.
* Therefore if you plan to reuse this conf to create multiple RDDs, you need to make
* sure you won't modify the conf. A safe approach is always creating a new conf for
* a new RDD.
* @param inputFormatClass Class of the InputFormat
* @param keyClass Class of the keys
* @param valueClass Class of the values
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
Expand Down Expand Up @@ -476,6 +493,14 @@ class JavaSparkContext(val sc: SparkContext)
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
*
* @param conf Configuration for setting up the dataset. Note: This will be put into a Broadcast.
* Therefore if you plan to reuse this conf to create multiple RDDs, you need to make
* sure you won't modify the conf. A safe approach is always creating a new conf for
* a new RDD.
* @param fClass Class of the InputFormat
* @param kClass Class of the keys
* @param vClass Class of the values
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
Expand Down Expand Up @@ -675,6 +700,9 @@ class JavaSparkContext(val sc: SparkContext)

/**
* Returns the Hadoop configuration used for the Hadoop code (e.g. file systems) we reuse.
*
* '''Note:''' As it will be reused in all Hadoop RDDs, it's better not to modify it unless you
* plan to set some global configurations for all Hadoop RDDs.
*/
def hadoopConfiguration(): Configuration = {
sc.hadoopConfiguration
Expand Down
20 changes: 12 additions & 8 deletions core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ import org.apache.spark.util.{IntParam, MemoryParam}
* Command-line parser for the driver client.
*/
private[spark] class ClientArguments(args: Array[String]) {
val defaultCores = 1
val defaultMemory = 512
import ClientArguments._

var cmd: String = "" // 'launch' or 'kill'
var logLevel = Level.WARN
Expand All @@ -39,9 +38,9 @@ private[spark] class ClientArguments(args: Array[String]) {
var master: String = ""
var jarUrl: String = ""
var mainClass: String = ""
var supervise: Boolean = false
var memory: Int = defaultMemory
var cores: Int = defaultCores
var supervise: Boolean = DEFAULT_SUPERVISE
var memory: Int = DEFAULT_MEMORY
var cores: Int = DEFAULT_CORES
private var _driverOptions = ListBuffer[String]()
def driverOptions = _driverOptions.toSeq

Expand All @@ -50,7 +49,7 @@ private[spark] class ClientArguments(args: Array[String]) {

parse(args.toList)

def parse(args: List[String]): Unit = args match {
private def parse(args: List[String]): Unit = args match {
case ("--cores" | "-c") :: IntParam(value) :: tail =>
cores = value
parse(tail)
Expand Down Expand Up @@ -106,9 +105,10 @@ private[spark] class ClientArguments(args: Array[String]) {
|Usage: DriverClient kill <active-master> <driver-id>
|
|Options:
| -c CORES, --cores CORES Number of cores to request (default: $defaultCores)
| -m MEMORY, --memory MEMORY Megabytes of memory to request (default: $defaultMemory)
| -c CORES, --cores CORES Number of cores to request (default: $DEFAULT_CORES)
| -m MEMORY, --memory MEMORY Megabytes of memory to request (default: $DEFAULT_MEMORY)
| -s, --supervise Whether to restart the driver on failure
| (default: $DEFAULT_SUPERVISE)
| -v, --verbose Print more debugging output
""".stripMargin
System.err.println(usage)
Expand All @@ -117,6 +117,10 @@ private[spark] class ClientArguments(args: Array[String]) {
}

object ClientArguments {
private[spark] val DEFAULT_CORES = 1
private[spark] val DEFAULT_MEMORY = 512 // MB
private[spark] val DEFAULT_SUPERVISE = false

def isValidJarUrl(s: String): Boolean = {
try {
val uri = new URI(s)
Expand Down
15 changes: 11 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,22 @@ private[deploy] object DeployMessages {

// Master to MasterWebUI

case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo],
activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo],
activeDrivers: Array[DriverInfo], completedDrivers: Array[DriverInfo],
status: MasterState) {
case class MasterStateResponse(
host: String,
port: Int,
restPort: Option[Int],
workers: Array[WorkerInfo],
activeApps: Array[ApplicationInfo],
completedApps: Array[ApplicationInfo],
activeDrivers: Array[DriverInfo],
completedDrivers: Array[DriverInfo],
status: MasterState) {

Utils.checkHost(host, "Required hostname")
assert (port > 0)

def uri = "spark://" + host + ":" + port
def restUri: Option[String] = restPort.map { p => "spark://" + host + ":" + p }
}

// WorkerWebUI to Worker
Expand Down
Loading

0 comments on commit 45884ab

Please sign in to comment.