Skip to content

Commit

Permalink
[SPARK-40902][MESOS][TESTS] Fix issue with mesos tests failing due to…
Browse files Browse the repository at this point in the history
… quick submission of drivers

### What changes were proposed in this pull request?

##### Quick submission of drivers in tests to mesos scheduler results in dropping drivers

Queued drivers in `MesosClusterScheduler` are ordered based on `MesosDriverDescription` - and the ordering used checks for priority (if different), followed by comparison of submission time.
For two driver submissions with same priority, if made in quick succession (such that submission time is same due to millisecond granularity of Date), this results in dropping the second `MesosDriverDescription` from `queuedDrivers` (since `driverOrdering` returns `0` when comparing the descriptions).

This PR fixes the more immediate issue with tests.

### Why are the changes needed?

Flakey tests, [see here](https://lists.apache.org/thread/jof098qxp0s6qqmt9qwv52f9665b1pjg) for an example.

### Does this PR introduce _any_ user-facing change?

No.
Fixing only tests for now - as mesos support is deprecated, not changing scheduler itself to address this.

### How was this patch tested?

Fixes unit tests

Closes apache#38378 from mridulm/fix_MesosClusterSchedulerSuite.

Authored-by: Mridul <mridulatgmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 60b1056)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit e5744b9)
  • Loading branch information
Mridul authored and huaxingao committed Oct 30, 2022
1 parent bcae416 commit 2c85583
Showing 1 changed file with 40 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.scheduler.cluster.mesos

import java.util.{Collection, Collections, Date}
import java.util.concurrent.atomic.AtomicLong

import scala.collection.JavaConverters._

Expand All @@ -40,6 +41,19 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
private var driver: SchedulerDriver = _
private var scheduler: MesosClusterScheduler = _

private val submissionTime = new AtomicLong(System.currentTimeMillis())

// Queued drivers in MesosClusterScheduler are ordered based on MesosDriverDescription
// The default ordering checks for priority, followed by submission time. For two driver
// submissions with same priority and if made in quick succession (such that submission
// time is same due to millisecond granularity), this results in dropping the
// second MesosDriverDescription from the queuedDrivers - as driverOrdering
// returns 0 when comparing the descriptions. Ensure two seperate submissions
// have differnt dates
private def getDate: Date = {
new Date(submissionTime.incrementAndGet())
}

private def setScheduler(sparkConfVars: Map[String, String] = null): Unit = {
val conf = new SparkConf()
conf.setMaster("mesos://localhost:5050")
Expand Down Expand Up @@ -68,7 +82,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
command,
Map[String, String](),
submissionId,
new Date())
getDate)
}

test("can queue drivers") {
Expand Down Expand Up @@ -108,7 +122,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
Map((config.EXECUTOR_HOME.key, "test"), ("spark.app.name", "test"),
(config.DRIVER_MEMORY_OVERHEAD.key, "0")),
"s1",
new Date()))
getDate))
assert(response.success)
val offer = Offer.newBuilder()
.addResources(
Expand Down Expand Up @@ -213,7 +227,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
Map("spark.mesos.executor.home" -> "test",
"spark.app.name" -> "test"),
"s1",
new Date()))
getDate))
assert(response.success)

val offer = Utils.createOffer("o1", "s1", mem*2, cpu)
Expand All @@ -240,7 +254,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
Map("spark.mesos.executor.home" -> "test",
"spark.app.name" -> "test"),
"s1",
new Date()))
getDate))
assert(response.success)

val offer = Utils.createOffer("o1", "s1", mem*2, cpu)
Expand Down Expand Up @@ -270,7 +284,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
config.DRIVER_MEMORY_OVERHEAD.key -> "0"
),
"s1",
new Date()))
getDate))
assert(response.success)

val offer = Utils.createOffer("o1", "s1", mem, cpu)
Expand All @@ -296,7 +310,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
config.NETWORK_LABELS.key -> "key1:val1,key2:val2",
config.DRIVER_MEMORY_OVERHEAD.key -> "0"),
"s1",
new Date()))
getDate))

assert(response.success)

Expand Down Expand Up @@ -327,7 +341,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
"spark.app.name" -> "test",
config.DRIVER_MEMORY_OVERHEAD.key -> "0"),
"s1",
new Date()))
getDate))

assert(response.success)

Expand All @@ -352,7 +366,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
"spark.app.name" -> "test",
config.DRIVER_MEMORY_OVERHEAD.key -> "0"),
"s1",
new Date()))
getDate))

assert(response.success)

Expand All @@ -378,7 +392,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
"spark.app.name" -> "test",
config.DRIVER_MEMORY_OVERHEAD.key -> "0"),
"s1",
new Date()))
getDate))

assert(response.success)

Expand Down Expand Up @@ -413,7 +427,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
config.DRIVER_CONSTRAINTS.key -> driverConstraints,
config.DRIVER_MEMORY_OVERHEAD.key -> "0"),
"s1",
new Date()))
getDate))
assert(response.success)
}

Expand Down Expand Up @@ -452,7 +466,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
config.DRIVER_LABELS.key -> "key:value",
config.DRIVER_MEMORY_OVERHEAD.key -> "0"),
"s1",
new Date()))
getDate))

assert(response.success)

Expand All @@ -474,7 +488,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi

val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
Map((config.EXECUTOR_HOME.key, "test"), ("spark.app.name", "test")), "s1", new Date()))
Map((config.EXECUTOR_HOME.key, "test"), ("spark.app.name", "test")), "s1", getDate))
assert(response.success)
val agentId = SlaveID.newBuilder().setValue("s1").build()
val offer = Offer.newBuilder()
Expand Down Expand Up @@ -533,7 +547,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi

val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")), "sub1", new Date()))
Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")), "sub1", getDate))
assert(response.success)

// Offer a resource to launch the submitted driver
Expand Down Expand Up @@ -651,7 +665,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
config.EXECUTOR_URI.key -> "s3a://bucket/spark-version.tgz",
"another.conf" -> "\\value"),
"s1",
new Date())
getDate)

val expectedCmd = "cd spark-version*; " +
"bin/spark-submit --name \"app name\" --master mesos://mesos://localhost:5050 " +
Expand Down Expand Up @@ -691,7 +705,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
command,
Map("spark.mesos.dispatcher.queue" -> "dummy"),
"s1",
new Date())
getDate)

assertThrows[NoSuchElementException] {
scheduler.getDriverPriority(desc)
Expand All @@ -702,7 +716,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
command,
Map[String, String](),
"s2",
new Date())
getDate)

assert(scheduler.getDriverPriority(desc) == 0.0f)

Expand All @@ -711,7 +725,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
command,
Map("spark.mesos.dispatcher.queue" -> "default"),
"s3",
new Date())
getDate)

assert(scheduler.getDriverPriority(desc) == 0.0f)

Expand All @@ -720,7 +734,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
command,
Map("spark.mesos.dispatcher.queue" -> "ROUTINE"),
"s4",
new Date())
getDate)

assert(scheduler.getDriverPriority(desc) == 1.0f)

Expand All @@ -729,7 +743,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
command,
Map("spark.mesos.dispatcher.queue" -> "URGENT"),
"s5",
new Date())
getDate)

assert(scheduler.getDriverPriority(desc) == 2.0f)
}
Expand All @@ -746,22 +760,22 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi

val response0 = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
Map("spark.mesos.dispatcher.queue" -> "ROUTINE"), "s0", new Date()))
Map("spark.mesos.dispatcher.queue" -> "ROUTINE"), "s0", getDate))
assert(response0.success)

val response1 = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
Map[String, String](), "s1", new Date()))
Map[String, String](), "s1", getDate))
assert(response1.success)

val response2 = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
Map("spark.mesos.dispatcher.queue" -> "EXCEPTIONAL"), "s2", new Date()))
Map("spark.mesos.dispatcher.queue" -> "EXCEPTIONAL"), "s2", getDate))
assert(response2.success)

val response3 = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
Map("spark.mesos.dispatcher.queue" -> "URGENT"), "s3", new Date()))
Map("spark.mesos.dispatcher.queue" -> "URGENT"), "s3", getDate))
assert(response3.success)

val state = scheduler.getSchedulerState()
Expand All @@ -782,12 +796,12 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi

val response0 = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
Map("spark.mesos.dispatcher.queue" -> "LOWER"), "s0", new Date()))
Map("spark.mesos.dispatcher.queue" -> "LOWER"), "s0", getDate))
assert(response0.success)

val response1 = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
Map[String, String](), "s1", new Date()))
Map[String, String](), "s1", getDate))
assert(response1.success)

val state = scheduler.getSchedulerState()
Expand All @@ -812,7 +826,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
config.DRIVER_MEMORY_OVERHEAD.key -> "0") ++
addlSparkConfVars,
"s1",
new Date())
getDate)
val response = scheduler.submitDriver(driverDesc)
assert(response.success)
val offer = Utils.createOffer("o1", "s1", mem, cpu)
Expand Down

0 comments on commit 2c85583

Please sign in to comment.