Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peak JVM used memory heuristic - (Depends on Custom SHS - Requires peakJvmUsedMemory metric) #283

Merged
merged 18 commits into from
Jan 10, 2018
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app-conf/FetcherConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
-->
<fetcher>
<applicationtype>spark</applicationtype>
<classname>com.linkedin.drelephant.spark.fetchers.FSFetcher</classname>
<classname>com.linkedin.drelephant.spark.fetchers.SparkFetcher</classname>
</fetcher>

<!--
Expand Down
6 changes: 6 additions & 0 deletions app-conf/HeuristicConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,12 @@
<classname>com.linkedin.drelephant.spark.heuristics.StagesHeuristic</classname>
<viewname>views.html.help.spark.helpStagesHeuristic</viewname>
</heuristic>
<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Spark JVM Used Memory</heuristicname>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JVM Used Memory

<classname>com.linkedin.drelephant.spark.heuristics.JvmUsedMemoryHeuristic</classname>
<viewname>views.html.help.spark.helpJvmUsedMemoryHeuristic</viewname>
</heuristic>
<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Executor GC</heuristicname>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ trait ExecutorSummary{
def totalShuffleWrite: Long
def maxMemory: Long
def totalGCTime: Long
def executorLogs: Map[String, String]}
def executorLogs: Map[String, String]
def peakJvmUsedMemory: Map[String, Long]
}

trait JobData{
def jobId: Int
Expand Down Expand Up @@ -293,7 +295,8 @@ class ExecutorSummaryImpl(
var totalShuffleWrite: Long,
var maxMemory: Long,
var totalGCTime: Long,
var executorLogs: Map[String, String]) extends ExecutorSummary
var executorLogs: Map[String, String],
var peakJvmUsedMemory: Map[String, Long]) extends ExecutorSummary

class JobDataImpl(
var jobId: Int,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.linkedin.drelephant.spark.heuristics

import com.linkedin.drelephant.analysis._
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.SparkApplicationData
import com.linkedin.drelephant.spark.fetchers.statusapiv1.ExecutorSummary
import com.linkedin.drelephant.util.MemoryFormatUtils

import scala.collection.JavaConverters


/**
* A heuristic based on peak JVM used memory for the spark executors and drivers
*
*/
class JvmUsedMemoryHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData)
extends Heuristic[SparkApplicationData] {

import JvmUsedMemoryHeuristic._
import JavaConverters._

override def getHeuristicConfData(): HeuristicConfigurationData = heuristicConfigurationData

override def apply(data: SparkApplicationData): HeuristicResult = {
val evaluator = new Evaluator(this, data)

var resultDetails = Seq(
new HeuristicResultDetails("Max executor peak JVM used memory", MemoryFormatUtils.bytesToString(evaluator.maxExecutorPeakJvmUsedMemory)),
new HeuristicResultDetails("Max driver peak JVM used memory", MemoryFormatUtils.bytesToString(evaluator.maxDriverPeakJvmUsedMemory)),
new HeuristicResultDetails("spark.executor.memory", MemoryFormatUtils.bytesToString(evaluator.sparkExecutorMemory)),
new HeuristicResultDetails("spark.driver.memory", MemoryFormatUtils.bytesToString(evaluator.sparkDriverMemory))
)

if(evaluator.severityExecutor.getValue > Severity.LOW.getValue) {
resultDetails :+ new HeuristicResultDetails("Executor Memory", "The allocated memory for the executor (in " + SPARK_EXECUTOR_MEMORY +") is much more than the peak JVM used memory by executors.")
resultDetails :+ new HeuristicResultDetails("Reasonable size for executor memory", ((1+BUFFER_PERCENT.toDouble/100.0)*evaluator.maxExecutorPeakJvmUsedMemory).toString)
}

if(evaluator.severityDriver.getValue > Severity.LOW.getValue) {
resultDetails :+ new HeuristicResultDetails("Driver Memory", "The allocated memory for the driver (in " + SPARK_DRIVER_MEMORY + ") is much more than the peak JVM used memory by the driver.")
}

val result = new HeuristicResult(
heuristicConfigurationData.getClassName,
heuristicConfigurationData.getHeuristicName,
evaluator.severity,
0,
resultDetails.asJava
)
result
}
}

object JvmUsedMemoryHeuristic {
val JVM_USED_MEMORY = "jvmUsedMemory"
val SPARK_EXECUTOR_MEMORY = "spark.executor.memory"
val SPARK_DRIVER_MEMORY = "spark.driver.memory"
val reservedMemory : Long = 314572800
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment on the value.

Use 300 * FileUtils.ONE_MB or 300 * 1024 * 1024 for clarity

val BUFFER_PERCENT : Int = 20

class Evaluator(memoryFractionHeuristic: JvmUsedMemoryHeuristic, data: SparkApplicationData) {
lazy val appConfigurationProperties: Map[String, String] =
data.appConfigurationProperties

lazy val executorSummaries: Seq[ExecutorSummary] = data.executorSummaries
lazy val driverSummary : Option[ExecutorSummary] = executorSummaries.find(_.id.equals("driver"))
val maxDriverPeakJvmUsedMemory : Long = driverSummary.get.peakJvmUsedMemory.getOrElse(JVM_USED_MEMORY, 0L).asInstanceOf[Number].longValue
val executorList : Seq[ExecutorSummary] = executorSummaries.filterNot(_.id.equals("driver"))
val sparkExecutorMemory : Long = (appConfigurationProperties.get(SPARK_EXECUTOR_MEMORY).map(MemoryFormatUtils.stringToBytes)).getOrElse(0L)
val sparkDriverMemory : Long = appConfigurationProperties.get(SPARK_DRIVER_MEMORY).map(MemoryFormatUtils.stringToBytes).getOrElse(0L)
val medianPeakJvmUsedMemory: Long = if (executorList.isEmpty) 0L else executorList.map {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this being used anywhere? Mention it along with the Heuristic Details

_.peakJvmUsedMemory.getOrElse(JVM_USED_MEMORY, 0L).asInstanceOf[Number].longValue
}.sortWith(_< _).drop(executorList.size/2).head
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Median computation is not accurate. Take a look at this code. http://rosettacode.org/wiki/Averages/Median#Scala

lazy val maxExecutorPeakJvmUsedMemory: Long = if (executorList.isEmpty) 0L else executorList.map {
_.peakJvmUsedMemory.getOrElse(JVM_USED_MEMORY, 0).asInstanceOf[Number].longValue
}.max

val DEFAULT_MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLDS =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Configure thresholds in HeuristicConf

SeverityThresholds(low = 1.5 * (maxExecutorPeakJvmUsedMemory + reservedMemory), moderate = 2 * (maxExecutorPeakJvmUsedMemory + reservedMemory), severe = 4 * (maxExecutorPeakJvmUsedMemory + reservedMemory), critical = 8 * (maxExecutorPeakJvmUsedMemory + reservedMemory), ascending = true)

val DEFAULT_MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS =
SeverityThresholds(low = 1.5 * (maxDriverPeakJvmUsedMemory + reservedMemory), moderate = 2 * (maxDriverPeakJvmUsedMemory + reservedMemory), severe = 4 * (maxDriverPeakJvmUsedMemory + reservedMemory), critical = 8 * (maxDriverPeakJvmUsedMemory + reservedMemory), ascending = true)

val severityExecutor = DEFAULT_MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLDS.severityOf(sparkExecutorMemory)
val severityDriver = DEFAULT_MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS.severityOf(sparkDriverMemory)

/**
* disabling the skew check for executors
* val severitySkew = DEFAULT_JVM_MEMORY_SKEW_THRESHOLDS.severityOf(maxExecutorPeakJvmUsedMemory)
*/
val severity : Severity = Severity.max(severityDriver, severityExecutor)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ object LegacyDataConverters {
executorInfo.shuffleWrite,
executorInfo.maxMem,
executorInfo.totalGCTime,
executorLogs = Map.empty
executorLogs = Map.empty,
peakJvmUsedMemory = Map.empty
)
}

Expand Down
20 changes: 20 additions & 0 deletions app/views/help/spark/helpJvmUsedMemoryHeuristic.scala.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
@*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*@
<p>This is a heuristic for peak JVM used memory.</p>
<h4>Executor Max Peak JVM Used Memory</h4>
<p>This is to analyse whether the executor memory is set to a good value. To avoid wasted memory, it checks if the peak JVM used memory by the executor is reasonably close to the blocked executor memory which is specified in spark.executor.memory. If the peak JVM memory is much smaller, then the executor memory should be reduced.</p>
<h4>Driver Max Peak JVM Used Memory</h4>
<p>This is to analyse whether the driver memory is set to a good value. To avoid wasted memory, it checks if the peak JVM used memory by the driver is reasonably close to the blocked driver memory which is specified in spark.driver.memory. If the peak JVM memory is much smaller, then the driver memory should be reduced.</p>
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ object SparkMetricsAggregatorTest {
totalShuffleWrite = 0,
maxMemory = 0,
totalGCTime = 0,
executorLogs = Map.empty
executorLogs = Map.empty,
peakJvmUsedMemory = Map.empty
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ object ExecutorGcHeuristicTest {
totalShuffleWrite= 0,
maxMemory= 0,
totalGCTime,
executorLogs = Map.empty
executorLogs = Map.empty,
peakJvmUsedMemory = Map.empty
)

def newFakeSparkApplicationData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ object ExecutorsHeuristicTest {
totalShuffleWrite,
maxMemory,
totalGCTime = 0,
executorLogs = Map.empty
executorLogs = Map.empty,
peakJvmUsedMemory = Map.empty
)

def newFakeSparkApplicationData(executorSummaries: Seq[ExecutorSummaryImpl]): SparkApplicationData = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package com.linkedin.drelephant.spark.heuristics

import com.linkedin.drelephant.analysis.{ApplicationType, Severity}
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, ExecutorSummaryImpl}
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
import org.scalatest.{FunSpec, Matchers}

import scala.collection.JavaConverters

class JvmUsedMemoryHeuristicTest extends FunSpec with Matchers {

import JvmUsedMemoryHeuristicTest._

val heuristicConfigurationData = newFakeHeuristicConfigurationData()

val peakJvmUsedMemoryHeuristic = new JvmUsedMemoryHeuristic(heuristicConfigurationData)

val appConfigurationProperties = Map("spark.driver.memory"->"40000000000", "spark.executor.memory"->"500000000")

val executorData = Seq(
newDummyExecutorData("1", Map("jvmUsedMemory" -> 394567123)),
newDummyExecutorData("2", Map("jvmUsedMemory" -> 23456834)),
newDummyExecutorData("3", Map("jvmUsedMemory" -> 334569)),
newDummyExecutorData("4", Map("jvmUsedMemory" -> 134563)),
newDummyExecutorData("5", Map("jvmUsedMemory" -> 234564)),
newDummyExecutorData("driver", Map("jvmUsedMemory" -> 394561))
)
describe(".apply") {
val data = newFakeSparkApplicationData(appConfigurationProperties, executorData)
val heuristicResult = peakJvmUsedMemoryHeuristic.apply(data)

it("has severity") {
heuristicResult.getSeverity should be(Severity.CRITICAL)
}

describe(".Evaluator") {
import JvmUsedMemoryHeuristic.Evaluator

val data = newFakeSparkApplicationData(appConfigurationProperties, executorData)
val evaluator = new Evaluator(peakJvmUsedMemoryHeuristic, data)

it("has severity executor") {
evaluator.severityExecutor should be(Severity.NONE)
}

it("has severity driver") {
evaluator.severityDriver should be(Severity.CRITICAL)
}

it("has median peak jvm memory") {
evaluator.medianPeakJvmUsedMemory should be (334569)
}

it("has max peak jvm memory") {
evaluator.maxExecutorPeakJvmUsedMemory should be (394567123)
}

it("has max driver peak jvm memory") {
evaluator.maxDriverPeakJvmUsedMemory should be (394561)
}
}
}
}

object JvmUsedMemoryHeuristicTest {

import JavaConverters._

def newFakeHeuristicConfigurationData(params: Map[String, String] = Map.empty): HeuristicConfigurationData =
new HeuristicConfigurationData("heuristic", "class", "view", new ApplicationType("type"), params.asJava)

def newDummyExecutorData(
id: String,
peakJvmUsedMemory: Map[String, Long]
): ExecutorSummaryImpl = new ExecutorSummaryImpl(
id,
hostPort = "",
rddBlocks = 0,
memoryUsed = 0,
diskUsed = 0,
activeTasks = 0,
failedTasks = 0,
completedTasks = 0,
totalTasks = 0,
totalDuration = 0,
totalInputBytes = 0,
totalShuffleRead = 0,
totalShuffleWrite = 0,
maxMemory = 0,
totalGCTime = 0,
executorLogs = Map.empty,
peakJvmUsedMemory
)

def newFakeSparkApplicationData(
appConfigurationProperties: Map[String, String],
executorSummaries: Seq[ExecutorSummaryImpl]
): SparkApplicationData = {

val logDerivedData = SparkLogDerivedData(
SparkListenerEnvironmentUpdate(Map("Spark Properties" -> appConfigurationProperties.toSeq))
)
val appId = "application_1"

val restDerivedData = SparkRestDerivedData(
new ApplicationInfoImpl(appId, name = "app", Seq.empty),
jobDatas = Seq.empty,
stageDatas = Seq.empty,
executorSummaries = executorSummaries
)

SparkApplicationData(appId, restDerivedData, Some(logDerivedData))
}
}