Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into unsafe-exchange
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jul 18, 2015
2 parents 8dd3ff2 + 6e1e2eb commit 93904e7
Show file tree
Hide file tree
Showing 174 changed files with 6,416 additions and 1,245 deletions.
10 changes: 5 additions & 5 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1314,7 +1314,7 @@ setMethod("except",
#' write.df(df, "myfile", "parquet", "overwrite")
#' }
setMethod("write.df",
signature(df = "DataFrame", path = 'character'),
signature(df = "DataFrame", path = "character"),
function(df, path, source = NULL, mode = "append", ...){
if (is.null(source)) {
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
Expand All @@ -1328,7 +1328,7 @@ setMethod("write.df",
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
options <- varargsToEnv(...)
if (!is.null(path)) {
options[['path']] <- path
options[["path"]] <- path
}
callJMethod(df@sdf, "save", source, jmode, options)
})
Expand All @@ -1337,7 +1337,7 @@ setMethod("write.df",
#' @aliases saveDF
#' @export
setMethod("saveDF",
signature(df = "DataFrame", path = 'character'),
signature(df = "DataFrame", path = "character"),
function(df, path, source = NULL, mode = "append", ...){
write.df(df, path, source, mode, ...)
})
Expand Down Expand Up @@ -1375,8 +1375,8 @@ setMethod("saveDF",
#' saveAsTable(df, "myfile")
#' }
setMethod("saveAsTable",
signature(df = "DataFrame", tableName = 'character', source = 'character',
mode = 'character'),
signature(df = "DataFrame", tableName = "character", source = "character",
mode = "character"),
function(df, tableName, source = NULL, mode="append", ...){
if (is.null(source)) {
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ dropTempTable <- function(sqlContext, tableName) {
read.df <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) {
options <- varargsToEnv(...)
if (!is.null(path)) {
options[['path']] <- path
options[["path"]] <- path
}
if (is.null(source)) {
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
Expand Down Expand Up @@ -506,7 +506,7 @@ loadDF <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) {
createExternalTable <- function(sqlContext, tableName, path = NULL, source = NULL, ...) {
options <- varargsToEnv(...)
if (!is.null(path)) {
options[['path']] <- path
options[["path"]] <- path
}
sdf <- callJMethod(sqlContext, "createExternalTable", tableName, source, options)
dataFrame(sdf)
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/serialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ writeType <- function(con, class) {
jobj = "j",
environment = "e",
Date = "D",
POSIXlt = 't',
POSIXct = 't',
POSIXlt = "t",
POSIXct = "t",
stop(paste("Unsupported type for serialization", class)))
writeBin(charToRaw(type), con)
}
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ sparkR.init <- function(
if (!file.exists(path)) {
stop("JVM is not ready after 10 seconds")
}
f <- file(path, open='rb')
f <- file(path, open="rb")
backendPort <- readInt(f)
monitorPort <- readInt(f)
close(f)
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ test_that("infer types", {
expect_equal(infer_type(as.Date("2015-03-11")), "date")
expect_equal(infer_type(as.POSIXlt("2015-03-11 12:13:04.043")), "timestamp")
expect_equal(infer_type(c(1L, 2L)),
list(type = 'array', elementType = "integer", containsNull = TRUE))
list(type = "array", elementType = "integer", containsNull = TRUE))
expect_equal(infer_type(list(1L, 2L)),
list(type = 'array', elementType = "integer", containsNull = TRUE))
list(type = "array", elementType = "integer", containsNull = TRUE))
testStruct <- infer_type(list(a = 1L, b = "2"))
expect_equal(class(testStruct), "structType")
checkStructField(testStruct$fields()[[1]], "a", "IntegerType", TRUE)
Expand Down
68 changes: 30 additions & 38 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package org.apache.spark
import java.io.{ObjectInputStream, Serializable}

import scala.collection.generic.Growable
import scala.collection.mutable.Map
import scala.collection.Map
import scala.collection.mutable
import scala.ref.WeakReference
import scala.reflect.ClassTag

Expand All @@ -39,25 +40,44 @@ import org.apache.spark.util.Utils
* @param initialValue initial value of accumulator
* @param param helper object defining how to add elements of type `R` and `T`
* @param name human-readable name for use in Spark's web UI
* @param internal if this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported
* to the driver via heartbeats. For internal [[Accumulable]]s, `R` must be
* thread safe so that they can be reported correctly.
* @tparam R the full accumulated data (result type)
* @tparam T partial data that can be added in
*/
class Accumulable[R, T] (
class Accumulable[R, T] private[spark] (
@transient initialValue: R,
param: AccumulableParam[R, T],
val name: Option[String])
val name: Option[String],
internal: Boolean)
extends Serializable {

private[spark] def this(
@transient initialValue: R, param: AccumulableParam[R, T], internal: Boolean) = {
this(initialValue, param, None, internal)
}

def this(@transient initialValue: R, param: AccumulableParam[R, T], name: Option[String]) =
this(initialValue, param, name, false)

def this(@transient initialValue: R, param: AccumulableParam[R, T]) =
this(initialValue, param, None)

val id: Long = Accumulators.newId

@transient private var value_ = initialValue // Current value on master
@volatile @transient private var value_ : R = initialValue // Current value on master
val zero = param.zero(initialValue) // Zero value to be passed to workers
private var deserialized = false

Accumulators.register(this, true)
Accumulators.register(this)

/**
* If this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported to the driver
* via heartbeats. For internal [[Accumulable]]s, `R` must be thread safe so that they can be
* reported correctly.
*/
private[spark] def isInternal: Boolean = internal

/**
* Add more data to this accumulator / accumulable
Expand Down Expand Up @@ -132,7 +152,8 @@ class Accumulable[R, T] (
in.defaultReadObject()
value_ = zero
deserialized = true
Accumulators.register(this, false)
val taskContext = TaskContext.get()
taskContext.registerAccumulator(this)
}

override def toString: String = if (value_ == null) "null" else value_.toString
Expand Down Expand Up @@ -284,16 +305,7 @@ private[spark] object Accumulators extends Logging {
* It keeps weak references to these objects so that accumulators can be garbage-collected
* once the RDDs and user-code that reference them are cleaned up.
*/
val originals = Map[Long, WeakReference[Accumulable[_, _]]]()

/**
* This thread-local map holds per-task copies of accumulators; it is used to collect the set
* of accumulator updates to send back to the driver when tasks complete. After tasks complete,
* this map is cleared by `Accumulators.clear()` (see Executor.scala).
*/
private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() {
override protected def initialValue() = Map[Long, Accumulable[_, _]]()
}
val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]()

private var lastId: Long = 0

Expand All @@ -302,19 +314,8 @@ private[spark] object Accumulators extends Logging {
lastId
}

def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized {
if (original) {
originals(a.id) = new WeakReference[Accumulable[_, _]](a)
} else {
localAccums.get()(a.id) = a
}
}

// Clear the local (non-original) accumulators for the current thread
def clear() {
synchronized {
localAccums.get.clear()
}
def register(a: Accumulable[_, _]): Unit = synchronized {
originals(a.id) = new WeakReference[Accumulable[_, _]](a)
}

def remove(accId: Long) {
Expand All @@ -323,15 +324,6 @@ private[spark] object Accumulators extends Logging {
}
}

// Get the values of the local accumulators for the current thread (by ID)
def values: Map[Long, Any] = synchronized {
val ret = Map[Long, Any]()
for ((id, accum) <- localAccums.get) {
ret(id) = accum.localValue
}
return ret
}

// Add values to the original accumulators with some given IDs
def add(values: Map[Long, Any]): Unit = synchronized {
for ((id, value) <- values) {
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
// Asynchronously kill the executor to avoid blocking the current thread
killExecutorThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
sc.killExecutor(executorId)
// Note: we want to get an executor back after expiring this one,
// so do not simply call `sc.killExecutor` here (SPARK-8119)
sc.killAndReplaceExecutor(executorId)
}
})
}
Expand Down
40 changes: 38 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1419,6 +1419,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/**
* :: DeveloperApi ::
* Request that the cluster manager kill the specified executors.
*
* Note: This is an indication to the cluster manager that the application wishes to adjust
* its resource usage downwards. If the application wishes to replace the executors it kills
* through this method with new ones, it should follow up explicitly with a call to
* {{SparkContext#requestExecutors}}.
*
* This is currently only supported in YARN mode. Return whether the request is received.
*/
@DeveloperApi
Expand All @@ -1436,12 +1442,42 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

/**
* :: DeveloperApi ::
* Request that cluster manager the kill the specified executor.
* This is currently only supported in Yarn mode. Return whether the request is received.
* Request that the cluster manager kill the specified executor.
*
* Note: This is an indication to the cluster manager that the application wishes to adjust
* its resource usage downwards. If the application wishes to replace the executor it kills
* through this method with a new one, it should follow up explicitly with a call to
* {{SparkContext#requestExecutors}}.
*
* This is currently only supported in YARN mode. Return whether the request is received.
*/
@DeveloperApi
override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId)

/**
* Request that the cluster manager kill the specified executor without adjusting the
* application resource requirements.
*
* The effect is that a new executor will be launched in place of the one killed by
* this request. This assumes the cluster manager will automatically and eventually
* fulfill all missing application resource requests.
*
* Note: The replace is by no means guaranteed; another application on the same cluster
* can steal the window of opportunity and acquire this application's resources in the
* mean time.
*
* This is currently only supported in YARN mode. Return whether the request is received.
*/
private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutors(Seq(executorId), replace = true)
case _ =>
logWarning("Killing executors is only supported in coarse-grained mode")
false
}
}

/** The version of Spark on which this application is running. */
def version: String = SPARK_VERSION

Expand Down
20 changes: 19 additions & 1 deletion core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object TaskContext {
*/
def getPartitionId(): Int = {
val tc = taskContext.get()
if (tc == null) {
if (tc eq null) {
0
} else {
tc.partitionId()
Expand Down Expand Up @@ -152,4 +152,22 @@ abstract class TaskContext extends Serializable {
* Returns the manager for this task's managed memory.
*/
private[spark] def taskMemoryManager(): TaskMemoryManager

/**
* Register an accumulator that belongs to this task. Accumulators must call this method when
* deserializing in executors.
*/
private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit

/**
* Return the local values of internal accumulators that belong to this task. The key of the Map
* is the accumulator id and the value of the Map is the latest accumulator local value.
*/
private[spark] def collectInternalAccumulators(): Map[Long, Any]

/**
* Return the local values of accumulators that belong to this task. The key of the Map is the
* accumulator id and the value of the Map is the latest accumulator local value.
*/
private[spark] def collectAccumulators(): Map[Long, Any]
}
19 changes: 16 additions & 3 deletions core/src/main/scala/org/apache/spark/TaskContextImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.spark

import scala.collection.mutable.{ArrayBuffer, HashMap}

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.unsafe.memory.TaskMemoryManager
import org.apache.spark.util.{TaskCompletionListener, TaskCompletionListenerException}

import scala.collection.mutable.ArrayBuffer

private[spark] class TaskContextImpl(
val stageId: Int,
val partitionId: Int,
Expand Down Expand Up @@ -94,5 +94,18 @@ private[spark] class TaskContextImpl(
override def isRunningLocally(): Boolean = runningLocally

override def isInterrupted(): Boolean = interrupted
}

@transient private val accumulators = new HashMap[Long, Accumulable[_, _]]

private[spark] override def registerAccumulator(a: Accumulable[_, _]): Unit = synchronized {
accumulators(a.id) = a
}

private[spark] override def collectInternalAccumulators(): Map[Long, Any] = synchronized {
accumulators.filter(_._2.isInternal).mapValues(_.localValue).toMap
}

private[spark] override def collectAccumulators(): Map[Long, Any] = synchronized {
accumulators.mapValues(_.localValue).toMap
}
}
29 changes: 18 additions & 11 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.util.{Arrays, Comparator}
import scala.collection.JavaConversions._
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.control.NonFatal

import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -248,19 +249,25 @@ class SparkHadoopUtil extends Logging {
dir: Path,
prefix: String,
exclusionSuffix: String): Array[FileStatus] = {
val fileStatuses = remoteFs.listStatus(dir,
new PathFilter {
override def accept(path: Path): Boolean = {
val name = path.getName
name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
try {
val fileStatuses = remoteFs.listStatus(dir,
new PathFilter {
override def accept(path: Path): Boolean = {
val name = path.getName
name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
}
})
Arrays.sort(fileStatuses, new Comparator[FileStatus] {
override def compare(o1: FileStatus, o2: FileStatus): Int = {
Longs.compare(o1.getModificationTime, o2.getModificationTime)
}
})
Arrays.sort(fileStatuses, new Comparator[FileStatus] {
override def compare(o1: FileStatus, o2: FileStatus): Int = {
Longs.compare(o1.getModificationTime, o2.getModificationTime)
}
})
fileStatuses
fileStatuses
} catch {
case NonFatal(e) =>
logWarning("Error while attempting to list files from application staging dir", e)
Array.empty
}
}

/**
Expand Down
Loading

0 comments on commit 93904e7

Please sign in to comment.