Skip to content

Commit

Permalink
Use Option[String] for attempt id.
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcelo Vanzin committed Apr 23, 2015
1 parent f1cb9b3 commit ba34b69
Show file tree
Hide file tree
Showing 11 changed files with 53 additions and 51 deletions.
32 changes: 16 additions & 16 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private var _heartbeatReceiver: RpcEndpointRef = _
@volatile private var _dagScheduler: DAGScheduler = _
private var _applicationId: String = _
private var _applicationAttemptId: String = _
private var _applicationAttemptId: Option[String] = None
private var _eventLogger: Option[EventLoggingListener] = None
private var _executorAllocationManager: Option[ExecutorAllocationManager] = None
private var _cleaner: Option[ContextCleaner] = None
Expand Down Expand Up @@ -313,7 +313,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}

def applicationId: String = _applicationId
def applicationAttemptId: String = _applicationAttemptId
def applicationAttemptId: Option[String] = _applicationAttemptId

def metricsSystem: MetricsSystem = if (_env != null) _env.metricsSystem else null

Expand Down Expand Up @@ -1847,7 +1847,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one).
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
startTime, sparkUser, Some(applicationAttemptId)))
startTime, sparkUser, applicationAttemptId))
}

/** Post the application end event */
Expand Down Expand Up @@ -1895,7 +1895,7 @@ object SparkContext extends Logging {
*
* Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK.
*/
private val activeContext: AtomicReference[SparkContext] =
private val activeContext: AtomicReference[SparkContext] =
new AtomicReference[SparkContext](null)

/**
Expand Down Expand Up @@ -1948,11 +1948,11 @@ object SparkContext extends Logging {
}

/**
* This function may be used to get or instantiate a SparkContext and register it as a
* singleton object. Because we can only have one active SparkContext per JVM,
* this is useful when applications may wish to share a SparkContext.
* This function may be used to get or instantiate a SparkContext and register it as a
* singleton object. Because we can only have one active SparkContext per JVM,
* this is useful when applications may wish to share a SparkContext.
*
* Note: This function cannot be used to create multiple SparkContext instances
* Note: This function cannot be used to create multiple SparkContext instances
* even if multiple contexts are allowed.
*/
def getOrCreate(config: SparkConf): SparkContext = {
Expand All @@ -1965,17 +1965,17 @@ object SparkContext extends Logging {
activeContext.get()
}
}

/**
* This function may be used to get or instantiate a SparkContext and register it as a
* singleton object. Because we can only have one active SparkContext per JVM,
* This function may be used to get or instantiate a SparkContext and register it as a
* singleton object. Because we can only have one active SparkContext per JVM,
* this is useful when applications may wish to share a SparkContext.
*
*
* This method allows not passing a SparkConf (useful if just retrieving).
*
* Note: This function cannot be used to create multiple SparkContext instances
* even if multiple contexts are allowed.
*/
*
* Note: This function cannot be used to create multiple SparkContext instances
* even if multiple contexts are allowed.
*/
def getOrCreate(): SparkContext = {
getOrCreate(new SparkConf())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ private[master] class Master(
}

val eventLogFilePrefix = EventLoggingListener.getLogPath(
eventLogDir, app.id, "", app.desc.eventLogCodec)
eventLogDir, app.id, None, app.desc.eventLogCodec)
val fs = Utils.getHadoopFileSystem(eventLogDir, hadoopConf)
val inProgressExists = fs.exists(new Path(eventLogFilePrefix +
EventLoggingListener.IN_PROGRESS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ import org.apache.spark.util.{JsonProtocol, Utils}
*/
private[spark] class EventLoggingListener(
appId: String,
appAttemptId : String,
appAttemptId : Option[String],
logBaseDir: URI,
sparkConf: SparkConf,
hadoopConf: Configuration)
extends SparkListener with Logging {

import EventLoggingListener._

def this(appId: String, appAttemptId : String, logBaseDir: URI, sparkConf: SparkConf) =
def this(appId: String, appAttemptId : Option[String], logBaseDir: URI, sparkConf: SparkConf) =
this(appId, appAttemptId, logBaseDir, sparkConf,
SparkHadoopUtil.get.newConfiguration(sparkConf))

Expand Down Expand Up @@ -267,15 +267,15 @@ private[spark] object EventLoggingListener extends Logging {
def getLogPath(
logBaseDir: URI,
appId: String,
appAttemptId: String,
appAttemptId: Option[String],
compressionCodecName: Option[String] = None): String = {
val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
val base = logBaseDir.toString.stripSuffix("/") + "/" + sanitize(appId)
val codec = compressionCodecName.map("." + _).getOrElse("")
if (appAttemptId.isEmpty) {
base + codec
if (appAttemptId.isDefined) {
base + "_" + sanitize(appAttemptId.get) + codec
} else {
base + "_" + sanitize(appAttemptId) + codec
base + codec
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,6 @@ private[spark] trait SchedulerBackend {
*
* @return An application attempt id
*/
def applicationAttemptId(): String = ""
def applicationAttemptId(): Option[String] = None

}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,6 @@ private[spark] trait TaskScheduler {
*
* @return An application's Attempt ID
*/
def applicationAttemptId(): String = ""
def applicationAttemptId(): Option[String]

}
Original file line number Diff line number Diff line change
Expand Up @@ -512,8 +512,8 @@ private[spark] class TaskSchedulerImpl(
}

override def applicationId(): String = backend.applicationId()
override def applicationAttemptId(): String = backend.applicationAttemptId()

override def applicationAttemptId(): Option[String] = backend.applicationAttemptId()

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
/** Create a fake log file using the new log format used in Spark 1.3+ */
private def newLogFile(
appId: String,
appAttemptId: String,
appAttemptId: Option[String],
inProgress: Boolean,
codec: Option[String] = None): File = {
val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else ""
Expand All @@ -60,20 +60,20 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
val provider = new FsHistoryProvider(createTestConf())

// Write a new-style application log.
val newAppComplete = newLogFile("new1", "", inProgress = false)
val newAppComplete = newLogFile("new1", None, inProgress = false)
writeFile(newAppComplete, true, None,
SparkListenerApplicationStart("new-app-complete", None, 1L, "test", None),
SparkListenerApplicationEnd(5L)
)

// Write a new-style application log.
val newAppCompressedComplete = newLogFile("new1compressed", "", inProgress = false, Some("lzf"))
val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false, Some("lzf"))
writeFile(newAppCompressedComplete, true, None,
SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test", None),
SparkListenerApplicationEnd(4L))

// Write an unfinished app, new-style.
val newAppIncomplete = newLogFile("new2", "", inProgress = true)
val newAppIncomplete = newLogFile("new2", None, inProgress = true)
writeFile(newAppIncomplete, true, None,
SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test", None)
)
Expand Down Expand Up @@ -165,12 +165,12 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
}

test("SPARK-3697: ignore directories that cannot be read.") {
val logFile1 = newLogFile("new1", "", inProgress = false)
val logFile1 = newLogFile("new1", None, inProgress = false)
writeFile(logFile1, true, None,
SparkListenerApplicationStart("app1-1", None, 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
val logFile2 = newLogFile("new2", "", inProgress = false)
val logFile2 = newLogFile("new2", None, inProgress = false)
writeFile(logFile2, true, None,
SparkListenerApplicationStart("app1-2", None, 1L, "test", None),
SparkListenerApplicationEnd(2L)
Expand All @@ -186,7 +186,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
test("history file is renamed from inprogress to completed") {
val provider = new FsHistoryProvider(createTestConf())

val logFile1 = newLogFile("app1", "", inProgress = true)
val logFile1 = newLogFile("app1", None, inProgress = true)
writeFile(logFile1, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", None),
SparkListenerApplicationEnd(2L)
Expand All @@ -197,7 +197,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
endWith(EventLoggingListener.IN_PROGRESS)
}

logFile1.renameTo(newLogFile("app1", "", inProgress = false))
logFile1.renameTo(newLogFile("app1", None, inProgress = false))
updateAndCheck(provider) { list =>
list.size should be (1)
list.head.attempts.head.asInstanceOf[FsApplicationAttemptInfo].logPath should not
Expand All @@ -208,7 +208,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
test("SPARK-5582: empty log directory") {
val provider = new FsHistoryProvider(createTestConf())

val logFile1 = newLogFile("app1", "", inProgress = true)
val logFile1 = newLogFile("app1", None, inProgress = true)
writeFile(logFile1, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", None),
SparkListenerApplicationEnd(2L))
Expand All @@ -224,7 +224,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
test("apps with multiple attempts") {
val provider = new FsHistoryProvider(createTestConf())

val attempt1 = newLogFile("app1", "attempt1", inProgress = false)
val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = false)
writeFile(attempt1, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
SparkListenerApplicationEnd(2L)
Expand All @@ -235,7 +235,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
list.head.attempts.size should be (1)
}

val attempt2 = newLogFile("app1", "attempt2", inProgress = true)
val attempt2 = newLogFile("app1", Some("attempt2"), inProgress = true)
writeFile(attempt2, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2"))
)
Expand All @@ -246,7 +246,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
list.head.attempts.head.attemptId should be ("attempt1")
}

val completedAttempt2 = newLogFile("app1", "attempt2", inProgress = false)
val completedAttempt2 = newLogFile("app1", Some("attempt2"), inProgress = false)
attempt2.delete()
writeFile(attempt2, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")),
Expand All @@ -260,7 +260,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
list.head.attempts.head.attemptId should be ("attempt2")
}

val app2Attempt1 = newLogFile("app2", "attempt1", inProgress = false)
val app2Attempt1 = newLogFile("app2", Some("attempt1"), inProgress = false)
writeFile(attempt2, true, None,
SparkListenerApplicationStart("app2", Some("app2"), 5L, "test", Some("attempt1")),
SparkListenerApplicationEnd(6L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class DAGSchedulerSuite
override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
override def defaultParallelism() = 2
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def applicationAttemptId(): Option[String] = None
}

/** Length of time to wait while draining listener events. */
Expand Down Expand Up @@ -404,6 +405,7 @@ class DAGSchedulerSuite
taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId): Boolean = true
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def applicationAttemptId(): Option[String] = None
}
val noKillScheduler = new DAGScheduler(
sc,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
test("Verify log file exist") {
// Verify logging directory exists
val conf = getLoggingConf(testDirPath)
val eventLogger = new EventLoggingListener("test", "", testDirPath.toUri(), conf)
val eventLogger = new EventLoggingListener("test", None, testDirPath.toUri(), conf)
eventLogger.start()

val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS)
Expand Down Expand Up @@ -95,7 +95,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
}

test("Log overwriting") {
val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test", "")
val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test", None)
val logPath = new URI(logUri).getPath
// Create file before writing the event log
new FileOutputStream(new File(logPath)).close()
Expand All @@ -108,18 +108,18 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
test("Event log name") {
// without compression
assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath(
Utils.resolveURI("/base-dir"), "app1", ""))
Utils.resolveURI("/base-dir"), "app1", None))
// with compression
assert(s"file:/base-dir/app1.lzf" ===
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", "", Some("lzf")))
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", None, Some("lzf")))
// illegal characters in app ID
assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" ===
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
"a fine:mind$dollar{bills}.1", ""))
"a fine:mind$dollar{bills}.1", None))
// illegal characters in app ID with compression
assert(s"file:/base-dir/a-fine-mind_dollar_bills__1.lz4" ===
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
"a fine:mind$dollar{bills}.1", "", Some("lz4")))
"a fine:mind$dollar{bills}.1", None, Some("lz4")))
}

/* ----------------- *
Expand All @@ -140,7 +140,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
val conf = getLoggingConf(testDirPath, compressionCodec)
extraConf.foreach { case (k, v) => conf.set(k, v) }
val logName = compressionCodec.map("test-" + _).getOrElse("test")
val eventLogger = new EventLoggingListener(logName, "", testDirPath.toUri(), conf)
val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf)
val listenerBus = new LiveListenerBus
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
125L, "Mickey", None)
Expand Down Expand Up @@ -186,7 +186,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
val eventLogPath = eventLogger.logPath
val expectedLogDir = testDir.toURI()
assert(eventLogPath === EventLoggingListener.getLogPath(
expectedLogDir, sc.applicationId, "", compressionCodec.map(CompressionCodec.getShortName)))
expectedLogDir, sc.applicationId, None, compressionCodec.map(CompressionCodec.getShortName)))

// Begin listening for events that trigger asserts
val eventExistenceListener = new EventExistenceListener(eventLogger)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
* log the events.
*/
private class EventMonster(conf: SparkConf)
extends EventLoggingListener("test", "", new URI("testdir"), conf) {
extends EventLoggingListener("test", None, new URI("testdir"), conf) {

override def start() { }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ private[spark] class YarnClusterSchedulerBackend(
super.applicationId
}

override def applicationAttemptId(): String =
override def applicationAttemptId(): Option[String] =
// In YARN Cluster mode, spark.yarn.app.attemptid is expect to be set
// before user application is launched.
// So, if spark.yarn.app.id is not set, it is something wrong.
sc.getConf.getOption("spark.yarn.app.attemptid").getOrElse {
sc.getConf.getOption("spark.yarn.app.attemptid").orElse {
logError("Application attempt ID is not set.")
super.applicationAttemptId
}
Expand Down

0 comments on commit ba34b69

Please sign in to comment.