Skip to content

Commit

Permalink
fixed resources used and wasted computation for spark jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
swasti committed Sep 12, 2017
1 parent 752a94b commit fc9b866
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
public class HadoopAggregatedData {

private long resourceUsed = 0;
//variable resourceUsed is actually the resource allocated
private long resourceWasted = 0;
private long totalDelay = 0;

Expand Down
43 changes: 25 additions & 18 deletions app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@ package com.linkedin.drelephant.spark
import com.linkedin.drelephant.analysis.{HadoopAggregatedData, HadoopApplicationData, HadoopMetricsAggregator}
import com.linkedin.drelephant.configurations.aggregator.AggregatorConfigurationData
import com.linkedin.drelephant.math.Statistics
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.data.{SparkApplicationData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.ExecutorSummary
import com.linkedin.drelephant.util.MemoryFormatUtils
import org.apache.commons.io.FileUtils
import org.apache.log4j.Logger

import scala.util.Try


class SparkMetricsAggregator(private val aggregatorConfigurationData: AggregatorConfigurationData)
extends HadoopMetricsAggregator {
extends HadoopMetricsAggregator {

import SparkMetricsAggregator._

private val logger: Logger = Logger.getLogger(classOf[SparkMetricsAggregator])
Expand All @@ -47,19 +50,13 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator
}

private def aggregate(data: SparkApplicationData): Unit = for {
executorInstances <- executorInstancesOf(data)
executorMemoryBytes <- executorMemoryBytesOf(data)
} {
val applicationDurationMillis = applicationDurationMillisOf(data)
if( applicationDurationMillis < 0) {
logger.warn(s"applicationDurationMillis is negative. Skipping Metrics Aggregation:${applicationDurationMillis}")
} else {
val totalExecutorTaskTimeMillis = totalExecutorTaskTimeMillisOf(data)

val resourcesAllocatedForUse =
aggregateresourcesAllocatedForUse(executorInstances, executorMemoryBytes, applicationDurationMillis)
val resourcesActuallyUsed = aggregateresourcesActuallyUsed(executorMemoryBytes, totalExecutorTaskTimeMillis)

} else {
var (resourcesActuallyUsed, resourcesAllocatedForUse) = calculateResourceUsage(data.executorSummaries, executorMemoryBytes)
val resourcesActuallyUsedWithBuffer = resourcesActuallyUsed.doubleValue() * (1.0 + allocatedMemoryWasteBufferPercentage)
val resourcesWastedMBSeconds = (resourcesActuallyUsedWithBuffer < resourcesAllocatedForUse.doubleValue()) match {
case true => resourcesAllocatedForUse.doubleValue() - resourcesActuallyUsedWithBuffer
Expand All @@ -71,10 +68,8 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator
} else {
logger.warn(s"resourcesAllocatedForUse/resourcesWasted exceeds Long.MaxValue")
logger.warn(s"ResourceUsed: ${resourcesAllocatedForUse}")
logger.warn(s"executorInstances: ${executorInstances}")
logger.warn(s"executorMemoryBytes:${executorMemoryBytes}")
logger.warn(s"applicationDurationMillis:${applicationDurationMillis}")
logger.warn(s"totalExecutorTaskTimeMillis:${totalExecutorTaskTimeMillis}")
logger.warn(s"resourcesActuallyUsedWithBuffer:${resourcesActuallyUsedWithBuffer}")
logger.warn(s"resourcesWastedMBSeconds:${resourcesWastedMBSeconds}")
logger.warn(s"allocatedMemoryWasteBufferPercentage:${allocatedMemoryWasteBufferPercentage}")
Expand All @@ -83,16 +78,28 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator
}
}

private def aggregateresourcesActuallyUsed(executorMemoryBytes: Long, totalExecutorTaskTimeMillis: BigInt): BigInt = {
val bytesMillis = BigInt(executorMemoryBytes) * totalExecutorTaskTimeMillis
(bytesMillis / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS)))
//calculates the resource usage by summing up the resources used per executor
private def calculateResourceUsage(executorSummaries: Seq[ExecutorSummary], executorMemoryBytes: Long): (BigInt, BigInt) = {
var sumResourceUsage: BigInt = 0
var sumResourcesAllocatedForUse : BigInt = 0
executorSummaries.foreach(
executorSummary => {
var memUsedBytes: Long = executorSummary.peakJvmUsedMemory.getOrElse(JVM_USED_MEMORY, 0).asInstanceOf[Number].longValue + MemoryFormatUtils.stringToBytes(SPARK_RESERVED_MEMORY)
var timeSpent: Long = executorSummary.totalDuration
val bytesMillisUsed = BigInt(memUsedBytes) * timeSpent
val bytesMillisAllocated = BigInt(executorMemoryBytes) * timeSpent
sumResourcesAllocatedForUse += (bytesMillisAllocated / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS)))
sumResourceUsage += (bytesMillisUsed / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS)))
}
)
(sumResourceUsage, sumResourcesAllocatedForUse)
}

private def aggregateresourcesAllocatedForUse(
executorInstances: Int,
executorMemoryBytes: Long,
applicationDurationMillis: Long
): BigInt = {
): BigInt = {
val bytesMillis = BigInt(executorInstances) * BigInt(executorMemoryBytes) * BigInt(applicationDurationMillis)
(bytesMillis / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS)))
}
Expand Down Expand Up @@ -121,9 +128,9 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator
object SparkMetricsAggregator {
/** The percentage of allocated memory we expect to waste because of overhead. */
val DEFAULT_ALLOCATED_MEMORY_WASTE_BUFFER_PERCENTAGE = 0.5D

val ALLOCATED_MEMORY_WASTE_BUFFER_PERCENTAGE_KEY = "allocated_memory_waste_buffer_percentage"

val SPARK_RESERVED_MEMORY: String = "300M"
val SPARK_EXECUTOR_INSTANCES_KEY = "spark.executor.instances"
val SPARK_EXECUTOR_MEMORY_KEY = "spark.executor.memory"
val JVM_USED_MEMORY = "jvmUsedMemory"
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ trait ExecutorSummary{
def totalShuffleRead: Long
def totalShuffleWrite: Long
def maxMemory: Long
def executorLogs: Map[String, String]}
def executorLogs: Map[String, String]
def peakJvmUsedMemory: Map[String, Long]}

trait JobData{
def jobId: Int
Expand Down Expand Up @@ -292,7 +293,8 @@ class ExecutorSummaryImpl(
var totalShuffleRead: Long,
var totalShuffleWrite: Long,
var maxMemory: Long,
var executorLogs: Map[String, String]) extends ExecutorSummary
var executorLogs: Map[String, String],
var peakJvmUsedMemory: Map[String, Long]) extends ExecutorSummary

class JobDataImpl(
var jobId: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ object LegacyDataConverters {
executorInfo.shuffleRead,
executorInfo.shuffleWrite,
executorInfo.maxMem,
executorLogs = Map.empty
executorLogs = Map.empty,
peakJvmUsedMemory = Map.empty
)
}

Expand Down
40 changes: 17 additions & 23 deletions test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package com.linkedin.drelephant.spark
import java.util.Date

import scala.collection.JavaConverters

import com.linkedin.drelephant.analysis.ApplicationType
import com.linkedin.drelephant.configurations.aggregator.AggregatorConfigurationData
import com.linkedin.drelephant.math.Statistics
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationAttemptInfoImpl, ApplicationInfoImpl, ExecutorSummaryImpl}
import com.linkedin.drelephant.util.MemoryFormatUtils
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
import org.apache.commons.io.FileUtils
import org.scalatest.{FunSpec, Matchers}

class SparkMetricsAggregatorTest extends FunSpec with Matchers {
Expand All @@ -47,8 +49,9 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers {
}

val executorSummaries = Seq(
newFakeExecutorSummary(id = "1", totalDuration = 1000000L),
newFakeExecutorSummary(id = "2", totalDuration = 3000000L)
newFakeExecutorSummary(id = "1", totalDuration = 1000000L, Map("jvmUsedMemory" -> 394567123)),
newFakeExecutorSummary(id = "2", totalDuration = 3000000L, Map("jvmUsedMemory" -> 23456834))

)
val restDerivedData = {
SparkRestDerivedData(
Expand All @@ -59,7 +62,7 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers {
)
}

describe("when it has log-derived data") {
describe("when it has data") {
val logDerivedData = {
val environmentUpdate = newFakeSparkListenerEnvironmentUpdate(
Map(
Expand All @@ -81,31 +84,20 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers {

val result = aggregator.getResult

it("calculates resources used") {
val totalExecutorMemoryMb = 2 * 4096
val applicationDurationSeconds = 8000
val executorMemoryMb = 4096
val totalExecutorTaskTimeSeconds = 1000 + 3000
result.getResourceUsed should be(totalExecutorMemoryMb * applicationDurationSeconds)
it("calculates resources used (allocated)") {
result.getResourceUsed should be(4096000+12288000)
}

it("calculates resources wasted") {
val totalExecutorMemoryMb = 2 * 4096
val applicationDurationSeconds = 8000
val resourceAllocated = totalExecutorMemoryMb * applicationDurationSeconds;

val executorMemoryMb = 4096
val totalExecutorTaskTimeSeconds = 1000 + 3000
val resourceUsed = executorMemoryMb * totalExecutorTaskTimeSeconds;


result.getResourceWasted should be(resourceAllocated - resourceUsed * 1.5)
val resourceAllocated = 4096000+12288000
val resourceUsed = 676288+967110
result.getResourceWasted should be(resourceAllocated.toDouble - resourceUsed.toDouble * 1.5)
}

it("doesn't calculate total delay") {
result.getTotalDelay should be(0L)
}
it("sets resourceused as 0 when duration is negative") {
it("sets resource used as 0 when duration is negative") {
//make the duration negative
val applicationInfo = {
val applicationAttemptInfo = {
Expand Down Expand Up @@ -178,7 +170,8 @@ object SparkMetricsAggregatorTest {

def newFakeExecutorSummary(
id: String,
totalDuration: Long
totalDuration: Long,
peakJvmUsedMemory: Map[String, Long]
): ExecutorSummaryImpl = new ExecutorSummaryImpl(
id,
hostPort = "",
Expand All @@ -194,6 +187,7 @@ object SparkMetricsAggregatorTest {
totalShuffleRead = 0,
totalShuffleWrite = 0,
maxMemory = 0,
executorLogs = Map.empty
executorLogs = Map.empty,
peakJvmUsedMemory
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ object ExecutorsHeuristicTest {
totalShuffleRead,
totalShuffleWrite,
maxMemory,
executorLogs = Map.empty
executorLogs = Map.empty,
peakJvmUsedMemory = Map.empty
)

def newFakeSparkApplicationData(executorSummaries: Seq[ExecutorSummaryImpl]): SparkApplicationData = {
Expand Down

0 comments on commit fc9b866

Please sign in to comment.