Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into ldaonline
Browse files Browse the repository at this point in the history
  • Loading branch information
hhbyyh committed Mar 20, 2015
2 parents 02d0373 + 0745a30 commit 62405cc
Show file tree
Hide file tree
Showing 250 changed files with 5,668 additions and 1,882 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ storage systems. Because the protocols have changed in different versions of
Hadoop, you must build Spark against the same version that your cluster runs.

Please refer to the build documentation at
["Specifying the Hadoop Version"](http://spark.apache.org/docs/latest/building-with-maven.html#specifying-the-hadoop-version)
["Specifying the Hadoop Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version)
for detailed guidance on building for a particular distribution of Hadoop, including
building for particular Hive and Hive Thriftserver distributions. See also
["Third Party Hadoop Distributions"](http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html)
Expand Down
10 changes: 0 additions & 10 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,6 @@
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
<filter>
<!-- Exclude libgfortran, libgcc for license issues -->
<artifact>org.jblas:jblas</artifact>
<excludes>
<!-- Linux amd64 is OK; not statically linked -->
<exclude>lib/static/Linux/i386/**</exclude>
<exclude>lib/static/Mac OS X/**</exclude>
<exclude>lib/static/Windows/**</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
Expand Down
1 change: 0 additions & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ export PYTHONSTARTUP="$SPARK_HOME/python/pyspark/shell.py"
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
export PYSPARK_SUBMIT_ARGS=pyspark-shell
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
exec "$PYSPARK_DRIVER_PYTHON" -m doctest $1
else
Expand Down
2 changes: 1 addition & 1 deletion conf/spark-env.sh.template
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# - SPARK_PUBLIC_DNS, to set the public DNS name of the driver program
# - SPARK_CLASSPATH, default classpath entries to append
# - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data
# - MESOS_NATIVE_LIBRARY, to point to your libmesos.so if you use Mesos
# - MESOS_NATIVE_JAVA_LIBRARY, to point to your libmesos.so if you use Mesos

# Options read in YARN client mode
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@
<overWriteIfNewer>true</overWriteIfNewer>
<useSubDirectoryPerType>true</useSubDirectoryPerType>
<includeArtifactIds>
guava,jetty-io,jetty-servlet,jetty-continuation,jetty-http,jetty-plus,jetty-util,jetty-server
guava,jetty-io,jetty-servlet,jetty-continuation,jetty-http,jetty-plus,jetty-util,jetty-server,jetty-security
</includeArtifactIds>
<silent>true</silent>
</configuration>
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}

/** Keep cleaning RDD, shuffle, and broadcast state. */
private def keepCleaning(): Unit = Utils.logUncaughtExceptions {
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
def addFile(path: String, recursive: Boolean): Unit = {
val uri = new URI(path)
val schemeCorrectedPath = uri.getScheme match {
case null | "local" => "file:" + uri.getPath
case null | "local" => new File(path).getCanonicalFile.toURI.toString
case _ => path
}

Expand All @@ -1104,7 +1104,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
if (!fs.exists(hadoopPath)) {
throw new FileNotFoundException(s"Added file $hadoopPath does not exist.")
}
val isDir = fs.isDirectory(hadoopPath)
val isDir = fs.getFileStatus(hadoopPath).isDir
if (!isLocal && scheme == "file" && isDir) {
throw new SparkException(s"addFile does not support local directories when not running " +
"local mode.")
Expand Down Expand Up @@ -1736,7 +1736,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}

listenerBus.start()
listenerBus.start(this)
}

/** Post the application start event */
Expand Down
6 changes: 1 addition & 5 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,7 @@ case object TaskKilled extends TaskFailedReason {
* Task requested the driver to commit, but was denied.
*/
@DeveloperApi
case class TaskCommitDenied(
jobID: Int,
partitionID: Int,
attemptID: Int)
extends TaskFailedReason {
case class TaskCommitDenied(jobID: Int, partitionID: Int, attemptID: Int) extends TaskFailedReason {
override def toErrorString: String = s"TaskCommitDenied (Driver denied task commit)" +
s" for job: $jobID, partition: $partitionID, attempt: $attemptID"
}
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/TaskState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,6 @@ private[spark] object TaskState extends Enumeration {
case MesosTaskState.TASK_FAILED => FAILED
case MesosTaskState.TASK_KILLED => KILLED
case MesosTaskState.TASK_LOST => LOST
case MesosTaskState.TASK_ERROR => LOST
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.StatCounter
import org.apache.spark.util.Utils

class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[JDouble, JavaDoubleRDD] {
class JavaDoubleRDD(val srdd: RDD[scala.Double])
extends AbstractJavaRDDLike[JDouble, JavaDoubleRDD] {

override val classTag: ClassTag[JDouble] = implicitly[ClassTag[JDouble]]

Expand Down
48 changes: 38 additions & 10 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ import org.apache.spark.api.java.function.{Function => JFunction, Function2 => J
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.rdd.{OrderedRDDFunctions, RDD}
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
(implicit val kClassTag: ClassTag[K], implicit val vClassTag: ClassTag[V])
extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
extends AbstractJavaRDDLike[(K, V), JavaPairRDD[K, V]] {

override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)

Expand Down Expand Up @@ -227,24 +228,51 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
* - `mergeCombiners`, to combine two C's into a single one.
*
* In addition, users can control the partitioning of the output RDD, and whether to perform
* map-side aggregation (if a mapper can produce multiple items with the same key).
* In addition, users can control the partitioning of the output RDD, the serializer that is use
* for the shuffle, and whether to perform map-side aggregation (if a mapper can produce multiple
* items with the same key).
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
partitioner: Partitioner): JavaPairRDD[K, C] = {
implicit val ctag: ClassTag[C] = fakeClassTag
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
partitioner: Partitioner,
mapSideCombine: Boolean,
serializer: Serializer): JavaPairRDD[K, C] = {
implicit val ctag: ClassTag[C] = fakeClassTag
fromRDD(rdd.combineByKey(
createCombiner,
mergeValue,
mergeCombiners,
partitioner
partitioner,
mapSideCombine,
serializer
))
}

/**
* Simplified version of combineByKey that hash-partitions the output RDD.
* Generic function to combine the elements for each key using a custom set of aggregation
* functions. Turns a JavaPairRDD[(K, V)] into a result of type JavaPairRDD[(K, C)], for a
* "combined type" C * Note that V and C can be different -- for example, one might group an
* RDD of type (Int, Int) into an RDD of type (Int, List[Int]). Users provide three
* functions:
*
* - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
* - `mergeCombiners`, to combine two C's into a single one.
*
* In addition, users can control the partitioning of the output RDD. This method automatically
* uses map-side aggregation in shuffling the RDD.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
partitioner: Partitioner): JavaPairRDD[K, C] = {
combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner, true, null)
}

/**
* Simplified version of combineByKey that hash-partitions the output RDD and uses map-side
* aggregation.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
Expand Down Expand Up @@ -488,7 +516,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])

/**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the existing
* partitioner/parallelism level.
* partitioner/parallelism level and using map-side aggregation.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
extends JavaRDDLike[T, JavaRDD[T]] {
extends AbstractJavaRDDLike[T, JavaRDD[T]] {

override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

/**
* As a workaround for https://issues.scala-lang.org/browse/SI-8905, implementations
* of JavaRDDLike should extend this dummy abstract class instead of directly inheriting
* from the trait. See SPARK-3266 for additional details.
*/
private[spark] abstract class AbstractJavaRDDLike[T, This <: JavaRDDLike[T, This]]
extends JavaRDDLike[T, This]

/**
* Defines operations common to several Java RDD implementations.
* Note that this trait is not intended to be implemented by user code.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ private[spark] class PythonRDD(

context.addTaskCompletionListener { context =>
writerThread.shutdownOnTaskCompletion()
writerThread.join()
if (!reuse_worker || !released) {
try {
worker.close()
Expand Down Expand Up @@ -248,13 +247,17 @@ private[spark] class PythonRDD(
} catch {
case e: Exception if context.isCompleted || context.isInterrupted =>
logDebug("Exception thrown after task completion (likely due to cleanup)", e)
Utils.tryLog(worker.shutdownOutput())
if (!worker.isClosed) {
Utils.tryLog(worker.shutdownOutput())
}

case e: Exception =>
// We must avoid throwing exceptions here, because the thread uncaught exception handler
// will kill the whole executor (see org.apache.spark.executor.Executor).
_exception = e
Utils.tryLog(worker.shutdownOutput())
if (!worker.isClosed) {
Utils.tryLog(worker.shutdownOutput())
}
} finally {
// Release memory used by this thread for shuffles
env.shuffleMemoryManager.releaseMemoryForThisThread()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.util.{IntParam, MemoryParam}
/**
* Command-line parser for the driver client.
*/
private[spark] class ClientArguments(args: Array[String]) {
private[deploy] class ClientArguments(args: Array[String]) {
import ClientArguments._

var cmd: String = "" // 'launch' or 'kill'
Expand Down Expand Up @@ -96,7 +96,7 @@ private[spark] class ClientArguments(args: Array[String]) {
/**
* Print usage and exit JVM with the given exit code.
*/
def printUsageAndExit(exitCode: Int) {
private def printUsageAndExit(exitCode: Int) {
// TODO: It wouldn't be too hard to allow users to submit their app and dependency jars
// separately similar to in the YARN client.
val usage =
Expand All @@ -116,10 +116,10 @@ private[spark] class ClientArguments(args: Array[String]) {
}
}

object ClientArguments {
private[spark] val DEFAULT_CORES = 1
private[spark] val DEFAULT_MEMORY = 512 // MB
private[spark] val DEFAULT_SUPERVISE = false
private[deploy] object ClientArguments {
val DEFAULT_CORES = 1
val DEFAULT_MEMORY = 512 // MB
val DEFAULT_SUPERVISE = false

def isValidJarUrl(s: String): Boolean = {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy

private[spark] class DriverDescription(
private[deploy] class DriverDescription(
val jarUrl: String,
val mem: Int,
val cores: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ package org.apache.spark.deploy
* This state is sufficient for the Master to reconstruct its internal data structures during
* failover.
*/
private[spark] class ExecutorDescription(
private[deploy] class ExecutorDescription(
val appId: String,
val execId: Int,
val cores: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy

private[spark] object ExecutorState extends Enumeration {
private[deploy] object ExecutorState extends Enumeration {

val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value

Expand Down
Loading

0 comments on commit 62405cc

Please sign in to comment.