Skip to content

Commit

Permalink
Fixed resources used/wasted computation for spark jobs - (Depends on …
Browse files Browse the repository at this point in the history
…Custom SHS - Requires peakJvmUsedMemory metric) (#287)
  • Loading branch information
skakker authored and akshayrai committed Mar 30, 2018
1 parent 233d53b commit 5f7fc68
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/
public class HadoopAggregatedData {

//variable resourceUsed is actually the resource blocked on the cluster.
private long resourceUsed = 0;
private long resourceWasted = 0;
private long totalDelay = 0;
Expand Down
40 changes: 23 additions & 17 deletions app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package com.linkedin.drelephant.spark
import com.linkedin.drelephant.analysis.{HadoopAggregatedData, HadoopApplicationData, HadoopMetricsAggregator}
import com.linkedin.drelephant.configurations.aggregator.AggregatorConfigurationData
import com.linkedin.drelephant.math.Statistics
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.data.{SparkApplicationData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.ExecutorSummary
import com.linkedin.drelephant.util.MemoryFormatUtils
import org.apache.commons.io.FileUtils
import org.apache.log4j.Logger

import scala.util.Try


Expand All @@ -47,19 +49,13 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator
}

private def aggregate(data: SparkApplicationData): Unit = for {
executorInstances <- executorInstancesOf(data)
executorMemoryBytes <- executorMemoryBytesOf(data)
} {
val applicationDurationMillis = applicationDurationMillisOf(data)
if( applicationDurationMillis < 0) {
logger.warn(s"applicationDurationMillis is negative. Skipping Metrics Aggregation:${applicationDurationMillis}")
} else {
val totalExecutorTaskTimeMillis = totalExecutorTaskTimeMillisOf(data)

val resourcesAllocatedForUse =
aggregateresourcesAllocatedForUse(executorInstances, executorMemoryBytes, applicationDurationMillis)
val resourcesActuallyUsed = aggregateresourcesActuallyUsed(executorMemoryBytes, totalExecutorTaskTimeMillis)

} else {
var (resourcesActuallyUsed, resourcesAllocatedForUse) = calculateResourceUsage(data.executorSummaries, executorMemoryBytes)
val resourcesActuallyUsedWithBuffer = resourcesActuallyUsed.doubleValue() * (1.0 + allocatedMemoryWasteBufferPercentage)
val resourcesWastedMBSeconds = (resourcesActuallyUsedWithBuffer < resourcesAllocatedForUse.doubleValue()) match {
case true => resourcesAllocatedForUse.doubleValue() - resourcesActuallyUsedWithBuffer
Expand All @@ -71,10 +67,8 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator
} else {
logger.warn(s"resourcesAllocatedForUse/resourcesWasted exceeds Long.MaxValue")
logger.warn(s"ResourceUsed: ${resourcesAllocatedForUse}")
logger.warn(s"executorInstances: ${executorInstances}")
logger.warn(s"executorMemoryBytes:${executorMemoryBytes}")
logger.warn(s"applicationDurationMillis:${applicationDurationMillis}")
logger.warn(s"totalExecutorTaskTimeMillis:${totalExecutorTaskTimeMillis}")
logger.warn(s"resourcesActuallyUsedWithBuffer:${resourcesActuallyUsedWithBuffer}")
logger.warn(s"resourcesWastedMBSeconds:${resourcesWastedMBSeconds}")
logger.warn(s"allocatedMemoryWasteBufferPercentage:${allocatedMemoryWasteBufferPercentage}")
Expand All @@ -83,16 +77,28 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator
}
}

private def aggregateresourcesActuallyUsed(executorMemoryBytes: Long, totalExecutorTaskTimeMillis: BigInt): BigInt = {
val bytesMillis = BigInt(executorMemoryBytes) * totalExecutorTaskTimeMillis
(bytesMillis / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS)))
//calculates the resource usage by summing up the resources used per executor
private def calculateResourceUsage(executorSummaries: Seq[ExecutorSummary], executorMemoryBytes: Long): (BigInt, BigInt) = {
var sumResourceUsage: BigInt = 0
var sumResourcesAllocatedForUse : BigInt = 0
executorSummaries.foreach(
executorSummary => {
var memUsedBytes: Long = executorSummary.peakJvmUsedMemory.getOrElse(JVM_USED_MEMORY, 0).asInstanceOf[Number].longValue + MemoryFormatUtils.stringToBytes(SPARK_RESERVED_MEMORY)
var timeSpent: Long = executorSummary.totalDuration
val bytesMillisUsed = BigInt(memUsedBytes) * timeSpent
val bytesMillisAllocated = BigInt(executorMemoryBytes) * timeSpent
sumResourcesAllocatedForUse += (bytesMillisAllocated / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS)))
sumResourceUsage += (bytesMillisUsed / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS)))
}
)
(sumResourceUsage, sumResourcesAllocatedForUse)
}

private def aggregateresourcesAllocatedForUse(
executorInstances: Int,
executorMemoryBytes: Long,
applicationDurationMillis: Long
): BigInt = {
): BigInt = {
val bytesMillis = BigInt(executorInstances) * BigInt(executorMemoryBytes) * BigInt(applicationDurationMillis)
(bytesMillis / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS)))
}
Expand Down Expand Up @@ -121,9 +127,9 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator
object SparkMetricsAggregator {
/** The percentage of allocated memory we expect to waste because of overhead. */
val DEFAULT_ALLOCATED_MEMORY_WASTE_BUFFER_PERCENTAGE = 0.5D

val ALLOCATED_MEMORY_WASTE_BUFFER_PERCENTAGE_KEY = "allocated_memory_waste_buffer_percentage"

val SPARK_RESERVED_MEMORY: String = "300M"
val SPARK_EXECUTOR_INSTANCES_KEY = "spark.executor.instances"
val SPARK_EXECUTOR_MEMORY_KEY = "spark.executor.memory"
val JVM_USED_MEMORY = "jvmUsedMemory"
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.Try
import com.linkedin.drelephant.spark.fetchers.statusapiv1._
import org.apache.spark.JobExecutionStatus
import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageStatus

/**
* Converters for legacy SparkApplicationData to current SparkApplicationData.
*
Expand Down
38 changes: 16 additions & 22 deletions test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ import scala.collection.JavaConverters

import com.linkedin.drelephant.analysis.ApplicationType
import com.linkedin.drelephant.configurations.aggregator.AggregatorConfigurationData
import com.linkedin.drelephant.math.Statistics
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationAttemptInfoImpl, ApplicationInfoImpl, ExecutorSummaryImpl}
import com.linkedin.drelephant.util.MemoryFormatUtils
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
import org.apache.commons.io.FileUtils
import org.scalatest.{FunSpec, Matchers}

class SparkMetricsAggregatorTest extends FunSpec with Matchers {
Expand All @@ -47,8 +50,9 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers {
}

val executorSummaries = Seq(
newFakeExecutorSummary(id = "1", totalDuration = 1000000L),
newFakeExecutorSummary(id = "2", totalDuration = 3000000L)
newFakeExecutorSummary(id = "1", totalDuration = 1000000L, Map("jvmUsedMemory" -> 394567123)),
newFakeExecutorSummary(id = "2", totalDuration = 3000000L, Map("jvmUsedMemory" -> 23456834))

)
val restDerivedData = {
SparkRestDerivedData(
Expand All @@ -60,7 +64,7 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers {
)
}

describe("when it has log-derived data") {
describe("when it has data") {
val logDerivedData = {
val environmentUpdate = newFakeSparkListenerEnvironmentUpdate(
Map(
Expand All @@ -82,31 +86,20 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers {

val result = aggregator.getResult

it("calculates resources used") {
val totalExecutorMemoryMb = 2 * 4096
val applicationDurationSeconds = 8000
val executorMemoryMb = 4096
val totalExecutorTaskTimeSeconds = 1000 + 3000
result.getResourceUsed should be(totalExecutorMemoryMb * applicationDurationSeconds)
it("calculates resources used (allocated)") {
result.getResourceUsed should be(4096000+12288000)
}

it("calculates resources wasted") {
val totalExecutorMemoryMb = 2 * 4096
val applicationDurationSeconds = 8000
val resourceAllocated = totalExecutorMemoryMb * applicationDurationSeconds;

val executorMemoryMb = 4096
val totalExecutorTaskTimeSeconds = 1000 + 3000
val resourceUsed = executorMemoryMb * totalExecutorTaskTimeSeconds;


result.getResourceWasted should be(resourceAllocated - resourceUsed * 1.5)
val resourceAllocated = 4096000+12288000
val resourceUsed = 676288+967110
result.getResourceWasted should be(resourceAllocated.toDouble - resourceUsed.toDouble * 1.5)
}

it("doesn't calculate total delay") {
result.getTotalDelay should be(0L)
}
it("sets resourceused as 0 when duration is negative") {
it("sets resource used as 0 when duration is negative") {
//make the duration negative
val applicationInfo = {
val applicationAttemptInfo = {
Expand Down Expand Up @@ -180,7 +173,8 @@ object SparkMetricsAggregatorTest {

def newFakeExecutorSummary(
id: String,
totalDuration: Long
totalDuration: Long,
peakJvmUsedMemory: Map[String, Long]
): ExecutorSummaryImpl = new ExecutorSummaryImpl(
id,
hostPort = "",
Expand All @@ -199,7 +193,7 @@ object SparkMetricsAggregatorTest {
totalGCTime = 0,
totalMemoryBytesSpilled = 0,
executorLogs = Map.empty,
peakJvmUsedMemory = Map.empty,
peakJvmUsedMemory,
peakUnifiedMemory = Map.empty
)
}

0 comments on commit 5f7fc68

Please sign in to comment.