Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peak Unified Memory Heuristic - (Depends on Custom SHS - Requires peakUnifiedMemory metric) #281

Merged
merged 11 commits into from
Jan 10, 2018
6 changes: 6 additions & 0 deletions app-conf/HeuristicConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,12 @@
<classname>com.linkedin.drelephant.spark.heuristics.StagesHeuristic</classname>
<viewname>views.html.help.spark.helpStagesHeuristic</viewname>
</heuristic>
<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Peak Unified Memory</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.UnifiedMemoryHeuristic</classname>
<viewname>views.html.help.spark.helpUnifiedMemoryHeuristic</viewname>
</heuristic>
<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>JVM Used Memory</heuristicname>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ trait ExecutorSummary{
def totalMemoryBytesSpilled: Long
def executorLogs: Map[String, String]
def peakJvmUsedMemory: Map[String, Long]
def peakUnifiedMemory: Map[String, Long]
}

trait JobData{
Expand Down Expand Up @@ -298,7 +299,8 @@ class ExecutorSummaryImpl(
var totalGCTime: Long,
var totalMemoryBytesSpilled: Long,
var executorLogs: Map[String, String],
var peakJvmUsedMemory: Map[String, Long]) extends ExecutorSummary
var peakJvmUsedMemory: Map[String, Long],
var peakUnifiedMemory: Map[String, Long]) extends ExecutorSummary

class JobDataImpl(
var jobId: Int,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.linkedin.drelephant.spark.heuristics

import com.linkedin.drelephant.analysis._
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.SparkApplicationData
import com.linkedin.drelephant.spark.fetchers.statusapiv1.ExecutorSummary

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Newline between linkedin and scala imports?

import com.linkedin.drelephant.util.MemoryFormatUtils

import scala.collection.JavaConverters


/**
* A heuristic based on peak unified memory for the spark executors
*
* This heuristic reports the fraction of memory used/ memory allocated and if the fraction can be reduced. Also, it checks for the skew in peak unified memory and reports if the skew is too much.
*/
class UnifiedMemoryHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData)
extends Heuristic[SparkApplicationData] {

import UnifiedMemoryHeuristic._
import JavaConverters._

override def getHeuristicConfData(): HeuristicConfigurationData = heuristicConfigurationData

lazy val peakUnifiedMemoryThresholdString: String = heuristicConfigurationData.getParamMap.get(PEAK_UNIFIED_MEMORY_THRESHOLD_KEY)

override def apply(data: SparkApplicationData): HeuristicResult = {
val evaluator = new Evaluator(this, data)

var resultDetails = Seq(
new HeuristicResultDetails("Unified Memory Space Allocated", MemoryFormatUtils.bytesToString(evaluator.maxMemory)),
new HeuristicResultDetails("Mean peak unified memory", MemoryFormatUtils.bytesToString(evaluator.meanUnifiedMemory)),
new HeuristicResultDetails("Max peak unified memory", MemoryFormatUtils.bytesToString(evaluator.maxUnifiedMemory)),
new HeuristicResultDetails("spark.executor.memory", MemoryFormatUtils.bytesToString(evaluator.sparkExecutorMemory)),
new HeuristicResultDetails("spark.memory.fraction", evaluator.sparkMemoryFraction.toString)
)

val result = new HeuristicResult(
heuristicConfigurationData.getClassName,
heuristicConfigurationData.getHeuristicName,
evaluator.severity,
0,
resultDetails.asJava
)
result
}
}

object UnifiedMemoryHeuristic {

val EXECUTION_MEMORY = "executionMemory"
val STORAGE_MEMORY = "storageMemory"
val SPARK_EXECUTOR_MEMORY_KEY = "spark.executor.memory"
val SPARK_MEMORY_FRACTION_KEY = "spark.memory.fraction"
val PEAK_UNIFIED_MEMORY_THRESHOLD_KEY = "peak_unified_memory_threshold"

class Evaluator(unifiedMemoryHeuristic: UnifiedMemoryHeuristic, data: SparkApplicationData) {
lazy val appConfigurationProperties: Map[String, String] =
data.appConfigurationProperties

lazy val DEFAULT_PEAK_UNIFIED_MEMORY_THRESHOLD: SeverityThresholds = SeverityThresholds(low = 0.7 * maxMemory, moderate = 0.6 * maxMemory, severe = 0.4 * maxMemory, critical = 0.2 * maxMemory, ascending = false)

lazy val executorSummaries: Seq[ExecutorSummary] = data.executorSummaries
if (executorSummaries == null) {
throw new Exception("Executors Summary is null.")
}

val executorList: Seq[ExecutorSummary] = executorSummaries.filterNot(_.id.equals("driver"))
if (executorList.isEmpty) {
throw new Exception("No executor information available.")
}

//allocated memory for the unified region
val maxMemory: Long = executorList.head.maxMemory

val PEAK_UNIFIED_MEMORY_THRESHOLDS: SeverityThresholds = if (unifiedMemoryHeuristic.peakUnifiedMemoryThresholdString == null) {
DEFAULT_PEAK_UNIFIED_MEMORY_THRESHOLD
} else {
SeverityThresholds.parse(unifiedMemoryHeuristic.peakUnifiedMemoryThresholdString.split(",").map(_.toDouble * maxMemory).toString, ascending = false).getOrElse(DEFAULT_PEAK_UNIFIED_MEMORY_THRESHOLD)
}

def getPeakUnifiedMemoryExecutorSeverity(executorSummary: ExecutorSummary): Severity = {
return PEAK_UNIFIED_MEMORY_THRESHOLDS.severityOf(executorSummary.peakUnifiedMemory.getOrElse(EXECUTION_MEMORY, 0).asInstanceOf[Number].longValue
+ executorSummary.peakUnifiedMemory.getOrElse(STORAGE_MEMORY, 0).asInstanceOf[Number].longValue)
}

val sparkExecutorMemory: Long = (appConfigurationProperties.get(SPARK_EXECUTOR_MEMORY_KEY).map(MemoryFormatUtils.stringToBytes)).getOrElse(0L)

val sparkMemoryFraction: Double = appConfigurationProperties.getOrElse(SPARK_MEMORY_FRACTION_KEY, 0.6D).asInstanceOf[Number].doubleValue

lazy val meanUnifiedMemory: Long = (executorList.map {
executorSummary => {
executorSummary.peakUnifiedMemory.getOrElse(EXECUTION_MEMORY, 0).asInstanceOf[Number].longValue
+executorSummary.peakUnifiedMemory.getOrElse(STORAGE_MEMORY, 0).asInstanceOf[Number].longValue
}
}.sum) / executorList.size

lazy val maxUnifiedMemory: Long = executorList.map {
executorSummary => {
executorSummary.peakUnifiedMemory.getOrElse(EXECUTION_MEMORY, 0).asInstanceOf[Number].longValue
+executorSummary.peakUnifiedMemory.getOrElse(STORAGE_MEMORY, 0).asInstanceOf[Number].longValue
}
}.max

lazy val severity: Severity = {
var severityPeakUnifiedMemoryVariable: Severity = Severity.NONE
for (executorSummary <- executorList) {
var peakUnifiedMemoryExecutorSeverity: Severity = getPeakUnifiedMemoryExecutorSeverity(executorSummary)
if (peakUnifiedMemoryExecutorSeverity.getValue > severityPeakUnifiedMemoryVariable.getValue) {
severityPeakUnifiedMemoryVariable = peakUnifiedMemoryExecutorSeverity
}
}
severityPeakUnifiedMemoryVariable
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import scala.util.Try
import com.linkedin.drelephant.spark.fetchers.statusapiv1._
import org.apache.spark.JobExecutionStatus
import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageStatus

/**
* Converters for legacy SparkApplicationData to current SparkApplicationData.
*
Expand All @@ -46,7 +45,7 @@ object LegacyDataConverters {
override def shuffleWriteRecords: Long = 0
override def inputBytes: Long = 0
override def details: String = ""
override def tasks: Option[collection.Map[Long, TaskData]] = None
override def tasks = None
override def attemptId: Int = 0
override def stageId: Int = 0
override def memoryBytesSpilled: Long = 0
Expand Down Expand Up @@ -206,7 +205,8 @@ object LegacyDataConverters {
executorInfo.totalGCTime,
executorInfo.totalMemoryBytesSpilled,
executorLogs = Map.empty,
peakJvmUsedMemory = Map.empty
peakJvmUsedMemory = Map.empty,
peakUnifiedMemory = Map.empty
)
}

Expand Down
24 changes: 24 additions & 0 deletions app/views/help/spark/helpUnifiedMemoryHeuristic.scala.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
@*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*@
<p>Peak Unified Memory Heuristic identifies and flags jobs which have over allocated Unified Memory region.</p>
<h4>Peak Unified Memory</h4>
<p>If the job's Peak Unified Memory Consumption is much smaller than the allocated Unified Memory space, then we recommend decreasing the allocated Unified Memory Region for your job.</p>
<h3>Action Items</h3>
<p>The Allocated Unified Memory Region can be reduced in the following ways: </p>
<p>1. If your job's Executor Memory is already low, then reduce <i>spark.memory.fraction</i> which will reduce the amount of space allocated to the Unified Memory Region.</p>
<p>2. If your job's Executor Memory is high, then we recommend reducing the <i>spark.executor.memory</i> itself which will lower the Allocated Unified Memory space.</p>
<p>Note:</p>
<p><i>spark.memory.fraction</i>: This is the fraction of JVM Used Memory (Executor memory - Reserved memory) dedicated to the unified memory region (execution + storage). It basically partitions user memory from execution and storage memory.</p>
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ object SparkMetricsAggregatorTest {
totalGCTime = 0,
totalMemoryBytesSpilled = 0,
executorLogs = Map.empty,
peakJvmUsedMemory = Map.empty
peakJvmUsedMemory = Map.empty,
peakUnifiedMemory = Map.empty
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ object ExecutorGcHeuristicTest {
totalGCTime,
totalMemoryBytesSpilled = 0,
executorLogs = Map.empty,
peakJvmUsedMemory = Map.empty
peakJvmUsedMemory = Map.empty,
peakUnifiedMemory = Map.empty
)

def newFakeSparkApplicationData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ object ExecutorStorageSpillHeuristicTest {
maxMemory= 0,
totalGCTime = 0,
totalMemoryBytesSpilled,
executorLogs = Map.empty
executorLogs = Map.empty,
peakJvmUsedMemory = Map.empty,
peakUnifiedMemory = Map.empty
)

def newFakeSparkApplicationData(
Expand All @@ -124,7 +126,8 @@ object ExecutorStorageSpillHeuristicTest {
new ApplicationInfoImpl(appId, name = "app", Seq.empty),
jobDatas = Seq.empty,
stageDatas = Seq.empty,
executorSummaries = executorSummaries
executorSummaries = executorSummaries,
stagesWithFailedTasks = Seq.empty
)

val logDerivedData = SparkLogDerivedData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ object ExecutorsHeuristicTest {
totalGCTime = 0,
totalMemoryBytesSpilled = 0,
executorLogs = Map.empty,
peakJvmUsedMemory = Map.empty
peakJvmUsedMemory = Map.empty,
peakUnifiedMemory = Map.empty
)

def newFakeSparkApplicationData(executorSummaries: Seq[ExecutorSummaryImpl]): SparkApplicationData = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@ object JvmUsedMemoryHeuristicTest {
totalShuffleWrite = 0,
maxMemory = 0,
totalGCTime = 0,
totalMemoryBytesSpilled = 0,
executorLogs = Map.empty,
peakJvmUsedMemory
peakJvmUsedMemory,
peakUnifiedMemory = Map.empty
)

def newFakeSparkApplicationData(
Expand All @@ -104,7 +106,8 @@ object JvmUsedMemoryHeuristicTest {
new ApplicationInfoImpl(appId, name = "app", Seq.empty),
jobDatas = Seq.empty,
stageDatas = Seq.empty,
executorSummaries = executorSummaries
executorSummaries = executorSummaries,
stagesWithFailedTasks = Seq.empty
)

SparkApplicationData(appId, restDerivedData, Some(logDerivedData))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.concurrent.duration.Duration
import com.linkedin.drelephant.analysis.{ApplicationType, Severity}
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, JobDataImpl, StageDataImpl}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, StageDataImpl}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageStatus
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
import org.scalatest.{FunSpec, Matchers}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package com.linkedin.drelephant.spark.heuristics

import com.linkedin.drelephant.analysis.{ApplicationType, Severity}
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, ExecutorSummaryImpl}
import org.scalatest.{FunSpec, Matchers}

import scala.collection.JavaConverters

class UnifiedMemoryHeuristicTest extends FunSpec with Matchers {

import UnifiedMemoryHeuristicTest._

val heuristicConfigurationData = newFakeHeuristicConfigurationData()

val memoryFractionHeuristic = new UnifiedMemoryHeuristic(heuristicConfigurationData)

val executorData = Seq(
newDummyExecutorData("1", 400000, Map("executionMemory" -> 300000, "storageMemory" -> 94567)),
newDummyExecutorData("2", 400000, Map("executionMemory" -> 200000, "storageMemory" -> 34568)),
newDummyExecutorData("3", 400000, Map("executionMemory" -> 300000, "storageMemory" -> 34569)),
newDummyExecutorData("4", 400000, Map("executionMemory" -> 20000, "storageMemory" -> 3456)),
newDummyExecutorData("5", 400000, Map("executionMemory" -> 200000, "storageMemory" -> 34564)),
newDummyExecutorData("6", 400000, Map("executionMemory" -> 300000, "storageMemory" -> 94561))
)
describe(".apply") {
val data = newFakeSparkApplicationData(executorData)
val heuristicResult = memoryFractionHeuristic.apply(data)
val heuristicResultDetails = heuristicResult.getHeuristicResultDetails

it("has severity") {
heuristicResult.getSeverity should be(Severity.CRITICAL)
}
}
}

object UnifiedMemoryHeuristicTest {

import JavaConverters._

def newFakeHeuristicConfigurationData(params: Map[String, String] = Map.empty): HeuristicConfigurationData =
new HeuristicConfigurationData("heuristic", "class", "view", new ApplicationType("type"), params.asJava)

def newDummyExecutorData(
id: String,
maxMemory: Long,
peakUnifiedMemory: Map[String, Long]
): ExecutorSummaryImpl = new ExecutorSummaryImpl(
id,
hostPort = "",
rddBlocks = 0,
memoryUsed = 0,
diskUsed = 0,
activeTasks = 0,
failedTasks = 0,
completedTasks = 0,
totalTasks = 0,
totalDuration = 0,
totalInputBytes = 0,
totalShuffleRead = 0,
totalShuffleWrite = 0,
maxMemory,
totalGCTime = 0,
totalMemoryBytesSpilled = 0,
executorLogs = Map.empty,
peakJvmUsedMemory = Map.empty,
peakUnifiedMemory
)

def newFakeSparkApplicationData(executorSummaries: Seq[ExecutorSummaryImpl]): SparkApplicationData = {
val appId = "application_1"
val restDerivedData = SparkRestDerivedData(
new ApplicationInfoImpl(appId, name = "app", Seq.empty),
jobDatas = Seq.empty,
stageDatas = Seq.empty,
executorSummaries = executorSummaries,
stagesWithFailedTasks = Seq.empty
)

SparkApplicationData(appId, restDerivedData, logDerivedData = None)
}
}
4 changes: 2 additions & 2 deletions test/com/linkedin/drelephant/util/SparkUtilsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ object SparkUtilsTest extends MockitoSugar {
BDDMockito.given(fileStatus.getPath()).willReturn(expectedPath)
fileStatus
}
val expectedStatusArray = Array(expectedFileStatus)
val expectedStatusArray = Array(expectedFileStatus)

val filter = new PathFilter() {
override def accept(file: Path): Boolean = {
Expand All @@ -298,7 +298,7 @@ object SparkUtilsTest extends MockitoSugar {
BDDMockito.given(fs.getUri).willReturn(fileSystemUri)
BDDMockito.given(fs.exists(expectedPath)).willReturn(true)
BDDMockito.given(fs.getFileStatus(expectedPath)).willReturn(expectedFileStatus)
BDDMockito.given(fs.listStatus(org.mockito.Matchers.refEq(new Path( new Path(fileSystemUri), basePath)),
BDDMockito.given(fs.listStatus(org.mockito.Matchers.refEq(new Path(new Path(fileSystemUri), basePath)),
org.mockito.Matchers.any(filter.getClass))).
willReturn(expectedStatusArray)
BDDMockito.given(fs.open(expectedPath)).willReturn(
Expand Down