Skip to content

Commit

Permalink
Merge pull request apache#2 from apache/master
Browse files Browse the repository at this point in the history
merge lastest spark
  • Loading branch information
pzzs committed Mar 5, 2015
2 parents c3f046f + 7ac072f commit cb1852d
Show file tree
Hide file tree
Showing 127 changed files with 4,119 additions and 1,199 deletions.
44 changes: 29 additions & 15 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,19 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
cleaningThread.start()
}

/** Stop the cleaner. */
/**
* Stop the cleaning thread and wait until the thread has finished running its current task.
*/
def stop() {
stopped = true
// Interrupt the cleaning thread, but wait until the current task has finished before
// doing so. This guards against the race condition where a cleaning thread may
// potentially clean similarly named variables created by a different SparkContext,
// resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132).
synchronized {
cleaningThread.interrupt()
}
cleaningThread.join()
}

/** Register a RDD for cleanup when it is garbage collected. */
Expand Down Expand Up @@ -140,21 +150,25 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
reference.map(_.task).foreach { task =>
logDebug("Got cleaning task " + task)
referenceBuffer -= reference.get
task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
// Synchronize here to avoid being interrupted on stop()
synchronized {
reference.map(_.task).foreach { task =>
logDebug("Got cleaning task " + task)
referenceBuffer -= reference.get
task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
}
}
}
} catch {
case ie: InterruptedException if stopped => // ignore
case e: Exception => logError("Error in cleaning thread", e)
}
}
Expand Down Expand Up @@ -188,10 +202,10 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
/** Perform broadcast cleanup. */
def doCleanupBroadcast(broadcastId: Long, blocking: Boolean) {
try {
logDebug("Cleaning broadcast " + broadcastId)
logDebug(s"Cleaning broadcast $broadcastId")
broadcastManager.unbroadcast(broadcastId, true, blocking)
listeners.foreach(_.broadcastCleaned(broadcastId))
logInfo("Cleaned broadcast " + broadcastId)
logDebug(s"Cleaned broadcast $broadcastId")
} catch {
case e: Exception => logError("Error cleaning broadcast " + broadcastId, e)
}
Expand Down
15 changes: 11 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
if (value == null) {
throw new NullPointerException("null value for " + key)
}
settings.put(translateConfKey(key, warn = true), value)
settings.put(key, value)
this
}

Expand Down Expand Up @@ -140,7 +140,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {

/** Set a parameter if it isn't already configured */
def setIfMissing(key: String, value: String): SparkConf = {
settings.putIfAbsent(translateConfKey(key, warn = true), value)
settings.putIfAbsent(key, value)
this
}

Expand Down Expand Up @@ -176,7 +176,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {

/** Get a parameter as an Option */
def getOption(key: String): Option[String] = {
Option(settings.get(translateConfKey(key)))
Option(settings.get(key))
}

/** Get all parameters as a list of pairs */
Expand Down Expand Up @@ -229,7 +229,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
def getAppId: String = get("spark.app.id")

/** Does the configuration contain a given parameter? */
def contains(key: String): Boolean = settings.containsKey(translateConfKey(key))
def contains(key: String): Boolean = settings.containsKey(key)

/** Copy this object */
override def clone: SparkConf = {
Expand Down Expand Up @@ -343,6 +343,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
}
}
}

// Warn against the use of deprecated configs
deprecatedConfigs.values.foreach { dc =>
if (contains(dc.oldName)) {
dc.warn()
}
}
}

/**
Expand Down
13 changes: 11 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.executor.TriggerThreadDump
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat,
FixedLengthBinaryInputFormat}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -233,6 +234,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
None
}
}
private[spark] val eventLogCodec: Option[String] = {
val compress = conf.getBoolean("spark.eventLog.compress", false)
if (compress && isEventLogEnabled) {
Some(CompressionCodec.getCodecName(conf)).map(CompressionCodec.getShortName)
} else {
None
}
}

// Generate the random name for a temp folder in Tachyon
// Add a timestamp as the suffix here to make it more safe
Expand Down Expand Up @@ -1383,10 +1392,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/** Shut down the SparkContext. */
def stop() {
SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
postApplicationEnd()
ui.foreach(_.stop())
if (!stopped) {
stopped = true
postApplicationEnd()
ui.foreach(_.stop())
env.metricsSystem.report()
metadataCleaner.cancel()
cleaner.foreach(_.stop())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ private[spark] class ApplicationDescription(
val memoryPerSlave: Int,
val command: Command,
var appUiUrl: String,
val eventLogDir: Option[String] = None)
val eventLogDir: Option[String] = None,
// short name of compression codec used when writing event logs, if any (e.g. lzf)
val eventLogCodec: Option[String] = None)
extends Serializable {

val user = System.getProperty("user.name", "<unknown>")
Expand All @@ -34,8 +36,10 @@ private[spark] class ApplicationDescription(
memoryPerSlave: Int = memoryPerSlave,
command: Command = command,
appUiUrl: String = appUiUrl,
eventLogDir: Option[String] = eventLogDir): ApplicationDescription =
new ApplicationDescription(name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir)
eventLogDir: Option[String] = eventLogDir,
eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription =
new ApplicationDescription(
name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec)

override def toString: String = "ApplicationDescription(" + name + ")"
}
Loading

0 comments on commit cb1852d

Please sign in to comment.