Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark Stages with Failed tasks Heuristic - (Depends on Custom SHS - Requires stages/failedTasks Rest API) #288

Merged
merged 9 commits into from
Jan 10, 2018

Conversation

skakker
Copy link
Contributor

@skakker skakker commented Sep 19, 2017

Tasks (and stages and jobs) can fail if an error occurs while the task is executing. Recommendations are based on what kind of errors are encountered by tasks. Error information is found on the task level, and looking at the error message, recommendations are given.

@shkhrgpt
Copy link
Contributor

shkhrgpt commented Oct 5, 2017

I think it would good if you can provide a brief description of this PR. It helps in the review.
Thanks.

@skakker skakker force-pushed the FailedTasksHeuristic branch 2 times, most recently from 5964954 to a6fc106 Compare October 10, 2017 06:59
@akshayrai
Copy link
Contributor

@skakker , can you please update the description of all the PRs as @shkhrgpt has been pointing out. It helps the reviewers a lot.

Copy link
Contributor

@akshayrai akshayrai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please create a task for yourself and update the wiki page in github with all the Spark Heuristics?

https://github.com/linkedin/dr-elephant/wiki/Metrics-and-Heuristics#spark

@@ -92,6 +93,8 @@ class SparkRestClient(sparkConf: SparkConf) {
await(futureJobDatas),
await(futureStageDatas),
await(futureExecutorSummaries),
await(futureFailedTasksDatas),
//Seq.empty,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this.

@@ -213,6 +216,18 @@ class SparkRestClient(sparkConf: SparkConf) {
}
}
}

private def getStagesWithFailedTasks(attemptTarget: WebTarget): Seq[StageDataImpl] = {
val target = attemptTarget.path("stages/failedTasks")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't you get this information from the getStageData call? Why do you need to make a separate call for retrieving failed tasks?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @akshayrai. Why not get this information from getStageData?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a separate API developed for "failed tasks". If we fetch data from the StageDatas, we will have to iterate over all the stages and filter out the stages having failed tasks, which is a costly operation, hence using this API to fetch failed tasks.

Copy link
Contributor

@shkhrgpt shkhrgpt Dec 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's an API which is not available with the official version of Spark, then you need to add a comment about it. I am not sure if this PR should be merged unless the API is included in the official version of Spark.

I agree that iterating over all the stages could be a costly operation. But doing the same costly operation on Spark history server may be a lot more expensive because the history server is not only limited to Dr Elephant.

Copy link
Contributor Author

@skakker skakker Dec 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Shekhar
As it turns out, the stages API does not return the task information. The only way to have failed task information is this or to call task API separately for each task. Hence, we will have to use this call only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Shekhar
I have made the calling of FailedTasksAPI configurable, the default value being false, now you don't have to worry about it as for the official version of spark, it won't call the API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @skakker for making it configurable. However, I am still not in favor of merging this change because it's going to be useless for almost all the users unless they patch their SHS which is not a trivial exercise.
@akshayrai @shankar37 What do you guys think about this issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shkhrgpt, we will not merge these PRs (which depend on the custom Spark HS) with the master as long as the changes are not reflected in the public spark release.

We are planning to merge these to a separate branch for now.

@@ -0,0 +1,18 @@
package com.linkedin.drelephant.spark.fetchers.statusapiv1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comments on the motivation behind adding this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -0,0 +1,18 @@
package com.linkedin.drelephant.spark.fetchers.statusapiv1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this class required? If not please remove it.

class StagesWithFailedTasksResource {
@GET
def getStagesWithFailedTasks(@PathParam("appId") appId: String, @PathParam("attemptId") attemptId: String): Seq[StageDataImpl] =
if (attemptId == "2") Seq.empty else throw new Exception()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not clear what this code does here? Can you elaborate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we added a new REST call "failedTasksData", its a test for that.

new HeuristicResultDetails("Stages with Overhead memory errors", evaluator.stagesWithOverheadError.toString)
)
if(evaluator.severityOverheadStages.getValue >= Severity.MODERATE.getValue)
resultDetails = resultDetails :+ new HeuristicResultDetails("Overhead memory errors", "Many tasks have failed due to overhead memory error. please try increasing it by 500MB in spark.yarn.executor.memoryOverhead")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try increasing it ... to try increasing spark.yarn.executor.memoryOverhead by 500MB

resultDetails = resultDetails :+ new HeuristicResultDetails("Overhead memory errors", "Many tasks have failed due to overhead memory error. please try increasing it by 500MB in spark.yarn.executor.memoryOverhead")
//TODO: refine recommendations
if(evaluator.severityOOMStages.getValue >= Severity.MODERATE.getValue)
resultDetails = resultDetails :+ new HeuristicResultDetails("OOM errors", "Many tasks have failed due to OOM error. Kindly check by increasing executor memory, decreasing spark.memory.fraction or decreasing number of cores.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rephrase Kindly check by ... to try increasing spark.executor.memory or decreasing spark.memory.fraction (take a look at unified memory heuristic) or decreasing number of cores.



/**
* A heuristic based on errors encountered by failed tasks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add more details about this heuristic here.

*@
<p>Tasks (and stages and jobs) can fail if an error occurs while the task is executing.</p>

<p>Tasks may fail due to Overhead memory issues or OOM errors. These errors are checked and warning is given accordingly.</p>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please elaborate.

attemptId = 0,
numActiveTasks = numCompleteTasks,
numCompleteTasks,
numFailedTasks = 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semantically numFailedTasks should be set to 3.

@@ -79,7 +79,7 @@
-->
<fetcher>
<applicationtype>spark</applicationtype>
<classname>com.linkedin.drelephant.spark.fetchers.FSFetcher</classname>
<classname>com.linkedin.drelephant.spark.fetchers.SparkFetcher</classname>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be done in a separate change?

@@ -213,6 +215,18 @@ class SparkRestClient(sparkConf: SparkConf) {
}
}
}

private def getStagesWithFailedTasks(attemptTarget: WebTarget): Seq[StageDataImpl] = {
val target = attemptTarget.path("stages/failedTasks")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went through the Spark documentation but I couldn't find rest endpoint, stages/failedTasks, in Spark history server. Not sure how it's gonna work.

@@ -213,6 +216,18 @@ class SparkRestClient(sparkConf: SparkConf) {
}
}
}

private def getStagesWithFailedTasks(attemptTarget: WebTarget): Seq[StageDataImpl] = {
val target = attemptTarget.path("stages/failedTasks")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @akshayrai. Why not get this information from getStageData?

import org.apache.spark.util.EnumUtil;

// added this class to accomodate the status "PENDING" for stages.
public enum StageStatus {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of the leveldb SHS changes have been merged into the master branch for Spark, including "SKIPPED". It is not in 2.1 or 2.2 branches however. Is it possible to use the version from master? If not, could you please add a TODO to replace with the Spark version when possible?

new HeuristicResultDetails("Stages with Overhead memory errors", evaluator.stagesWithOverheadError.toString)
)
if (evaluator.severityOverheadStages.getValue >= Severity.MODERATE.getValue)
resultDetails = resultDetails :+ new HeuristicResultDetails("Overhead memory errors", "Many tasks have failed due to overhead memory error. Please try increasing spark.yarn.executor.memoryOverhead by 500MB in spark.yarn.executor.memoryOverhead")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're alerting if any tasks have an OOM error, or are killed by YARN. Can "Many" be changed to "some"?

private def getStageSeverity(numFailedTasks: Int, stageStatus: StageStatus, severityStage: Severity, numCompleteTasks: Int): Severity = {
var severityTemp: Severity = Severity.NONE
if (numFailedTasks != 0 && stageStatus != StageStatus.FAILED) {
if (numFailedTasks.toDouble / numCompleteTasks.toDouble < 2.toDouble / 100.toDouble) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The threshold (2) is hard coded right now -- please add a constant for this.

lazy val stagesWithFailedTasks: Seq[StageData] = data.stagesWithFailedTasks

/**
* returns the OOM and Overhead memory errors severity

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Combine the "@return" with the comment.

@shankar37
Copy link
Contributor

shankar37 commented Dec 15, 2017 via email

@shkhrgpt
Copy link
Contributor

I don't think depending on Spark features that are in beta is a good idea because those features may or may not exist in the final release. I am worried that users will try to use these new heuristics, and then they won't see results which might result in unnecessary issues. Just like the issues we facing with Spark 2.1.

Anyways, if we want to keep this change, then we need to have much more clear documentation. And in the PR, the author should also post the link to the JIRA/GitHub link of the new SHS patches on which this change depends. We need to make sure that at least those changes are merged in Spark before merging these changes. I would strongly recommend NOT to merge these changes until all the relevant SHS changes are merged in the main Spark. For example, this ticket about the better scalability of SHS, https://issues.apache.org/jira/browse/SPARK-18085, it still unresolved.

@skakker Please add the relevant documentation to the other PRs too which depend on the custom SHS.

@skakker
Copy link
Contributor Author

skakker commented Dec 19, 2017

@shkhrgpt
Yeah providing links to the relevant JIRAs makes sense. Will update the JIRAs with relevant JIRA ids.

@@ -193,6 +193,12 @@
<classname>com.linkedin.drelephant.spark.heuristics.StagesHeuristic</classname>
<viewname>views.html.help.spark.helpStagesHeuristic</viewname>
</heuristic>
<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Spark Stages with failed tasks</heuristicname>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stages with Failed Tasks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

val appId = analyticJob.getAppId
val restDerivedData = await(sparkRestClient.fetchData(appId, eventLogSource == EventLogSource.Rest))
val restDerivedData = await(sparkRestClient.fetchData(appId, eventLogSource == EventLogSource.Rest, fetchFailedTasks))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed you were passing fetchFailedTasks in every method prior to this? Can't you compute it directly over here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -41,6 +41,7 @@ class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData)
import ExecutionContext.Implicits.global

private val logger: Logger = Logger.getLogger(classOf[SparkFetcher])
val fetchFailedTasks : Boolean = Option(fetcherConfigurationData.getParamMap.get(FETCH_FAILED_TASKS)).getOrElse("false").toBoolean
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetchFailedTasks to doFetchFailedTasks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

*@
<p>Tasks (and stages and jobs) can fail if an error occurs while the task is executing.</p>

<p>Tasks may fail due to Overhead memory issues or OOM errors. Due to errors in tasks, that stage might also fail. It is analysed as to why the tasks failed, if many tasks of the same stage failed due to the same error, etc. Suggestions are given to prevent these errors.</p>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need more clarity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -34,14 +34,44 @@ import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageStatus
object LegacyDataConverters {
import JavaConverters._

//Currently returns a default object (as this JSON is retrieved from Spark History Server), if spark history server is not used to fetch data, changes are required
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need more clarity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if (numFailedTasks != 0 && stageStatus != StageStatus.FAILED) {
if (numFailedTasks.toDouble / numCompleteTasks.toDouble < ratioThreshold / 100.toDouble) {
severityTemp = Severity.MODERATE
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

formatting. Here and below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

/**
* returns the max (severity of this stage, present severity)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

define what is stage severity here in layman terms first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

severityTemp = Severity.SEVERE
}
}
else if (numFailedTasks != 0 && stageStatus == StageStatus.FAILED && numFailedTasks / numCompleteTasks > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if numCompleteTasks is 0?


val OOM_ERROR = "java.lang.OutOfMemoryError"
val OVERHEAD_MEMORY_ERROR = "killed by YARN for exceeding memory limits"
val ratioThreshold : Double = 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this configurable in HeuristicConf

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@shkhrgpt
Copy link
Contributor

shkhrgpt commented Jan 9, 2018

@akshayrai I think that is a good idea to not to merge these changes until the relevant Spark code is checked in.
@skakker Can you please post the GitHub/JIRA links of those Spark changes so their progress can be monitored. Thank you.

@akshayrai
Copy link
Contributor

@shkhrgpt , we are planning to merge this into a separate branch rather than keeping the PR open. We will merge it with the master once the Spark work is available in the public release.

@skakker will be sharing the JIRA details shortly.

@akshayrai akshayrai changed the title Stages with failed tasks heuristic Spark Stages with Failed tasks Heuristic - (Depends on Custom SHS - Requires stages/failedTasks Rest API) Jan 10, 2018
@skakker skakker changed the base branch from master to customSHSWork January 10, 2018 06:03
@akshayrai akshayrai merged commit 06b87a1 into linkedin:customSHSWork Jan 10, 2018
akshayrai pushed a commit that referenced this pull request Feb 21, 2018
akshayrai pushed a commit that referenced this pull request Feb 27, 2018
akshayrai pushed a commit that referenced this pull request Mar 6, 2018
arpang pushed a commit to arpang/dr-elephant that referenced this pull request Mar 14, 2018
akshayrai pushed a commit that referenced this pull request Mar 19, 2018
akshayrai pushed a commit that referenced this pull request Mar 19, 2018
akshayrai pushed a commit that referenced this pull request Mar 30, 2018
akshayrai pushed a commit that referenced this pull request Apr 6, 2018
akshayrai pushed a commit that referenced this pull request May 21, 2018
pralabhkumar pushed a commit to pralabhkumar/dr-elephant that referenced this pull request Aug 31, 2018
varunsaxena pushed a commit that referenced this pull request Oct 16, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants