Skip to content

Commit

Permalink
Peak Unified Memory Heuristic - (Depends on Custom SHS - Requires pea…
Browse files Browse the repository at this point in the history
…kUnifiedMemory metric) (#281)
  • Loading branch information
skakker authored and akshayrai committed Mar 19, 2018
1 parent 26da7c9 commit b296615
Show file tree
Hide file tree
Showing 13 changed files with 271 additions and 14 deletions.
6 changes: 6 additions & 0 deletions app-conf/HeuristicConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,12 @@
<classname>com.linkedin.drelephant.spark.heuristics.StagesHeuristic</classname>
<viewname>views.html.help.spark.helpStagesHeuristic</viewname>
</heuristic>
<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Peak Unified Memory</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.UnifiedMemoryHeuristic</classname>
<viewname>views.html.help.spark.helpUnifiedMemoryHeuristic</viewname>
</heuristic>
<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>JVM Used Memory</heuristicname>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ trait ExecutorSummary{
def totalMemoryBytesSpilled: Long
def executorLogs: Map[String, String]
def peakJvmUsedMemory: Map[String, Long]
def peakUnifiedMemory: Map[String, Long]
}

trait JobData{
Expand Down Expand Up @@ -298,7 +299,8 @@ class ExecutorSummaryImpl(
var totalGCTime: Long,
var totalMemoryBytesSpilled: Long,
var executorLogs: Map[String, String],
var peakJvmUsedMemory: Map[String, Long]) extends ExecutorSummary
var peakJvmUsedMemory: Map[String, Long],
var peakUnifiedMemory: Map[String, Long]) extends ExecutorSummary

class JobDataImpl(
var jobId: Int,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.linkedin.drelephant.spark.heuristics

import com.linkedin.drelephant.analysis._
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.SparkApplicationData
import com.linkedin.drelephant.spark.fetchers.statusapiv1.ExecutorSummary
import com.linkedin.drelephant.util.MemoryFormatUtils

import scala.collection.JavaConverters


/**
* A heuristic based on peak unified memory for the spark executors
*
* This heuristic reports the fraction of memory used/ memory allocated and if the fraction can be reduced. Also, it checks for the skew in peak unified memory and reports if the skew is too much.
*/
class UnifiedMemoryHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData)
extends Heuristic[SparkApplicationData] {

import UnifiedMemoryHeuristic._
import JavaConverters._

override def getHeuristicConfData(): HeuristicConfigurationData = heuristicConfigurationData

lazy val peakUnifiedMemoryThresholdString: String = heuristicConfigurationData.getParamMap.get(PEAK_UNIFIED_MEMORY_THRESHOLD_KEY)

override def apply(data: SparkApplicationData): HeuristicResult = {
val evaluator = new Evaluator(this, data)

var resultDetails = Seq(
new HeuristicResultDetails("Unified Memory Space Allocated", MemoryFormatUtils.bytesToString(evaluator.maxMemory)),
new HeuristicResultDetails("Mean peak unified memory", MemoryFormatUtils.bytesToString(evaluator.meanUnifiedMemory)),
new HeuristicResultDetails("Max peak unified memory", MemoryFormatUtils.bytesToString(evaluator.maxUnifiedMemory)),
new HeuristicResultDetails("spark.executor.memory", MemoryFormatUtils.bytesToString(evaluator.sparkExecutorMemory)),
new HeuristicResultDetails("spark.memory.fraction", evaluator.sparkMemoryFraction.toString)
)

val result = new HeuristicResult(
heuristicConfigurationData.getClassName,
heuristicConfigurationData.getHeuristicName,
evaluator.severity,
0,
resultDetails.asJava
)
result
}
}

object UnifiedMemoryHeuristic {

val EXECUTION_MEMORY = "executionMemory"
val STORAGE_MEMORY = "storageMemory"
val SPARK_EXECUTOR_MEMORY_KEY = "spark.executor.memory"
val SPARK_MEMORY_FRACTION_KEY = "spark.memory.fraction"
val PEAK_UNIFIED_MEMORY_THRESHOLD_KEY = "peak_unified_memory_threshold"

class Evaluator(unifiedMemoryHeuristic: UnifiedMemoryHeuristic, data: SparkApplicationData) {
lazy val appConfigurationProperties: Map[String, String] =
data.appConfigurationProperties

lazy val DEFAULT_PEAK_UNIFIED_MEMORY_THRESHOLD: SeverityThresholds = SeverityThresholds(low = 0.7 * maxMemory, moderate = 0.6 * maxMemory, severe = 0.4 * maxMemory, critical = 0.2 * maxMemory, ascending = false)

lazy val executorSummaries: Seq[ExecutorSummary] = data.executorSummaries
if (executorSummaries == null) {
throw new Exception("Executors Summary is null.")
}

val executorList: Seq[ExecutorSummary] = executorSummaries.filterNot(_.id.equals("driver"))
if (executorList.isEmpty) {
throw new Exception("No executor information available.")
}

//allocated memory for the unified region
val maxMemory: Long = executorList.head.maxMemory

val PEAK_UNIFIED_MEMORY_THRESHOLDS: SeverityThresholds = if (unifiedMemoryHeuristic.peakUnifiedMemoryThresholdString == null) {
DEFAULT_PEAK_UNIFIED_MEMORY_THRESHOLD
} else {
SeverityThresholds.parse(unifiedMemoryHeuristic.peakUnifiedMemoryThresholdString.split(",").map(_.toDouble * maxMemory).toString, ascending = false).getOrElse(DEFAULT_PEAK_UNIFIED_MEMORY_THRESHOLD)
}

def getPeakUnifiedMemoryExecutorSeverity(executorSummary: ExecutorSummary): Severity = {
return PEAK_UNIFIED_MEMORY_THRESHOLDS.severityOf(executorSummary.peakUnifiedMemory.getOrElse(EXECUTION_MEMORY, 0).asInstanceOf[Number].longValue
+ executorSummary.peakUnifiedMemory.getOrElse(STORAGE_MEMORY, 0).asInstanceOf[Number].longValue)
}

val sparkExecutorMemory: Long = (appConfigurationProperties.get(SPARK_EXECUTOR_MEMORY_KEY).map(MemoryFormatUtils.stringToBytes)).getOrElse(0L)

val sparkMemoryFraction: Double = appConfigurationProperties.getOrElse(SPARK_MEMORY_FRACTION_KEY, 0.6D).asInstanceOf[Number].doubleValue

lazy val meanUnifiedMemory: Long = (executorList.map {
executorSummary => {
executorSummary.peakUnifiedMemory.getOrElse(EXECUTION_MEMORY, 0).asInstanceOf[Number].longValue
+executorSummary.peakUnifiedMemory.getOrElse(STORAGE_MEMORY, 0).asInstanceOf[Number].longValue
}
}.sum) / executorList.size

lazy val maxUnifiedMemory: Long = executorList.map {
executorSummary => {
executorSummary.peakUnifiedMemory.getOrElse(EXECUTION_MEMORY, 0).asInstanceOf[Number].longValue
+executorSummary.peakUnifiedMemory.getOrElse(STORAGE_MEMORY, 0).asInstanceOf[Number].longValue
}
}.max

lazy val severity: Severity = {
var severityPeakUnifiedMemoryVariable: Severity = Severity.NONE
for (executorSummary <- executorList) {
var peakUnifiedMemoryExecutorSeverity: Severity = getPeakUnifiedMemoryExecutorSeverity(executorSummary)
if (peakUnifiedMemoryExecutorSeverity.getValue > severityPeakUnifiedMemoryVariable.getValue) {
severityPeakUnifiedMemoryVariable = peakUnifiedMemoryExecutorSeverity
}
}
severityPeakUnifiedMemoryVariable
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import scala.util.Try
import com.linkedin.drelephant.spark.fetchers.statusapiv1._
import org.apache.spark.JobExecutionStatus
import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageStatus

/**
* Converters for legacy SparkApplicationData to current SparkApplicationData.
*
Expand All @@ -46,7 +45,7 @@ object LegacyDataConverters {
override def shuffleWriteRecords: Long = 0
override def inputBytes: Long = 0
override def details: String = ""
override def tasks: Option[collection.Map[Long, TaskData]] = None
override def tasks = None
override def attemptId: Int = 0
override def stageId: Int = 0
override def memoryBytesSpilled: Long = 0
Expand Down Expand Up @@ -206,7 +205,8 @@ object LegacyDataConverters {
executorInfo.totalGCTime,
executorInfo.totalMemoryBytesSpilled,
executorLogs = Map.empty,
peakJvmUsedMemory = Map.empty
peakJvmUsedMemory = Map.empty,
peakUnifiedMemory = Map.empty
)
}

Expand Down
24 changes: 24 additions & 0 deletions app/views/help/spark/helpUnifiedMemoryHeuristic.scala.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
@*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*@
<p>Peak Unified Memory Heuristic identifies and flags jobs which have over allocated Unified Memory region.</p>
<h4>Peak Unified Memory</h4>
<p>If the job's Peak Unified Memory Consumption is much smaller than the allocated Unified Memory space, then we recommend decreasing the allocated Unified Memory Region for your job.</p>
<h3>Action Items</h3>
<p>The Allocated Unified Memory Region can be reduced in the following ways: </p>
<p>1. If your job's Executor Memory is already low, then reduce <i>spark.memory.fraction</i> which will reduce the amount of space allocated to the Unified Memory Region.</p>
<p>2. If your job's Executor Memory is high, then we recommend reducing the <i>spark.executor.memory</i> itself which will lower the Allocated Unified Memory space.</p>
<p>Note:</p>
<p><i>spark.memory.fraction</i>: This is the fraction of JVM Used Memory (Executor memory - Reserved memory) dedicated to the unified memory region (execution + storage). It basically partitions user memory from execution and storage memory.</p>
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ object SparkMetricsAggregatorTest {
totalGCTime = 0,
totalMemoryBytesSpilled = 0,
executorLogs = Map.empty,
peakJvmUsedMemory = Map.empty
peakJvmUsedMemory = Map.empty,
peakUnifiedMemory = Map.empty
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ object ExecutorGcHeuristicTest {
totalGCTime,
totalMemoryBytesSpilled = 0,
executorLogs = Map.empty,
peakJvmUsedMemory = Map.empty
peakJvmUsedMemory = Map.empty,
peakUnifiedMemory = Map.empty
)

def newFakeSparkApplicationData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ object ExecutorStorageSpillHeuristicTest {
maxMemory= 0,
totalGCTime = 0,
totalMemoryBytesSpilled,
executorLogs = Map.empty
executorLogs = Map.empty,
peakJvmUsedMemory = Map.empty,
peakUnifiedMemory = Map.empty
)

def newFakeSparkApplicationData(
Expand All @@ -124,7 +126,8 @@ object ExecutorStorageSpillHeuristicTest {
new ApplicationInfoImpl(appId, name = "app", Seq.empty),
jobDatas = Seq.empty,
stageDatas = Seq.empty,
executorSummaries = executorSummaries
executorSummaries = executorSummaries,
stagesWithFailedTasks = Seq.empty
)

val logDerivedData = SparkLogDerivedData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ object ExecutorsHeuristicTest {
totalGCTime = 0,
totalMemoryBytesSpilled = 0,
executorLogs = Map.empty,
peakJvmUsedMemory = Map.empty
peakJvmUsedMemory = Map.empty,
peakUnifiedMemory = Map.empty
)

def newFakeSparkApplicationData(executorSummaries: Seq[ExecutorSummaryImpl]): SparkApplicationData = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@ object JvmUsedMemoryHeuristicTest {
totalShuffleWrite = 0,
maxMemory = 0,
totalGCTime = 0,
totalMemoryBytesSpilled = 0,
executorLogs = Map.empty,
peakJvmUsedMemory
peakJvmUsedMemory,
peakUnifiedMemory = Map.empty
)

def newFakeSparkApplicationData(
Expand All @@ -104,7 +106,8 @@ object JvmUsedMemoryHeuristicTest {
new ApplicationInfoImpl(appId, name = "app", Seq.empty),
jobDatas = Seq.empty,
stageDatas = Seq.empty,
executorSummaries = executorSummaries
executorSummaries = executorSummaries,
stagesWithFailedTasks = Seq.empty
)

SparkApplicationData(appId, restDerivedData, Some(logDerivedData))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.concurrent.duration.Duration
import com.linkedin.drelephant.analysis.{ApplicationType, Severity}
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, JobDataImpl, StageDataImpl}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, StageDataImpl}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageStatus
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
import org.scalatest.{FunSpec, Matchers}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package com.linkedin.drelephant.spark.heuristics

import com.linkedin.drelephant.analysis.{ApplicationType, Severity}
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, ExecutorSummaryImpl}
import org.scalatest.{FunSpec, Matchers}

import scala.collection.JavaConverters

class UnifiedMemoryHeuristicTest extends FunSpec with Matchers {

import UnifiedMemoryHeuristicTest._

val heuristicConfigurationData = newFakeHeuristicConfigurationData()

val memoryFractionHeuristic = new UnifiedMemoryHeuristic(heuristicConfigurationData)

val executorData = Seq(
newDummyExecutorData("1", 400000, Map("executionMemory" -> 300000, "storageMemory" -> 94567)),
newDummyExecutorData("2", 400000, Map("executionMemory" -> 200000, "storageMemory" -> 34568)),
newDummyExecutorData("3", 400000, Map("executionMemory" -> 300000, "storageMemory" -> 34569)),
newDummyExecutorData("4", 400000, Map("executionMemory" -> 20000, "storageMemory" -> 3456)),
newDummyExecutorData("5", 400000, Map("executionMemory" -> 200000, "storageMemory" -> 34564)),
newDummyExecutorData("6", 400000, Map("executionMemory" -> 300000, "storageMemory" -> 94561))
)
describe(".apply") {
val data = newFakeSparkApplicationData(executorData)
val heuristicResult = memoryFractionHeuristic.apply(data)
val heuristicResultDetails = heuristicResult.getHeuristicResultDetails

it("has severity") {
heuristicResult.getSeverity should be(Severity.CRITICAL)
}
}
}

object UnifiedMemoryHeuristicTest {

import JavaConverters._

def newFakeHeuristicConfigurationData(params: Map[String, String] = Map.empty): HeuristicConfigurationData =
new HeuristicConfigurationData("heuristic", "class", "view", new ApplicationType("type"), params.asJava)

def newDummyExecutorData(
id: String,
maxMemory: Long,
peakUnifiedMemory: Map[String, Long]
): ExecutorSummaryImpl = new ExecutorSummaryImpl(
id,
hostPort = "",
rddBlocks = 0,
memoryUsed = 0,
diskUsed = 0,
activeTasks = 0,
failedTasks = 0,
completedTasks = 0,
totalTasks = 0,
totalDuration = 0,
totalInputBytes = 0,
totalShuffleRead = 0,
totalShuffleWrite = 0,
maxMemory,
totalGCTime = 0,
totalMemoryBytesSpilled = 0,
executorLogs = Map.empty,
peakJvmUsedMemory = Map.empty,
peakUnifiedMemory
)

def newFakeSparkApplicationData(executorSummaries: Seq[ExecutorSummaryImpl]): SparkApplicationData = {
val appId = "application_1"
val restDerivedData = SparkRestDerivedData(
new ApplicationInfoImpl(appId, name = "app", Seq.empty),
jobDatas = Seq.empty,
stageDatas = Seq.empty,
executorSummaries = executorSummaries,
stagesWithFailedTasks = Seq.empty
)

SparkApplicationData(appId, restDerivedData, logDerivedData = None)
}
}
4 changes: 2 additions & 2 deletions test/com/linkedin/drelephant/util/SparkUtilsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ object SparkUtilsTest extends MockitoSugar {
BDDMockito.given(fileStatus.getPath()).willReturn(expectedPath)
fileStatus
}
val expectedStatusArray = Array(expectedFileStatus)
val expectedStatusArray = Array(expectedFileStatus)

val filter = new PathFilter() {
override def accept(file: Path): Boolean = {
Expand All @@ -298,7 +298,7 @@ object SparkUtilsTest extends MockitoSugar {
BDDMockito.given(fs.getUri).willReturn(fileSystemUri)
BDDMockito.given(fs.exists(expectedPath)).willReturn(true)
BDDMockito.given(fs.getFileStatus(expectedPath)).willReturn(expectedFileStatus)
BDDMockito.given(fs.listStatus(org.mockito.Matchers.refEq(new Path( new Path(fileSystemUri), basePath)),
BDDMockito.given(fs.listStatus(org.mockito.Matchers.refEq(new Path(new Path(fileSystemUri), basePath)),
org.mockito.Matchers.any(filter.getClass))).
willReturn(expectedStatusArray)
BDDMockito.given(fs.open(expectedPath)).willReturn(
Expand Down

0 comments on commit b296615

Please sign in to comment.