Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peak Unified Memory Heuristic - (Depends on Custom SHS - Requires peakUnifiedMemory metric) #281

Merged
merged 11 commits into from
Jan 10, 2018

Conversation

skakker
Copy link
Contributor

@skakker skakker commented Sep 4, 2017

The amount of unified memory can be examined to see if spark.memory.fraction can be adjusted. If the ratio of unified memory to executor memory is much smaller than spark.memory.fraction, then more memory has been reserved for execution than is being used. This memory can instead be allocated for user memory, and/or total executor memory could be reduced.

*
* This heuristic reports the fraction of memory used/ memory allocated for execution and if the fraction can be reduced. Also, it checks for the skew in peak unified memory and reports if the skew is too much.
*/

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra newline.

import com.linkedin.drelephant.spark.data.SparkApplicationData
import com.linkedin.drelephant.spark.fetchers.statusapiv1.ExecutorSummary
import scala.collection.JavaConverters

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra newline.

import com.linkedin.drelephant.analysis._
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.SparkApplicationData
import com.linkedin.drelephant.spark.fetchers.statusapiv1.ExecutorSummary

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Newline between linkedin and scala imports?

/**
* A heuristic based on peak unified memory for the spark executors
*
* This heuristic reports the fraction of memory used/ memory allocated for execution and if the fraction can be reduced. Also, it checks for the skew in peak unified memory and reports if the skew is too much.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unified memory is both execution and storage.

class UnifiedMemoryHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData)
extends Heuristic[SparkApplicationData] {

import UnifiedMemoryHeuristic._

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these imports needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need unifiedMemoryHeuristic import as we are using the "evaluator" inside the class and the JavaConverters import as the resultDetails is converted (.asJava) before being passed as an argument to Heuristic Result.

<p>If the ratio of unified memory to executor memory is much smaller than "spark.memory.fraction", then more memory has been reserved for execution than is being used. This memory can instead be allocated for user memory, and/or total executor memory could be reduced.</p>
<p>spark.memory.fraction: this is the fraction of (executor memory - reserved memory) used for execution and storage. This partitions user memory from execution and storage memory.</p>
<h4>Unified Memory Skew</h4>
<p>Skew in the amount of unified memory for different executors might indicates a similar imbalance in the amount of work (and data) for tasks. It should be more balanced.</p>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change "might indicates" to "might indicate"


object UnifiedMemoryHeuristic {

val JVM_USED_MEMORY = "jvmUsedMemory"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be replaced by vals for storage memory and execution memory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get this, could you please be more specific?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unified memory is the sum of storage and execution memory. This is different from JVM used memory.
Storage memory is the amount of memory used by Spark for storing RDDS, broadcast variables, etc.
Execution memory is the amount of memory used for execution (shuffle, sort, etc.).
For more recent versions of Spark, execution and storage memory share a unified memory region, and are able to borrow from each other, if one still has extra capacity. Unified memory is thus the sum of storage and execution memory.
JVM memory is the JVM memory used by the Spark application.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yupp I understand that. Now peakUnifiedMemory contains the following key value pairs:

“peakUnifiedMemory” : {
“jvmUsedMemory” : ,
“executionMemory”: ,
“storageMemory”: ,
“time”: ,
“activeStages”: [, ...]
}

so for the purpose of this heuristic, to calculate the peak unified memory, do you mean that I should take sum of executionMemory and storageMemory instead of jvmUsedMemory?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please use the sum of executionMemory and storageMemory for unifiedMemory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.. Thank you! :)

SeverityThresholds(low = 1.5 * meanUnifiedMemory, moderate = 2 * meanUnifiedMemory, severe = 4 * meanUnifiedMemory, critical = 8 * meanUnifiedMemory, ascending = true)

def getPeakUnifiedMemoryExecutorSeverity(executorSummary: ExecutorSummary): Severity = {
var jvmPeakUnifiedMemory: Long = executorSummary.peakUnifiedMemory.getOrElse(JVM_USED_MEMORY, 0)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is memory controlled by Spark, so peakUnifiedMemory might be better than jvmPeakUnifiedMemory.

SeverityThresholds(low = 1.5 * meanUnifiedMemory, moderate = 2 * meanUnifiedMemory, severe = 4 * meanUnifiedMemory, critical = 8 * meanUnifiedMemory, ascending = true)

def getPeakUnifiedMemoryExecutorSeverity(executorSummary: ExecutorSummary): Severity = {
var jvmPeakUnifiedMemory: Long = executorSummary.peakUnifiedMemory.getOrElse(JVM_USED_MEMORY, 0)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unified memory is the sum of execution and storage memory. Right now it is using jvmUsedMemory.

val memoryFractionHeuristic = new UnifiedMemoryHeuristic(heuristicConfigurationData)

val executorData = Seq(
newDummyExecutorData("1", 400000, Map("jvmUsedMemory" -> 394567)),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should set execution and storage memory.

@@ -87,7 +86,9 @@ trait ExecutorSummary{
def totalShuffleRead: Long
def totalShuffleWrite: Long
def maxMemory: Long
def executorLogs: Map[String, String]}
def executorLogs: Map[String, String]
def peakUnifiedMemory: Map[String, Long]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you get peakUnifiedMemroy value? I don't think Spark history server reports it.

@@ -292,7 +293,8 @@ class ExecutorSummaryImpl(
var totalShuffleRead: Long,
var totalShuffleWrite: Long,
var maxMemory: Long,
var executorLogs: Map[String, String]) extends ExecutorSummary
var executorLogs: Map[String, String],
var peakUnifiedMemory: Map[String, Long]) extends ExecutorSummary
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you get peakUnifiedMemroy value? I don't think Spark history server reports it.

* License for the specific language governing permissions and limitations under
* the License.
*@
<p> This is a heuristic for peak Unified Memory </p>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This trivial sentence doesn't convey anything. You could say something like,

Peak Unified Memory Heuristic identifies and flags jobs which have over allocated Unified Memory region.

import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
import org.scalatest.{FunSpec, Matchers}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageStatus
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Organize the imports

@@ -193,6 +193,12 @@
<classname>com.linkedin.drelephant.spark.heuristics.StagesHeuristic</classname>
<viewname>views.html.help.spark.helpStagesHeuristic</viewname>
</heuristic>
<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Spark Peak Unified Memory</heuristicname>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because these heuristics are applicable only to Spark applications, I don't think we should mention Spark in the Heuristic Name. It could just be Peak Unified Memory

Maybe a personal preference. What do you think?

<p> This is a heuristic for peak Unified Memory </p>
<h4>Peak Unified Memory</h4>
<p>If the peak unified memory is much smaller than allocated executor memory then we recommend to decrease spark.memory.fraction, or total executor memory.</p>
<p>spark.memory.fraction: this is the fraction of (executor memory - reserved memory) used for execution and storage. This partitions user memory from execution and storage memory.</p>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requesting changes:

  1. Add a <p>Note:</p> and then provide the details.
  2. Make spark.memory.fraction italics.
  3. Rephrase: This is the fraction of JVM Used Memory (Executor memory - Reserved memory) dedicated to the unified memory region (execution + storage).

*@
<p> This is a heuristic for peak Unified Memory </p>
<h4>Peak Unified Memory</h4>
<p>If the peak unified memory is much smaller than allocated executor memory then we recommend to decrease spark.memory.fraction, or total executor memory.</p>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rephrase:

If the job's Peak Unified Memory Consumption is much smaller than the allocated Unified Memory space, then we recommend decreasing the allocated Unified Memory Region for your job.

Action Item:
The Allocated Unified Memory Region can be reduced in the following ways.

  1. If your job's Executor Memory is already low, then reduce spark.memory.fraction which will reduce the amount of space allocated to the Unified Memory Region.
  2. If your job's Executor Memory is high, then we recommend reducing the spark.executor.memory itself which will lower the Allocated Unified Memory space.

var resultDetails = Seq(
new HeuristicResultDetails("Allocated memory for the unified region", MemoryFormatUtils.bytesToString(evaluator.maxMemory)),
new HeuristicResultDetails("Mean peak unified memory", MemoryFormatUtils.bytesToString(evaluator.meanUnifiedMemory))
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Show the Spark Executor Memory as well.

new HeuristicResultDetails("Mean peak unified memory", MemoryFormatUtils.bytesToString(evaluator.meanUnifiedMemory))
)

if (evaluator.severity.getValue > Severity.LOW.getValue) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this. Suggestions are given on the help page.

val executorList: Seq[ExecutorSummary] = executorSummaries.filterNot(_.id.equals("driver"))

//allocated memory for the unified region
val maxMemory: Long = executorList.head.maxMemory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if executorList is empty?

lazy val executorSummaries: Seq[ExecutorSummary] = data.executorSummaries
val executorList: Seq[ExecutorSummary] = executorSummaries.filterNot(_.id.equals("driver"))

//allocated memory for the unified region
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure if maxMemory is the allocated memory? I couldn't find any documentation in Spark


var resultDetails = Seq(
new HeuristicResultDetails("Allocated memory for the unified region", MemoryFormatUtils.bytesToString(evaluator.maxMemory)),
new HeuristicResultDetails("Mean peak unified memory", MemoryFormatUtils.bytesToString(evaluator.meanUnifiedMemory))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would also make sense to show the max Peak unified memory among all executors.

@akshayrai
Copy link
Contributor

Nice work @skakker. Thanks for addressing all the comments.

@akshayrai akshayrai changed the title Peak unified memory heuristic implemented, DEPENDENCY: REST-API changes Peak unified memory heuristic implemented - (Depends on Custom SHS - Requires peakUnifiedMemory metric) Jan 10, 2018
@skakker skakker changed the base branch from master to customSHSWork January 10, 2018 06:14
@akshayrai akshayrai changed the title Peak unified memory heuristic implemented - (Depends on Custom SHS - Requires peakUnifiedMemory metric) Peak Unified Memory Heuristic - (Depends on Custom SHS - Requires peakUnifiedMemory metric) Jan 10, 2018
@akshayrai akshayrai merged commit 13115c2 into linkedin:customSHSWork Jan 10, 2018
akshayrai pushed a commit that referenced this pull request Feb 21, 2018
akshayrai pushed a commit that referenced this pull request Feb 27, 2018
akshayrai pushed a commit that referenced this pull request Mar 6, 2018
arpang pushed a commit to arpang/dr-elephant that referenced this pull request Mar 14, 2018
akshayrai pushed a commit that referenced this pull request Mar 19, 2018
akshayrai pushed a commit that referenced this pull request Mar 19, 2018
akshayrai pushed a commit that referenced this pull request Mar 30, 2018
akshayrai pushed a commit that referenced this pull request Apr 6, 2018
akshayrai pushed a commit that referenced this pull request May 21, 2018
pralabhkumar pushed a commit to pralabhkumar/dr-elephant that referenced this pull request Aug 31, 2018
varunsaxena pushed a commit that referenced this pull request Oct 16, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants