Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed resources used/wasted computation for spark jobs - (Depends on Custom SHS - Requires peakJvmUsedMemory metric) #287

Merged
merged 4 commits into from
Jan 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/
public class HadoopAggregatedData {

//variable resourceUsed is actually the resource blocked on the cluster.
private long resourceUsed = 0;
private long resourceWasted = 0;
private long totalDelay = 0;
Expand Down
40 changes: 23 additions & 17 deletions app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package com.linkedin.drelephant.spark
import com.linkedin.drelephant.analysis.{HadoopAggregatedData, HadoopApplicationData, HadoopMetricsAggregator}
import com.linkedin.drelephant.configurations.aggregator.AggregatorConfigurationData
import com.linkedin.drelephant.math.Statistics
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.data.{SparkApplicationData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.ExecutorSummary
import com.linkedin.drelephant.util.MemoryFormatUtils
import org.apache.commons.io.FileUtils
import org.apache.log4j.Logger

import scala.util.Try


Expand All @@ -47,19 +49,13 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator
}

private def aggregate(data: SparkApplicationData): Unit = for {
executorInstances <- executorInstancesOf(data)
executorMemoryBytes <- executorMemoryBytesOf(data)
} {
val applicationDurationMillis = applicationDurationMillisOf(data)
if( applicationDurationMillis < 0) {
logger.warn(s"applicationDurationMillis is negative. Skipping Metrics Aggregation:${applicationDurationMillis}")
} else {
val totalExecutorTaskTimeMillis = totalExecutorTaskTimeMillisOf(data)

val resourcesAllocatedForUse =
aggregateresourcesAllocatedForUse(executorInstances, executorMemoryBytes, applicationDurationMillis)
val resourcesActuallyUsed = aggregateresourcesActuallyUsed(executorMemoryBytes, totalExecutorTaskTimeMillis)

} else {
var (resourcesActuallyUsed, resourcesAllocatedForUse) = calculateResourceUsage(data.executorSummaries, executorMemoryBytes)
val resourcesActuallyUsedWithBuffer = resourcesActuallyUsed.doubleValue() * (1.0 + allocatedMemoryWasteBufferPercentage)
val resourcesWastedMBSeconds = (resourcesActuallyUsedWithBuffer < resourcesAllocatedForUse.doubleValue()) match {
case true => resourcesAllocatedForUse.doubleValue() - resourcesActuallyUsedWithBuffer
Expand All @@ -71,10 +67,8 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator
} else {
logger.warn(s"resourcesAllocatedForUse/resourcesWasted exceeds Long.MaxValue")
logger.warn(s"ResourceUsed: ${resourcesAllocatedForUse}")
logger.warn(s"executorInstances: ${executorInstances}")
logger.warn(s"executorMemoryBytes:${executorMemoryBytes}")
logger.warn(s"applicationDurationMillis:${applicationDurationMillis}")
logger.warn(s"totalExecutorTaskTimeMillis:${totalExecutorTaskTimeMillis}")
logger.warn(s"resourcesActuallyUsedWithBuffer:${resourcesActuallyUsedWithBuffer}")
logger.warn(s"resourcesWastedMBSeconds:${resourcesWastedMBSeconds}")
logger.warn(s"allocatedMemoryWasteBufferPercentage:${allocatedMemoryWasteBufferPercentage}")
Expand All @@ -83,16 +77,28 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator
}
}

private def aggregateresourcesActuallyUsed(executorMemoryBytes: Long, totalExecutorTaskTimeMillis: BigInt): BigInt = {
val bytesMillis = BigInt(executorMemoryBytes) * totalExecutorTaskTimeMillis
(bytesMillis / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS)))
//calculates the resource usage by summing up the resources used per executor
private def calculateResourceUsage(executorSummaries: Seq[ExecutorSummary], executorMemoryBytes: Long): (BigInt, BigInt) = {
var sumResourceUsage: BigInt = 0
var sumResourcesAllocatedForUse : BigInt = 0
executorSummaries.foreach(
executorSummary => {
var memUsedBytes: Long = executorSummary.peakJvmUsedMemory.getOrElse(JVM_USED_MEMORY, 0).asInstanceOf[Number].longValue + MemoryFormatUtils.stringToBytes(SPARK_RESERVED_MEMORY)
var timeSpent: Long = executorSummary.totalDuration
val bytesMillisUsed = BigInt(memUsedBytes) * timeSpent
val bytesMillisAllocated = BigInt(executorMemoryBytes) * timeSpent
sumResourcesAllocatedForUse += (bytesMillisAllocated / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS)))
sumResourceUsage += (bytesMillisUsed / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS)))
}
)
(sumResourceUsage, sumResourcesAllocatedForUse)
}

private def aggregateresourcesAllocatedForUse(
executorInstances: Int,
executorMemoryBytes: Long,
applicationDurationMillis: Long
): BigInt = {
): BigInt = {
val bytesMillis = BigInt(executorInstances) * BigInt(executorMemoryBytes) * BigInt(applicationDurationMillis)
(bytesMillis / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS)))
}
Expand Down Expand Up @@ -121,9 +127,9 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator
object SparkMetricsAggregator {
/** The percentage of allocated memory we expect to waste because of overhead. */
val DEFAULT_ALLOCATED_MEMORY_WASTE_BUFFER_PERCENTAGE = 0.5D

val ALLOCATED_MEMORY_WASTE_BUFFER_PERCENTAGE_KEY = "allocated_memory_waste_buffer_percentage"

val SPARK_RESERVED_MEMORY: String = "300M"
val SPARK_EXECUTOR_INSTANCES_KEY = "spark.executor.instances"
val SPARK_EXECUTOR_MEMORY_KEY = "spark.executor.memory"
val JVM_USED_MEMORY = "jvmUsedMemory"
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.Try
import com.linkedin.drelephant.spark.fetchers.statusapiv1._
import org.apache.spark.JobExecutionStatus
import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageStatus

/**
* Converters for legacy SparkApplicationData to current SparkApplicationData.
*
Expand Down
38 changes: 16 additions & 22 deletions test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ import scala.collection.JavaConverters

import com.linkedin.drelephant.analysis.ApplicationType
import com.linkedin.drelephant.configurations.aggregator.AggregatorConfigurationData
import com.linkedin.drelephant.math.Statistics
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationAttemptInfoImpl, ApplicationInfoImpl, ExecutorSummaryImpl}
import com.linkedin.drelephant.util.MemoryFormatUtils
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
import org.apache.commons.io.FileUtils
import org.scalatest.{FunSpec, Matchers}

class SparkMetricsAggregatorTest extends FunSpec with Matchers {
Expand All @@ -47,8 +50,9 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers {
}

val executorSummaries = Seq(
newFakeExecutorSummary(id = "1", totalDuration = 1000000L),
newFakeExecutorSummary(id = "2", totalDuration = 3000000L)
newFakeExecutorSummary(id = "1", totalDuration = 1000000L, Map("jvmUsedMemory" -> 394567123)),
newFakeExecutorSummary(id = "2", totalDuration = 3000000L, Map("jvmUsedMemory" -> 23456834))

)
val restDerivedData = {
SparkRestDerivedData(
Expand All @@ -60,7 +64,7 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers {
)
}

describe("when it has log-derived data") {
describe("when it has data") {
val logDerivedData = {
val environmentUpdate = newFakeSparkListenerEnvironmentUpdate(
Map(
Expand All @@ -82,31 +86,20 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers {

val result = aggregator.getResult

it("calculates resources used") {
val totalExecutorMemoryMb = 2 * 4096
val applicationDurationSeconds = 8000
val executorMemoryMb = 4096
val totalExecutorTaskTimeSeconds = 1000 + 3000
result.getResourceUsed should be(totalExecutorMemoryMb * applicationDurationSeconds)
it("calculates resources used (allocated)") {
result.getResourceUsed should be(4096000+12288000)
}

it("calculates resources wasted") {
val totalExecutorMemoryMb = 2 * 4096
val applicationDurationSeconds = 8000
val resourceAllocated = totalExecutorMemoryMb * applicationDurationSeconds;

val executorMemoryMb = 4096
val totalExecutorTaskTimeSeconds = 1000 + 3000
val resourceUsed = executorMemoryMb * totalExecutorTaskTimeSeconds;


result.getResourceWasted should be(resourceAllocated - resourceUsed * 1.5)
val resourceAllocated = 4096000+12288000
val resourceUsed = 676288+967110
result.getResourceWasted should be(resourceAllocated.toDouble - resourceUsed.toDouble * 1.5)
}

it("doesn't calculate total delay") {
result.getTotalDelay should be(0L)
}
it("sets resourceused as 0 when duration is negative") {
it("sets resource used as 0 when duration is negative") {
//make the duration negative
val applicationInfo = {
val applicationAttemptInfo = {
Expand Down Expand Up @@ -180,7 +173,8 @@ object SparkMetricsAggregatorTest {

def newFakeExecutorSummary(
id: String,
totalDuration: Long
totalDuration: Long,
peakJvmUsedMemory: Map[String, Long]
): ExecutorSummaryImpl = new ExecutorSummaryImpl(
id,
hostPort = "",
Expand All @@ -199,7 +193,7 @@ object SparkMetricsAggregatorTest {
totalGCTime = 0,
totalMemoryBytesSpilled = 0,
executorLogs = Map.empty,
peakJvmUsedMemory = Map.empty,
peakJvmUsedMemory,
peakUnifiedMemory = Map.empty
)
}