From 7f7717f291061d8b5a1305e42e6468e55aa5cf49 Mon Sep 17 00:00:00 2001 From: swasti Date: Tue, 12 Sep 2017 13:08:14 +0530 Subject: [PATCH] fixed resources used and wasted computation for spark jobs --- .../spark/SparkMetricsAggregator.scala | 33 ++++++++++++------- .../fetchers/statusapiv1/statusapiv1.scala | 6 ++-- .../legacydata/LegacyDataConverters.scala | 3 +- .../spark/SparkMetricsAggregatorTest.scala | 33 ++++++++----------- .../heuristics/ExecutorsHeuristicTest.scala | 3 +- 5 files changed, 44 insertions(+), 34 deletions(-) diff --git a/app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala b/app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala index 135ebbd34..35bab9b5f 100644 --- a/app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala +++ b/app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala @@ -20,14 +20,17 @@ import com.linkedin.drelephant.analysis.{HadoopAggregatedData, HadoopApplication import com.linkedin.drelephant.configurations.aggregator.AggregatorConfigurationData import com.linkedin.drelephant.math.Statistics import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData} +import com.linkedin.drelephant.spark.fetchers.statusapiv1.ExecutorSummary import com.linkedin.drelephant.util.MemoryFormatUtils import org.apache.commons.io.FileUtils import org.apache.log4j.Logger + import scala.util.Try class SparkMetricsAggregator(private val aggregatorConfigurationData: AggregatorConfigurationData) - extends HadoopMetricsAggregator { + extends HadoopMetricsAggregator { + import SparkMetricsAggregator._ private val logger: Logger = Logger.getLogger(classOf[SparkMetricsAggregator]) @@ -53,13 +56,11 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator val applicationDurationMillis = applicationDurationMillisOf(data) if( applicationDurationMillis < 0) { logger.warn(s"applicationDurationMillis is negative. Skipping Metrics Aggregation:${applicationDurationMillis}") - } else { + } else { val totalExecutorTaskTimeMillis = totalExecutorTaskTimeMillisOf(data) - val resourcesAllocatedForUse = aggregateresourcesAllocatedForUse(executorInstances, executorMemoryBytes, applicationDurationMillis) - val resourcesActuallyUsed = aggregateresourcesActuallyUsed(executorMemoryBytes, totalExecutorTaskTimeMillis) - + val resourcesActuallyUsed = calculateResourceUsage(data.executorSummaries) val resourcesActuallyUsedWithBuffer = resourcesActuallyUsed.doubleValue() * (1.0 + allocatedMemoryWasteBufferPercentage) val resourcesWastedMBSeconds = (resourcesActuallyUsedWithBuffer < resourcesAllocatedForUse.doubleValue()) match { case true => resourcesAllocatedForUse.doubleValue() - resourcesActuallyUsedWithBuffer @@ -67,7 +68,7 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator } //allocated is the total used resource from the cluster. if (resourcesAllocatedForUse.isValidLong) { - hadoopAggregatedData.setResourceUsed(resourcesAllocatedForUse.toLong) + hadoopAggregatedData.setResourceUsed(resourcesActuallyUsed.toLong) } else { logger.warn(s"resourcesAllocatedForUse/resourcesWasted exceeds Long.MaxValue") logger.warn(s"ResourceUsed: ${resourcesAllocatedForUse}") @@ -83,16 +84,25 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator } } - private def aggregateresourcesActuallyUsed(executorMemoryBytes: Long, totalExecutorTaskTimeMillis: BigInt): BigInt = { - val bytesMillis = BigInt(executorMemoryBytes) * totalExecutorTaskTimeMillis - (bytesMillis / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS))) + //calculates the resource usage by summing up the resources used per executor + private def calculateResourceUsage(executorSummaries: Seq[ExecutorSummary]): BigInt = { + var sumResourceUsage: BigInt = 0 + executorSummaries.foreach( + executorSummary => { + var memUsed: Long = executorSummary.peakJvmUsedMemory.getOrElse(JVM_USED_MEMORY, 0) //+ MemoryFormatUtils.stringToBytes("300M") + var timeSpent: Long = executorSummary.totalDuration + val bytesMillis = BigInt(memUsed) * timeSpent + sumResourceUsage += (bytesMillis / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS))) + } + ) + return sumResourceUsage } private def aggregateresourcesAllocatedForUse( executorInstances: Int, executorMemoryBytes: Long, applicationDurationMillis: Long - ): BigInt = { + ): BigInt = { val bytesMillis = BigInt(executorInstances) * BigInt(executorMemoryBytes) * BigInt(applicationDurationMillis) (bytesMillis / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS))) } @@ -123,7 +133,8 @@ object SparkMetricsAggregator { val DEFAULT_ALLOCATED_MEMORY_WASTE_BUFFER_PERCENTAGE = 0.5D val ALLOCATED_MEMORY_WASTE_BUFFER_PERCENTAGE_KEY = "allocated_memory_waste_buffer_percentage" - + val SPARK_RESERVED_MEMORY: String = "300M" val SPARK_EXECUTOR_INSTANCES_KEY = "spark.executor.instances" val SPARK_EXECUTOR_MEMORY_KEY = "spark.executor.memory" + val JVM_USED_MEMORY = "jvmUsedMemory" } diff --git a/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala b/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala index 1b013c0f3..406c6fbcd 100644 --- a/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala +++ b/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala @@ -87,7 +87,8 @@ trait ExecutorSummary{ def totalShuffleRead: Long def totalShuffleWrite: Long def maxMemory: Long - def executorLogs: Map[String, String]} + def executorLogs: Map[String, String] + def peakJvmUsedMemory: Map[String, Long]} trait JobData{ def jobId: Int @@ -292,7 +293,8 @@ class ExecutorSummaryImpl( var totalShuffleRead: Long, var totalShuffleWrite: Long, var maxMemory: Long, - var executorLogs: Map[String, String]) extends ExecutorSummary + var executorLogs: Map[String, String], + var peakJvmUsedMemory: Map[String, Long]) extends ExecutorSummary class JobDataImpl( var jobId: Int, diff --git a/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala b/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala index 0c7412fe0..8d46072de 100644 --- a/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala +++ b/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala @@ -173,7 +173,8 @@ object LegacyDataConverters { executorInfo.shuffleRead, executorInfo.shuffleWrite, executorInfo.maxMem, - executorLogs = Map.empty + executorLogs = Map.empty, + peakJvmUsedMemory = Map.empty ) } diff --git a/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala b/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala index 3947fdf3f..64a361646 100644 --- a/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala +++ b/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala @@ -19,12 +19,13 @@ package com.linkedin.drelephant.spark import java.util.Date import scala.collection.JavaConverters - import com.linkedin.drelephant.analysis.ApplicationType import com.linkedin.drelephant.configurations.aggregator.AggregatorConfigurationData +import com.linkedin.drelephant.math.Statistics import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData} import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationAttemptInfoImpl, ApplicationInfoImpl, ExecutorSummaryImpl} import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate +import org.apache.commons.io.FileUtils import org.scalatest.{FunSpec, Matchers} class SparkMetricsAggregatorTest extends FunSpec with Matchers { @@ -47,8 +48,9 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers { } val executorSummaries = Seq( - newFakeExecutorSummary(id = "1", totalDuration = 1000000L), - newFakeExecutorSummary(id = "2", totalDuration = 3000000L) + newFakeExecutorSummary(id = "1", totalDuration = 1000000L, Map("jvmUsedMemory" -> 394567123)), + newFakeExecutorSummary(id = "2", totalDuration = 3000000L, Map("jvmUsedMemory" -> 23456834)) + ) val restDerivedData = { SparkRestDerivedData( @@ -59,7 +61,7 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers { ) } - describe("when it has log-derived data") { + describe("when it has data") { val logDerivedData = { val environmentUpdate = newFakeSparkListenerEnvironmentUpdate( Map( @@ -82,30 +84,21 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers { val result = aggregator.getResult it("calculates resources used") { - val totalExecutorMemoryMb = 2 * 4096 - val applicationDurationSeconds = 8000 - val executorMemoryMb = 4096 - val totalExecutorTaskTimeSeconds = 1000 + 3000 - result.getResourceUsed should be(totalExecutorMemoryMb * applicationDurationSeconds) + result.getResourceUsed should be(376288+67110) } it("calculates resources wasted") { val totalExecutorMemoryMb = 2 * 4096 val applicationDurationSeconds = 8000 val resourceAllocated = totalExecutorMemoryMb * applicationDurationSeconds; - - val executorMemoryMb = 4096 - val totalExecutorTaskTimeSeconds = 1000 + 3000 - val resourceUsed = executorMemoryMb * totalExecutorTaskTimeSeconds; - - - result.getResourceWasted should be(resourceAllocated - resourceUsed * 1.5) + val resourceUsed = 376288+67110 + result.getResourceWasted should be(resourceAllocated.toDouble - resourceUsed.toDouble * 1.5) } it("doesn't calculate total delay") { result.getTotalDelay should be(0L) } - it("sets resourceused as 0 when duration is negative") { + it("sets resource used as 0 when duration is negative") { //make the duration negative val applicationInfo = { val applicationAttemptInfo = { @@ -178,7 +171,8 @@ object SparkMetricsAggregatorTest { def newFakeExecutorSummary( id: String, - totalDuration: Long + totalDuration: Long, + peakJvmUsedMemory: Map[String, Long] ): ExecutorSummaryImpl = new ExecutorSummaryImpl( id, hostPort = "", @@ -194,6 +188,7 @@ object SparkMetricsAggregatorTest { totalShuffleRead = 0, totalShuffleWrite = 0, maxMemory = 0, - executorLogs = Map.empty + executorLogs = Map.empty, + peakJvmUsedMemory ) } diff --git a/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala index dfdcf4a15..74b584ff3 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala @@ -249,7 +249,8 @@ object ExecutorsHeuristicTest { totalShuffleRead, totalShuffleWrite, maxMemory, - executorLogs = Map.empty + executorLogs = Map.empty, + peakJvmUsedMemory = Map.empty ) def newFakeSparkApplicationData(executorSummaries: Seq[ExecutorSummaryImpl]): SparkApplicationData = {