Skip to content

Commit

Permalink
Change formula to calculate the used and wasted metrics to handle dyn…
Browse files Browse the repository at this point in the history
…amic allocation
  • Loading branch information
shankar committed May 9, 2017
1 parent e7c6d09 commit 9d550f4
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 30 deletions.
35 changes: 14 additions & 21 deletions app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,30 +51,20 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator
executorInstances <- executorInstancesOf(data)
executorMemoryBytes <- executorMemoryBytesOf(data)
} {
<<<<<<< HEAD
val totalExecutorTimeMillis = totalExecutorTimeMillisOf(data)
val totalTaskTimeMillis = totalTaskTimeMillisOf(data)

val resourcesAllocatedForUse = aggregateResourceUsage(executorMemoryBytes, totalExecutorTimeMillis)
val resourcesActuallyUsed = aggregateResourceUsage(executorMemoryBytes, totalTaskTimeMillis)

val resourcesActuallyUsedWithBuffer = resourcesActuallyUsed.doubleValue() * (1.0 + allocatedMemoryWasteBufferPercentage)
val resourcesWastedMBSeconds = (resourcesActuallyUsedWithBuffer < resourcesAllocatedForUse.doubleValue()) match {
case true => resourcesAllocatedForUse.doubleValue() - resourcesActuallyUsedWithBuffer
case false => 0.0
=======
val applicationDurationMillis = applicationDurationMillisOf(data)
if( applicationDurationMillis < 0) {
logger.warn(s"applicationDurationMillis is negative. Skipping Metrics Aggregation:${applicationDurationMillis}")
} else {
val totalExecutorTaskTimeMillis = totalExecutorTaskTimeMillisOf(data)
val totalExecutorTimeMillis = totalExecutorTimeMillisOf(data)
val totalTaskTimeMillis = totalTaskTimeMillisOf(data)

val resourcesAllocatedForUse =
aggregateresourcesAllocatedForUse(executorInstances, executorMemoryBytes, applicationDurationMillis)
val resourcesActuallyUsed = aggregateresourcesActuallyUsed(executorMemoryBytes, totalExecutorTaskTimeMillis)
logger.info(s"${data.getAppId}=> totalExecutorTime: ${totalExecutorTimeMillis}, totalTaskTime: ${totalTaskTimeMillis} executorMemoryBytes: ${executorMemoryBytes}")

val resourcesAllocatedForUse = aggregateResourceUsage(executorMemoryBytes, totalExecutorTimeMillis)
val resourcesActuallyUsed = aggregateResourceUsage(executorMemoryBytes, totalTaskTimeMillis)

val resourcesActuallyUsedWithBuffer = resourcesActuallyUsed.doubleValue() * (1.0 + allocatedMemoryWasteBufferPercentage)
val resourcesWastedMBSeconds = (resourcesActuallyUsedWithBuffer < resourcesAllocatedForUse.doubleValue()) match {
val resourcesWastedMBSeconds = (resourcesActuallyUsedWithBuffer < resourcesAllocatedForUse.doubleValue()) match {
case true => resourcesAllocatedForUse.doubleValue() - resourcesActuallyUsedWithBuffer
case false => 0.0
}
Expand All @@ -93,7 +83,6 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator
logger.warn(s"allocatedMemoryWasteBufferPercentage:${allocatedMemoryWasteBufferPercentage}")
}
hadoopAggregatedData.setResourceWasted(resourcesWastedMBSeconds.toLong)
>>>>>>> linkedin/master
}
}

Expand Down Expand Up @@ -138,9 +127,13 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator
case None => 0
case _ => stageData.tasks.get.values.map( taskData => taskData.taskMetrics match {
case None => 0
case _ => taskData.taskMetrics.get.executorRunTime +
taskData.taskMetrics.get.executorDeserializeTime +
taskData.taskMetrics.get.resultSerializationTime
case _ => {
logger.info(s"${taskData.taskMetrics.get.executorRunTime} ${taskData.taskMetrics.get.executorDeserializeTime}" +
s"${taskData.taskMetrics.get.resultSerializationTime}")
taskData.taskMetrics.get.executorRunTime +
taskData.taskMetrics.get.executorDeserializeTime +
taskData.taskMetrics.get.resultSerializationTime
}
}).sum
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,15 +220,19 @@ object ExecutorsHeuristic {

object Distribution {
def apply(values: Seq[Long]): Distribution = {
val sortedValues = values.sorted
val sortedValuesAsJava = sortedValues.map(Long.box).to[ArrayBuffer].asJava
Distribution(
sortedValues.min,
p25 = Statistics.percentile(sortedValuesAsJava, 25),
Statistics.median(sortedValuesAsJava),
p75 = Statistics.percentile(sortedValuesAsJava, 75),
sortedValues.max
)
if(values.size > 0) {
val sortedValues = values.sorted
val sortedValuesAsJava = sortedValues.map(Long.box).to[ArrayBuffer].asJava
Distribution(
sortedValues.min,
p25 = Statistics.percentile(sortedValuesAsJava, 25),
Statistics.median(sortedValuesAsJava),
p75 = Statistics.percentile(sortedValuesAsJava, 75),
sortedValues.max
)
} else {
Distribution(0,0,0,0,0)
}
}
}
}

0 comments on commit 9d550f4

Please sign in to comment.