Skip to content

Commit

Permalink
Merge pull request apache#5 from apache/master
Browse files Browse the repository at this point in the history
merge upstream updates
  • Loading branch information
nchammas committed Jul 20, 2014
2 parents f7e4581 + 98ab411 commit 4dd148f
Show file tree
Hide file tree
Showing 332 changed files with 5,995 additions and 2,312 deletions.
24 changes: 15 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
# Apache Spark

Lightning-Fast Cluster Computing - <http://spark.apache.org/>
Spark is a fast and general cluster computing system for Big Data. It provides
high-level APIs in Scala, Java, and Python, and an optimized engine that
supports general computation graphs for data analysis. It also supports a
rich set of higher-level tools including Spark SQL for SQL and structured
data processing, MLLib for machine learning, GraphX for graph processing,
and Spark Streaming.

<http://spark.apache.org/>


## Online Documentation
Expand Down Expand Up @@ -69,29 +76,28 @@ can be run using:
Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported
storage systems. Because the protocols have changed in different versions of
Hadoop, you must build Spark against the same version that your cluster runs.
You can change the version by setting the `SPARK_HADOOP_VERSION` environment
when building Spark.
You can change the version by setting `-Dhadoop.version` when building Spark.

For Apache Hadoop versions 1.x, Cloudera CDH MRv1, and other Hadoop
versions without YARN, use:

# Apache Hadoop 1.2.1
$ SPARK_HADOOP_VERSION=1.2.1 sbt/sbt assembly
$ sbt/sbt -Dhadoop.version=1.2.1 assembly

# Cloudera CDH 4.2.0 with MapReduce v1
$ SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.2.0 sbt/sbt assembly
$ sbt/sbt -Dhadoop.version=2.0.0-mr1-cdh4.2.0 assembly

For Apache Hadoop 2.2.X, 2.1.X, 2.0.X, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions
with YARN, also set `SPARK_YARN=true`:
with YARN, also set `-Pyarn`:

# Apache Hadoop 2.0.5-alpha
$ SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt assembly
$ sbt/sbt -Dhadoop.version=2.0.5-alpha -Pyarn assembly

# Cloudera CDH 4.2.0 with MapReduce v2
$ SPARK_HADOOP_VERSION=2.0.0-cdh4.2.0 SPARK_YARN=true sbt/sbt assembly
$ sbt/sbt -Dhadoop.version=2.0.0-cdh4.2.0 -Pyarn assembly

# Apache Hadoop 2.2.X and newer
$ SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true sbt/sbt assembly
$ sbt/sbt -Dhadoop.version=2.2.0 -Pyarn assembly

When developing a Spark application, specify the Hadoop version by adding the
"hadoop-client" artifact to your project's dependencies. For example, if you're
Expand Down
1 change: 1 addition & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
<packaging>pom</packaging>

<properties>
<sbt.project.name>assembly</sbt.project.name>
<spark.jar.dir>scala-${scala.binary.version}</spark.jar.dir>
<spark.jar.basename>spark-assembly-${project.version}-hadoop${hadoop.version}.jar</spark.jar.basename>
<spark.jar>${project.build.directory}/${spark.jar.dir}/${spark.jar.basename}</spark.jar>
Expand Down
3 changes: 3 additions & 0 deletions bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@

<groupId>org.apache.spark</groupId>
<artifactId>spark-bagel_2.10</artifactId>
<properties>
<sbt.project.name>bagel</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project Bagel</name>
<url>http://spark.apache.org/</url>
Expand Down
4 changes: 2 additions & 2 deletions bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ export JAVA_OPTS

TOOLS_DIR="$FWDIR"/tools
SPARK_TOOLS_JAR=""
if [ -e "$TOOLS_DIR"/target/scala-$SCALA_VERSION/*assembly*[0-9Tg].jar ]; then
if [ -e "$TOOLS_DIR"/target/scala-$SCALA_VERSION/spark-tools*[0-9Tg].jar ]; then
# Use the JAR from the SBT build
export SPARK_TOOLS_JAR=`ls "$TOOLS_DIR"/target/scala-$SCALA_VERSION/*assembly*[0-9Tg].jar`
export SPARK_TOOLS_JAR=`ls "$TOOLS_DIR"/target/scala-$SCALA_VERSION/spark-tools*[0-9Tg].jar`
fi
if [ -e "$TOOLS_DIR"/target/spark-tools*[0-9Tg].jar ]; then
# Use the JAR from the Maven build
Expand Down
7 changes: 7 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@

<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<properties>
<sbt.project.name>core</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project Core</name>
<url>http://spark.apache.org/</url>
Expand Down Expand Up @@ -111,6 +114,10 @@
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill_${scala.binary.version}</artifactId>
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ table.sortable thead {

span.kill-link {
margin-right: 2px;
margin-left: 20px;
color: gray;
float: right;
}

span.kill-link a {
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ case class Aggregator[K, V, C] (
} else {
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
while (iter.hasNext) {
val (k, v) = iter.next()
combiners.insert(k, v)
val pair = iter.next()
combiners.insert(pair._1, pair._2)
}
// TODO: Make this non optional in a future release
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
Expand Down Expand Up @@ -85,8 +85,8 @@ case class Aggregator[K, V, C] (
} else {
val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
while (iter.hasNext) {
val (k, c) = iter.next()
combiners.insert(k, c)
val pair = iter.next()
combiners.insert(pair._1, pair._2)
}
// TODO: Make this non optional in a future release
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1531,7 +1531,16 @@ object SparkContext extends Logging {
throw new SparkException("YARN mode not available ?", e)
}
}
val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
val backend = try {
val clazz =
Class.forName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
scheduler.initialize(backend)
scheduler

Expand Down
6 changes: 2 additions & 4 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -88,10 +89,7 @@ case class ExceptionFailure(
stackTrace: Array[StackTraceElement],
metrics: Option[TaskMetrics])
extends TaskFailedReason {
override def toErrorString: String = {
val stackTraceString = if (stackTrace == null) "null" else stackTrace.mkString("\n")
s"$className ($description}\n$stackTraceString"
}
override def toErrorString: String = Utils.exceptionString(className, description, stackTrace)
}

/**
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ private[spark] object TestUtils {
def createCompiledClass(className: String, destDir: File, value: String = ""): File = {
val compiler = ToolProvider.getSystemJavaCompiler
val sourceFile = new JavaSourceFromString(className,
"public class " + className + " { @Override public String toString() { " +
"return \"" + value + "\";}}")
"public class " + className + " implements java.io.Serializable {" +
" @Override public String toString() { return \"" + value + "\"; }}")

// Calling this outputs a class file in pwd. It's easier to just rename the file than
// build a custom FileManager that controls the output location.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,23 @@ abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable {
* Actually get the broadcasted value. Concrete implementations of Broadcast class must
* define their own way to get the value.
*/
private[spark] def getValue(): T
protected def getValue(): T

/**
* Actually unpersist the broadcasted value on the executors. Concrete implementations of
* Broadcast class must define their own logic to unpersist their own data.
*/
private[spark] def doUnpersist(blocking: Boolean)
protected def doUnpersist(blocking: Boolean)

/**
* Actually destroy all data and metadata related to this broadcast variable.
* Implementation of Broadcast class must define their own logic to destroy their own
* state.
*/
private[spark] def doDestroy(blocking: Boolean)
protected def doDestroy(blocking: Boolean)

/** Check if this broadcast is valid. If not valid, exception is thrown. */
private[spark] def assertValid() {
protected def assertValid() {
if (!_isValid) {
throw new SparkException("Attempted to use %s after it has been destroyed!".format(toString))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private[spark] class BroadcastManager(
synchronized {
if (!initialized) {
val broadcastFactoryClass =
conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.HttpBroadcastFactory")
conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory")

broadcastFactory =
Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ private[spark] class HttpBroadcast[T: ClassTag](
@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id) with Logging with Serializable {

def getValue = value_
override protected def getValue() = value_

val blockId = BroadcastBlockId(id)
private val blockId = BroadcastBlockId(id)

/*
* Broadcasted data is also stored in the BlockManager of the driver. The BlockManagerMaster
Expand All @@ -60,14 +60,14 @@ private[spark] class HttpBroadcast[T: ClassTag](
/**
* Remove all persisted state associated with this HTTP broadcast on the executors.
*/
def doUnpersist(blocking: Boolean) {
override protected def doUnpersist(blocking: Boolean) {
HttpBroadcast.unpersist(id, removeFromDriver = false, blocking)
}

/**
* Remove all persisted state associated with this HTTP broadcast on the executors and driver.
*/
def doDestroy(blocking: Boolean) {
override protected def doDestroy(blocking: Boolean) {
HttpBroadcast.unpersist(id, removeFromDriver = true, blocking)
}

Expand Down Expand Up @@ -102,7 +102,7 @@ private[spark] class HttpBroadcast[T: ClassTag](
}
}

private[spark] object HttpBroadcast extends Logging {
private[broadcast] object HttpBroadcast extends Logging {
private var initialized = false
private var broadcastDir: File = null
private var compress: Boolean = false
Expand Down Expand Up @@ -160,7 +160,7 @@ private[spark] object HttpBroadcast extends Logging {

def getFile(id: Long) = new File(broadcastDir, BroadcastBlockId(id).name)

def write(id: Long, value: Any) {
private def write(id: Long, value: Any) {
val file = getFile(id)
val out: OutputStream = {
if (compress) {
Expand All @@ -176,7 +176,7 @@ private[spark] object HttpBroadcast extends Logging {
files += file
}

def read[T: ClassTag](id: Long): T = {
private def read[T: ClassTag](id: Long): T = {
logDebug("broadcast read server: " + serverUri + " id: broadcast-" + id)
val url = serverUri + "/" + BroadcastBlockId(id).name

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@ import org.apache.spark.{SecurityManager, SparkConf}
* [[org.apache.spark.broadcast.HttpBroadcast]] for more details about this mechanism.
*/
class HttpBroadcastFactory extends BroadcastFactory {
def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
HttpBroadcast.initialize(isDriver, conf, securityMgr)
}

def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
new HttpBroadcast[T](value_, isLocal, id)

def stop() { HttpBroadcast.stop() }
override def stop() { HttpBroadcast.stop() }

/**
* Remove all persisted state associated with the HTTP broadcast with the given ID.
* @param removeFromDriver Whether to remove state from the driver
* @param blocking Whether to block until unbroadcasted
*/
def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
override def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
HttpBroadcast.unpersist(id, removeFromDriver, blocking)
}
}
Loading

0 comments on commit 4dd148f

Please sign in to comment.