Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
leahmcguire committed Mar 5, 2015
2 parents dc65374 + e06c7df commit 85f298f
Show file tree
Hide file tree
Showing 167 changed files with 5,302 additions and 1,613 deletions.
16 changes: 16 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,22 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

========================================================================
For TestTimSort (core/src/test/java/org/apache/spark/util/collection/TestTimSort.java):
========================================================================
Copyright (C) 2015 Stijn de Gouw

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

========================================================================
For LimitedInputStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,15 +425,14 @@ private void pushRun(int runBase, int runLen) {
private void mergeCollapse() {
while (stackSize > 1) {
int n = stackSize - 2;
if (n > 0 && runLen[n-1] <= runLen[n] + runLen[n+1]) {
if ( (n >= 1 && runLen[n-1] <= runLen[n] + runLen[n+1])
|| (n >= 2 && runLen[n-2] <= runLen[n] + runLen[n-1])) {
if (runLen[n - 1] < runLen[n + 1])
n--;
mergeAt(n);
} else if (runLen[n] <= runLen[n + 1]) {
mergeAt(n);
} else {
} else if (runLen[n] > runLen[n + 1]) {
break; // Invariant is established
}
mergeAt(n);
}
}

Expand Down
40 changes: 23 additions & 17 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -280,15 +280,24 @@ object AccumulatorParam {

// TODO: The multi-thread support in accumulators is kind of lame; check
// if there's a more intuitive way of doing it right
private[spark] object Accumulators {
// Store a WeakReference instead of a StrongReference because this way accumulators can be
// appropriately garbage collected during long-running jobs and release memory
type WeakAcc = WeakReference[Accumulable[_, _]]
val originals = Map[Long, WeakAcc]()
val localAccums = new ThreadLocal[Map[Long, WeakAcc]]() {
override protected def initialValue() = Map[Long, WeakAcc]()
private[spark] object Accumulators extends Logging {
/**
* This global map holds the original accumulator objects that are created on the driver.
* It keeps weak references to these objects so that accumulators can be garbage-collected
* once the RDDs and user-code that reference them are cleaned up.
*/
val originals = Map[Long, WeakReference[Accumulable[_, _]]]()

/**
* This thread-local map holds per-task copies of accumulators; it is used to collect the set
* of accumulator updates to send back to the driver when tasks complete. After tasks complete,
* this map is cleared by `Accumulators.clear()` (see Executor.scala).
*/
private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() {
override protected def initialValue() = Map[Long, Accumulable[_, _]]()
}
var lastId: Long = 0

private var lastId: Long = 0

def newId(): Long = synchronized {
lastId += 1
Expand All @@ -297,16 +306,16 @@ private[spark] object Accumulators {

def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized {
if (original) {
originals(a.id) = new WeakAcc(a)
originals(a.id) = new WeakReference[Accumulable[_, _]](a)
} else {
localAccums.get()(a.id) = new WeakAcc(a)
localAccums.get()(a.id) = a
}
}

// Clear the local (non-original) accumulators for the current thread
def clear() {
synchronized {
localAccums.get.clear
localAccums.get.clear()
}
}

Expand All @@ -320,12 +329,7 @@ private[spark] object Accumulators {
def values: Map[Long, Any] = synchronized {
val ret = Map[Long, Any]()
for ((id, accum) <- localAccums.get) {
// Since we are now storing weak references, we must check whether the underlying data
// is valid.
ret(id) = accum.get match {
case Some(values) => values.localValue
case None => None
}
ret(id) = accum.localValue
}
return ret
}
Expand All @@ -341,6 +345,8 @@ private[spark] object Accumulators {
case None =>
throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
}
} else {
logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
}
}
}
Expand Down
44 changes: 29 additions & 15 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,19 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
cleaningThread.start()
}

/** Stop the cleaner. */
/**
* Stop the cleaning thread and wait until the thread has finished running its current task.
*/
def stop() {
stopped = true
// Interrupt the cleaning thread, but wait until the current task has finished before
// doing so. This guards against the race condition where a cleaning thread may
// potentially clean similarly named variables created by a different SparkContext,
// resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132).
synchronized {
cleaningThread.interrupt()
}
cleaningThread.join()
}

/** Register a RDD for cleanup when it is garbage collected. */
Expand Down Expand Up @@ -140,21 +150,25 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
reference.map(_.task).foreach { task =>
logDebug("Got cleaning task " + task)
referenceBuffer -= reference.get
task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
// Synchronize here to avoid being interrupted on stop()
synchronized {
reference.map(_.task).foreach { task =>
logDebug("Got cleaning task " + task)
referenceBuffer -= reference.get
task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
}
}
}
} catch {
case ie: InterruptedException if stopped => // ignore
case e: Exception => logError("Error in cleaning thread", e)
}
}
Expand Down Expand Up @@ -188,10 +202,10 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
/** Perform broadcast cleanup. */
def doCleanupBroadcast(broadcastId: Long, blocking: Boolean) {
try {
logDebug("Cleaning broadcast " + broadcastId)
logDebug(s"Cleaning broadcast $broadcastId")
broadcastManager.unbroadcast(broadcastId, true, blocking)
listeners.foreach(_.broadcastCleaned(broadcastId))
logInfo("Cleaned broadcast " + broadcastId)
logDebug(s"Cleaned broadcast $broadcastId")
} catch {
case e: Exception => logError("Error cleaning broadcast " + broadcastId, e)
}
Expand Down
65 changes: 59 additions & 6 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,86 @@

package org.apache.spark

import akka.actor.Actor
import scala.concurrent.duration._
import scala.collection.mutable

import akka.actor.{Actor, Cancellable}

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.scheduler.TaskScheduler
import org.apache.spark.scheduler.{SlaveLost, TaskScheduler}
import org.apache.spark.util.ActorLogReceive

/**
* A heartbeat from executors to the driver. This is a shared message used by several internal
* components to convey liveness or execution information for in-progress tasks.
* components to convey liveness or execution information for in-progress tasks. It will also
* expire the hosts that have not heartbeated for more than spark.network.timeout.
*/
private[spark] case class Heartbeat(
executorId: String,
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
blockManagerId: BlockManagerId)

private[spark] case object ExpireDeadHosts

private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)

/**
* Lives in the driver to receive heartbeats from executors..
*/
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler)
private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler)
extends Actor with ActorLogReceive with Logging {

// executor ID -> timestamp of when the last heartbeat from this executor was received
private val executorLastSeen = new mutable.HashMap[String, Long]

private val executorTimeoutMs = sc.conf.getLong("spark.network.timeout",
sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120)) * 1000

private val checkTimeoutIntervalMs = sc.conf.getLong("spark.network.timeoutInterval",
sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60)) * 1000

private var timeoutCheckingTask: Cancellable = null

override def preStart(): Unit = {
import context.dispatcher
timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
checkTimeoutIntervalMs.milliseconds, self, ExpireDeadHosts)
super.preStart()
}

override def receiveWithLogging = {
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
val response = HeartbeatResponse(
!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, taskMetrics, blockManagerId)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
executorLastSeen(executorId) = System.currentTimeMillis()
sender ! response
case ExpireDeadHosts =>
expireDeadHosts()
}

private def expireDeadHosts(): Unit = {
logTrace("Checking for hosts with no recent heartbeats in HeartbeatReceiver.")
val now = System.currentTimeMillis()
for ((executorId, lastSeenMs) <- executorLastSeen) {
if (now - lastSeenMs > executorTimeoutMs) {
logWarning(s"Removing executor $executorId with no recent heartbeats: " +
s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " +
"timed out after ${now - lastSeenMs} ms"))
if (sc.supportDynamicAllocation) {
sc.killExecutor(executorId)
}
executorLastSeen.remove(executorId)
}
}
}

override def postStop(): Unit = {
if (timeoutCheckingTask != null) {
timeoutCheckingTask.cancel()
}
super.postStop()
}
}
25 changes: 19 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
if (value == null) {
throw new NullPointerException("null value for " + key)
}
settings.put(translateConfKey(key, warn = true), value)
settings.put(key, value)
this
}

Expand Down Expand Up @@ -140,7 +140,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {

/** Set a parameter if it isn't already configured */
def setIfMissing(key: String, value: String): SparkConf = {
settings.putIfAbsent(translateConfKey(key, warn = true), value)
settings.putIfAbsent(key, value)
this
}

Expand Down Expand Up @@ -176,7 +176,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {

/** Get a parameter as an Option */
def getOption(key: String): Option[String] = {
Option(settings.get(translateConfKey(key)))
Option(settings.get(key))
}

/** Get all parameters as a list of pairs */
Expand Down Expand Up @@ -229,7 +229,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
def getAppId: String = get("spark.app.id")

/** Does the configuration contain a given parameter? */
def contains(key: String): Boolean = settings.containsKey(translateConfKey(key))
def contains(key: String): Boolean = settings.containsKey(key)

/** Copy this object */
override def clone: SparkConf = {
Expand Down Expand Up @@ -343,6 +343,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
}
}
}

// Warn against the use of deprecated configs
deprecatedConfigs.values.foreach { dc =>
if (contains(dc.oldName)) {
dc.warn()
}
}
}

/**
Expand All @@ -362,7 +369,13 @@ private[spark] object SparkConf extends Logging {
DeprecatedConfig("spark.files.userClassPathFirst", "spark.executor.userClassPathFirst",
"1.3"),
DeprecatedConfig("spark.yarn.user.classpath.first", null, "1.3",
"Use spark.{driver,executor}.userClassPathFirst instead."))
"Use spark.{driver,executor}.userClassPathFirst instead."),
DeprecatedConfig("spark.history.fs.updateInterval",
"spark.history.fs.update.interval.seconds",
"1.3", "Use spark.history.fs.update.interval.seconds instead"),
DeprecatedConfig("spark.history.updateInterval",
"spark.history.fs.update.interval.seconds",
"1.3", "Use spark.history.fs.update.interval.seconds instead"))
configs.map { x => (x.oldName, x) }.toMap
}

Expand Down Expand Up @@ -401,7 +414,7 @@ private[spark] object SparkConf extends Logging {
* @param warn Whether to print a warning if the key is deprecated. Warnings will be printed
* only once for each key.
*/
def translateConfKey(userKey: String, warn: Boolean = false): String = {
private def translateConfKey(userKey: String, warn: Boolean = false): String = {
deprecatedConfigs.get(userKey)
.map { deprecatedKey =>
if (warn) {
Expand Down
Loading

0 comments on commit 85f298f

Please sign in to comment.