Skip to content

Commit

Permalink
Spark Stages with Failed tasks Heuristic - (Depends on Custom SHS - R…
Browse files Browse the repository at this point in the history
…equires stages/failedTasks Rest API) (#288)
  • Loading branch information
skakker authored and akshayrai committed Jan 10, 2018
1 parent 14ec8f2 commit 06b87a1
Show file tree
Hide file tree
Showing 20 changed files with 460 additions and 28 deletions.
5 changes: 4 additions & 1 deletion app-conf/FetcherConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@
-->
<fetcher>
<applicationtype>spark</applicationtype>
<classname>com.linkedin.drelephant.spark.fetchers.FSFetcher</classname>
<classname>com.linkedin.drelephant.spark.fetchers.SparkFetcher</classname>
<params>
<fetch_failed_tasks>true</fetch_failed_tasks>
</params>
</fetcher>

<!--
Expand Down
6 changes: 6 additions & 0 deletions app-conf/HeuristicConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,12 @@
<classname>com.linkedin.drelephant.spark.heuristics.StagesHeuristic</classname>
<viewname>views.html.help.spark.helpStagesHeuristic</viewname>
</heuristic>
<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Stages with failed tasks</heuristicname>
<classname>com.linkedin.drelephant.spark.heuristics.StagesWithFailedTasksHeuristic</classname>
<viewname>views.html.help.spark.helpStagesWithFailedTasks</viewname>
</heuristic>
<heuristic>
<applicationtype>spark</applicationtype>
<heuristicname>Executor GC</heuristicname>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ case class SparkApplicationData(
applicationInfo: ApplicationInfo,
jobDatas: Seq[JobData],
stageDatas: Seq[StageData],
executorSummaries: Seq[ExecutorSummary]
executorSummaries: Seq[ExecutorSummary],
stagesWithFailedTasks: Seq[StageData]
) extends HadoopApplicationData {
import SparkApplicationData._
import JavaConverters._
Expand Down Expand Up @@ -65,6 +66,7 @@ object SparkApplicationData {
val jobDatas = restDerivedData.jobDatas
val stageDatas = restDerivedData.stageDatas
val executorSummaries = restDerivedData.executorSummaries
apply(appId, appConfigurationProperties, applicationInfo, jobDatas, stageDatas, executorSummaries)
val stagesWithFailedTasks = restDerivedData.stagesWithFailedTasks
apply(appId, appConfigurationProperties, applicationInfo, jobDatas, stageDatas, executorSummaries, stagesWithFailedTasks)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ case class SparkRestDerivedData(
jobDatas: Seq[JobData],
stageDatas: Seq[StageData],
executorSummaries: Seq[ExecutorSummary],
stagesWithFailedTasks: Seq[StageData],
private[spark] val logDerivedData: Option[SparkLogDerivedData] = None)
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData)

private def doFetchDataUsingRestAndLogClients(analyticJob: AnalyticJob): Future[SparkApplicationData] = async {
val appId = analyticJob.getAppId
val restDerivedData = await(sparkRestClient.fetchData(appId, eventLogSource == EventLogSource.Rest))
val doFetchFailedTasks : Boolean = Option(fetcherConfigurationData.getParamMap.get(FETCH_FAILED_TASKS)).getOrElse("false").toBoolean
val restDerivedData = await(sparkRestClient.fetchData(appId, eventLogSource == EventLogSource.Rest, doFetchFailedTasks))

val logDerivedData = eventLogSource match {
case EventLogSource.None => None
Expand Down Expand Up @@ -143,4 +144,5 @@ object SparkFetcher {
val SPARK_EVENT_LOG_ENABLED_KEY = "spark.eventLog.enabled"
val DEFAULT_TIMEOUT = Duration(60, SECONDS)
val LOG_LOCATION_URI_XML_FIELD = "event_log_location_uri"
val FETCH_FAILED_TASKS = "fetch_failed_tasks"
}
44 changes: 34 additions & 10 deletions app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.linkedin.drelephant.spark.fetchers

import java.io.{InputStream, BufferedInputStream}
import java.io.{BufferedInputStream, InputStream}
import java.net.URI
import java.text.SimpleDateFormat
import java.util.zip.ZipInputStream
Expand Down Expand Up @@ -73,7 +73,7 @@ class SparkRestClient(sparkConf: SparkConf) {

private val apiTarget: WebTarget = client.target(historyServerUri).path(API_V1_MOUNT_PATH)

def fetchData(appId: String, fetchLogs: Boolean = false)(
def fetchData(appId: String, fetchLogs: Boolean = false, fetchFailedTasks: Boolean = true)(
implicit ec: ExecutionContext
): Future[SparkRestDerivedData] = {
val (applicationInfo, attemptTarget) = getApplicationMetaData(appId)
Expand All @@ -87,13 +87,25 @@ class SparkRestClient(sparkConf: SparkConf) {
async { getLogData(attemptTarget)}
} else Future.successful(None)

SparkRestDerivedData(
applicationInfo,
await(futureJobDatas),
await(futureStageDatas),
await(futureExecutorSummaries),
await(futureLogData)
)
if(fetchFailedTasks) {
val futureFailedTasksDatas = async { getStagesWithFailedTasks(attemptTarget) }
SparkRestDerivedData(
applicationInfo,
await(futureJobDatas),
await(futureStageDatas),
await(futureExecutorSummaries),
await(futureFailedTasksDatas),
await(futureLogData))
} else {
SparkRestDerivedData(
applicationInfo,
await(futureJobDatas),
await(futureStageDatas),
await(futureExecutorSummaries),
Seq.empty,
await(futureLogData)
)
}
}
}

Expand Down Expand Up @@ -213,6 +225,18 @@ class SparkRestClient(sparkConf: SparkConf) {
}
}
}

private def getStagesWithFailedTasks(attemptTarget: WebTarget): Seq[StageDataImpl] = {
val target = attemptTarget.path("stages/failedTasks")
try {
get(target, SparkRestObjectMapper.readValue[Seq[StageDataImpl]])
} catch {
case NonFatal(e) => {
logger.error(s"error reading failedTasks ${target.getUri}", e)
throw e
}
}
}
}

object SparkRestClient {
Expand All @@ -237,4 +261,4 @@ object SparkRestClient {

def get[T](webTarget: WebTarget, converter: String => T): T =
converter(webTarget.request(MediaType.APPLICATION_JSON).get(classOf[String]))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.linkedin.drelephant.spark.heuristics

import com.linkedin.drelephant.analysis._
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.SparkApplicationData
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{StageData, TaskData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageStatus

import scala.collection.JavaConverters


/**
* A heuristic based on errors encountered by failed tasks. Tasks may fail due to Overhead memory issues or OOM errors. These errors are checked and warning is given accordingly.
*/
class StagesWithFailedTasksHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData)
extends Heuristic[SparkApplicationData] {

import StagesWithFailedTasksHeuristic._
import JavaConverters._

override def getHeuristicConfData(): HeuristicConfigurationData = heuristicConfigurationData

override def apply(data: SparkApplicationData): HeuristicResult = {
val evaluator = new Evaluator(this, data)
var resultDetails = Seq(
new HeuristicResultDetails("Stages with OOM errors", evaluator.stagesWithOOMError.toString),
new HeuristicResultDetails("Stages with Overhead memory errors", evaluator.stagesWithOverheadError.toString)
)
if (evaluator.severityOverheadStages.getValue >= Severity.MODERATE.getValue)
resultDetails = resultDetails :+ new HeuristicResultDetails("Overhead memory errors", "Some tasks have failed due to overhead memory error. Please try increasing spark.yarn.executor.memoryOverhead by 500MB in spark.yarn.executor.memoryOverhead")
//TODO: refine recommendations
if (evaluator.severityOOMStages.getValue >= Severity.MODERATE.getValue)
resultDetails = resultDetails :+ new HeuristicResultDetails("OOM errors", "Some tasks have failed due to OOM error. Try increasing spark.executor.memory or decreasing spark.memory.fraction (take a look at unified memory heuristic) or decreasing number of cores.")
val result = new HeuristicResult(
heuristicConfigurationData.getClassName,
heuristicConfigurationData.getHeuristicName,
evaluator.severity,
0,
resultDetails.asJava
)
result
}
}

object StagesWithFailedTasksHeuristic {

val OOM_ERROR = "java.lang.OutOfMemoryError"
val OVERHEAD_MEMORY_ERROR = "killed by YARN for exceeding memory limits"
val ratioThreshold: Double = 2

class Evaluator(memoryFractionHeuristic: StagesWithFailedTasksHeuristic, data: SparkApplicationData) {
lazy val stagesWithFailedTasks: Seq[StageData] = data.stagesWithFailedTasks

/**
* @return : returns the OOM and Overhead memory errors severity
*/
private def getErrorsSeverity: (Severity, Severity, Int, Int) = {
var severityOOM: Severity = Severity.NONE
var severityOverhead: Severity = Severity.NONE
var stagesWithOOMError: Int = 0
var stagesWithOverheadError: Int = 0
stagesWithFailedTasks.foreach(stageData => {
val numCompleteTasks: Int = stageData.numCompleteTasks
var failedOOMTasks = 0
var failedOverheadMemoryTasks = 0
stageData.tasks.get.values.foreach((taskData: TaskData) => {
var errorMessage: String = taskData.errorMessage.getOrElse("")
failedOOMTasks = hasError(errorMessage, OOM_ERROR, failedOOMTasks)
failedOverheadMemoryTasks = hasError(errorMessage, OVERHEAD_MEMORY_ERROR, failedOverheadMemoryTasks)
})
if (failedOOMTasks > 0) {
stagesWithOOMError = stagesWithOOMError + 1
}
if (failedOverheadMemoryTasks > 0) {
stagesWithOverheadError = stagesWithOverheadError + 1
}
severityOOM = getStageSeverity(failedOOMTasks, stageData.status, severityOOM, numCompleteTasks)
severityOverhead = getStageSeverity(failedOverheadMemoryTasks, stageData.status, severityOverhead, numCompleteTasks)
})
(severityOOM, severityOverhead, stagesWithOOMError, stagesWithOverheadError)
}

/**
* returns the max (severity of this stage, present severity)
*
* note : this method is called for all the stages, in turn updating the value of max stage severity if required.
*
* @param numFailedTasks
* @param stageStatus
* @param severityStage : max severity of all the stages we have encountered till now.
* @param numCompleteTasks
* @return
*/
private def getStageSeverity(numFailedTasks: Int, stageStatus: StageStatus, severityStage: Severity, numCompleteTasks: Int): Severity = {
var severityTemp: Severity = Severity.NONE
if(numCompleteTasks == 0) {
return severityStage
}

if (numFailedTasks != 0 && stageStatus != StageStatus.FAILED) {
if (numFailedTasks.toDouble / numCompleteTasks.toDouble < ratioThreshold / 100.toDouble) {
severityTemp = Severity.MODERATE
} else {
severityTemp = Severity.SEVERE
}
} else if (numFailedTasks != 0 && stageStatus == StageStatus.FAILED && numFailedTasks / numCompleteTasks > 0) {
severityTemp = Severity.CRITICAL
}
return Severity.max(severityTemp, severityStage)
}

/**
* checks whether the error message contains the corresponding error
*
* @param errorMessage : the entire error message
* @param whichError : the error we want to search the error message with
* @param noTasks : number of tasks having that error
* @return : returning the number of tasks having the error.
*/
private def hasError(errorMessage: String, whichError: String, noTasks: Int): Int = {
if (errorMessage.contains(whichError))
return noTasks + 1
return noTasks
}

lazy val (severityOOMStages: Severity, severityOverheadStages: Severity, stagesWithOOMError: Int, stagesWithOverheadError: Int) = getErrorsSeverity
lazy val severity: Severity = Severity.max(severityOverheadStages, severityOOMStages)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,44 @@ import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageStatus
object LegacyDataConverters {
import JavaConverters._

//Returns a default object. This default object is returned if logs are processed locally i.e. in case FS fetcher is being used.
def extractStagesWithFailedTasks(legacyData: SparkApplicationData): scala.Seq[StageData] = {
Seq(new StageData {
override def numCompleteTasks: Int = 0
override def inputRecords: Long = 0
override def shuffleReadBytes: Long = 0
override def shuffleWriteBytes: Long = 0
override def schedulingPool: String = ""
override def outputRecords: Long = 0
override def shuffleWriteRecords: Long = 0
override def inputBytes: Long = 0
override def details: String = ""
override def tasks: Option[collection.Map[Long, TaskData]] = None
override def attemptId: Int = 0
override def stageId: Int = 0
override def memoryBytesSpilled: Long = 0
override def executorRunTime: Long = 0
override def shuffleReadRecords: Long = 0
override def outputBytes: Long = 0
override def numActiveTasks: Int = 0
override def diskBytesSpilled: Long = 0
override def numFailedTasks: Int = 0
override def accumulatorUpdates: Seq[AccumulableInfo] = Seq.empty
override def name: String = ""
override def executorSummary: Option[collection.Map[String, ExecutorStageSummary]] = None
override def status = StageStatus.COMPLETE
})
}

def convert(legacyData: SparkApplicationData): com.linkedin.drelephant.spark.data.SparkApplicationData = {
com.linkedin.drelephant.spark.data.SparkApplicationData(
legacyData.getAppId,
extractAppConfigurationProperties(legacyData),
extractApplicationInfo(legacyData),
extractJobDatas(legacyData),
extractStageDatas(legacyData),
extractExecutorSummaries(legacyData)
extractExecutorSummaries(legacyData),
extractStagesWithFailedTasks(legacyData)
)
}

Expand Down
22 changes: 22 additions & 0 deletions app/views/help/spark/helpStagesWithFailedTasks.scala.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
@*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*@
<p>Tasks (and stages and jobs) can fail if an error occurs while the task is executing.</p>

<p>In Spark, each stage is divided into tasks which run individually. Now a task may fail due to more than one reasons. Out of memory and overhead memory issues are the most common reasons. </p>
<p>Due to errors in tasks, their corresponding stages might also fail. The reasons for tasks failure are analysed and suggestions are given.</p>
<h3>Suggestions</h3>
<p>Failed due to Overhead Memory issues: Please try increasing spark.yarn.executor.memoryOverhead by 500MB in spark.yarn.executor.memoryOverhead.</p>
<p>Failed due to OOM: Try increasing spark.executor.memory or decreasing spark.memory.fraction (take a look at unified memory heuristic) or decreasing number of cores.</p>
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers {
applicationInfo,
jobDatas = Seq.empty,
stageDatas = Seq.empty,
executorSummaries = executorSummaries
executorSummaries = executorSummaries,
stagesWithFailedTasks = Seq.empty
)
}

Expand Down Expand Up @@ -119,7 +120,8 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers {
applicationInfo,
jobDatas = Seq.empty,
stageDatas = Seq.empty,
executorSummaries = executorSummaries
executorSummaries = executorSummaries,
stagesWithFailedTasks = Seq.empty
)

val data = SparkApplicationData(appId, restDerivedData, Some(logDerivedData))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class SparkApplicationDataTest extends FunSpec with Matchers {
new ApplicationInfoImpl(appId, "app", Seq(applicationAttemptInfo)),
jobDatas = Seq.empty,
stageDatas = Seq.empty,
executorSummaries = Seq.empty
executorSummaries = Seq.empty,
stagesWithFailedTasks = Seq.empty
)

val configurationProperties = Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class SparkFetcherTest extends FunSpec with Matchers with MockitoSugar {
),
jobDatas = Seq.empty,
stageDatas = Seq.empty,
executorSummaries = Seq.empty
executorSummaries = Seq.empty,
stagesWithFailedTasks = Seq.empty
)

val logDerivedData = SparkLogDerivedData(SparkListenerEnvironmentUpdate(Map.empty))
Expand Down
Loading

0 comments on commit 06b87a1

Please sign in to comment.