Skip to content

Commit

Permalink
Removed skew check
Browse files Browse the repository at this point in the history
  • Loading branch information
swasti committed Jan 8, 2018
1 parent 9c28e72 commit 6fb2725
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,13 @@ class UnifiedMemoryHeuristic(private val heuristicConfigurationData: HeuristicCo
val evaluator = new Evaluator(this, data)

var resultDetails = Seq(
new HeuristicResultDetails("Max peak unified memory", evaluator.maxUnifiedMemory.toString),
new HeuristicResultDetails("Allocated memory for the unified region", evaluator.maxMemory.toString),
new HeuristicResultDetails("Mean peak unified memory", evaluator.meanUnifiedMemory.toString)
)

if (evaluator.severityPeak.getValue > Severity.LOW.getValue) {
if (evaluator.severity.getValue > Severity.LOW.getValue) {
resultDetails = resultDetails :+ new HeuristicResultDetails("Note", "The value of peak unified memory is very low, we recommend to decrease spark.memory.fraction, or total executor memory")
}
if (evaluator.severitySkew.getValue > Severity.LOW.getValue) {
resultDetails = resultDetails :+ new HeuristicResultDetails("Note", "There is an imbalance in the amount of memory used by executors, please look into this to see if it can be distributed more evenly")
}
val result = new HeuristicResult(
heuristicConfigurationData.getClassName,
heuristicConfigurationData.getHeuristicName,
Expand All @@ -72,33 +69,27 @@ object UnifiedMemoryHeuristic {
data.appConfigurationProperties

lazy val executorSummaries: Seq[ExecutorSummary] = data.executorSummaries
val executorList : Seq[ExecutorSummary] = executorSummaries.filterNot(_.id.equals("driver"))
val executorList: Seq[ExecutorSummary] = executorSummaries.filterNot(_.id.equals("driver"))

//allocated memory for the unified region
val maxMemory: Long = executorList.head.maxMemory

val DEFAULT_PEAK_UNIFIED_MEMORY_THRESHOLDS =
SeverityThresholds(low = 0.7 * maxMemory, moderate = 0.6 * maxMemory, severe = 0.4 * maxMemory, critical = 0.2 * maxMemory, ascending = false)

val DEFAULT_UNIFIED_MEMORY_SKEW_THRESHOLDS =
SeverityThresholds(low = 1.5 * meanUnifiedMemory, moderate = 2 * meanUnifiedMemory, severe = 4 * meanUnifiedMemory, critical = 8 * meanUnifiedMemory, ascending = true)

def getPeakUnifiedMemoryExecutorSeverity(executorSummary: ExecutorSummary): Severity = {
return DEFAULT_PEAK_UNIFIED_MEMORY_THRESHOLDS.severityOf(executorSummary.peakUnifiedMemory.getOrElse(EXECUTION_MEMORY, 0).asInstanceOf[Number].longValue
+ executorSummary.peakUnifiedMemory.getOrElse(STORAGE_MEMORY, 0).asInstanceOf[Number].longValue)
}

lazy val meanUnifiedMemory: Long = (executorList.map {
executorSummary=> {executorSummary.peakUnifiedMemory.getOrElse(EXECUTION_MEMORY, 0).asInstanceOf[Number].longValue
+ executorSummary.peakUnifiedMemory.getOrElse(STORAGE_MEMORY, 0).asInstanceOf[Number].longValue}
executorSummary => {
executorSummary.peakUnifiedMemory.getOrElse(EXECUTION_MEMORY, 0).asInstanceOf[Number].longValue
+ executorSummary.peakUnifiedMemory.getOrElse(STORAGE_MEMORY, 0).asInstanceOf[Number].longValue
}
}.sum) / executorList.size
lazy val maxUnifiedMemory: Long = executorList.map {
executorSummary => {executorSummary.peakUnifiedMemory.getOrElse(EXECUTION_MEMORY, 0).asInstanceOf[Number].longValue
+ executorSummary.peakUnifiedMemory.getOrElse(STORAGE_MEMORY, 0).asInstanceOf[Number].longValue}
}.max
val severitySkew = DEFAULT_UNIFIED_MEMORY_SKEW_THRESHOLDS.severityOf(maxUnifiedMemory)

lazy val severityPeak: Severity = {
lazy val severity: Severity = {
var severityPeakUnifiedMemoryVariable: Severity = Severity.NONE
for (executorSummary <- executorList) {
var peakUnifiedMemoryExecutorSeverity: Severity = getPeakUnifiedMemoryExecutorSeverity(executorSummary)
Expand All @@ -108,6 +99,6 @@ object UnifiedMemoryHeuristic {
}
severityPeakUnifiedMemoryVariable
}
lazy val severity: Severity = Severity.max(severityPeak, severitySkew)
}

}
2 changes: 0 additions & 2 deletions app/views/help/spark/helpUnifiedMemoryHeuristic.scala.html
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,3 @@
<h4>Peak Unified Memory</h4>
<p>If the peak unified memory is much smaller than allocated executor memory then we recommend to decrease spark.memory.fraction, or total executor memory.</p>
<p>spark.memory.fraction: this is the fraction of (executor memory - reserved memory) used for execution and storage. This partitions user memory from execution and storage memory.</p>
<h4>Unified Memory Skew</h4>
<p>Skew in the amount of unified memory for different executors might indicate a similar imbalance in the amount of work (and data) for tasks. It should be more balanced.</p>

0 comments on commit 6fb2725

Please sign in to comment.