Skip to content

Commit

Permalink
Aggregate spark metrics during run time instead of post processing by…
Browse files Browse the repository at this point in the history
… default (#358)
  • Loading branch information
gerashegalov authored and tovbinm committed Jul 10, 2019
1 parent 3b82d96 commit 3e0ae2c
Show file tree
Hide file tree
Showing 2 changed files with 240 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,15 @@

package com.salesforce.op.utils.spark

import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind.SerializerProvider
import com.fasterxml.jackson.databind.ser.std.StdSerializer
import com.salesforce.op.utils.date.DateTimeUtils
import com.salesforce.op.utils.json.JsonLike
import com.salesforce.op.utils.json.{JsonLike, JsonUtils, SerDes}
import com.salesforce.op.utils.version.VersionInfo
import com.twitter.algebird.Max
import com.twitter.algebird.Operators._
import com.twitter.algebird.macros.caseclass
import org.apache.spark.scheduler._
import org.joda.time.Duration
import org.joda.time.format.PeriodFormatterBuilder
Expand Down Expand Up @@ -71,6 +77,7 @@ class OpSparkListener
(now, now, now)
}
private val stageMetrics = ArrayBuffer.empty[StageMetrics]
private var cumulativeStageMetrics: CumulativeStageMetrics = CumulativeStageMetrics.zero

val logPrefix: String = "%s:%s,RUN_TYPE:%s,APP:%s,APP_ID:%s".format(
customTagName.getOrElse("APP_NAME"),
Expand All @@ -92,13 +99,20 @@ class OpSparkListener
appEndTime = appEndTime,
appDuration = appEndTime - appStartTime,
stageMetrics = stageMetrics.toList,
cumulativeStageMetrics = cumulativeStageMetrics,
versionInfo = VersionInfo()
)

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
val si = stageCompleted.stageInfo
val tm = si.taskMetrics
if (collectStageMetrics) stageMetrics += StageMetrics(si)
val sm = StageMetrics(si)
if (collectStageMetrics) {
stageMetrics += sm
}

cumulativeStageMetrics = CumulativeStageMetrics.plus(cumulativeStageMetrics, sm)

if (logStageMetrics) {
log.info("{},STAGE:{},MEMORY_SPILLED_BYTES:{},GC_TIME_MS:{},STAGE_TIME_MS:{}",
logPrefix, si.name, tm.memoryBytesSpilled.toString, tm.jvmGCTime.toString, tm.executorRunTime.toString
Expand Down Expand Up @@ -129,6 +143,23 @@ class OpSparkListener

}

trait MetricJsonLike extends JsonLike {
override def toJson(pretty: Boolean): String = {
JsonUtils.toJsonString(this, pretty = pretty, Seq(SerDes[Max[Long]](
classOf[Max[Long]],
new StdSerializer[Max[Long]](classOf[Max[Long]]) {
override def serialize(
value: Max[Long],
gen: JsonGenerator, provider: SerializerProvider
): Unit = {
gen.writeNumber(value.get)
}
},
null // not necessary
)))
}
}

/**
* App metrics container.
* Contains the app info, all the stage metrics computed by the spark listener and project version info.
Expand All @@ -144,8 +175,9 @@ case class AppMetrics
appEndTime: Long,
appDuration: Long,
stageMetrics: Seq[StageMetrics],
cumulativeStageMetrics: CumulativeStageMetrics,
versionInfo: VersionInfo
) extends JsonLike {
) extends MetricJsonLike {

def appDurationPretty: String = {
val duration = new Duration(appDuration)
Expand All @@ -157,6 +189,35 @@ case class AppMetrics
}
}


trait BaseStageMetrics {
def numTasks: Int
def numAccumulables: Int
def executorRunTime: Long
def executorCpuTime: Long
def executorDeserializeTime: Long
def executorDeserializeCpuTime: Long
def resultSerializationTime: Long
def jvmGCTime: Long
def resultSizeBytes: Long
def numUpdatedBlockStatuses: Int
def diskBytesSpilled: Long
def memoryBytesSpilled: Long
def recordsRead: Long
def bytesRead: Long
def recordsWritten: Long
def bytesWritten: Long
def shuffleFetchWaitTime: Long
def shuffleTotalBytesRead: Long
def shuffleTotalBlocksFetched: Long
def shuffleLocalBlocksFetched: Long
def shuffleRemoteBlocksFetched: Long
def shuffleWriteTime: Long
def shuffleBytesWritten: Long
def shuffleRecordsWritten: Long
def duration: Option[Long]
}

/**
* Spark stage metrics container for a [[org.apache.spark.scheduler.StageInfo]]
* Note: all the time values are in milliseconds.
Expand All @@ -173,7 +234,6 @@ case class StageMetrics private
failureReason: Option[String],
submissionTime: Option[Long],
completionTime: Option[Long],
duration: Option[Long],
executorRunTime: Long,
executorCpuTime: Long,
executorDeserializeTime: Long,
Expand All @@ -197,7 +257,13 @@ case class StageMetrics private
shuffleWriteTime: Long,
shuffleBytesWritten: Long,
shuffleRecordsWritten: Long
) extends JsonLike
) extends BaseStageMetrics with MetricJsonLike {
override val duration: Option[Long] =
for {
c <- completionTime
s <- submissionTime
} yield c - s
}

object StageMetrics {
/**
Expand Down Expand Up @@ -226,7 +292,6 @@ object StageMetrics {
failureReason = si.failureReason,
submissionTime = si.submissionTime,
completionTime = si.completionTime,
duration = for {s <- si.submissionTime; c <- si.completionTime} yield c - s,
executorRunTime = tm.executorRunTime,
executorCpuTime = toMillis(tm.executorCpuTime),
executorDeserializeTime = tm.executorDeserializeTime,
Expand All @@ -253,3 +318,95 @@ object StageMetrics {
)
}
}

case class CumulativeStageMetrics
(
numTasks: Int,
numAccumulables: Int,
executorRunTime: Long,
executorCpuTime: Long,
executorDeserializeTime: Long,
executorDeserializeCpuTime: Long,
resultSerializationTime: Long,
jvmGCTime: Long,
resultSizeBytes: Long,
numUpdatedBlockStatuses: Int,
diskBytesSpilled: Long,
memoryBytesSpilled: Long,
peakExecutionMemory: Max[Long],
recordsRead: Long,
bytesRead: Long,
recordsWritten: Long,
bytesWritten: Long,
shuffleFetchWaitTime: Long,
shuffleTotalBytesRead: Long,
shuffleTotalBlocksFetched: Long,
shuffleLocalBlocksFetched: Long,
shuffleRemoteBlocksFetched: Long,
shuffleWriteTime: Long,
shuffleBytesWritten: Long,
shuffleRecordsWritten: Long,
duration: Option[Long] = None
) extends BaseStageMetrics with MetricJsonLike

object CumulativeStageMetrics {
implicit val stageSG = caseclass.semigroup[CumulativeStageMetrics]

val zero: CumulativeStageMetrics = CumulativeStageMetrics(
numTasks = 0,
numAccumulables = 0,
executorRunTime = 0L,
executorCpuTime = 0L,
executorDeserializeTime = 0L,
executorDeserializeCpuTime = 0L,
resultSerializationTime = 0L,
jvmGCTime = 0L,
resultSizeBytes = 0L,
numUpdatedBlockStatuses = 0,
diskBytesSpilled = 0L,
memoryBytesSpilled = 0L,
peakExecutionMemory = Max(0L),
recordsRead = 0L,
bytesRead = 0L,
recordsWritten = 0L,
bytesWritten = 0L,
shuffleFetchWaitTime = 0L,
shuffleTotalBytesRead = 0L,
shuffleTotalBlocksFetched = 0L,
shuffleLocalBlocksFetched = 0L,
shuffleRemoteBlocksFetched = 0L,
shuffleWriteTime = 0L,
shuffleBytesWritten = 0L,
shuffleRecordsWritten = 0L
)

def plus(csm: CumulativeStageMetrics, sm: StageMetrics): CumulativeStageMetrics = csm +
CumulativeStageMetrics(
numTasks = sm.numTasks,
numAccumulables = sm.numAccumulables,
executorRunTime = sm.executorRunTime,
executorCpuTime = sm.executorCpuTime,
executorDeserializeTime = sm.executorDeserializeTime,
executorDeserializeCpuTime = sm.executorDeserializeCpuTime,
resultSerializationTime = sm.resultSerializationTime,
jvmGCTime = sm.jvmGCTime,
resultSizeBytes = sm.resultSizeBytes,
numUpdatedBlockStatuses = sm.numUpdatedBlockStatuses,
diskBytesSpilled = sm.diskBytesSpilled,
memoryBytesSpilled = sm.memoryBytesSpilled,
peakExecutionMemory = Max(sm.peakExecutionMemory),
recordsRead = sm.recordsRead,
bytesRead = sm.bytesRead,
recordsWritten = sm.recordsWritten,
bytesWritten = sm.bytesWritten,
shuffleFetchWaitTime = sm.shuffleFetchWaitTime,
shuffleTotalBytesRead = sm.shuffleTotalBytesRead,
shuffleTotalBlocksFetched = sm.shuffleTotalBlocksFetched,
shuffleLocalBlocksFetched = sm.shuffleLocalBlocksFetched,
shuffleRemoteBlocksFetched = sm.shuffleRemoteBlocksFetched,
shuffleWriteTime = sm.shuffleWriteTime,
shuffleBytesWritten = sm.shuffleBytesWritten,
shuffleRecordsWritten = sm.shuffleRecordsWritten,
duration = sm.duration
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ package com.salesforce.op.utils.spark

import com.salesforce.op.test.TestSparkContext
import com.salesforce.op.utils.date.DateTimeUtils
import com.twitter.algebird.Max
import com.twitter.algebird.Operators._
import com.twitter.algebird.macros.caseclass
import org.apache.log4j._
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
Expand Down Expand Up @@ -72,13 +75,17 @@ class OpSparkListenerTest extends FlatSpec with TableDrivenPropertyChecks with T
}

it should "capture app stage metrics" in {
val stageMetrics = listener.metrics.stageMetrics
val appMetrics = listener.metrics
val stageMetrics = appMetrics.stageMetrics
stageMetrics.size should be > 0
val firstStage = stageMetrics.head
firstStage.name should startWith("csv at OpSparkListenerTest.scala")
firstStage.stageId shouldBe 0
firstStage.numTasks shouldBe 1
firstStage.status shouldBe "succeeded"
val dur = firstStage.completionTime.getOrElse(0L) - firstStage.submissionTime.getOrElse(0L)
firstStage.duration shouldBe Option(dur)
firstStage.toJson(pretty = true) should include(s""""duration" : $dur""")
}

it should "log messages for listener initialization, stage completion, app completion" in {
Expand All @@ -94,6 +101,75 @@ class OpSparkListenerTest extends FlatSpec with TableDrivenPropertyChecks with T
)
forAll(messages) { m => logs.contains(m) shouldBe true }
}

it should "be able to aggregate Stage metrics" in {
implicit val stageSG = caseclass.semigroup[StageMetrics]

val sm0 = CumulativeStageMetrics(
numTasks = 1,
numAccumulables = 100,
executorRunTime = 1000L,
executorCpuTime = 700L,
executorDeserializeTime = 750L,
executorDeserializeCpuTime = 740L,
resultSerializationTime = 450L,
jvmGCTime = 2000L,
resultSizeBytes = 1L,
numUpdatedBlockStatuses = 1,
diskBytesSpilled = 1L,
memoryBytesSpilled = 1L,
peakExecutionMemory = Max(1000L),
recordsRead = 1,
bytesRead = 1,
recordsWritten = 1,
bytesWritten = 1,
shuffleFetchWaitTime = 1,
shuffleTotalBytesRead = 1,
shuffleTotalBlocksFetched = 1,
shuffleLocalBlocksFetched = 1,
shuffleRemoteBlocksFetched = 1,
shuffleWriteTime = 1,
shuffleBytesWritten = 1,
shuffleRecordsWritten = 1,
duration = Some(1)
)

val sm1 = CumulativeStageMetrics(
numTasks = 10,
numAccumulables = 100,
executorRunTime = 1000,
executorCpuTime = 700,
executorDeserializeTime = 750,
executorDeserializeCpuTime = 740,
resultSerializationTime = 450,
jvmGCTime = 2000,
resultSizeBytes = 1,
numUpdatedBlockStatuses = 1,
diskBytesSpilled = 1,
memoryBytesSpilled = 1,
peakExecutionMemory = Max(1001),
recordsRead = 1,
bytesRead = 1,
recordsWritten = 1,
bytesWritten = 1,
shuffleFetchWaitTime = 1,
shuffleTotalBytesRead = 1,
shuffleTotalBlocksFetched = 1,
shuffleLocalBlocksFetched = 1,
shuffleRemoteBlocksFetched = 1,
shuffleWriteTime = 1,
shuffleBytesWritten = 1,
shuffleRecordsWritten = 1,
duration = Some(2)
)

val total = Seq(sm0, sm1).foldLeft(CumulativeStageMetrics.zero)(_ + _)

total.peakExecutionMemory shouldBe Max(1001)
val jsonStr = total.toJson(pretty = true)
jsonStr should include ("\"peakExecutionMemory\" : 1001")
jsonStr should include ("\"duration\" : 3")
}
}

/**
Expand Down

0 comments on commit 3e0ae2c

Please sign in to comment.