Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into unsafe_obj
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Jun 28, 2015
2 parents b04d69c + 42db3a1 commit 2f41c90
Show file tree
Hide file tree
Showing 127 changed files with 4,362 additions and 1,822 deletions.
26 changes: 19 additions & 7 deletions R/pkg/R/client.R
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,36 @@ connectBackend <- function(hostname, port, timeout = 6000) {
con
}

launchBackend <- function(args, sparkHome, jars, sparkSubmitOpts) {
determineSparkSubmitBin <- function() {
if (.Platform$OS.type == "unix") {
sparkSubmitBinName = "spark-submit"
} else {
sparkSubmitBinName = "spark-submit.cmd"
}
sparkSubmitBinName
}

generateSparkSubmitArgs <- function(args, sparkHome, jars, sparkSubmitOpts, packages) {
if (jars != "") {
jars <- paste("--jars", jars)
}

if (packages != "") {
packages <- paste("--packages", packages)
}

combinedArgs <- paste(jars, packages, sparkSubmitOpts, args, sep = " ")
combinedArgs
}

launchBackend <- function(args, sparkHome, jars, sparkSubmitOpts, packages) {
sparkSubmitBinName <- determineSparkSubmitBin()
if (sparkHome != "") {
sparkSubmitBin <- file.path(sparkHome, "bin", sparkSubmitBinName)
} else {
sparkSubmitBin <- sparkSubmitBinName
}

if (jars != "") {
jars <- paste("--jars", jars)
}

combinedArgs <- paste(jars, sparkSubmitOpts, args, sep = " ")
combinedArgs <- generateSparkSubmitArgs(args, sparkHome, jars, sparkSubmitOpts, packages)
cat("Launching java with spark-submit command", sparkSubmitBin, combinedArgs, "\n")
invisible(system2(sparkSubmitBin, combinedArgs, wait = F))
}
7 changes: 5 additions & 2 deletions R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ sparkR.stop <- function() {
#' @param sparkExecutorEnv Named list of environment variables to be used when launching executors.
#' @param sparkJars Character string vector of jar files to pass to the worker nodes.
#' @param sparkRLibDir The path where R is installed on the worker nodes.
#' @param sparkPackages Character string vector of packages from spark-packages.org
#' @export
#' @examples
#'\dontrun{
Expand All @@ -100,7 +101,8 @@ sparkR.init <- function(
sparkEnvir = list(),
sparkExecutorEnv = list(),
sparkJars = "",
sparkRLibDir = "") {
sparkRLibDir = "",
sparkPackages = "") {

if (exists(".sparkRjsc", envir = .sparkREnv)) {
cat("Re-using existing Spark Context. Please stop SparkR with sparkR.stop() or restart R to create a new Spark Context\n")
Expand Down Expand Up @@ -129,7 +131,8 @@ sparkR.init <- function(
args = path,
sparkHome = sparkHome,
jars = jars,
sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell"))
sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell"),
packages = sparkPackages)
# wait atmost 100 seconds for JVM to launch
wait <- 0.1
for (i in 1:25) {
Expand Down
Binary file added R/pkg/inst/test_support/sparktestjar_2.10-1.0.jar
Binary file not shown.
32 changes: 32 additions & 0 deletions R/pkg/inst/tests/jarTest.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
library(SparkR)

sc <- sparkR.init()

helloTest <- SparkR:::callJStatic("sparkR.test.hello",
"helloWorld",
"Dave")

basicFunction <- SparkR:::callJStatic("sparkR.test.basicFunction",
"addStuff",
2L,
2L)

sparkR.stop()
output <- c(helloTest, basicFunction)
writeLines(output)
32 changes: 32 additions & 0 deletions R/pkg/inst/tests/test_client.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

context("functions in client.R")

test_that("adding spark-testing-base as a package works", {
args <- generateSparkSubmitArgs("", "", "", "",
"holdenk:spark-testing-base:1.3.0_0.0.5")
expect_equal(gsub("[[:space:]]", "", args),
gsub("[[:space:]]", "",
"--packages holdenk:spark-testing-base:1.3.0_0.0.5"))
})

test_that("no package specified doesn't add packages flag", {
args <- generateSparkSubmitArgs("", "", "", "", "")
expect_equal(gsub("[[:space:]]", "", args),
"")
})
37 changes: 37 additions & 0 deletions R/pkg/inst/tests/test_includeJAR.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
context("include an external JAR in SparkContext")

runScript <- function() {
sparkHome <- Sys.getenv("SPARK_HOME")
jarPath <- paste("--jars",
shQuote(file.path(sparkHome, "R/lib/SparkR/test_support/sparktestjar_2.10-1.0.jar")))
scriptPath <- file.path(sparkHome, "R/lib/SparkR/tests/jarTest.R")
submitPath <- file.path(sparkHome, "bin/spark-submit")
res <- system2(command = submitPath,
args = c(jarPath, scriptPath),
stdout = TRUE)
tail(res, 2)
}

test_that("sparkJars tag in SparkContext", {
testOutput <- runScript()
helloTest <- testOutput[1]
expect_true(helloTest == "Hello, Dave")
basicFunction <- testOutput[2]
expect_true(basicFunction == 4L)
})
15 changes: 12 additions & 3 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ library(testthat)

context("SparkSQL functions")

# Utility function for easily checking the values of a StructField
checkStructField <- function(actual, expectedName, expectedType, expectedNullable) {
expect_equal(class(actual), "structField")
expect_equal(actual$name(), expectedName)
expect_equal(actual$dataType.toString(), expectedType)
expect_equal(actual$nullable(), expectedNullable)
}

# Tests for SparkSQL functions in SparkR

sc <- sparkR.init()
Expand Down Expand Up @@ -52,9 +60,10 @@ test_that("infer types", {
list(type = 'array', elementType = "integer", containsNull = TRUE))
expect_equal(infer_type(list(1L, 2L)),
list(type = 'array', elementType = "integer", containsNull = TRUE))
expect_equal(infer_type(list(a = 1L, b = "2")),
structType(structField(x = "a", type = "integer", nullable = TRUE),
structField(x = "b", type = "string", nullable = TRUE)))
testStruct <- infer_type(list(a = 1L, b = "2"))
expect_true(class(testStruct) == "structType")
checkStructField(testStruct$fields()[[1]], "a", "IntegerType", TRUE)
checkStructField(testStruct$fields()[[2]], "b", "StringType", TRUE)
e <- new.env()
assign("a", 1L, envir = e)
expect_equal(infer_type(e),
Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

// Post init
_taskScheduler.postStartHook()
_env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler))
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
_executorAllocationManager.foreach { e =>
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
Expand Down
17 changes: 16 additions & 1 deletion core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,21 @@ private[r] class RBackendHandler(server: RBackend)
ctx.close()
}

// Looks up a class given a class name. This function first checks the
// current class loader and if a class is not found, it looks up the class
// in the context class loader. Address [SPARK-5185]
def getStaticClass(objId: String): Class[_] = {
try {
val clsCurrent = Class.forName(objId)
clsCurrent
} catch {
// Use contextLoader if we can't find the JAR in the system class loader
case e: ClassNotFoundException =>
val clsContext = Class.forName(objId, true, Thread.currentThread().getContextClassLoader)
clsContext
}
}

def handleMethodCall(
isStatic: Boolean,
objId: String,
Expand All @@ -98,7 +113,7 @@ private[r] class RBackendHandler(server: RBackend)
var obj: Object = null
try {
val cls = if (isStatic) {
Class.forName(objId)
getStaticClass(objId)
} else {
JVMObjectTracker.get(objId) match {
case None => throw new IllegalArgumentException("Object not found " + objId)
Expand Down
55 changes: 38 additions & 17 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class DAGScheduler(

def this(sc: SparkContext) = this(sc, sc.taskScheduler)

private[scheduler] val metricsSource: DAGSchedulerSource = new DAGSchedulerSource(this)

private[scheduler] val nextJobId = new AtomicInteger(0)
private[scheduler] def numTotalJobs: Int = nextJobId.get()
private val nextStageId = new AtomicInteger(0)
Expand Down Expand Up @@ -905,22 +907,29 @@ class DAGScheduler(
return
}

val tasks: Seq[Task[_]] = stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = getPreferredLocs(stage.rdd, id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, taskBinary, part, locs)
}
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = getPreferredLocs(stage.rdd, id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, taskBinary, part, locs)
}

case stage: ResultStage =>
val job = stage.resultOfJob.get
partitionsToCompute.map { id =>
val p: Int = job.partitions(id)
val part = stage.rdd.partitions(p)
val locs = getPreferredLocs(stage.rdd, p)
new ResultTask(stage.id, taskBinary, part, locs, id)
}
case stage: ResultStage =>
val job = stage.resultOfJob.get
partitionsToCompute.map { id =>
val p: Int = job.partitions(id)
val part = stage.rdd.partitions(p)
val locs = getPreferredLocs(stage.rdd, p)
new ResultTask(stage.id, taskBinary, part, locs, id)
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}")
runningStages -= stage
return
}

if (tasks.size > 0) {
Expand Down Expand Up @@ -1438,17 +1447,29 @@ class DAGScheduler(
taskScheduler.stop()
}

// Start the event thread at the end of the constructor
// Start the event thread and register the metrics source at the end of the constructor
env.metricsSystem.registerSource(metricsSource)
eventProcessLoop.start()
}

private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {

private[this] val timer = dagScheduler.metricsSource.messageProcessingTimer

/**
* The main event loop of the DAG scheduler.
*/
override def onReceive(event: DAGSchedulerEvent): Unit = event match {
override def onReceive(event: DAGSchedulerEvent): Unit = {
val timerContext = timer.time()
try {
doOnReceive(event)
} finally {
timerContext.stop()
}
}

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
listener, properties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.apache.spark.scheduler

import com.codahale.metrics.{Gauge, MetricRegistry}
import com.codahale.metrics.{Gauge, MetricRegistry, Timer}

import org.apache.spark.metrics.source.Source

private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
private[scheduler] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
extends Source {
override val metricRegistry = new MetricRegistry()
override val sourceName = "DAGScheduler"
Expand All @@ -45,4 +45,8 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
metricRegistry.register(MetricRegistry.name("job", "activeJobs"), new Gauge[Int] {
override def getValue: Int = dagScheduler.activeJobs.size
})

/** Timer that tracks the time to process messages in the DAGScheduler's event loop */
val messageProcessingTimer: Timer =
metricRegistry.timer(MetricRegistry.name("messageProcessingTime"))
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.network.nio.{GetBlock, GotBlock, PutBlock}
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
import org.apache.spark.util.BoundedPriorityQueue
import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf}
import org.apache.spark.util.collection.CompactBuffer

/**
Expand Down Expand Up @@ -94,8 +94,10 @@ class KryoSerializer(conf: SparkConf)
// For results returned by asJavaIterable. See JavaIterableWrapperSerializer.
kryo.register(JavaIterableWrapperSerializer.wrapperClass, new JavaIterableWrapperSerializer)

// Allow sending SerializableWritable
// Allow sending classes with custom Java serializers
kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
kryo.register(classOf[SerializableConfiguration], new KryoJavaSerializer())
kryo.register(classOf[SerializableJobConf], new KryoJavaSerializer())
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer())

Expand Down
Loading

0 comments on commit 2f41c90

Please sign in to comment.