Skip to content

Commit

Permalink
Spark Peak jvm memory Heuristic - (Depends on Custom SHS - Requires p…
Browse files Browse the repository at this point in the history
…eakJvmUsedMemory metric) (#318)
  • Loading branch information
skakker authored and akshayrai committed Feb 27, 2018
1 parent a001e83 commit 748f840
Show file tree
Hide file tree
Showing 10 changed files with 278 additions and 9 deletions.
4 changes: 2 additions & 2 deletions app-conf/FetcherConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
To work properly, this fetcher should use the same timezone with the job history server.
If not set, the local timezone will be used.
-->

<fetcher>
<applicationtype>mapreduce</applicationtype>
<classname>com.linkedin.drelephant.mapreduce.fetchers.MapReduceFSFetcherHadoop2</classname>
Expand All @@ -61,7 +61,7 @@
<history_server_time_zone>PST</history_server_time_zone>
</params>
</fetcher>


<!--
FSFetcher for Spark. Loads the eventlog from HDFS and replays to get the metrics and application properties
Expand Down
8 changes: 7 additions & 1 deletion app-conf/HeuristicConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,12 @@
<classname>com.linkedin.drelephant.spark.heuristics.StagesHeuristic</classname>
<viewname>views.html.help.spark.helpStagesHeuristic</viewname>
</heuristic>
<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>JVM Used Memory</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.JvmUsedMemoryHeuristic</classname>
<viewname>views.html.help.spark.helpJvmUsedMemoryHeuristic</viewname>
</heuristic>
<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Stages with failed tasks</heuristicname>
Expand All @@ -211,5 +217,5 @@
<classname>com.linkedin.drelephant.spark.heuristics.ExecutorStorageSpillHeuristic</classname>
<viewname>views.html.help.spark.helpExecutorStorageSpillHeuristic</viewname>
</heuristic>

</heuristics>
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ trait ExecutorSummary{
def maxMemory: Long
def totalGCTime: Long
def totalMemoryBytesSpilled: Long
def executorLogs: Map[String, String]}
def executorLogs: Map[String, String]
def peakJvmUsedMemory: Map[String, Long]
}

trait JobData{
def jobId: Int
Expand Down Expand Up @@ -295,7 +297,8 @@ class ExecutorSummaryImpl(
var maxMemory: Long,
var totalGCTime: Long,
var totalMemoryBytesSpilled: Long,
var executorLogs: Map[String, String]) extends ExecutorSummary
var executorLogs: Map[String, String],
var peakJvmUsedMemory: Map[String, Long]) extends ExecutorSummary

class JobDataImpl(
var jobId: Int,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.linkedin.drelephant.spark.heuristics

import com.linkedin.drelephant.analysis._
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.SparkApplicationData
import com.linkedin.drelephant.spark.fetchers.statusapiv1.ExecutorSummary
import com.linkedin.drelephant.util.MemoryFormatUtils

import scala.collection.JavaConverters


/**
* A heuristic based on peak JVM used memory for the spark executors and driver
*
*/
class JvmUsedMemoryHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData)
extends Heuristic[SparkApplicationData] {

import JvmUsedMemoryHeuristic._
import JavaConverters._

override def getHeuristicConfData(): HeuristicConfigurationData = heuristicConfigurationData

lazy val executorPeakJvmMemoryThresholdString: String = heuristicConfigurationData.getParamMap.get(MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLD_KEY)
lazy val driverPeakJvmMemoryThresholdString: String = heuristicConfigurationData.getParamMap.get(MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLD_KEY)

override def apply(data: SparkApplicationData): HeuristicResult = {
val evaluator = new Evaluator(this, data)

var resultDetails = Seq(
new HeuristicResultDetails("Max executor peak JVM used memory", MemoryFormatUtils.bytesToString(evaluator.maxExecutorPeakJvmUsedMemory)),
new HeuristicResultDetails("Max driver peak JVM used memory", MemoryFormatUtils.bytesToString(evaluator.maxDriverPeakJvmUsedMemory)),
new HeuristicResultDetails("spark.executor.memory", MemoryFormatUtils.bytesToString(evaluator.sparkExecutorMemory)),
new HeuristicResultDetails("spark.driver.memory", MemoryFormatUtils.bytesToString(evaluator.sparkDriverMemory))
)

if(evaluator.severityExecutor.getValue > Severity.LOW.getValue) {
resultDetails :+ new HeuristicResultDetails("Executor Memory", "The allocated memory for the executor (in " + SPARK_EXECUTOR_MEMORY +") is much more than the peak JVM used memory by executors.")
resultDetails :+ new HeuristicResultDetails("Reasonable size for executor memory", ((1+BUFFER_PERCENT.toDouble/100.0)*evaluator.maxExecutorPeakJvmUsedMemory).toString)
}

if(evaluator.severityDriver.getValue > Severity.LOW.getValue) {
resultDetails :+ new HeuristicResultDetails("Driver Memory", "The allocated memory for the driver (in " + SPARK_DRIVER_MEMORY + ") is much more than the peak JVM used memory by the driver.")
}

val result = new HeuristicResult(
heuristicConfigurationData.getClassName,
heuristicConfigurationData.getHeuristicName,
evaluator.severity,
0,
resultDetails.asJava
)
result
}
}

object JvmUsedMemoryHeuristic {
val JVM_USED_MEMORY = "jvmUsedMemory"
val SPARK_EXECUTOR_MEMORY = "spark.executor.memory"
val SPARK_DRIVER_MEMORY = "spark.driver.memory"
// 300 * FileUtils.ONE_MB (300 * 1024 * 1024)
val reservedMemory : Long = 314572800
val BUFFER_PERCENT : Int = 20
val MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLD_KEY = "executor_peak_jvm_memory_threshold"
val MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLD_KEY = "driver_peak_jvm_memory_threshold"

class Evaluator(jvmUsedMemoryHeuristic: JvmUsedMemoryHeuristic, data: SparkApplicationData) {
lazy val appConfigurationProperties: Map[String, String] =
data.appConfigurationProperties

lazy val executorSummaries: Seq[ExecutorSummary] = data.executorSummaries
lazy val driverSummary : Option[ExecutorSummary] = executorSummaries.find(_.id.equals("driver"))
val maxDriverPeakJvmUsedMemory : Long = driverSummary.get.peakJvmUsedMemory.getOrElse(JVM_USED_MEMORY, 0L).asInstanceOf[Number].longValue
val executorList : Seq[ExecutorSummary] = executorSummaries.filterNot(_.id.equals("driver"))
val sparkExecutorMemory : Long = (appConfigurationProperties.get(SPARK_EXECUTOR_MEMORY).map(MemoryFormatUtils.stringToBytes)).getOrElse(0L)
val sparkDriverMemory : Long = appConfigurationProperties.get(SPARK_DRIVER_MEMORY).map(MemoryFormatUtils.stringToBytes).getOrElse(0L)
lazy val maxExecutorPeakJvmUsedMemory: Long = if (executorList.isEmpty) 0L else executorList.map {
_.peakJvmUsedMemory.getOrElse(JVM_USED_MEMORY, 0).asInstanceOf[Number].longValue
}.max

lazy val DEFAULT_MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLDS =
SeverityThresholds(low = 1.5 * (maxExecutorPeakJvmUsedMemory + reservedMemory), moderate = 2 * (maxExecutorPeakJvmUsedMemory + reservedMemory), severe = 4 * (maxExecutorPeakJvmUsedMemory + reservedMemory), critical = 8 * (maxExecutorPeakJvmUsedMemory + reservedMemory), ascending = true)

lazy val DEFAULT_MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS =
SeverityThresholds(low = 1.5 * (maxDriverPeakJvmUsedMemory + reservedMemory), moderate = 2 * (maxDriverPeakJvmUsedMemory + reservedMemory), severe = 4 * (maxDriverPeakJvmUsedMemory + reservedMemory), critical = 8 * (maxDriverPeakJvmUsedMemory + reservedMemory), ascending = true)

val MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLDS : SeverityThresholds = if(jvmUsedMemoryHeuristic.executorPeakJvmMemoryThresholdString == null) {
DEFAULT_MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLDS
} else {
SeverityThresholds.parse(jvmUsedMemoryHeuristic.executorPeakJvmMemoryThresholdString.split(",").map(_.toDouble * (maxExecutorPeakJvmUsedMemory + reservedMemory)).toString, ascending = false).getOrElse(DEFAULT_MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLDS)
}

val MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS : SeverityThresholds = if(jvmUsedMemoryHeuristic.driverPeakJvmMemoryThresholdString == null) {
DEFAULT_MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS
} else {
SeverityThresholds.parse(jvmUsedMemoryHeuristic.driverPeakJvmMemoryThresholdString.split(",").map(_.toDouble * (maxDriverPeakJvmUsedMemory + reservedMemory)).toString, ascending = false).getOrElse(DEFAULT_MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS)
}

val severityExecutor = MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLDS.severityOf(sparkExecutorMemory)
val severityDriver = MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS.severityOf(sparkDriverMemory)

/**
* disabling the skew check for executors
* val severitySkew = DEFAULT_JVM_MEMORY_SKEW_THRESHOLDS.severityOf(maxExecutorPeakJvmUsedMemory)
*/
val severity : Severity = Severity.max(severityDriver, severityExecutor)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ object LegacyDataConverters {
executorInfo.maxMem,
executorInfo.totalGCTime,
executorInfo.totalMemoryBytesSpilled,
executorLogs = Map.empty
executorLogs = Map.empty,
peakJvmUsedMemory = Map.empty
)
}

Expand Down
20 changes: 20 additions & 0 deletions app/views/help/spark/helpJvmUsedMemoryHeuristic.scala.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
@*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*@
<p>This is a heuristic for peak JVM used memory.</p>
<h4>Executor Max Peak JVM Used Memory</h4>
<p>This is to analyse whether the executor memory is set to a good value. To avoid wasted memory, it checks if the peak JVM used memory by the executor is reasonably close to the blocked executor memory which is specified in spark.executor.memory. If the peak JVM memory is much smaller, then the executor memory should be reduced.</p>
<h4>Driver Max Peak JVM Used Memory</h4>
<p>This is to analyse whether the driver memory is set to a good value. To avoid wasted memory, it checks if the peak JVM used memory by the driver is reasonably close to the blocked driver memory which is specified in spark.driver.memory. If the peak JVM memory is much smaller, then the driver memory should be reduced.</p>
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ object SparkMetricsAggregatorTest {
maxMemory = 0,
totalGCTime = 0,
totalMemoryBytesSpilled = 0,
executorLogs = Map.empty
executorLogs = Map.empty,
peakJvmUsedMemory = Map.empty
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ object ExecutorGcHeuristicTest {
maxMemory = 0,
totalGCTime,
totalMemoryBytesSpilled = 0,
executorLogs = Map.empty
executorLogs = Map.empty,
peakJvmUsedMemory = Map.empty
)

def newFakeSparkApplicationData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ object ExecutorsHeuristicTest {
maxMemory,
totalGCTime = 0,
totalMemoryBytesSpilled = 0,
executorLogs = Map.empty
executorLogs = Map.empty,
peakJvmUsedMemory = Map.empty
)

def newFakeSparkApplicationData(executorSummaries: Seq[ExecutorSummaryImpl]): SparkApplicationData = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package com.linkedin.drelephant.spark.heuristics

import com.linkedin.drelephant.analysis.{ApplicationType, Severity}
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, ExecutorSummaryImpl}
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
import org.scalatest.{FunSpec, Matchers}

import scala.collection.JavaConverters

class JvmUsedMemoryHeuristicTest extends FunSpec with Matchers {

import JvmUsedMemoryHeuristicTest._

val heuristicConfigurationData = newFakeHeuristicConfigurationData()

val peakJvmUsedMemoryHeuristic = new JvmUsedMemoryHeuristic(heuristicConfigurationData)

val appConfigurationProperties = Map("spark.driver.memory"->"40000000000", "spark.executor.memory"->"500000000")

val executorData = Seq(
newDummyExecutorData("1", Map("jvmUsedMemory" -> 394567123)),
newDummyExecutorData("2", Map("jvmUsedMemory" -> 23456834)),
newDummyExecutorData("3", Map("jvmUsedMemory" -> 334569)),
newDummyExecutorData("4", Map("jvmUsedMemory" -> 134563)),
newDummyExecutorData("5", Map("jvmUsedMemory" -> 234564)),
newDummyExecutorData("driver", Map("jvmUsedMemory" -> 394561))
)
describe(".apply") {
val data = newFakeSparkApplicationData(appConfigurationProperties, executorData)
val heuristicResult = peakJvmUsedMemoryHeuristic.apply(data)

it("has severity") {
heuristicResult.getSeverity should be(Severity.CRITICAL)
}

describe(".Evaluator") {
import JvmUsedMemoryHeuristic.Evaluator

val data = newFakeSparkApplicationData(appConfigurationProperties, executorData)
val evaluator = new Evaluator(peakJvmUsedMemoryHeuristic, data)

it("has severity executor") {
evaluator.severityExecutor should be(Severity.NONE)
}

it("has severity driver") {
evaluator.severityDriver should be(Severity.CRITICAL)
}

it("has max peak jvm memory") {
evaluator.maxExecutorPeakJvmUsedMemory should be (394567123)
}

it("has max driver peak jvm memory") {
evaluator.maxDriverPeakJvmUsedMemory should be (394561)
}
}
}
}

object JvmUsedMemoryHeuristicTest {

import JavaConverters._

def newFakeHeuristicConfigurationData(params: Map[String, String] = Map.empty): HeuristicConfigurationData =
new HeuristicConfigurationData("heuristic", "class", "view", new ApplicationType("type"), params.asJava)

def newDummyExecutorData(
id: String,
peakJvmUsedMemory: Map[String, Long]
): ExecutorSummaryImpl = new ExecutorSummaryImpl(
id,
hostPort = "",
rddBlocks = 0,
memoryUsed = 0,
diskUsed = 0,
activeTasks = 0,
failedTasks = 0,
completedTasks = 0,
totalTasks = 0,
totalDuration = 0,
totalInputBytes = 0,
totalShuffleRead = 0,
totalShuffleWrite = 0,
maxMemory = 0,
totalGCTime = 0,
executorLogs = Map.empty,
peakJvmUsedMemory
)

def newFakeSparkApplicationData(
appConfigurationProperties: Map[String, String],
executorSummaries: Seq[ExecutorSummaryImpl]
): SparkApplicationData = {

val logDerivedData = SparkLogDerivedData(
SparkListenerEnvironmentUpdate(Map("Spark Properties" -> appConfigurationProperties.toSeq))
)
val appId = "application_1"

val restDerivedData = SparkRestDerivedData(
new ApplicationInfoImpl(appId, name = "app", Seq.empty),
jobDatas = Seq.empty,
stageDatas = Seq.empty,
executorSummaries = executorSummaries
)

SparkApplicationData(appId, restDerivedData, Some(logDerivedData))
}
}

0 comments on commit 748f840

Please sign in to comment.