Skip to content

Commit

Permalink
merged master
Browse files Browse the repository at this point in the history
  • Loading branch information
manishamde committed Jul 17, 2014
2 parents abf2901 + 1fcd5dc commit e1c970d
Show file tree
Hide file tree
Showing 249 changed files with 2,742 additions and 1,013 deletions.
11 changes: 9 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
# Apache Spark

Lightning-Fast Cluster Computing - <http://spark.apache.org/>
Spark is a fast and general cluster computing system for Big Data. It provides
high-level APIs in Scala, Java, and Python, and an optimized engine that
supports general computation graphs for data analysis. It also supports a
rich set of higher-level tools including Spark SQL for SQL and structured
data processing, MLLib for machine learning, GraphX for graph processing,
and Spark Streaming.

<http://spark.apache.org/>


## Online Documentation
Expand Down Expand Up @@ -81,7 +88,7 @@ versions without YARN, use:
$ sbt/sbt -Dhadoop.version=2.0.0-mr1-cdh4.2.0 assembly

For Apache Hadoop 2.2.X, 2.1.X, 2.0.X, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions
with YARN, also set `SPARK_YARN=true`:
with YARN, also set `-Pyarn`:

# Apache Hadoop 2.0.5-alpha
$ sbt/sbt -Dhadoop.version=2.0.5-alpha -Pyarn assembly
Expand Down
4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill_${scala.binary.version}</artifactId>
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ case class Aggregator[K, V, C] (
} else {
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
while (iter.hasNext) {
val (k, v) = iter.next()
combiners.insert(k, v)
val pair = iter.next()
combiners.insert(pair._1, pair._2)
}
// TODO: Make this non optional in a future release
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
Expand Down Expand Up @@ -85,8 +85,8 @@ case class Aggregator[K, V, C] (
} else {
val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
while (iter.hasNext) {
val (k, c) = iter.next()
combiners.insert(k, c)
val pair = iter.next()
combiners.insert(pair._1, pair._2)
}
// TODO: Make this non optional in a future release
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1531,7 +1531,16 @@ object SparkContext extends Logging {
throw new SparkException("YARN mode not available ?", e)
}
}
val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
val backend = try {
val clazz =
Class.forName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
scheduler.initialize(backend)
scheduler

Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ case class ExceptionFailure(
metrics: Option[TaskMetrics])
extends TaskFailedReason {
override def toErrorString: String = {
val stackTraceString = if (stackTrace == null) "null" else stackTrace.mkString("\n")
s"$className ($description}\n$stackTraceString"
val stackTraceString =
if (stackTrace == null) "null" else stackTrace.map(" " + _).mkString("\n")
s"$className ($description)\n$stackTraceString"
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ private[spark] object TestUtils {
def createCompiledClass(className: String, destDir: File, value: String = ""): File = {
val compiler = ToolProvider.getSystemJavaCompiler
val sourceFile = new JavaSourceFromString(className,
"public class " + className + " { @Override public String toString() { " +
"return \"" + value + "\";}}")
"public class " + className + " implements java.io.Serializable {" +
" @Override public String toString() { return \"" + value + "\"; }}")

// Calling this outputs a class file in pwd. It's easier to just rename the file than
// build a custom FileManager that controls the output location.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,23 @@ abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable {
* Actually get the broadcasted value. Concrete implementations of Broadcast class must
* define their own way to get the value.
*/
private[spark] def getValue(): T
protected def getValue(): T

/**
* Actually unpersist the broadcasted value on the executors. Concrete implementations of
* Broadcast class must define their own logic to unpersist their own data.
*/
private[spark] def doUnpersist(blocking: Boolean)
protected def doUnpersist(blocking: Boolean)

/**
* Actually destroy all data and metadata related to this broadcast variable.
* Implementation of Broadcast class must define their own logic to destroy their own
* state.
*/
private[spark] def doDestroy(blocking: Boolean)
protected def doDestroy(blocking: Boolean)

/** Check if this broadcast is valid. If not valid, exception is thrown. */
private[spark] def assertValid() {
protected def assertValid() {
if (!_isValid) {
throw new SparkException("Attempted to use %s after it has been destroyed!".format(toString))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private[spark] class BroadcastManager(
synchronized {
if (!initialized) {
val broadcastFactoryClass =
conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.HttpBroadcastFactory")
conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory")

broadcastFactory =
Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ private[spark] class HttpBroadcast[T: ClassTag](
@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id) with Logging with Serializable {

def getValue = value_
override protected def getValue() = value_

val blockId = BroadcastBlockId(id)
private val blockId = BroadcastBlockId(id)

/*
* Broadcasted data is also stored in the BlockManager of the driver. The BlockManagerMaster
Expand All @@ -60,14 +60,14 @@ private[spark] class HttpBroadcast[T: ClassTag](
/**
* Remove all persisted state associated with this HTTP broadcast on the executors.
*/
def doUnpersist(blocking: Boolean) {
override protected def doUnpersist(blocking: Boolean) {
HttpBroadcast.unpersist(id, removeFromDriver = false, blocking)
}

/**
* Remove all persisted state associated with this HTTP broadcast on the executors and driver.
*/
def doDestroy(blocking: Boolean) {
override protected def doDestroy(blocking: Boolean) {
HttpBroadcast.unpersist(id, removeFromDriver = true, blocking)
}

Expand Down Expand Up @@ -102,7 +102,7 @@ private[spark] class HttpBroadcast[T: ClassTag](
}
}

private[spark] object HttpBroadcast extends Logging {
private[broadcast] object HttpBroadcast extends Logging {
private var initialized = false
private var broadcastDir: File = null
private var compress: Boolean = false
Expand Down Expand Up @@ -160,7 +160,7 @@ private[spark] object HttpBroadcast extends Logging {

def getFile(id: Long) = new File(broadcastDir, BroadcastBlockId(id).name)

def write(id: Long, value: Any) {
private def write(id: Long, value: Any) {
val file = getFile(id)
val out: OutputStream = {
if (compress) {
Expand All @@ -176,7 +176,7 @@ private[spark] object HttpBroadcast extends Logging {
files += file
}

def read[T: ClassTag](id: Long): T = {
private def read[T: ClassTag](id: Long): T = {
logDebug("broadcast read server: " + serverUri + " id: broadcast-" + id)
val url = serverUri + "/" + BroadcastBlockId(id).name

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@ import org.apache.spark.{SecurityManager, SparkConf}
* [[org.apache.spark.broadcast.HttpBroadcast]] for more details about this mechanism.
*/
class HttpBroadcastFactory extends BroadcastFactory {
def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
HttpBroadcast.initialize(isDriver, conf, securityMgr)
}

def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
new HttpBroadcast[T](value_, isLocal, id)

def stop() { HttpBroadcast.stop() }
override def stop() { HttpBroadcast.stop() }

/**
* Remove all persisted state associated with the HTTP broadcast with the given ID.
* @param removeFromDriver Whether to remove state from the driver
* @param blocking Whether to block until unbroadcasted
*/
def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
override def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
HttpBroadcast.unpersist(id, removeFromDriver, blocking)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.broadcast
import java.io.{ByteArrayInputStream, ObjectInputStream, ObjectOutputStream}

import scala.reflect.ClassTag
import scala.math
import scala.util.Random

import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
Expand Down Expand Up @@ -49,19 +48,19 @@ private[spark] class TorrentBroadcast[T: ClassTag](
@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id) with Logging with Serializable {

def getValue = value_
override protected def getValue() = value_

val broadcastId = BroadcastBlockId(id)
private val broadcastId = BroadcastBlockId(id)

TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
}

@transient var arrayOfBlocks: Array[TorrentBlock] = null
@transient var totalBlocks = -1
@transient var totalBytes = -1
@transient var hasBlocks = 0
@transient private var arrayOfBlocks: Array[TorrentBlock] = null
@transient private var totalBlocks = -1
@transient private var totalBytes = -1
@transient private var hasBlocks = 0

if (!isLocal) {
sendBroadcast()
Expand All @@ -70,19 +69,19 @@ private[spark] class TorrentBroadcast[T: ClassTag](
/**
* Remove all persisted state associated with this Torrent broadcast on the executors.
*/
def doUnpersist(blocking: Boolean) {
override protected def doUnpersist(blocking: Boolean) {
TorrentBroadcast.unpersist(id, removeFromDriver = false, blocking)
}

/**
* Remove all persisted state associated with this Torrent broadcast on the executors
* and driver.
*/
def doDestroy(blocking: Boolean) {
override protected def doDestroy(blocking: Boolean) {
TorrentBroadcast.unpersist(id, removeFromDriver = true, blocking)
}

def sendBroadcast() {
private def sendBroadcast() {
val tInfo = TorrentBroadcast.blockifyObject(value_)
totalBlocks = tInfo.totalBlocks
totalBytes = tInfo.totalBytes
Expand Down Expand Up @@ -159,7 +158,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](
hasBlocks = 0
}

def receiveBroadcast(): Boolean = {
private def receiveBroadcast(): Boolean = {
// Receive meta-info about the size of broadcast data,
// the number of chunks it is divided into, etc.
val metaId = BroadcastBlockId(id, "meta")
Expand Down Expand Up @@ -211,7 +210,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](

}

private[spark] object TorrentBroadcast extends Logging {
private[broadcast] object TorrentBroadcast extends Logging {
private lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
private var initialized = false
private var conf: SparkConf = null
Expand Down Expand Up @@ -272,17 +271,19 @@ private[spark] object TorrentBroadcast extends Logging {
* Remove all persisted blocks associated with this torrent broadcast on the executors.
* If removeFromDriver is true, also remove these persisted blocks on the driver.
*/
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = synchronized {
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
synchronized {
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
}
}
}

private[spark] case class TorrentBlock(
private[broadcast] case class TorrentBlock(
blockID: Int,
byteArray: Array[Byte])
extends Serializable

private[spark] case class TorrentInfo(
private[broadcast] case class TorrentInfo(
@transient arrayOfBlocks: Array[TorrentBlock],
totalBlocks: Int,
totalBytes: Int)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,21 @@ import org.apache.spark.{SecurityManager, SparkConf}
*/
class TorrentBroadcastFactory extends BroadcastFactory {

def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
TorrentBroadcast.initialize(isDriver, conf)
}

def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
new TorrentBroadcast[T](value_, isLocal, id)

def stop() { TorrentBroadcast.stop() }
override def stop() { TorrentBroadcast.stop() }

/**
* Remove all persisted state associated with the torrent broadcast with the given ID.
* @param removeFromDriver Whether to remove state from the driver.
* @param blocking Whether to block until unbroadcasted
*/
def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
override def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
TorrentBroadcast.unpersist(id, removeFromDriver, blocking)
}
}
Loading

0 comments on commit e1c970d

Please sign in to comment.