Skip to content

Commit

Permalink
fixed resources used and wasted computation for spark jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
swasti committed Sep 12, 2017
1 parent 752a94b commit 7f7717f
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 34 deletions.
33 changes: 22 additions & 11 deletions app/com/linkedin/drelephant/spark/SparkMetricsAggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ import com.linkedin.drelephant.analysis.{HadoopAggregatedData, HadoopApplication
import com.linkedin.drelephant.configurations.aggregator.AggregatorConfigurationData
import com.linkedin.drelephant.math.Statistics
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.ExecutorSummary
import com.linkedin.drelephant.util.MemoryFormatUtils
import org.apache.commons.io.FileUtils
import org.apache.log4j.Logger

import scala.util.Try


class SparkMetricsAggregator(private val aggregatorConfigurationData: AggregatorConfigurationData)
extends HadoopMetricsAggregator {
extends HadoopMetricsAggregator {

import SparkMetricsAggregator._

private val logger: Logger = Logger.getLogger(classOf[SparkMetricsAggregator])
Expand All @@ -53,21 +56,19 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator
val applicationDurationMillis = applicationDurationMillisOf(data)
if( applicationDurationMillis < 0) {
logger.warn(s"applicationDurationMillis is negative. Skipping Metrics Aggregation:${applicationDurationMillis}")
} else {
} else {
val totalExecutorTaskTimeMillis = totalExecutorTaskTimeMillisOf(data)

val resourcesAllocatedForUse =
aggregateresourcesAllocatedForUse(executorInstances, executorMemoryBytes, applicationDurationMillis)
val resourcesActuallyUsed = aggregateresourcesActuallyUsed(executorMemoryBytes, totalExecutorTaskTimeMillis)

val resourcesActuallyUsed = calculateResourceUsage(data.executorSummaries)
val resourcesActuallyUsedWithBuffer = resourcesActuallyUsed.doubleValue() * (1.0 + allocatedMemoryWasteBufferPercentage)
val resourcesWastedMBSeconds = (resourcesActuallyUsedWithBuffer < resourcesAllocatedForUse.doubleValue()) match {
case true => resourcesAllocatedForUse.doubleValue() - resourcesActuallyUsedWithBuffer
case false => 0.0
}
//allocated is the total used resource from the cluster.
if (resourcesAllocatedForUse.isValidLong) {
hadoopAggregatedData.setResourceUsed(resourcesAllocatedForUse.toLong)
hadoopAggregatedData.setResourceUsed(resourcesActuallyUsed.toLong)
} else {
logger.warn(s"resourcesAllocatedForUse/resourcesWasted exceeds Long.MaxValue")
logger.warn(s"ResourceUsed: ${resourcesAllocatedForUse}")
Expand All @@ -83,16 +84,25 @@ class SparkMetricsAggregator(private val aggregatorConfigurationData: Aggregator
}
}

private def aggregateresourcesActuallyUsed(executorMemoryBytes: Long, totalExecutorTaskTimeMillis: BigInt): BigInt = {
val bytesMillis = BigInt(executorMemoryBytes) * totalExecutorTaskTimeMillis
(bytesMillis / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS)))
//calculates the resource usage by summing up the resources used per executor
private def calculateResourceUsage(executorSummaries: Seq[ExecutorSummary]): BigInt = {
var sumResourceUsage: BigInt = 0
executorSummaries.foreach(
executorSummary => {
var memUsed: Long = executorSummary.peakJvmUsedMemory.getOrElse(JVM_USED_MEMORY, 0) //+ MemoryFormatUtils.stringToBytes("300M")
var timeSpent: Long = executorSummary.totalDuration
val bytesMillis = BigInt(memUsed) * timeSpent
sumResourceUsage += (bytesMillis / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS)))
}
)
return sumResourceUsage
}

private def aggregateresourcesAllocatedForUse(
executorInstances: Int,
executorMemoryBytes: Long,
applicationDurationMillis: Long
): BigInt = {
): BigInt = {
val bytesMillis = BigInt(executorInstances) * BigInt(executorMemoryBytes) * BigInt(applicationDurationMillis)
(bytesMillis / (BigInt(FileUtils.ONE_MB) * BigInt(Statistics.SECOND_IN_MS)))
}
Expand Down Expand Up @@ -123,7 +133,8 @@ object SparkMetricsAggregator {
val DEFAULT_ALLOCATED_MEMORY_WASTE_BUFFER_PERCENTAGE = 0.5D

val ALLOCATED_MEMORY_WASTE_BUFFER_PERCENTAGE_KEY = "allocated_memory_waste_buffer_percentage"

val SPARK_RESERVED_MEMORY: String = "300M"
val SPARK_EXECUTOR_INSTANCES_KEY = "spark.executor.instances"
val SPARK_EXECUTOR_MEMORY_KEY = "spark.executor.memory"
val JVM_USED_MEMORY = "jvmUsedMemory"
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ trait ExecutorSummary{
def totalShuffleRead: Long
def totalShuffleWrite: Long
def maxMemory: Long
def executorLogs: Map[String, String]}
def executorLogs: Map[String, String]
def peakJvmUsedMemory: Map[String, Long]}

trait JobData{
def jobId: Int
Expand Down Expand Up @@ -292,7 +293,8 @@ class ExecutorSummaryImpl(
var totalShuffleRead: Long,
var totalShuffleWrite: Long,
var maxMemory: Long,
var executorLogs: Map[String, String]) extends ExecutorSummary
var executorLogs: Map[String, String],
var peakJvmUsedMemory: Map[String, Long]) extends ExecutorSummary

class JobDataImpl(
var jobId: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ object LegacyDataConverters {
executorInfo.shuffleRead,
executorInfo.shuffleWrite,
executorInfo.maxMem,
executorLogs = Map.empty
executorLogs = Map.empty,
peakJvmUsedMemory = Map.empty
)
}

Expand Down
33 changes: 14 additions & 19 deletions test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package com.linkedin.drelephant.spark
import java.util.Date

import scala.collection.JavaConverters

import com.linkedin.drelephant.analysis.ApplicationType
import com.linkedin.drelephant.configurations.aggregator.AggregatorConfigurationData
import com.linkedin.drelephant.math.Statistics
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationAttemptInfoImpl, ApplicationInfoImpl, ExecutorSummaryImpl}
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
import org.apache.commons.io.FileUtils
import org.scalatest.{FunSpec, Matchers}

class SparkMetricsAggregatorTest extends FunSpec with Matchers {
Expand All @@ -47,8 +48,9 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers {
}

val executorSummaries = Seq(
newFakeExecutorSummary(id = "1", totalDuration = 1000000L),
newFakeExecutorSummary(id = "2", totalDuration = 3000000L)
newFakeExecutorSummary(id = "1", totalDuration = 1000000L, Map("jvmUsedMemory" -> 394567123)),
newFakeExecutorSummary(id = "2", totalDuration = 3000000L, Map("jvmUsedMemory" -> 23456834))

)
val restDerivedData = {
SparkRestDerivedData(
Expand All @@ -59,7 +61,7 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers {
)
}

describe("when it has log-derived data") {
describe("when it has data") {
val logDerivedData = {
val environmentUpdate = newFakeSparkListenerEnvironmentUpdate(
Map(
Expand All @@ -82,30 +84,21 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers {
val result = aggregator.getResult

it("calculates resources used") {
val totalExecutorMemoryMb = 2 * 4096
val applicationDurationSeconds = 8000
val executorMemoryMb = 4096
val totalExecutorTaskTimeSeconds = 1000 + 3000
result.getResourceUsed should be(totalExecutorMemoryMb * applicationDurationSeconds)
result.getResourceUsed should be(376288+67110)
}

it("calculates resources wasted") {
val totalExecutorMemoryMb = 2 * 4096
val applicationDurationSeconds = 8000
val resourceAllocated = totalExecutorMemoryMb * applicationDurationSeconds;

val executorMemoryMb = 4096
val totalExecutorTaskTimeSeconds = 1000 + 3000
val resourceUsed = executorMemoryMb * totalExecutorTaskTimeSeconds;


result.getResourceWasted should be(resourceAllocated - resourceUsed * 1.5)
val resourceUsed = 376288+67110
result.getResourceWasted should be(resourceAllocated.toDouble - resourceUsed.toDouble * 1.5)
}

it("doesn't calculate total delay") {
result.getTotalDelay should be(0L)
}
it("sets resourceused as 0 when duration is negative") {
it("sets resource used as 0 when duration is negative") {
//make the duration negative
val applicationInfo = {
val applicationAttemptInfo = {
Expand Down Expand Up @@ -178,7 +171,8 @@ object SparkMetricsAggregatorTest {

def newFakeExecutorSummary(
id: String,
totalDuration: Long
totalDuration: Long,
peakJvmUsedMemory: Map[String, Long]
): ExecutorSummaryImpl = new ExecutorSummaryImpl(
id,
hostPort = "",
Expand All @@ -194,6 +188,7 @@ object SparkMetricsAggregatorTest {
totalShuffleRead = 0,
totalShuffleWrite = 0,
maxMemory = 0,
executorLogs = Map.empty
executorLogs = Map.empty,
peakJvmUsedMemory
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ object ExecutorsHeuristicTest {
totalShuffleRead,
totalShuffleWrite,
maxMemory,
executorLogs = Map.empty
executorLogs = Map.empty,
peakJvmUsedMemory = Map.empty
)

def newFakeSparkApplicationData(executorSummaries: Seq[ExecutorSummaryImpl]): SparkApplicationData = {
Expand Down

0 comments on commit 7f7717f

Please sign in to comment.