Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into SPARK-8639
Browse files Browse the repository at this point in the history
  • Loading branch information
Rosstin committed Jun 29, 2015
2 parents 21ac1e5 + c6ba2ea commit 2cd2985
Show file tree
Hide file tree
Showing 38 changed files with 639 additions and 331 deletions.
8 changes: 8 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_dagScheduler = ds
}

/**
* A unique identifier for the Spark application.
* Its format depends on the scheduler implementation.
* (i.e.
* in case of local spark app something like 'local-1433865536131'
* in case of YARN something like 'application_1433865536131_34483'
* )
*/
def applicationId: String = _applicationId
def applicationAttemptId: Option[String] = _applicationAttemptId

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
replayBus.addListener(appListener)
val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)

appInfo.foreach { app => ui.setAppName(s"${app.name} ($appId)") }
ui.setAppName(s"${appInfo.name} ($appId)")

val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
Expand Down Expand Up @@ -282,12 +282,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val newAttempts = logs.flatMap { fileStatus =>
try {
val res = replay(fileStatus, bus)
res match {
case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully.")
case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
"The application may have not started.")
}
res
logInfo(s"Application log ${res.logPath} loaded successfully.")
Some(res)
} catch {
case e: Exception =>
logError(
Expand Down Expand Up @@ -433,11 +429,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

/**
* Replays the events in the specified log file and returns information about the associated
* application. Return `None` if the application ID cannot be located.
* application.
*/
private def replay(
eventLog: FileStatus,
bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = {
val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath")
val logInput =
Expand All @@ -451,18 +445,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val appCompleted = isApplicationCompleted(eventLog)
bus.addListener(appListener)
bus.replay(logInput, logPath.toString, !appCompleted)
appListener.appId.map { appId =>
new FsApplicationAttemptInfo(
logPath.getName(),
appListener.appName.getOrElse(NOT_STARTED),
appId,
appListener.appAttemptId,
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog).get,
appListener.sparkUser.getOrElse(NOT_STARTED),
appCompleted)
}
new FsApplicationAttemptInfo(
logPath.getName(),
appListener.appName.getOrElse(NOT_STARTED),
appListener.appId.getOrElse(logPath.getName()),
appListener.appAttemptId,
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog).get,
appListener.sparkUser.getOrElse(NOT_STARTED),
appCompleted)
} finally {
logInput.close()
}
Expand Down
33 changes: 33 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2333,3 +2333,36 @@ private[spark] class RedirectThread(
}
}
}

/**
* An [[OutputStream]] that will store the last 10 kilobytes (by default) written to it
* in a circular buffer. The current contents of the buffer can be accessed using
* the toString method.
*/
private[spark] class CircularBuffer(sizeInBytes: Int = 10240) extends java.io.OutputStream {
var pos: Int = 0
var buffer = new Array[Int](sizeInBytes)

def write(i: Int): Unit = {
buffer(pos) = i
pos = (pos + 1) % buffer.length
}

override def toString: String = {
val (end, start) = buffer.splitAt(pos)
val input = new java.io.InputStream {
val iterator = (start ++ end).iterator

def read(): Int = if (iterator.hasNext) iterator.next() else -1
}
val reader = new BufferedReader(new InputStreamReader(input))
val stringBuilder = new StringBuilder
var line = reader.readLine()
while (line != null) {
stringBuilder.append(line)
stringBuilder.append("\n")
line = reader.readLine()
}
stringBuilder.toString()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,33 +67,29 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
// Write a new-style application log.
val newAppComplete = newLogFile("new1", None, inProgress = false)
writeFile(newAppComplete, true, None,
SparkListenerApplicationStart(
"new-app-complete", Some("new-app-complete"), 1L, "test", None),
SparkListenerApplicationStart("new-app-complete", None, 1L, "test", None),
SparkListenerApplicationEnd(5L)
)

// Write a new-style application log.
val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false,
Some("lzf"))
writeFile(newAppCompressedComplete, true, None,
SparkListenerApplicationStart(
"new-app-compressed-complete", Some("new-app-compressed-complete"), 1L, "test", None),
SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test", None),
SparkListenerApplicationEnd(4L))

// Write an unfinished app, new-style.
val newAppIncomplete = newLogFile("new2", None, inProgress = true)
writeFile(newAppIncomplete, true, None,
SparkListenerApplicationStart(
"new-app-incomplete", Some("new-app-incomplete"), 1L, "test", None)
SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test", None)
)

// Write an old-style application log.
val oldAppComplete = new File(testDir, "old1")
oldAppComplete.mkdir()
createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
SparkListenerApplicationStart(
"old-app-complete", Some("old-app-complete"), 2L, "test", None),
SparkListenerApplicationStart("old-app-complete", None, 2L, "test", None),
SparkListenerApplicationEnd(3L)
)
createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))
Expand All @@ -107,8 +103,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
oldAppIncomplete.mkdir()
createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
SparkListenerApplicationStart(
"old-app-incomplete", Some("old-app-incomplete"), 2L, "test", None)
SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test", None)
)

// Force a reload of data from the log directory, and check that both logs are loaded.
Expand All @@ -129,16 +124,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
}

list(0) should be (makeAppInfo("new-app-complete", "new-app-complete", 1L, 5L,
list(0) should be (makeAppInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
newAppComplete.lastModified(), "test", true))
list(1) should be (makeAppInfo("new-app-compressed-complete",
list(1) should be (makeAppInfo(newAppCompressedComplete.getName(),
"new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test",
true))
list(2) should be (makeAppInfo("old-app-complete", "old-app-complete", 2L, 3L,
list(2) should be (makeAppInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
oldAppComplete.lastModified(), "test", true))
list(3) should be (makeAppInfo("old-app-incomplete", "old-app-incomplete", 2L, -1L,
list(3) should be (makeAppInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, -1L,
oldAppIncomplete.lastModified(), "test", false))
list(4) should be (makeAppInfo("new-app-incomplete", "new-app-incomplete", 1L, -1L,
list(4) should be (makeAppInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L, -1L,
newAppIncomplete.lastModified(), "test", false))

// Make sure the UI can be rendered.
Expand All @@ -162,7 +157,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
logDir.mkdir()
createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec),
SparkListenerApplicationStart("app2", Some("app2"), 2L, "test", None),
SparkListenerApplicationStart("app2", None, 2L, "test", None),
SparkListenerApplicationEnd(3L)
)
createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName))
Expand All @@ -185,12 +180,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
test("SPARK-3697: ignore directories that cannot be read.") {
val logFile1 = newLogFile("new1", None, inProgress = false)
writeFile(logFile1, true, None,
SparkListenerApplicationStart("app1-1", Some("app1-1"), 1L, "test", None),
SparkListenerApplicationStart("app1-1", None, 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
val logFile2 = newLogFile("new2", None, inProgress = false)
writeFile(logFile2, true, None,
SparkListenerApplicationStart("app1-2", Some("app1-2"), 1L, "test", None),
SparkListenerApplicationStart("app1-2", None, 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
logFile2.setReadable(false, false)
Expand Down Expand Up @@ -223,18 +218,6 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
}
}

test("Parse logs that application is not started") {
val provider = new FsHistoryProvider((createTestConf()))

val logFile1 = newLogFile("app1", None, inProgress = true)
writeFile(logFile1, true, None,
SparkListenerLogStart("1.4")
)
updateAndCheck(provider) { list =>
list.size should be (0)
}
}

test("SPARK-5582: empty log directory") {
val provider = new FsHistoryProvider(createTestConf())

Expand Down
8 changes: 8 additions & 0 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -673,4 +673,12 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assert(!Utils.isInDirectory(nullFile, parentDir))
assert(!Utils.isInDirectory(nullFile, childFile3))
}

test("circular buffer") {
val buffer = new CircularBuffer(25)
val stream = new java.io.PrintStream(buffer, true, "UTF-8")

stream.println("test circular test circular test circular test circular test circular")
assert(buffer.toString === "t circular test circular\n")
}
}
15 changes: 15 additions & 0 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,21 @@ def version(self):
"""
return self._jsc.version()

@property
@ignore_unicode_prefix
def applicationId(self):
"""
A unique identifier for the Spark application.
Its format depends on the scheduler implementation.
(i.e.
in case of local spark app something like 'local-1433865536131'
in case of YARN something like 'application_1433865536131_34483'
)
>>> sc.applicationId # doctest: +ELLIPSIS
u'local-...'
"""
return self._jsc.sc().applicationId()

@property
def startTime(self):
"""Return the epoch time when the Spark Context was started."""
Expand Down
14 changes: 14 additions & 0 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
'monotonicallyIncreasingId',
'rand',
'randn',
'sha1',
'sha2',
'sparkPartitionId',
'struct',
Expand Down Expand Up @@ -382,6 +383,19 @@ def sha2(col, numBits):
return Column(jc)


@ignore_unicode_prefix
@since(1.5)
def sha1(col):
"""Returns the hex string result of SHA-1.
>>> sqlContext.createDataFrame([('ABC',)], ['a']).select(sha1('a').alias('hash')).collect()
[Row(hash=u'3c01bdbb26f358bab27f267924aa2c9a03fcfdb8')]
"""
sc = SparkContext._active_spark_context
jc = sc._jvm.functions.sha1(_to_java_column(col))
return Column(jc)


@since(1.4)
def sparkPartitionId():
"""A column for partition ID of the Spark task.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ object FunctionRegistry {
expression[Expm1]("expm1"),
expression[Floor]("floor"),
expression[Hypot]("hypot"),
expression[Hex]("hex"),
expression[Logarithm]("log"),
expression[Log]("ln"),
expression[Log10]("log10"),
Expand All @@ -136,6 +137,8 @@ object FunctionRegistry {
// misc functions
expression[Md5]("md5"),
expression[Sha2]("sha2"),
expression[Sha1]("sha1"),
expression[Sha1]("sha"),

// aggregate functions
expression[Average]("avg"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,15 @@ final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableR
def this(dataTypes: Seq[DataType]) =
this(
dataTypes.map {
case IntegerType => new MutableInt
case BooleanType => new MutableBoolean
case ByteType => new MutableByte
case FloatType => new MutableFloat
case ShortType => new MutableShort
// We use INT for DATE internally
case IntegerType | DateType => new MutableInt
// We use Long for Timestamp internally
case LongType | TimestampType => new MutableLong
case FloatType => new MutableFloat
case DoubleType => new MutableDouble
case BooleanType => new MutableBoolean
case LongType => new MutableLong
case DateType => new MutableInt // We use INT for DATE internally
case TimestampType => new MutableLong // We use Long for Timestamp internally
case _ => new MutableAny
}.toArray)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,12 @@ private object UnsafeColumnWriter {
case BooleanType => BooleanUnsafeColumnWriter
case ByteType => ByteUnsafeColumnWriter
case ShortType => ShortUnsafeColumnWriter
case IntegerType => IntUnsafeColumnWriter
case LongType => LongUnsafeColumnWriter
case IntegerType | DateType => IntUnsafeColumnWriter
case LongType | TimestampType => LongUnsafeColumnWriter
case FloatType => FloatUnsafeColumnWriter
case DoubleType => DoubleUnsafeColumnWriter
case StringType => StringUnsafeColumnWriter
case BinaryType => BinaryUnsafeColumnWriter
case DateType => IntUnsafeColumnWriter
case TimestampType => LongUnsafeColumnWriter
case t =>
throw new UnsupportedOperationException(s"Do not know how to write columns of type $t")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,13 @@ class CodeGenContext {
case BooleanType => JAVA_BOOLEAN
case ByteType => JAVA_BYTE
case ShortType => JAVA_SHORT
case IntegerType => JAVA_INT
case LongType => JAVA_LONG
case IntegerType | DateType => JAVA_INT
case LongType | TimestampType => JAVA_LONG
case FloatType => JAVA_FLOAT
case DoubleType => JAVA_DOUBLE
case dt: DecimalType => decimalType
case BinaryType => "byte[]"
case StringType => stringType
case DateType => JAVA_INT
case TimestampType => JAVA_LONG
case dt: OpenHashSetUDT if dt.elementType == IntegerType => classOf[IntegerHashSet].getName
case dt: OpenHashSetUDT if dt.elementType == LongType => classOf[LongHashSet].getName
case _ => "Object"
Expand Down
Loading

0 comments on commit 2cd2985

Please sign in to comment.