diff --git a/app/com/linkedin/drelephant/analysis/HadoopAggregatedData.java b/app/com/linkedin/drelephant/analysis/HadoopAggregatedData.java index 2c88f3d7b..5c35502e4 100644 --- a/app/com/linkedin/drelephant/analysis/HadoopAggregatedData.java +++ b/app/com/linkedin/drelephant/analysis/HadoopAggregatedData.java @@ -21,6 +21,7 @@ */ public class HadoopAggregatedData { + //variable resourceUsed is actually the resource blocked on the cluster. private long resourceUsed = 0; private long resourceWasted = 0; private long totalDelay = 0; diff --git a/app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala b/app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala index 135ebbd34..c8216cdf4 100644 --- a/app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala +++ b/app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala @@ -19,10 +19,12 @@ package com.linkedin.drelephant.spark import com.linkedin.drelephant.analysis.{HadoopAggregatedData, HadoopApplicationData, HadoopMetricsAggregator} import com.linkedin.drelephant.configurations.aggregator.AggregatorConfigurationData import com.linkedin.drelephant.math.Statistics -import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData} +import com.linkedin.drelephant.spark.data.{SparkApplicationData} +import com.linkedin.drelephant.spark.fetchers.statusapiv1.ExecutorSummary import com.linkedin.drelephant.util.MemoryFormatUtils import org.apache.commons.io.FileUtils import org.apache.log4j.Logger + import scala.util.Try @@ -47,19 +49,13 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator } private def aggregate(data: SparkApplicationData): Unit = for { - executorInstances <- executorInstancesOf(data) executorMemoryBytes <- executorMemoryBytesOf(data) } { val applicationDurationMillis = applicationDurationMillisOf(data) if( applicationDurationMillis < 0) { logger.warn(s"applicationDurationMillis is negative. Skipping Metrics Aggregation:${applicationDurationMillis}") - } else { - val totalExecutorTaskTimeMillis = totalExecutorTaskTimeMillisOf(data) - - val resourcesAllocatedForUse = - aggregateresourcesAllocatedForUse(executorInstances, executorMemoryBytes, applicationDurationMillis) - val resourcesActuallyUsed = aggregateresourcesActuallyUsed(executorMemoryBytes, totalExecutorTaskTimeMillis) - + } else { + var (resourcesActuallyUsed, resourcesAllocatedForUse) = calculateResourceUsage(data.executorSummaries, executorMemoryBytes) val resourcesActuallyUsedWithBuffer = resourcesActuallyUsed.doubleValue() * (1.0 + allocatedMemoryWasteBufferPercentage) val resourcesWastedMBSeconds = (resourcesActuallyUsedWithBuffer < resourcesAllocatedForUse.doubleValue()) match { case true => resourcesAllocatedForUse.doubleValue() - resourcesActuallyUsedWithBuffer @@ -71,10 +67,8 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator } else { logger.warn(s"resourcesAllocatedForUse/resourcesWasted exceeds Long.MaxValue") logger.warn(s"ResourceUsed: ${resourcesAllocatedForUse}") - logger.warn(s"executorInstances: ${executorInstances}") logger.warn(s"executorMemoryBytes:${executorMemoryBytes}") logger.warn(s"applicationDurationMillis:${applicationDurationMillis}") - logger.warn(s"totalExecutorTaskTimeMillis:${totalExecutorTaskTimeMillis}") logger.warn(s"resourcesActuallyUsedWithBuffer:${resourcesActuallyUsedWithBuffer}") logger.warn(s"resourcesWastedMBSeconds:${resourcesWastedMBSeconds}") logger.warn(s"allocatedMemoryWasteBufferPercentage:${allocatedMemoryWasteBufferPercentage}") @@ -83,16 +77,28 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator } } - private def aggregateresourcesActuallyUsed(executorMemoryBytes: Long, totalExecutorTaskTimeMillis: BigInt): BigInt = { - val bytesMillis = BigInt(executorMemoryBytes) * totalExecutorTaskTimeMillis - (bytesMillis / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS))) + //calculates the resource usage by summing up the resources used per executor + private def calculateResourceUsage(executorSummaries: Seq[ExecutorSummary], executorMemoryBytes: Long): (BigInt, BigInt) = { + var sumResourceUsage: BigInt = 0 + var sumResourcesAllocatedForUse : BigInt = 0 + executorSummaries.foreach( + executorSummary => { + var memUsedBytes: Long = executorSummary.peakJvmUsedMemory.getOrElse(JVM_USED_MEMORY, 0).asInstanceOf[Number].longValue + MemoryFormatUtils.stringToBytes(SPARK_RESERVED_MEMORY) + var timeSpent: Long = executorSummary.totalDuration + val bytesMillisUsed = BigInt(memUsedBytes) * timeSpent + val bytesMillisAllocated = BigInt(executorMemoryBytes) * timeSpent + sumResourcesAllocatedForUse += (bytesMillisAllocated / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS))) + sumResourceUsage += (bytesMillisUsed / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS))) + } + ) + (sumResourceUsage, sumResourcesAllocatedForUse) } private def aggregateresourcesAllocatedForUse( executorInstances: Int, executorMemoryBytes: Long, applicationDurationMillis: Long - ): BigInt = { + ): BigInt = { val bytesMillis = BigInt(executorInstances) * BigInt(executorMemoryBytes) * BigInt(applicationDurationMillis) (bytesMillis / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS))) } @@ -121,9 +127,9 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator object SparkMetricsAggregator { /** The percentage of allocated memory we expect to waste because of overhead. */ val DEFAULT_ALLOCATED_MEMORY_WASTE_BUFFER_PERCENTAGE = 0.5D - val ALLOCATED_MEMORY_WASTE_BUFFER_PERCENTAGE_KEY = "allocated_memory_waste_buffer_percentage" - + val SPARK_RESERVED_MEMORY: String = "300M" val SPARK_EXECUTOR_INSTANCES_KEY = "spark.executor.instances" val SPARK_EXECUTOR_MEMORY_KEY = "spark.executor.memory" + val JVM_USED_MEMORY = "jvmUsedMemory" } diff --git a/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala b/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala index 76c317be3..2403dd8ae 100644 --- a/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala +++ b/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala @@ -24,6 +24,7 @@ import scala.util.Try import com.linkedin.drelephant.spark.fetchers.statusapiv1._ import org.apache.spark.JobExecutionStatus import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageStatus + /** * Converters for legacy SparkApplicationData to current SparkApplicationData. * diff --git a/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala b/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala index d7d6acc0a..e708d7859 100644 --- a/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala +++ b/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala @@ -22,9 +22,12 @@ import scala.collection.JavaConverters import com.linkedin.drelephant.analysis.ApplicationType import com.linkedin.drelephant.configurations.aggregator.AggregatorConfigurationData +import com.linkedin.drelephant.math.Statistics import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData} import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationAttemptInfoImpl, ApplicationInfoImpl, ExecutorSummaryImpl} +import com.linkedin.drelephant.util.MemoryFormatUtils import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate +import org.apache.commons.io.FileUtils import org.scalatest.{FunSpec, Matchers} class SparkMetricsAggregatorTest extends FunSpec with Matchers { @@ -47,8 +50,9 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers { } val executorSummaries = Seq( - newFakeExecutorSummary(id = "1", totalDuration = 1000000L), - newFakeExecutorSummary(id = "2", totalDuration = 3000000L) + newFakeExecutorSummary(id = "1", totalDuration = 1000000L, Map("jvmUsedMemory" -> 394567123)), + newFakeExecutorSummary(id = "2", totalDuration = 3000000L, Map("jvmUsedMemory" -> 23456834)) + ) val restDerivedData = { SparkRestDerivedData( @@ -60,7 +64,7 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers { ) } - describe("when it has log-derived data") { + describe("when it has data") { val logDerivedData = { val environmentUpdate = newFakeSparkListenerEnvironmentUpdate( Map( @@ -82,31 +86,20 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers { val result = aggregator.getResult - it("calculates resources used") { - val totalExecutorMemoryMb = 2 * 4096 - val applicationDurationSeconds = 8000 - val executorMemoryMb = 4096 - val totalExecutorTaskTimeSeconds = 1000 + 3000 - result.getResourceUsed should be(totalExecutorMemoryMb * applicationDurationSeconds) + it("calculates resources used (allocated)") { + result.getResourceUsed should be(4096000+12288000) } it("calculates resources wasted") { - val totalExecutorMemoryMb = 2 * 4096 - val applicationDurationSeconds = 8000 - val resourceAllocated = totalExecutorMemoryMb * applicationDurationSeconds; - - val executorMemoryMb = 4096 - val totalExecutorTaskTimeSeconds = 1000 + 3000 - val resourceUsed = executorMemoryMb * totalExecutorTaskTimeSeconds; - - - result.getResourceWasted should be(resourceAllocated - resourceUsed * 1.5) + val resourceAllocated = 4096000+12288000 + val resourceUsed = 676288+967110 + result.getResourceWasted should be(resourceAllocated.toDouble - resourceUsed.toDouble * 1.5) } it("doesn't calculate total delay") { result.getTotalDelay should be(0L) } - it("sets resourceused as 0 when duration is negative") { + it("sets resource used as 0 when duration is negative") { //make the duration negative val applicationInfo = { val applicationAttemptInfo = { @@ -180,7 +173,8 @@ object SparkMetricsAggregatorTest { def newFakeExecutorSummary( id: String, - totalDuration: Long + totalDuration: Long, + peakJvmUsedMemory: Map[String, Long] ): ExecutorSummaryImpl = new ExecutorSummaryImpl( id, hostPort = "", @@ -199,7 +193,7 @@ object SparkMetricsAggregatorTest { totalGCTime = 0, totalMemoryBytesSpilled = 0, executorLogs = Map.empty, - peakJvmUsedMemory = Map.empty, + peakJvmUsedMemory, peakUnifiedMemory = Map.empty ) }