Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into structfield-metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Oct 23, 2014
2 parents 1e2abcf + 83b7a1c commit 589f314
Show file tree
Hide file tree
Showing 62 changed files with 826 additions and 320 deletions.
4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@
<groupId>com.twitter</groupId>
<artifactId>chill-java</artifactId>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
val computedValues = rdd.computeOrReadCheckpoint(partition, context)

// If the task is running locally, do not persist the result
if (context.runningLocally) {
if (context.isRunningLocally) {
return computedValues
}

Expand Down
17 changes: 16 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark

import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import scala.collection.mutable.{HashMap, LinkedHashSet}
import org.apache.spark.serializer.KryoSerializer

/**
* Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
Expand Down Expand Up @@ -140,6 +141,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
this
}

/**
* Use Kryo serialization and register the given set of classes with Kryo.
* If called multiple times, this will append the classes from all calls together.
*/
def registerKryoClasses(classes: Array[Class[_]]): SparkConf = {
val allClassNames = new LinkedHashSet[String]()
allClassNames ++= get("spark.kryo.classesToRegister", "").split(',').filter(!_.isEmpty)
allClassNames ++= classes.map(_.getName)

set("spark.kryo.classesToRegister", allClassNames.mkString(","))
set("spark.serializer", classOf[KryoSerializer].getName)
this
}

/** Remove a parameter from the configuration */
def remove(key: String): SparkConf = {
settings.remove(key)
Expand Down
136 changes: 65 additions & 71 deletions core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.util.Random

import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
import org.apache.spark.util.ByteBufferInputStream
import org.apache.spark.util.io.ByteArrayChunkOutputStream
Expand All @@ -46,14 +47,12 @@ import org.apache.spark.util.io.ByteArrayChunkOutputStream
* This prevents the driver from being the bottleneck in sending out multiple copies of the
* broadcast data (one per executor) as done by the [[org.apache.spark.broadcast.HttpBroadcast]].
*
* When initialized, TorrentBroadcast objects read SparkEnv.get.conf.
*
* @param obj object to broadcast
* @param isLocal whether Spark is running in local mode (single JVM process).
* @param id A unique identifier for the broadcast variable.
*/
private[spark] class TorrentBroadcast[T: ClassTag](
obj : T,
@transient private val isLocal: Boolean,
id: Long)
private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
extends Broadcast[T](id) with Logging with Serializable {

/**
Expand All @@ -62,6 +61,20 @@ private[spark] class TorrentBroadcast[T: ClassTag](
* blocks from the driver and/or other executors.
*/
@transient private var _value: T = obj
/** The compression codec to use, or None if compression is disabled */
@transient private var compressionCodec: Option[CompressionCodec] = _
/** Size of each block. Default value is 4MB. This value is only read by the broadcaster. */
@transient private var blockSize: Int = _

private def setConf(conf: SparkConf) {
compressionCodec = if (conf.getBoolean("spark.broadcast.compress", true)) {
Some(CompressionCodec.createCodec(conf))
} else {
None
}
blockSize = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
}
setConf(SparkEnv.get.conf)

private val broadcastId = BroadcastBlockId(id)

Expand All @@ -76,23 +89,20 @@ private[spark] class TorrentBroadcast[T: ClassTag](
* @return number of blocks this broadcast variable is divided into
*/
private def writeBlocks(): Int = {
// For local mode, just put the object in the BlockManager so we can find it later.
SparkEnv.get.blockManager.putSingle(
broadcastId, _value, StorageLevel.MEMORY_AND_DISK, tellMaster = false)

if (!isLocal) {
val blocks = TorrentBroadcast.blockifyObject(_value)
blocks.zipWithIndex.foreach { case (block, i) =>
SparkEnv.get.blockManager.putBytes(
BroadcastBlockId(id, "piece" + i),
block,
StorageLevel.MEMORY_AND_DISK_SER,
tellMaster = true)
}
blocks.length
} else {
0
// Store a copy of the broadcast variable in the driver so that tasks run on the driver
// do not create a duplicate copy of the broadcast variable's value.
SparkEnv.get.blockManager.putSingle(broadcastId, _value, StorageLevel.MEMORY_AND_DISK,
tellMaster = false)
val blocks =
TorrentBroadcast.blockifyObject(_value, blockSize, SparkEnv.get.serializer, compressionCodec)
blocks.zipWithIndex.foreach { case (block, i) =>
SparkEnv.get.blockManager.putBytes(
BroadcastBlockId(id, "piece" + i),
block,
StorageLevel.MEMORY_AND_DISK_SER,
tellMaster = true)
}
blocks.length
}

/** Fetch torrent blocks from the driver and/or other executors. */
Expand All @@ -104,29 +114,24 @@ private[spark] class TorrentBroadcast[T: ClassTag](

for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
val pieceId = BroadcastBlockId(id, "piece" + pid)

// First try getLocalBytes because there is a chance that previous attempts to fetch the
logDebug(s"Reading piece $pieceId of $broadcastId")
// First try getLocalBytes because there is a chance that previous attempts to fetch the
// broadcast blocks have already fetched some of the blocks. In that case, some blocks
// would be available locally (on this executor).
var blockOpt = bm.getLocalBytes(pieceId)
if (!blockOpt.isDefined) {
blockOpt = bm.getRemoteBytes(pieceId)
blockOpt match {
case Some(block) =>
// If we found the block from remote executors/driver's BlockManager, put the block
// in this executor's BlockManager.
SparkEnv.get.blockManager.putBytes(
pieceId,
block,
StorageLevel.MEMORY_AND_DISK_SER,
tellMaster = true)

case None =>
throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
}
def getLocal: Option[ByteBuffer] = bm.getLocalBytes(pieceId)
def getRemote: Option[ByteBuffer] = bm.getRemoteBytes(pieceId).map { block =>
// If we found the block from remote executors/driver's BlockManager, put the block
// in this executor's BlockManager.
SparkEnv.get.blockManager.putBytes(
pieceId,
block,
StorageLevel.MEMORY_AND_DISK_SER,
tellMaster = true)
block
}
// If we get here, the option is defined.
blocks(pid) = blockOpt.get
val block: ByteBuffer = getLocal.orElse(getRemote).getOrElse(
throw new SparkException(s"Failed to get $pieceId of $broadcastId"))
blocks(pid) = block
}
blocks
}
Expand Down Expand Up @@ -156,6 +161,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](
private def readObject(in: ObjectInputStream) {
in.defaultReadObject()
TorrentBroadcast.synchronized {
setConf(SparkEnv.get.conf)
SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next()) match {
case Some(x) =>
_value = x.asInstanceOf[T]
Expand All @@ -167,7 +173,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](
val time = (System.nanoTime() - start) / 1e9
logInfo("Reading broadcast variable " + id + " took " + time + " s")

_value = TorrentBroadcast.unBlockifyObject[T](blocks)
_value =
TorrentBroadcast.unBlockifyObject[T](blocks, SparkEnv.get.serializer, compressionCodec)
// Store the merged copy in BlockManager so other tasks on this executor don't
// need to re-fetch it.
SparkEnv.get.blockManager.putSingle(
Expand All @@ -179,43 +186,29 @@ private[spark] class TorrentBroadcast[T: ClassTag](


private object TorrentBroadcast extends Logging {
/** Size of each block. Default value is 4MB. */
private lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
private var initialized = false
private var conf: SparkConf = null
private var compress: Boolean = false
private var compressionCodec: CompressionCodec = null

def initialize(_isDriver: Boolean, conf: SparkConf) {
TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests
synchronized {
if (!initialized) {
compress = conf.getBoolean("spark.broadcast.compress", true)
compressionCodec = CompressionCodec.createCodec(conf)
initialized = true
}
}
}

def stop() {
initialized = false
}

def blockifyObject[T: ClassTag](obj: T): Array[ByteBuffer] = {
val bos = new ByteArrayChunkOutputStream(BLOCK_SIZE)
val out: OutputStream = if (compress) compressionCodec.compressedOutputStream(bos) else bos
val ser = SparkEnv.get.serializer.newInstance()
def blockifyObject[T: ClassTag](
obj: T,
blockSize: Int,
serializer: Serializer,
compressionCodec: Option[CompressionCodec]): Array[ByteBuffer] = {
val bos = new ByteArrayChunkOutputStream(blockSize)
val out: OutputStream = compressionCodec.map(c => c.compressedOutputStream(bos)).getOrElse(bos)
val ser = serializer.newInstance()
val serOut = ser.serializeStream(out)
serOut.writeObject[T](obj).close()
bos.toArrays.map(ByteBuffer.wrap)
}

def unBlockifyObject[T: ClassTag](blocks: Array[ByteBuffer]): T = {
def unBlockifyObject[T: ClassTag](
blocks: Array[ByteBuffer],
serializer: Serializer,
compressionCodec: Option[CompressionCodec]): T = {
require(blocks.nonEmpty, "Cannot unblockify an empty array of blocks")
val is = new SequenceInputStream(
asJavaEnumeration(blocks.iterator.map(block => new ByteBufferInputStream(block))))
val in: InputStream = if (compress) compressionCodec.compressedInputStream(is) else is

val ser = SparkEnv.get.serializer.newInstance()
val in: InputStream = compressionCodec.map(c => c.compressedInputStream(is)).getOrElse(is)
val ser = serializer.newInstance()
val serIn = ser.deserializeStream(in)
val obj = serIn.readObject[T]()
serIn.close()
Expand All @@ -227,6 +220,7 @@ private object TorrentBroadcast extends Logging {
* If removeFromDriver is true, also remove these persisted blocks on the driver.
*/
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
logDebug(s"Unpersisting TorrentBroadcast $id")
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@ import org.apache.spark.{SecurityManager, SparkConf}
*/
class TorrentBroadcastFactory extends BroadcastFactory {

override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
TorrentBroadcast.initialize(isDriver, conf)
}
override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) { }

override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
new TorrentBroadcast[T](value_, isLocal, id)
override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) = {
new TorrentBroadcast[T](value_, id)
}

override def stop() { TorrentBroadcast.stop() }
override def stop() { }

/**
* Remove all persisted state associated with the torrent broadcast with the given ID.
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,15 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
@deprecated("Use reduceByKeyLocally", "1.0.0")
def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = reduceByKeyLocally(func)

/** Count the number of elements for each key, and return the result to the master as a Map. */
def countByKey(): Map[K, Long] = self.map(_._1).countByValue()
/**
* Count the number of elements for each key, collecting the results to a local Map.
*
* Note that this method should only be used if the resulting map is expected to be small, as
* the whole thing is loaded into the driver's memory.
* To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which
* returns an RDD[T, Long] instead of a map.
*/
def countByKey(): Map[K, Long] = self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap

/**
* :: Experimental ::
Expand Down
31 changes: 7 additions & 24 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -927,32 +927,15 @@ abstract class RDD[T: ClassTag](
}

/**
* Return the count of each unique value in this RDD as a map of (value, count) pairs. The final
* combine step happens locally on the master, equivalent to running a single reduce task.
* Return the count of each unique value in this RDD as a local map of (value, count) pairs.
*
* Note that this method should only be used if the resulting map is expected to be small, as
* the whole thing is loaded into the driver's memory.
* To handle very large results, consider using rdd.map(x => (x, 1L)).reduceByKey(_ + _), which
* returns an RDD[T, Long] instead of a map.
*/
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = {
if (elementClassTag.runtimeClass.isArray) {
throw new SparkException("countByValue() does not support arrays")
}
// TODO: This should perhaps be distributed by default.
val countPartition = (iter: Iterator[T]) => {
val map = new OpenHashMap[T,Long]
iter.foreach {
t => map.changeValue(t, 1L, _ + 1L)
}
Iterator(map)
}: Iterator[OpenHashMap[T,Long]]
val mergeMaps = (m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]) => {
m2.foreach { case (key, value) =>
m1.changeValue(key, value, _ + value)
}
m1
}: OpenHashMap[T,Long]
val myResult = mapPartitions(countPartition).reduce(mergeMaps)
// Convert to a Scala mutable map
val mutableResult = scala.collection.mutable.Map[T,Long]()
myResult.foreach { case (k, v) => mutableResult.put(k, v) }
mutableResult
map(value => (value, null)).countByKey()
}

/**
Expand Down
Loading

0 comments on commit 589f314

Please sign in to comment.