Skip to content

Commit

Permalink
solve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
zhzhan committed Sep 6, 2014
2 parents 94b4fdc + e4c1982 commit 05d3683
Show file tree
Hide file tree
Showing 122 changed files with 2,455 additions and 1,819 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ Spark is a fast and general cluster computing system for Big Data. It provides
high-level APIs in Scala, Java, and Python, and an optimized engine that
supports general computation graphs for data analysis. It also supports a
rich set of higher-level tools including Spark SQL for SQL and structured
data processing, MLLib for machine learning, GraphX for graph processing,
and Spark Streaming.
data processing, MLlib for machine learning, GraphX for graph processing,
and Spark Streaming for stream processing.

<http://spark.apache.org/>

Expand Down
2 changes: 2 additions & 0 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ export PYSPARK_SUBMIT_ARGS

# For pyspark tests
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
exec "$PYSPARK_PYTHON" -m doctest $1
else
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1261,7 +1261,10 @@ class SparkContext(config: SparkConf) extends Logging {

/** Post the application start event */
private def postApplicationStart() {
listenerBus.post(SparkListenerApplicationStart(appName, startTime, sparkUser))
// Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one).
listenerBus.post(SparkListenerApplicationStart(appName, taskScheduler.applicationId(),
startTime, sparkUser))
}

/** Post the application end event */
Expand Down Expand Up @@ -1294,7 +1297,7 @@ class SparkContext(config: SparkConf) extends Logging {
*/
object SparkContext extends Logging {

private[spark] val SPARK_VERSION = "1.0.0"
private[spark] val SPARK_VERSION = "1.2.0-SNAPSHOT"

private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ object SparkEnv extends Logging {

val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
"BlockManagerMaster",
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf)
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)

val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
serializer, conf, securityManager, mapOutputTracker, shuffleManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ private[spark] abstract class ApplicationHistoryProvider {
*
* @return List of all know applications.
*/
def getListing(): Seq[ApplicationHistoryInfo]
def getListing(): Iterable[ApplicationHistoryInfo]

/**
* Returns the Spark UI for a specific application.
*
* @param appId The application ID.
* @return The application's UI, or null if application is not found.
* @return The application's UI, or None if application is not found.
*/
def getAppUI(appId: String): SparkUI
def getAppUI(appId: String): Option[SparkUI]

/**
* Called when the server is shutting down.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import org.apache.spark.util.Utils
private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
with Logging {

private val NOT_STARTED = "<Not Started>"

// Interval between each check for event log updates
private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval",
conf.getInt("spark.history.updateInterval", 10)) * 1000
Expand All @@ -47,8 +49,15 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTimeMs = -1L

// List of applications, in order from newest to oldest.
@volatile private var appList: Seq[ApplicationHistoryInfo] = Nil
// The modification time of the newest log detected during the last scan. This is used
// to ignore logs that are older during subsequent scans, to avoid processing data that
// is already known.
private var lastModifiedTime = -1L

// Mapping of application IDs to their metadata, in descending end time order. Apps are inserted
// into the map in order, so the LinkedHashMap maintains the correct ordering.
@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
= new mutable.LinkedHashMap()

/**
* A background thread that periodically checks for event log updates on disk.
Expand Down Expand Up @@ -93,15 +102,35 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
logCheckingThread.start()
}

override def getListing() = appList
override def getListing() = applications.values

override def getAppUI(appId: String): SparkUI = {
override def getAppUI(appId: String): Option[SparkUI] = {
try {
val appLogDir = fs.getFileStatus(new Path(resolvedLogDir.toString, appId))
val (_, ui) = loadAppInfo(appLogDir, renderUI = true)
ui
applications.get(appId).map { info =>
val (replayBus, appListener) = createReplayBus(fs.getFileStatus(
new Path(logDir, info.logDir)))
val ui = {
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
new SparkUI(conf, appSecManager, replayBus, appId,
s"${HistoryServer.UI_PATH_PREFIX}/$appId")
// Do not call ui.bind() to avoid creating a new server for each application
}

replayBus.replay()

ui.setAppName(s"${appListener.appName.getOrElse(NOT_STARTED)} ($appId)")

val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
// make sure to set admin acls before view acls so they are properly picked up
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
ui.getSecurityManager.setViewAcls(appListener.sparkUser.getOrElse(NOT_STARTED),
appListener.viewAcls.getOrElse(""))
ui
}
} catch {
case e: FileNotFoundException => null
case e: FileNotFoundException => None
}
}

Expand All @@ -119,84 +148,79 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
try {
val logStatus = fs.listStatus(new Path(resolvedLogDir))
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
val logInfos = logDirs.filter { dir =>
fs.isFile(new Path(dir.getPath, EventLoggingListener.APPLICATION_COMPLETE))
}

val currentApps = Map[String, ApplicationHistoryInfo](
appList.map(app => app.id -> app):_*)

// For any application that either (i) is not listed or (ii) has changed since the last time
// the listing was created (defined by the log dir's modification time), load the app's info.
// Otherwise just reuse what's already in memory.
val newApps = new mutable.ArrayBuffer[ApplicationHistoryInfo](logInfos.size)
for (dir <- logInfos) {
val curr = currentApps.getOrElse(dir.getPath().getName(), null)
if (curr == null || curr.lastUpdated < getModificationTime(dir)) {
// Load all new logs from the log directory. Only directories that have a modification time
// later than the last known log directory will be loaded.
var newLastModifiedTime = lastModifiedTime
val logInfos = logDirs
.filter { dir =>
if (fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))) {
val modTime = getModificationTime(dir)
newLastModifiedTime = math.max(newLastModifiedTime, modTime)
modTime > lastModifiedTime
} else {
false
}
}
.flatMap { dir =>
try {
val (app, _) = loadAppInfo(dir, renderUI = false)
newApps += app
val (replayBus, appListener) = createReplayBus(dir)
replayBus.replay()
Some(new FsApplicationHistoryInfo(
dir.getPath().getName(),
appListener.appId.getOrElse(dir.getPath().getName()),
appListener.appName.getOrElse(NOT_STARTED),
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(dir),
appListener.sparkUser.getOrElse(NOT_STARTED)))
} catch {
case e: Exception => logError(s"Failed to load app info from directory $dir.")
case e: Exception =>
logInfo(s"Failed to load application log data from $dir.", e)
None
}
}
.sortBy { info => -info.endTime }

lastModifiedTime = newLastModifiedTime

// When there are new logs, merge the new list with the existing one, maintaining
// the expected ordering (descending end time). Maintaining the order is important
// to avoid having to sort the list every time there is a request for the log list.
if (!logInfos.isEmpty) {
val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
def addIfAbsent(info: FsApplicationHistoryInfo) = {
if (!newApps.contains(info.id)) {
newApps += (info.id -> info)
}
} else {
newApps += curr
}
}

appList = newApps.sortBy { info => -info.endTime }
val newIterator = logInfos.iterator.buffered
val oldIterator = applications.values.iterator.buffered
while (newIterator.hasNext && oldIterator.hasNext) {
if (newIterator.head.endTime > oldIterator.head.endTime) {
addIfAbsent(newIterator.next)
} else {
addIfAbsent(oldIterator.next)
}
}
newIterator.foreach(addIfAbsent)
oldIterator.foreach(addIfAbsent)

applications = newApps
}
} catch {
case t: Throwable => logError("Exception in checking for event log updates", t)
}
}

/**
* Parse the application's logs to find out the information we need to build the
* listing page.
*
* When creating the listing of available apps, there is no need to load the whole UI for the
* application. The UI is requested by the HistoryServer (by calling getAppInfo()) when the user
* clicks on a specific application.
*
* @param logDir Directory with application's log files.
* @param renderUI Whether to create the SparkUI for the application.
* @return A 2-tuple `(app info, ui)`. `ui` will be null if `renderUI` is false.
*/
private def loadAppInfo(logDir: FileStatus, renderUI: Boolean) = {
val path = logDir.getPath
val appId = path.getName
private def createReplayBus(logDir: FileStatus): (ReplayListenerBus, ApplicationEventListener) = {
val path = logDir.getPath()
val elogInfo = EventLoggingListener.parseLoggingInfo(path, fs)
val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec)
val appListener = new ApplicationEventListener
replayBus.addListener(appListener)

val ui: SparkUI = if (renderUI) {
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
new SparkUI(conf, appSecManager, replayBus, appId,
HistoryServer.UI_PATH_PREFIX + s"/$appId")
// Do not call ui.bind() to avoid creating a new server for each application
} else {
null
}

replayBus.replay()
val appInfo = ApplicationHistoryInfo(
appId,
appListener.appName,
appListener.startTime,
appListener.endTime,
getModificationTime(logDir),
appListener.sparkUser)

if (ui != null) {
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
// make sure to set admin acls before view acls so properly picked up
ui.getSecurityManager.setAdminAcls(appListener.adminAcls)
ui.getSecurityManager.setViewAcls(appListener.sparkUser, appListener.viewAcls)
}
(appInfo, ui)
(replayBus, appListener)
}

/** Return when this directory was last modified. */
Expand All @@ -219,3 +243,13 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private def getMonotonicTimeMs() = System.nanoTime() / (1000 * 1000)

}

private class FsApplicationHistoryInfo(
val logDir: String,
id: String,
name: String,
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String)
extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser)
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,7 @@ class HistoryServer(

private val appLoader = new CacheLoader[String, SparkUI] {
override def load(key: String): SparkUI = {
val ui = provider.getAppUI(key)
if (ui == null) {
throw new NoSuchElementException()
}
val ui = provider.getAppUI(key).getOrElse(throw new NoSuchElementException())
attachSparkUI(ui)
ui
}
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ private[spark] class Executor(
env.metricsSystem.report()
isStopped = true
threadPool.shutdown()
if (!isLocal) {
env.stop()
}
}

class TaskRunner(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,38 +24,31 @@ package org.apache.spark.scheduler
* from multiple applications are seen, the behavior is unspecified.
*/
private[spark] class ApplicationEventListener extends SparkListener {
var appName = "<Not Started>"
var sparkUser = "<Not Started>"
var startTime = -1L
var endTime = -1L
var viewAcls = ""
var adminAcls = ""

def applicationStarted = startTime != -1

def applicationCompleted = endTime != -1

def applicationDuration: Long = {
val difference = endTime - startTime
if (applicationStarted && applicationCompleted && difference > 0) difference else -1L
}
var appName: Option[String] = None
var appId: Option[String] = None
var sparkUser: Option[String] = None
var startTime: Option[Long] = None
var endTime: Option[Long] = None
var viewAcls: Option[String] = None
var adminAcls: Option[String] = None

override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
appName = applicationStart.appName
startTime = applicationStart.time
sparkUser = applicationStart.sparkUser
appName = Some(applicationStart.appName)
appId = applicationStart.appId
startTime = Some(applicationStart.time)
sparkUser = Some(applicationStart.sparkUser)
}

override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
endTime = applicationEnd.time
endTime = Some(applicationEnd.time)
}

override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
synchronized {
val environmentDetails = environmentUpdate.environmentDetails
val allProperties = environmentDetails("Spark Properties").toMap
viewAcls = allProperties.getOrElse("spark.ui.view.acls", "")
adminAcls = allProperties.getOrElse("spark.admin.acls", "")
viewAcls = allProperties.get("spark.ui.view.acls")
adminAcls = allProperties.get("spark.admin.acls")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,12 @@ private[spark] trait SchedulerBackend {
def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
throw new UnsupportedOperationException
def isReady(): Boolean = true

/**
* The application ID associated with the job, if any.
*
* @return The application ID, or None if the backend does not provide an ID.
*/
def applicationId(): Option[String] = None

}
Loading

0 comments on commit 05d3683

Please sign in to comment.