Skip to content

Commit

Permalink
[SPARK-39987][K8S] Support PEAK_JVM_(ON|OFF)HEAP_MEMORY executor ro…
Browse files Browse the repository at this point in the history
…lling policy (apache#1484)

### What changes were proposed in this pull request?

This PR aims to support two new executor rolling policies.
- `PEAK_JVM_ONHEAP_MEMORY` policy chooses an executor with the biggest peak JVM on-heap memory.
- `PEAK_JVM_OFFHEAP_MEMORY` policy chooses an executor with the biggest peak JVM off-heap memory.

### Why are the changes needed?

Although peak memory is a kind of historic value, these two new policies add a capability to maintain the memory usage of Spark jobs minimally as much as possible.

### Does this PR introduce _any_ user-facing change?

Yes, but this is a new feature.

### How was this patch tested?

Pass the CIs.

Closes apache#37418 from dongjoon-hyun/SPARK-39987.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 3df7124)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 84cd907)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>

Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
2 people authored and GitHub Enterprise committed Aug 5, 2022
1 parent b263f87 commit a5b4e14
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,14 @@ private[spark] object Config extends Logging {

object ExecutorRollPolicy extends Enumeration {
val ID, ADD_TIME, TOTAL_GC_TIME, TOTAL_DURATION, AVERAGE_DURATION, FAILED_TASKS,
OUTLIER, OUTLIER_NO_FALLBACK = Value
PEAK_JVM_ONHEAP_MEMORY, PEAK_JVM_OFFHEAP_MEMORY, OUTLIER, OUTLIER_NO_FALLBACK = Value
}

val EXECUTOR_ROLL_POLICY =
ConfigBuilder("spark.kubernetes.executor.rollPolicy")
.doc("Executor roll policy: Valid values are ID, ADD_TIME, TOTAL_GC_TIME, " +
"TOTAL_DURATION, FAILED_TASKS, and OUTLIER (default). " +
"TOTAL_DURATION, AVERAGE_DURATION, FAILED_TASKS, PEAK_JVM_ONHEAP_MEMORY, " +
"PEAK_JVM_OFFHEAP_MEMORY, OUTLIER (default), and OUTLIER_NO_FALLBACK. " +
"When executor roll happens, Spark uses this policy to choose " +
"an executor and decommission it. The built-in policies are based on executor summary." +
"ID policy chooses an executor with the smallest executor ID. " +
Expand All @@ -163,6 +164,9 @@ private[spark] object Config extends Logging {
"TOTAL_DURATION policy chooses an executor with the biggest total task time. " +
"AVERAGE_DURATION policy chooses an executor with the biggest average task time. " +
"FAILED_TASKS policy chooses an executor with the most number of failed tasks. " +
"PEAK_JVM_ONHEAP_MEMORY policy chooses an executor with the biggest peak JVM on-heap " +
"memory. PEAK_JVM_OFFHEAP_MEMORY policy chooses an executor with the biggest peak JVM " +
"off-heap memory. " +
"OUTLIER policy chooses an executor with outstanding statistics which is bigger than" +
"at least two standard deviation from the mean in average task time, " +
"total task time, total task GC time, and the number of failed tasks if exists. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.SparkContext
import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin}
import org.apache.spark.deploy.k8s.Config.{EXECUTOR_ROLL_INTERVAL, EXECUTOR_ROLL_POLICY, ExecutorRollPolicy, MINIMUM_TASKS_PER_EXECUTOR_BEFORE_ROLLING}
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.DECOMMISSION_ENABLED
import org.apache.spark.scheduler.ExecutorDecommissionInfo
Expand All @@ -47,6 +48,8 @@ class ExecutorRollPlugin extends SparkPlugin {
}

class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
lazy val EMPTY_METRICS = new ExecutorMetrics(Array.emptyLongArray)

private var sparkContext: SparkContext = _

private val periodicService: ScheduledExecutorService =
Expand Down Expand Up @@ -99,6 +102,9 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging {

override def shutdown(): Unit = periodicService.shutdown()

private def getPeakMetrics(summary: v1.ExecutorSummary, name: String): Long =
summary.peakMemoryMetrics.getOrElse(EMPTY_METRICS).getMetricValue(name)

private def choose(list: Seq[v1.ExecutorSummary], policy: ExecutorRollPolicy.Value)
: Option[String] = {
val listWithoutDriver = list
Expand All @@ -118,6 +124,10 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
listWithoutDriver.sortBy(e => e.totalDuration.toFloat / Math.max(1, e.totalTasks)).reverse
case ExecutorRollPolicy.FAILED_TASKS =>
listWithoutDriver.sortBy(_.failedTasks).reverse
case ExecutorRollPolicy.PEAK_JVM_ONHEAP_MEMORY =>
listWithoutDriver.sortBy(getPeakMetrics(_, "JVMHeapMemory")).reverse
case ExecutorRollPolicy.PEAK_JVM_OFFHEAP_MEMORY =>
listWithoutDriver.sortBy(getPeakMetrics(_, "JVMOffHeapMemory")).reverse
case ExecutorRollPolicy.OUTLIER =>
// If there is no outlier we fallback to TOTAL_DURATION policy.
outliersFromMultipleDimensions(listWithoutDriver) ++
Expand All @@ -131,14 +141,17 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
/**
* We build multiple outlier lists and concat in the following importance order to find
* outliers in various perspective:
* AVERAGE_DURATION > TOTAL_DURATION > TOTAL_GC_TIME > FAILED_TASKS
* AVERAGE_DURATION > TOTAL_DURATION > TOTAL_GC_TIME > FAILED_TASKS >
* PEAK_JVM_ONHEAP_MEMORY > PEAK_JVM_OFFHEAP_MEMORY
* Since we will choose only first item, the duplication is okay.
*/
private def outliersFromMultipleDimensions(listWithoutDriver: Seq[v1.ExecutorSummary]) =
outliers(listWithoutDriver.filter(_.totalTasks > 0), e => e.totalDuration / e.totalTasks) ++
outliers(listWithoutDriver, e => e.totalDuration) ++
outliers(listWithoutDriver, e => e.totalGCTime) ++
outliers(listWithoutDriver, e => e.failedTasks)
outliers(listWithoutDriver, e => e.failedTasks) ++
outliers(listWithoutDriver, e => getPeakMetrics(e, "JVMHeapMemory")) ++
outliers(listWithoutDriver, e => getPeakMetrics(e, "JVMOffHeapMemory"))

/**
* Return executors whose metrics is outstanding, '(value - mean) > 2-sigma'. This is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.scalatest.PrivateMethodTester

import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.Config.ExecutorRollPolicy
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.status.api.v1.ExecutorSummary

class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
Expand All @@ -31,20 +32,23 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {

private val _choose = PrivateMethod[Option[String]](Symbol("choose"))

val metrics = Some(new ExecutorMetrics(
Map("JVMHeapMemory" -> 1024L, "JVMOffHeapMemory" -> 1024L)))

val driverSummary = new ExecutorSummary("driver", "host:port", true, 1,
10, 10, 1, 1, 1,
0, 0, 1, 100,
1, 100, 100,
10, false, 20, new Date(1639300000000L),
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
false, Set())

val execWithSmallestID = new ExecutorSummary("1", "host:port", true, 1,
10, 10, 1, 1, 1,
0, 0, 1, 100,
20, 100, 100,
10, false, 20, new Date(1639300001000L),
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
false, Set())

// The smallest addTime
Expand All @@ -53,7 +57,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
0, 0, 1, 100,
20, 100, 100,
10, false, 20, new Date(1639300000000L),
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
false, Set())

// The biggest totalGCTime
Expand All @@ -62,7 +66,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
0, 0, 1, 100,
40, 100, 100,
10, false, 20, new Date(1639300002000L),
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
false, Set())

// The biggest totalDuration
Expand All @@ -71,7 +75,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
0, 0, 4, 400,
20, 100, 100,
10, false, 20, new Date(1639300003000L),
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
false, Set())

// The biggest failedTasks
Expand All @@ -80,7 +84,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
5, 0, 1, 100,
20, 100, 100,
10, false, 20, new Date(1639300003000L),
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
false, Set())

// The biggest average duration (= totalDuration / totalTask)
Expand All @@ -89,7 +93,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
0, 0, 2, 300,
20, 100, 100,
10, false, 20, new Date(1639300003000L),
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
false, Set())

// The executor with no tasks
Expand All @@ -98,7 +102,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
0, 0, 0, 0,
0, 0, 0,
0, false, 0, new Date(1639300001000L),
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
false, Set())

// This is used to stabilize 'mean' and 'sd' in OUTLIER test cases.
Expand All @@ -107,20 +111,41 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
4, 0, 2, 280,
30, 100, 100,
10, false, 20, new Date(1639300001000L),
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
Option.empty, Option.empty, Map(), Option.empty, Set(),
Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 1200L, "JVMOffHeapMemory" -> 1200L))),
Map(), Map(), 1,
false, Set())

val execWithTwoDigitID = new ExecutorSummary("10", "host:port", true, 1,
10, 10, 1, 1, 1,
4, 0, 2, 280,
30, 100, 100,
10, false, 20, new Date(1639300001000L),
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
false, Set())

val execWithBiggestPeakJVMOnHeapMemory = new ExecutorSummary("11", "host:port", true, 1,
10, 10, 1, 1, 1,
4, 0, 2, 280,
30, 100, 100,
10, false, 20, new Date(1639300001000L),
Option.empty, Option.empty, Map(), Option.empty, Set(),
Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 1201L, "JVMOffHeapMemory" -> 1200L))),
Map(), Map(), 1, false, Set())

val execWithBiggestPeakJVMOffHeapMemory = new ExecutorSummary("12", "host:port", true, 1,
10, 10, 1, 1, 1,
4, 0, 2, 280,
30, 100, 100,
10, false, 20, new Date(1639300001000L),
Option.empty, Option.empty, Map(), Option.empty, Set(),
Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 1200L, "JVMOffHeapMemory" -> 1201L))),
Map(), Map(), 1, false, Set())

val list = Seq(driverSummary, execWithSmallestID, execWithSmallestAddTime,
execWithBiggestTotalGCTime, execWithBiggestTotalDuration, execWithBiggestFailedTasks,
execWithBiggestAverageDuration, execWithoutTasks, execNormal, execWithTwoDigitID)
execWithBiggestAverageDuration, execWithoutTasks, execNormal, execWithTwoDigitID,
execWithBiggestPeakJVMOnHeapMemory, execWithBiggestPeakJVMOffHeapMemory)

override def beforeEach(): Unit = {
super.beforeEach()
Expand Down Expand Up @@ -182,6 +207,16 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
plugin.invokePrivate(_choose(list, ExecutorRollPolicy.AVERAGE_DURATION)))
}

test("Policy: PEAK_JVM_ONHEAP_MEMORY") {
assert(plugin.invokePrivate(
_choose(list, ExecutorRollPolicy.PEAK_JVM_ONHEAP_MEMORY)).contains("11"))
}

test("Policy: PEAK_JVM_OFFHEAP_MEMORY") {
assert(plugin.invokePrivate(
_choose(list, ExecutorRollPolicy.PEAK_JVM_OFFHEAP_MEMORY)).contains("12"))
}

test("Policy: OUTLIER - Work like TOTAL_DURATION if there is no outlier") {
assertEquals(
plugin.invokePrivate(_choose(list, ExecutorRollPolicy.TOTAL_DURATION)),
Expand Down Expand Up @@ -227,6 +262,36 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER)))
}

test("Policy: OUTLIER - Detect a peak JVM on-heap memory outlier") {
val outlier = new ExecutorSummary("9999", "host:port", true, 1,
0, 0, 1, 0, 0,
3, 0, 1, 100,
1000, 0, 0,
0, false, 0, new Date(1639300001000L),
Option.empty, Option.empty, Map(), Option.empty, Set(),
Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 2048L, "JVMOffHeapMemory" -> 1200L))),
Map(), Map(), 1,
false, Set())
assert(
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.PEAK_JVM_ONHEAP_MEMORY)) ==
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER)))
}

test("Policy: OUTLIER - Detect a peak JVM off-heap memory outlier") {
val outlier = new ExecutorSummary("9999", "host:port", true, 1,
0, 0, 1, 0, 0,
3, 0, 1, 100,
1000, 0, 0,
0, false, 0, new Date(1639300001000L),
Option.empty, Option.empty, Map(), Option.empty, Set(),
Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 1024L, "JVMOffHeapMemory" -> 2048L))),
Map(), Map(), 1,
false, Set())
assert(
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.PEAK_JVM_OFFHEAP_MEMORY)) ==
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER)))
}

test("Policy: OUTLIER_NO_FALLBACK - Return None if there are no outliers") {
assertEquals(None, plugin.invokePrivate(_choose(list, ExecutorRollPolicy.OUTLIER_NO_FALLBACK)))
}
Expand Down Expand Up @@ -269,4 +334,35 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.TOTAL_GC_TIME)),
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER_NO_FALLBACK)))
}

test("Policy: OUTLIER_NO_FALLBACK - Detect a peak JVM on-heap memory outlier") {
val outlier = new ExecutorSummary("9999", "host:port", true, 1,
0, 0, 1, 0, 0,
3, 0, 1, 100,
0, 0, 0,
0, false, 0, new Date(1639300001000L),
Option.empty, Option.empty, Map(), Option.empty, Set(),
Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 2048L, "JVMOffHeapMemory" -> 1200L))),
Map(), Map(), 1,
false, Set())
val x = plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.TOTAL_GC_TIME))
assert(
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.PEAK_JVM_ONHEAP_MEMORY)) ==
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER_NO_FALLBACK)))
}

test("Policy: OUTLIER_NO_FALLBACK - Detect a peak JVM off-heap memory outlier") {
val outlier = new ExecutorSummary("9999", "host:port", true, 1,
0, 0, 1, 0, 0,
3, 0, 1, 100,
0, 0, 0,
0, false, 0, new Date(1639300001000L),
Option.empty, Option.empty, Map(), Option.empty, Set(),
Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 1024L, "JVMOffHeapMemory" -> 2048L))),
Map(), Map(), 1,
false, Set())
assert(
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.PEAK_JVM_OFFHEAP_MEMORY)) ==
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER_NO_FALLBACK)))
}
}

0 comments on commit a5b4e14

Please sign in to comment.