Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into viz2
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Apr 28, 2015
2 parents aa868a9 + 62888a4 commit 4310271
Show file tree
Hide file tree
Showing 24 changed files with 242 additions and 68 deletions.
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ exportMethods(
"unpersist",
"value",
"values",
"zipPartitions",
"zipRDD",
"zipWithIndex",
"zipWithUniqueId"
Expand Down
51 changes: 51 additions & 0 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ setMethod("initialize", "RDD", function(.Object, jrdd, serializedMode,
.Object
})

setMethod("show", "RDD",
function(.Object) {
cat(paste(callJMethod(.Object@jrdd, "toString"), "\n", sep=""))
})

setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) {
.Object@env <- new.env()
.Object@env$isCached <- FALSE
Expand Down Expand Up @@ -1590,3 +1595,49 @@ setMethod("intersection",

keys(filterRDD(cogroup(rdd1, rdd2, numPartitions = numPartitions), filterFunction))
})

#' Zips an RDD's partitions with one (or more) RDD(s).
#' Same as zipPartitions in Spark.
#'
#' @param ... RDDs to be zipped.
#' @param func A function to transform zipped partitions.
#' @return A new RDD by applying a function to the zipped partitions.
#' Assumes that all the RDDs have the *same number of partitions*, but
#' does *not* require them to have the same number of elements in each partition.
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
#' rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
#' rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
#' collect(zipPartitions(rdd1, rdd2, rdd3,
#' func = function(x, y, z) { list(list(x, y, z))} ))
#' # list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6)))
#'}
#' @rdname zipRDD
#' @aliases zipPartitions,RDD
setMethod("zipPartitions",
"RDD",
function(..., func) {
rrdds <- list(...)
if (length(rrdds) == 1) {
return(rrdds[[1]])
}
nPart <- sapply(rrdds, numPartitions)
if (length(unique(nPart)) != 1) {
stop("Can only zipPartitions RDDs which have the same number of partitions.")
}

rrdds <- lapply(rrdds, function(rdd) {
mapPartitionsWithIndex(rdd, function(partIndex, part) {
print(length(part))
list(list(partIndex, part))
})
})
union.rdd <- Reduce(unionRDD, rrdds)
zipped.rdd <- values(groupByKey(union.rdd, numPartitions = nPart[1]))
res <- mapPartitions(zipped.rdd, function(plist) {
do.call(func, plist[[1]])
})
res
})
5 changes: 5 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") })
#' @export
setGeneric("zipRDD", function(x, other) { standardGeneric("zipRDD") })

#' @rdname zipRDD
#' @export
setGeneric("zipPartitions", function(..., func) { standardGeneric("zipPartitions") },
signature = "...")

#' @rdname zipWithIndex
#' @seealso zipWithUniqueId
#' @export
Expand Down
33 changes: 33 additions & 0 deletions R/pkg/inst/tests/test_binary_function.R
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,36 @@ test_that("cogroup on two RDDs", {
expect_equal(sortKeyValueList(actual),
sortKeyValueList(expected))
})

test_that("zipPartitions() on RDDs", {
rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
actual <- collect(zipPartitions(rdd1, rdd2, rdd3,
func = function(x, y, z) { list(list(x, y, z))} ))
expect_equal(actual,
list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6))))

mockFile = c("Spark is pretty.", "Spark is awesome.")
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
writeLines(mockFile, fileName)

rdd <- textFile(sc, fileName, 1)
actual <- collect(zipPartitions(rdd, rdd,
func = function(x, y) { list(paste(x, y, sep = "\n")) }))
expected <- list(paste(mockFile, mockFile, sep = "\n"))
expect_equal(actual, expected)

rdd1 <- parallelize(sc, 0:1, 1)
actual <- collect(zipPartitions(rdd1, rdd,
func = function(x, y) { list(x + nchar(y)) }))
expected <- list(0:1 + nchar(mockFile))
expect_equal(actual, expected)

rdd <- map(rdd, function(x) { x })
actual <- collect(zipPartitions(rdd, rdd1,
func = function(x, y) { list(y + nchar(x)) }))
expect_equal(actual, expected)

unlink(fileName)
})
5 changes: 5 additions & 0 deletions R/pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,11 @@ test_that("collectAsMap() on a pairwise RDD", {
expect_equal(vals, list(`1` = "a", `2` = "b"))
})

test_that("show()", {
rdd <- parallelize(sc, list(1:10))
expect_output(show(rdd), "ParallelCollectionRDD\\[\\d+\\] at parallelize at RRDD\\.scala:\\d+")
})

test_that("sampleByKey() on pairwise RDDs", {
rdd <- parallelize(sc, 1:2000)
pairsRDD <- lapply(rdd, function(x) { if (x %% 2 == 0) list("a", x) else list("b", x) })
Expand Down
40 changes: 26 additions & 14 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private var _listenerBusStarted: Boolean = false
private var _jars: Seq[String] = _
private var _files: Seq[String] = _
private var _shutdownHookRef: AnyRef = _

/* ------------------------------------------------------------------------------------- *
| Accessors and public fields. These provide access to the internal state of the |
Expand Down Expand Up @@ -517,6 +518,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_taskScheduler.postStartHook()
_env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler))
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))

// Make sure the context is stopped if the user forgets about it. This avoids leaving
// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
// is killed, though.
_shutdownHookRef = Utils.addShutdownHook(Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
logInfo("Invoking stop() from shutdown hook")
stop()
}
} catch {
case NonFatal(e) =>
logError("Error initializing SparkContext.", e)
Expand Down Expand Up @@ -1065,7 +1074,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/** Build the union of a list of RDDs. */
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = withRDDScope {
val partitioners = rdds.flatMap(_.partitioner).toSet
if (partitioners.size == 1) {
if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) {
new PartitionerAwareUnionRDD(this, rdds)
} else {
new UnionRDD(this, rdds)
Expand Down Expand Up @@ -1492,6 +1501,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
logInfo("SparkContext already stopped.")
return
}
if (_shutdownHookRef != null) {
Utils.removeShutdownHook(_shutdownHookRef)
}

postApplicationEnd()
_ui.foreach(_.stop())
Expand Down Expand Up @@ -1902,7 +1914,7 @@ object SparkContext extends Logging {
*
* Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK.
*/
private val activeContext: AtomicReference[SparkContext] =
private val activeContext: AtomicReference[SparkContext] =
new AtomicReference[SparkContext](null)

/**
Expand Down Expand Up @@ -1955,11 +1967,11 @@ object SparkContext extends Logging {
}

/**
* This function may be used to get or instantiate a SparkContext and register it as a
* singleton object. Because we can only have one active SparkContext per JVM,
* this is useful when applications may wish to share a SparkContext.
* This function may be used to get or instantiate a SparkContext and register it as a
* singleton object. Because we can only have one active SparkContext per JVM,
* this is useful when applications may wish to share a SparkContext.
*
* Note: This function cannot be used to create multiple SparkContext instances
* Note: This function cannot be used to create multiple SparkContext instances
* even if multiple contexts are allowed.
*/
def getOrCreate(config: SparkConf): SparkContext = {
Expand All @@ -1972,17 +1984,17 @@ object SparkContext extends Logging {
activeContext.get()
}
}

/**
* This function may be used to get or instantiate a SparkContext and register it as a
* singleton object. Because we can only have one active SparkContext per JVM,
* This function may be used to get or instantiate a SparkContext and register it as a
* singleton object. Because we can only have one active SparkContext per JVM,
* this is useful when applications may wish to share a SparkContext.
*
*
* This method allows not passing a SparkConf (useful if just retrieving).
*
* Note: This function cannot be used to create multiple SparkContext instances
* even if multiple contexts are allowed.
*/
*
* Note: This function cannot be used to create multiple SparkContext instances
* even if multiple contexts are allowed.
*/
def getOrCreate(): SparkContext = {
getOrCreate(new SparkConf())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class PartitionerAwareUnionRDD[T: ClassTag](
var rdds: Seq[RDD[T]]
) extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) {
require(rdds.length > 0)
require(rdds.forall(_.partitioner.isDefined))
require(rdds.flatMap(_.partitioner).toSet.size == 1,
"Parent RDDs have different partitioners: " + rdds.flatMap(_.partitioner))

Expand Down
45 changes: 30 additions & 15 deletions core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private[spark] object SizeEstimator extends Logging {
}

// Estimate the size of arrays larger than ARRAY_SIZE_FOR_SAMPLING by sampling.
private val ARRAY_SIZE_FOR_SAMPLING = 200
private val ARRAY_SIZE_FOR_SAMPLING = 400
private val ARRAY_SAMPLE_SIZE = 100 // should be lower than ARRAY_SIZE_FOR_SAMPLING

private def visitArray(array: AnyRef, arrayClass: Class[_], state: SearchState) {
Expand All @@ -204,25 +204,40 @@ private[spark] object SizeEstimator extends Logging {
}
} else {
// Estimate the size of a large array by sampling elements without replacement.
var size = 0.0
// To exclude the shared objects that the array elements may link, sample twice
// and use the min one to caculate array size.
val rand = new Random(42)
val drawn = new OpenHashSet[Int](ARRAY_SAMPLE_SIZE)
var numElementsDrawn = 0
while (numElementsDrawn < ARRAY_SAMPLE_SIZE) {
var index = 0
do {
index = rand.nextInt(length)
} while (drawn.contains(index))
drawn.add(index)
val elem = ScalaRunTime.array_apply(array, index).asInstanceOf[AnyRef]
size += SizeEstimator.estimate(elem, state.visited)
numElementsDrawn += 1
}
state.size += ((length / (ARRAY_SAMPLE_SIZE * 1.0)) * size).toLong
val drawn = new OpenHashSet[Int](2 * ARRAY_SAMPLE_SIZE)
val s1 = sampleArray(array, state, rand, drawn, length)
val s2 = sampleArray(array, state, rand, drawn, length)
val size = math.min(s1, s2)
state.size += math.max(s1, s2) +
(size * ((length - ARRAY_SAMPLE_SIZE) / (ARRAY_SAMPLE_SIZE))).toLong
}
}
}

private def sampleArray(
array: AnyRef,
state: SearchState,
rand: Random,
drawn: OpenHashSet[Int],
length: Int): Long = {
var size = 0L
for (i <- 0 until ARRAY_SAMPLE_SIZE) {
var index = 0
do {
index = rand.nextInt(length)
} while (drawn.contains(index))
drawn.add(index)
val obj = ScalaRunTime.array_apply(array, index).asInstanceOf[AnyRef]
if (obj != null) {
size += SizeEstimator.estimate(obj, state.visited).toLong
}
}
size
}

private def primitiveSize(cls: Class[_]): Long = {
if (cls == classOf[Byte]) {
BYTE_SIZE
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ private[spark] object Utils extends Logging {

val DEFAULT_SHUTDOWN_PRIORITY = 100

/**
* The shutdown priority of the SparkContext instance. This is lower than the default
* priority, so that by default hooks are run before the context is shut down.
*/
val SPARK_CONTEXT_SHUTDOWN_PRIORITY = 50

private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
@volatile private var localRootDirs: Array[String] = null

Expand Down Expand Up @@ -2116,7 +2122,7 @@ private[spark] object Utils extends Logging {
* @return A handle that can be used to unregister the shutdown hook.
*/
def addShutdownHook(hook: () => Unit): AnyRef = {
addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY, hook)
addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY)(hook)
}

/**
Expand All @@ -2126,7 +2132,7 @@ private[spark] object Utils extends Logging {
* @param hook The code to run during shutdown.
* @return A handle that can be used to unregister the shutdown hook.
*/
def addShutdownHook(priority: Int, hook: () => Unit): AnyRef = {
def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
shutdownHooks.add(priority, hook)
}

Expand Down
7 changes: 3 additions & 4 deletions core/src/test/scala/org/apache/spark/FileServerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import java.net.URI
import java.util.jar.{JarEntry, JarOutputStream}
import javax.net.ssl.SSLException

import com.google.common.io.ByteStreams
import org.apache.commons.io.{FileUtils, IOUtils}
import com.google.common.io.{ByteStreams, Files}
import org.apache.commons.lang3.RandomUtils
import org.scalatest.FunSuite

Expand Down Expand Up @@ -239,7 +238,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
def fileTransferTest(server: HttpFileServer, sm: SecurityManager = null): Unit = {
val randomContent = RandomUtils.nextBytes(100)
val file = File.createTempFile("FileServerSuite", "sslTests", tmpDir)
FileUtils.writeByteArrayToFile(file, randomContent)
Files.write(randomContent, file)
server.addFile(file)

val uri = new URI(server.serverUri + "/files/" + file.getName)
Expand All @@ -254,7 +253,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
Utils.setupSecureURLConnection(connection, sm)
}

val buf = IOUtils.toByteArray(connection.getInputStream)
val buf = ByteStreams.toByteArray(connection.getInputStream)
assert(buf === randomContent)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.{File, FileWriter, PrintWriter}

import scala.collection.mutable.ArrayBuffer

import org.apache.commons.lang.math.RandomUtils
import org.apache.commons.lang3.RandomUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{LongWritable, Text}
Expand Down Expand Up @@ -60,7 +60,7 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext
tmpFile = new File(testTempDir, getClass.getSimpleName + ".txt")
val pw = new PrintWriter(new FileWriter(tmpFile))
for (x <- 1 to numRecords) {
pw.println(RandomUtils.nextInt(numBuckets))
pw.println(RandomUtils.nextInt(0, numBuckets))
}
pw.close()

Expand Down
Loading

0 comments on commit 4310271

Please sign in to comment.