Skip to content

Commit

Permalink
Spark Executor GC Heuristic (#311)
Browse files Browse the repository at this point in the history
  • Loading branch information
skakker authored and akshayrai committed Jan 8, 2018
1 parent a384fcc commit 8c99625
Show file tree
Hide file tree
Showing 9 changed files with 290 additions and 1 deletion.
6 changes: 6 additions & 0 deletions app-conf/HeuristicConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -193,5 +193,11 @@
<classname>com.linkedin.drelephant.spark.heuristics.StagesHeuristic</classname>
<viewname>views.html.help.spark.helpStagesHeuristic</viewname>
</heuristic>
<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Executor GC</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.ExecutorGcHeuristic</classname>
<viewname>views.html.help.spark.helpExecutorGcHeuristic</viewname>
</heuristic>

</heuristics>
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ trait ExecutorSummary{
def totalShuffleRead: Long
def totalShuffleWrite: Long
def maxMemory: Long
def totalGCTime: Long
def executorLogs: Map[String, String]}

trait JobData{
Expand Down Expand Up @@ -292,6 +293,7 @@ class ExecutorSummaryImpl(
var totalShuffleRead: Long,
var totalShuffleWrite: Long,
var maxMemory: Long,
var totalGCTime: Long,
var executorLogs: Map[String, String]) extends ExecutorSummary

class JobDataImpl(
Expand Down
119 changes: 119 additions & 0 deletions app/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristic.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.linkedin.drelephant.spark.heuristics

import com.linkedin.drelephant.analysis.Severity
import com.linkedin.drelephant.spark.fetchers.statusapiv1._
import com.linkedin.drelephant.analysis._
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.SparkApplicationData

import scala.collection.JavaConverters

/**
* A heuristic based on GC time and CPU run time. It calculates the ratio of the total time a job spends in GC to the total run time of a job and warns if too much time is spent in GC.
*/
class ExecutorGcHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData)
extends Heuristic[SparkApplicationData] {

import ExecutorGcHeuristic._
import JavaConverters._

val gcSeverityAThresholds: SeverityThresholds =
SeverityThresholds.parse(heuristicConfigurationData.getParamMap.get(GC_SEVERITY_A_THRESHOLDS_KEY), ascending = true)
.getOrElse(DEFAULT_GC_SEVERITY_A_THRESHOLDS)

val gcSeverityDThresholds: SeverityThresholds =
SeverityThresholds.parse(heuristicConfigurationData.getParamMap.get(GC_SEVERITY_D_THRESHOLDS_KEY), ascending = true)
.getOrElse(DEFAULT_GC_SEVERITY_D_THRESHOLDS)

override def getHeuristicConfData(): HeuristicConfigurationData = heuristicConfigurationData

override def apply(data: SparkApplicationData): HeuristicResult = {
val evaluator = new Evaluator(this, data)
var resultDetails = Seq(
new HeuristicResultDetails("GC time to Executor Run time ratio", evaluator.ratio.toString),
new HeuristicResultDetails("Total GC time", evaluator.jvmTime.toString),
new HeuristicResultDetails("Total Executor Runtime", evaluator.executorRunTimeTotal.toString)
)

//adding recommendations to the result, severityTimeA corresponds to the ascending severity calculation
if (evaluator.severityTimeA.getValue > Severity.LOW.getValue) {
resultDetails = resultDetails :+ new HeuristicResultDetails("Gc ratio high", "The job is spending too much time on GC. We recommend increasing the executor memory.")
}
//severityTimeD corresponds to the descending severity calculation
if (evaluator.severityTimeD.getValue > Severity.LOW.getValue) {
resultDetails = resultDetails :+ new HeuristicResultDetails("Gc ratio low", "The job is spending too less time in GC. Please check if you have asked for more executor memory than required.")
}

val result = new HeuristicResult(
heuristicConfigurationData.getClassName,
heuristicConfigurationData.getHeuristicName,
evaluator.severityTimeA,
0,
resultDetails.asJava
)
result
}
}

object ExecutorGcHeuristic {
val SPARK_EXECUTOR_MEMORY = "spark.executor.memory"
val SPARK_EXECUTOR_CORES = "spark.executor.cores"

/** The ascending severity thresholds for the ratio of JVM GC Time and executor Run Time (checking whether ratio is above normal)
* These thresholds are experimental and are likely to change */
val DEFAULT_GC_SEVERITY_A_THRESHOLDS =
SeverityThresholds(low = 0.08D, moderate = 0.1D, severe = 0.15D, critical = 0.2D, ascending = true)

/** The descending severity thresholds for the ratio of JVM GC Time and executor Run Time (checking whether ratio is below normal)
* These thresholds are experimental and are likely to change */
val DEFAULT_GC_SEVERITY_D_THRESHOLDS =
SeverityThresholds(low = 0.05D, moderate = 0.04D, severe = 0.03D, critical = 0.01D, ascending = false)

val GC_SEVERITY_A_THRESHOLDS_KEY: String = "gc_severity_A_threshold"
val GC_SEVERITY_D_THRESHOLDS_KEY: String = "gc_severity_D_threshold"

class Evaluator(executorGcHeuristic: ExecutorGcHeuristic, data: SparkApplicationData) {
lazy val executorAndDriverSummaries: Seq[ExecutorSummary] = data.executorSummaries
lazy val executorSummaries: Seq[ExecutorSummary] = executorAndDriverSummaries.filterNot(_.id.equals("driver"))
lazy val appConfigurationProperties: Map[String, String] =
data.appConfigurationProperties
var (jvmTime, executorRunTimeTotal) = getTimeValues(executorSummaries)

var ratio: Double = jvmTime.toDouble / executorRunTimeTotal.toDouble

lazy val severityTimeA: Severity = executorGcHeuristic.gcSeverityAThresholds.severityOf(ratio)
lazy val severityTimeD: Severity = executorGcHeuristic.gcSeverityDThresholds.severityOf(ratio)

/**
* returns the total JVM GC Time and total executor Run Time across all stages
* @param executorSummaries
* @return
*/
private def getTimeValues(executorSummaries: Seq[ExecutorSummary]): (Long, Long) = {
var jvmGcTimeTotal: Long = 0
var executorRunTimeTotal: Long = 0
executorSummaries.foreach(executorSummary => {
jvmGcTimeTotal+=executorSummary.totalGCTime
executorRunTimeTotal+=executorSummary.totalDuration
})
(jvmGcTimeTotal, executorRunTimeTotal)
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ object LegacyDataConverters {
executorInfo.shuffleRead,
executorInfo.shuffleWrite,
executorInfo.maxMem,
executorInfo.totalGCTime,
executorLogs = Map.empty
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ public static class ExecutorInfo {
public long inputBytes = 0L;
public long outputBytes = 0L;
public long shuffleRead = 0L;
public long totalGCTime = 0L;
public long shuffleWrite = 0L;

public String toString() {
return "{execId: " + execId + ", hostPort:" + hostPort + " , rddBlocks: " + rddBlocks + ", memUsed: " + memUsed
+ ", maxMem: " + maxMem + ", diskUsed: " + diskUsed + ", totalTasks" + totalTasks + ", tasksActive: "
+ activeTasks + ", tasksComplete: " + completedTasks + ", tasksFailed: " + failedTasks + ", duration: "
+ duration + ", inputBytes: " + inputBytes + ", outputBytes:" + outputBytes + ", shuffleRead: " + shuffleRead
+ ", shuffleWrite: " + shuffleWrite + "}";
+ ", shuffleWrite: " + shuffleWrite + ", totalGCTime: " + totalGCTime + "}";
}
}

Expand Down
20 changes: 20 additions & 0 deletions app/views/help/spark/helpExecutorGcHeuristic.scala.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
@*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*@

<p>This analysis shows how much time a job is spending in GC. To normalise the results across all jobs, the ratio of the time a job spends in Gc to the total run time of the job is calculated. </p>
<p>A job is flagged if the ratio is too high, meaning the job spends too much time in GC.</p>
<h3>Suggestions</h3>
<p>We recommend increasing the executor memory.</p>
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ object SparkMetricsAggregatorTest {
totalShuffleRead = 0,
totalShuffleWrite = 0,
maxMemory = 0,
totalGCTime = 0,
executorLogs = Map.empty
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.linkedin.drelephant.spark.heuristics

import scala.collection.JavaConverters
import com.linkedin.drelephant.analysis.{ApplicationType, Severity, SeverityThresholds}
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, ExecutorSummaryImpl, StageDataImpl}
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
import org.scalatest.{FunSpec, Matchers}

import scala.concurrent.duration.Duration


class ExecutorGcHeuristicTest extends FunSpec with Matchers {
import ExecutorGcHeuristicTest._

describe("ExecutorGcHeuristic") {
val heuristicConfigurationData = newFakeHeuristicConfigurationData(
Map(
"max_to_median_ratio_severity_thresholds" -> "1.414,2,4,16",
"ignore_max_bytes_less_than_threshold" -> "4000000",
"ignore_max_millis_less_than_threshold" -> "4000001"
)
)
val executorGcHeuristic = new ExecutorGcHeuristic(heuristicConfigurationData)

val executorSummaries = Seq(
newFakeExecutorSummary(
id = "1",
totalGCTime = Duration("2min").toMillis,
totalDuration = Duration("15min").toMillis
),
newFakeExecutorSummary(
id = "2",
totalGCTime = Duration("6min").toMillis,
totalDuration = Duration("14min").toMillis
),
newFakeExecutorSummary(
id = "3",
totalGCTime = Duration("4min").toMillis,
totalDuration = Duration("20min").toMillis
),
newFakeExecutorSummary(
id = "4",
totalGCTime = Duration("8min").toMillis,
totalDuration = Duration("30min").toMillis
)
)

describe(".apply") {
val data1 = newFakeSparkApplicationData(executorSummaries)
val heuristicResult = executorGcHeuristic.apply(data1)
val heuristicResultDetails = heuristicResult.getHeuristicResultDetails

it("returns the severity") {
heuristicResult.getSeverity should be(Severity.CRITICAL)
}

it("returns the JVM GC time to Executor Run time duration") {
val details = heuristicResultDetails.get(0)
details.getName should include("GC time to Executor Run time ratio")
details.getValue should include("0.2531")
}

it("returns the total GC time") {
val details = heuristicResultDetails.get(1)
details.getName should include("Total GC time")
details.getValue should be("1200000")
}

it("returns the executor's run time") {
val details = heuristicResultDetails.get(2)
details.getName should include("Total Executor Runtime")
details.getValue should be("4740000")
}
}
}
}

object ExecutorGcHeuristicTest {
import JavaConverters._

def newFakeHeuristicConfigurationData(params: Map[String, String] = Map.empty): HeuristicConfigurationData =
new HeuristicConfigurationData("heuristic", "class", "view", new ApplicationType("type"), params.asJava)

def newFakeExecutorSummary(
id: String,
totalGCTime: Long,
totalDuration: Long
): ExecutorSummaryImpl = new ExecutorSummaryImpl(
id,
hostPort = "",
rddBlocks = 0,
memoryUsed=0,
diskUsed = 0,
activeTasks = 0,
failedTasks = 0,
completedTasks = 0,
totalTasks = 0,
totalDuration,
totalInputBytes=0,
totalShuffleRead=0,
totalShuffleWrite= 0,
maxMemory= 0,
totalGCTime,
executorLogs = Map.empty
)

def newFakeSparkApplicationData(
executorSummaries: Seq[ExecutorSummaryImpl]
): SparkApplicationData = {
val appId = "application_1"

val restDerivedData = SparkRestDerivedData(
new ApplicationInfoImpl(appId, name = "app", Seq.empty),
jobDatas = Seq.empty,
stageDatas = Seq.empty,
executorSummaries = executorSummaries
)
SparkApplicationData(appId, restDerivedData, None)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ object ExecutorsHeuristicTest {
totalShuffleRead,
totalShuffleWrite,
maxMemory,
totalGCTime = 0,
executorLogs = Map.empty
)

Expand Down

0 comments on commit 8c99625

Please sign in to comment.