Skip to content

Commit

Permalink
adding a local instance of StageStatus and changing default from .sna…
Browse files Browse the repository at this point in the history
…ppy to .lz4
  • Loading branch information
swasti committed Oct 5, 2017
1 parent 1a2c0d4 commit 6147188
Show file tree
Hide file tree
Showing 12 changed files with 42 additions and 36 deletions.
2 changes: 1 addition & 1 deletion app-conf/FetcherConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
-->
<fetcher>
<applicationtype>spark</applicationtype>
<classname>com.linkedin.drelephant.spark.fetchers.FSFetcher</classname>
<classname>com.linkedin.drelephant.spark.fetchers.SparkFetcher</classname>
</fetcher>

<!--
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.linkedin.drelephant.spark.fetchers.statusapiv1;

import org.apache.spark.util.EnumUtil;

public enum StageStatus {
ACTIVE,
COMPLETE,
FAILED,
SKIPPED,
PENDING;

private StageStatus() {
}

public static com.linkedin.drelephant.spark.fetchers.statusapiv1.StageStatus fromString(String str) {
return (com.linkedin.drelephant.spark.fetchers.statusapiv1.StageStatus) EnumUtil.parseIgnoreCase(com.linkedin.drelephant.spark.fetchers.statusapiv1.StageStatus.class, str);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import java.util.Date
import scala.collection.Map

import org.apache.spark.JobExecutionStatus
import org.apache.spark.status.api.v1.StageStatus
import com.fasterxml.jackson.annotation.JsonSubTypes.Type
import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ class JvmUsedMemoryHeuristic(private val heuristicConfigurationData: HeuristicCo
val evaluator = new Evaluator(this, data)

var resultDetails = Seq(
new HeuristicResultDetails("Max peak JVM used memory", evaluator.maxExecutorPeakJvmUsedMemory.toString),
new HeuristicResultDetails("Median peak JVM used memory", evaluator.medianPeakJvmUsedMemory.toString)
new HeuristicResultDetails("Max executor peak JVM used memory", MemoryFormatUtils.bytesToString(evaluator.maxExecutorPeakJvmUsedMemory)),
new HeuristicResultDetails("Max driver peak JVM used memory", MemoryFormatUtils.bytesToString(evaluator.maxDriverPeakJvmUsedMemory)),
new HeuristicResultDetails("spark.executor.memory", MemoryFormatUtils.bytesToString(evaluator.sparkExecutorMemory)),
new HeuristicResultDetails("spark.driver.memory", MemoryFormatUtils.bytesToString(evaluator.sparkDriverMemory))
)

if(evaluator.severityExecutor.getValue > Severity.LOW.getValue) {
Expand All @@ -54,11 +56,6 @@ class JvmUsedMemoryHeuristic(private val heuristicConfigurationData: HeuristicCo
new HeuristicResultDetails("Note", "The allocated memory for the driver (in " + SPARK_DRIVER_MEMORY + ") is much more than the peak JVM used memory by the driver.")
}

// Disabling the skew test for executors
// if(evaluator.severitySkew.getValue > Severity.LOW.getValue) {
// new HeuristicResultDetails("Note", "As there is a big difference between median and maximum values of the peak JVM used memory, there could also be a skew in the data being processed. Please look into that.")
// }

val result = new HeuristicResult(
heuristicConfigurationData.getClassName,
heuristicConfigurationData.getHeuristicName,
Expand All @@ -71,7 +68,6 @@ class JvmUsedMemoryHeuristic(private val heuristicConfigurationData: HeuristicCo
}

object JvmUsedMemoryHeuristic {

val JVM_USED_MEMORY = "jvmUsedMemory"
val SPARK_EXECUTOR_MEMORY = "spark.executor.memory"
val SPARK_DRIVER_MEMORY = "spark.driver.memory"
Expand All @@ -83,32 +79,26 @@ object JvmUsedMemoryHeuristic {
data.appConfigurationProperties

lazy val executorSummaries: Seq[ExecutorSummary] = data.executorSummaries

val driverSummary : Option[ExecutorSummary] = executorSummaries.find(_.id.equals("driver"))
lazy val driverSummary : Option[ExecutorSummary] = executorSummaries.find(_.id.equals("driver"))
val maxDriverPeakJvmUsedMemory : Long = driverSummary.get.peakJvmUsedMemory.getOrElse(JVM_USED_MEMORY, 0).asInstanceOf[Number].longValue
val executorList : Seq[ExecutorSummary] = executorSummaries.patch(executorSummaries.indexWhere(_.id.equals("driver")), Nil, 1)
val executorList : Seq[ExecutorSummary] = executorSummaries.filterNot(_.id.equals("driver"))
val sparkExecutorMemory : Long = (appConfigurationProperties.get(SPARK_EXECUTOR_MEMORY).map(MemoryFormatUtils.stringToBytes)).getOrElse(0)
val sparkDriverMemory : Long = appConfigurationProperties.get(SPARK_DRIVER_MEMORY).map(MemoryFormatUtils.stringToBytes).getOrElse(0)
val medianPeakJvmUsedMemory: Long = executorList.map {
_.peakJvmUsedMemory.getOrElse(JVM_USED_MEMORY, 0).asInstanceOf[Number].longValue
}.sortWith(_< _).drop(executorList.size/2).head
val maxExecutorPeakJvmUsedMemory: Long = (executorList.map {
lazy val maxExecutorPeakJvmUsedMemory: Long = (executorList.map {
_.peakJvmUsedMemory.get(JVM_USED_MEMORY)
}.max).getOrElse(0)
}.max).getOrElse(0.asInstanceOf[Number].longValue())

val DEFAULT_MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLDS =
SeverityThresholds(low = 1.5 * (maxExecutorPeakJvmUsedMemory + reservedMemory), moderate = 2 * (maxExecutorPeakJvmUsedMemory + reservedMemory), severe = 4 * (maxExecutorPeakJvmUsedMemory + reservedMemory), critical = 8 * (maxExecutorPeakJvmUsedMemory + reservedMemory), ascending = true)

val DEFAULT_MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS =
SeverityThresholds(low = 1.5 * (maxDriverPeakJvmUsedMemory + reservedMemory), moderate = 2 * (maxDriverPeakJvmUsedMemory + reservedMemory), severe = 4 * (maxDriverPeakJvmUsedMemory + reservedMemory), critical = 8 * (maxDriverPeakJvmUsedMemory + reservedMemory), ascending = true)

val DEFAULT_JVM_MEMORY_SKEW_THRESHOLDS =
SeverityThresholds(low = 1.5 * medianPeakJvmUsedMemory, moderate = 2 * medianPeakJvmUsedMemory, severe = 4 * medianPeakJvmUsedMemory, critical = 8 * medianPeakJvmUsedMemory, ascending = true)

val severityExecutor = DEFAULT_MAX_EXECUTOR_PEAK_JVM_USED_MEMORY_THRESHOLDS.severityOf(sparkExecutorMemory)
val severityDriver = DEFAULT_MAX_DRIVER_PEAK_JVM_USED_MEMORY_THRESHOLDS.severityOf(sparkDriverMemory)
//disabling the skew check for executors
//val severitySkew = DEFAULT_JVM_MEMORY_SKEW_THRESHOLDS.severityOf(maxExecutorPeakJvmUsedMemory)
val severity : Severity = Severity.max(severityDriver, severityExecutor)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationDa
import com.linkedin.drelephant.math.Statistics
import com.linkedin.drelephant.spark.data.SparkApplicationData
import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageData
import org.apache.spark.status.api.v1.StageStatus

import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageStatus

/**
* A heuristic based on metrics for a Spark app's stages.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import scala.util.Try

import com.linkedin.drelephant.spark.fetchers.statusapiv1._
import org.apache.spark.JobExecutionStatus
import org.apache.spark.status.api.v1.StageStatus

import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageStatus
/**
* Converters for legacy SparkApplicationData to current SparkApplicationData.
*
Expand Down
2 changes: 1 addition & 1 deletion app/com/linkedin/drelephant/util/SparkUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ trait SparkUtils {
}

private val IN_PROGRESS = ".inprogress"
private val DEFAULT_COMPRESSION_CODEC = "snappy"
private val DEFAULT_COMPRESSION_CODEC = "lz4"

private val compressionCodecClassNamesByShortName = Map(
"lz4" -> classOf[LZ4CompressionCodec].getName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.zip.{ZipInputStream, ZipEntry, ZipOutputStream}
import java.util.{Calendar, Date, SimpleTimeZone}
import javax.ws.rs.client.WebTarget

import org.apache.spark.status.api.v1.StageStatus
import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageStatus

import scala.concurrent.ExecutionContext
import scala.util.Try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class JvmUsedMemoryHeuristicTest extends FunSpec with Matchers {
val peakJvmUsedMemoryHeuristic = new JvmUsedMemoryHeuristic(heuristicConfigurationData)

val appConfigurationProperties = Map("spark.driver.memory"->"40000000000", "spark.executor.memory"->"500000000")

val executorData = Seq(
newDummyExecutorData("1", Map("jvmUsedMemory" -> 394567123)),
newDummyExecutorData("2", Map("jvmUsedMemory" -> 23456834)),
Expand All @@ -29,7 +30,6 @@ class JvmUsedMemoryHeuristicTest extends FunSpec with Matchers {
describe(".apply") {
val data = newFakeSparkApplicationData(appConfigurationProperties, executorData)
val heuristicResult = peakJvmUsedMemoryHeuristic.apply(data)
val heuristicResultDetails = heuristicResult.getHeuristicResultDetails

it("has severity") {
heuristicResult.getSeverity should be(Severity.CRITICAL)
Expand All @@ -44,18 +44,19 @@ class JvmUsedMemoryHeuristicTest extends FunSpec with Matchers {
it("has severity executor") {
evaluator.severityExecutor should be(Severity.NONE)
}

it("has severity driver") {
evaluator.severityDriver should be(Severity.CRITICAL)
}
it("has severity skew") {
evaluator.severitySkew should be(Severity.CRITICAL)
}

it("has median peak jvm memory") {
evaluator.medianPeakJvmUsedMemory should be (334569)
}

it("has max peak jvm memory") {
evaluator.maxExecutorPeakJvmUsedMemory should be (394567123)
}

it("has max driver peak jvm memory") {
evaluator.maxDriverPeakJvmUsedMemory should be (394561)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationDa
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, JobDataImpl, StageDataImpl}
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
import org.apache.spark.status.api.v1.StageStatus
import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageStatus
import org.scalatest.{FunSpec, Matchers}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.linkedin.drelephant.spark.legacydata
import java.util.Date

import org.apache.spark.JobExecutionStatus
import org.apache.spark.status.api.v1.StageStatus
import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageStatus
import org.scalatest.{FunSpec, Matchers}


Expand Down
8 changes: 4 additions & 4 deletions test/com/linkedin/drelephant/util/SparkUtilsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.{FSDataInputStream, FileStatus, FileSystem, Path, Pa
import org.apache.hadoop.io.compress.CompressionInputStream
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.io.SnappyCompressionCodec
import org.apache.spark.io.{LZ4CompressionCodec, SnappyCompressionCodec}
import org.mockito.BDDMockito
import org.mockito.Matchers
import org.scalatest.{FunSpec, Matchers, OptionValues}
Expand Down Expand Up @@ -180,7 +180,7 @@ class SparkUtilsTest extends FunSpec with org.scalatest.Matchers with OptionValu
val sparkUtils = SparkUtilsTest.newFakeSparkUtilsForEventLog(
new URI("webhdfs://nn1.grid.example.com:50070"),
new Path("/logs/spark"),
new Path("application_1_1.snappy"),
new Path("application_1_1.lz4"),
Array.empty[Byte]
)

Expand All @@ -189,8 +189,8 @@ class SparkUtilsTest extends FunSpec with org.scalatest.Matchers with OptionValu
val (path, codec) =
sparkUtils.pathAndCodecforEventLog(sparkConf: SparkConf, fs: FileSystem, basePath: Path, "application_1", Some("1"))

path should be(new Path("webhdfs://nn1.grid.example.com:50070/logs/spark/application_1_1.snappy"))
codec.value should be(a[SnappyCompressionCodec])
path should be(new Path("webhdfs://nn1.grid.example.com:50070/logs/spark/application_1_1.lz4"))
codec.value should be(a[LZ4CompressionCodec])
}
it("returns the path and codec for the event log, given the base path and appid. Extracts attempt and codec from path") {
val hadoopConfiguration = new Configuration(false)
Expand Down

0 comments on commit 6147188

Please sign in to comment.