diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala index 9a1862d32dc13..102dd4b76d237 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler.cluster.mesos import java.util.{Collection, Collections, Date} +import java.util.concurrent.atomic.AtomicLong import scala.collection.JavaConverters._ @@ -40,6 +41,19 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi private var driver: SchedulerDriver = _ private var scheduler: MesosClusterScheduler = _ + private val submissionTime = new AtomicLong(System.currentTimeMillis()) + + // Queued drivers in MesosClusterScheduler are ordered based on MesosDriverDescription + // The default ordering checks for priority, followed by submission time. For two driver + // submissions with same priority and if made in quick succession (such that submission + // time is same due to millisecond granularity), this results in dropping the + // second MesosDriverDescription from the queuedDrivers - as driverOrdering + // returns 0 when comparing the descriptions. Ensure two seperate submissions + // have differnt dates + private def getDate: Date = { + new Date(submissionTime.incrementAndGet()) + } + private def setScheduler(sparkConfVars: Map[String, String] = null): Unit = { val conf = new SparkConf() conf.setMaster("mesos://localhost:5050") @@ -68,7 +82,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi command, Map[String, String](), submissionId, - new Date()) + getDate) } test("can queue drivers") { @@ -108,7 +122,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi Map((config.EXECUTOR_HOME.key, "test"), ("spark.app.name", "test"), (config.DRIVER_MEMORY_OVERHEAD.key, "0")), "s1", - new Date())) + getDate)) assert(response.success) val offer = Offer.newBuilder() .addResources( @@ -213,7 +227,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi Map("spark.mesos.executor.home" -> "test", "spark.app.name" -> "test"), "s1", - new Date())) + getDate)) assert(response.success) val offer = Utils.createOffer("o1", "s1", mem*2, cpu) @@ -240,7 +254,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi Map("spark.mesos.executor.home" -> "test", "spark.app.name" -> "test"), "s1", - new Date())) + getDate)) assert(response.success) val offer = Utils.createOffer("o1", "s1", mem*2, cpu) @@ -270,7 +284,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi config.DRIVER_MEMORY_OVERHEAD.key -> "0" ), "s1", - new Date())) + getDate)) assert(response.success) val offer = Utils.createOffer("o1", "s1", mem, cpu) @@ -296,7 +310,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi config.NETWORK_LABELS.key -> "key1:val1,key2:val2", config.DRIVER_MEMORY_OVERHEAD.key -> "0"), "s1", - new Date())) + getDate)) assert(response.success) @@ -327,7 +341,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi "spark.app.name" -> "test", config.DRIVER_MEMORY_OVERHEAD.key -> "0"), "s1", - new Date())) + getDate)) assert(response.success) @@ -352,7 +366,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi "spark.app.name" -> "test", config.DRIVER_MEMORY_OVERHEAD.key -> "0"), "s1", - new Date())) + getDate)) assert(response.success) @@ -378,7 +392,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi "spark.app.name" -> "test", config.DRIVER_MEMORY_OVERHEAD.key -> "0"), "s1", - new Date())) + getDate)) assert(response.success) @@ -413,7 +427,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi config.DRIVER_CONSTRAINTS.key -> driverConstraints, config.DRIVER_MEMORY_OVERHEAD.key -> "0"), "s1", - new Date())) + getDate)) assert(response.success) } @@ -452,7 +466,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi config.DRIVER_LABELS.key -> "key:value", config.DRIVER_MEMORY_OVERHEAD.key -> "0"), "s1", - new Date())) + getDate)) assert(response.success) @@ -474,7 +488,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi val response = scheduler.submitDriver( new MesosDriverDescription("d1", "jar", 100, 1, true, command, - Map((config.EXECUTOR_HOME.key, "test"), ("spark.app.name", "test")), "s1", new Date())) + Map((config.EXECUTOR_HOME.key, "test"), ("spark.app.name", "test")), "s1", getDate)) assert(response.success) val agentId = SlaveID.newBuilder().setValue("s1").build() val offer = Offer.newBuilder() @@ -533,7 +547,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi val response = scheduler.submitDriver( new MesosDriverDescription("d1", "jar", 100, 1, true, command, - Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")), "sub1", new Date())) + Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")), "sub1", getDate)) assert(response.success) // Offer a resource to launch the submitted driver @@ -651,7 +665,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi config.EXECUTOR_URI.key -> "s3a://bucket/spark-version.tgz", "another.conf" -> "\\value"), "s1", - new Date()) + getDate) val expectedCmd = "cd spark-version*; " + "bin/spark-submit --name \"app name\" --master mesos://mesos://localhost:5050 " + @@ -691,7 +705,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi command, Map("spark.mesos.dispatcher.queue" -> "dummy"), "s1", - new Date()) + getDate) assertThrows[NoSuchElementException] { scheduler.getDriverPriority(desc) @@ -702,7 +716,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi command, Map[String, String](), "s2", - new Date()) + getDate) assert(scheduler.getDriverPriority(desc) == 0.0f) @@ -711,7 +725,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi command, Map("spark.mesos.dispatcher.queue" -> "default"), "s3", - new Date()) + getDate) assert(scheduler.getDriverPriority(desc) == 0.0f) @@ -720,7 +734,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi command, Map("spark.mesos.dispatcher.queue" -> "ROUTINE"), "s4", - new Date()) + getDate) assert(scheduler.getDriverPriority(desc) == 1.0f) @@ -729,7 +743,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi command, Map("spark.mesos.dispatcher.queue" -> "URGENT"), "s5", - new Date()) + getDate) assert(scheduler.getDriverPriority(desc) == 2.0f) } @@ -746,22 +760,22 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi val response0 = scheduler.submitDriver( new MesosDriverDescription("d1", "jar", 100, 1, true, command, - Map("spark.mesos.dispatcher.queue" -> "ROUTINE"), "s0", new Date())) + Map("spark.mesos.dispatcher.queue" -> "ROUTINE"), "s0", getDate)) assert(response0.success) val response1 = scheduler.submitDriver( new MesosDriverDescription("d1", "jar", 100, 1, true, command, - Map[String, String](), "s1", new Date())) + Map[String, String](), "s1", getDate)) assert(response1.success) val response2 = scheduler.submitDriver( new MesosDriverDescription("d1", "jar", 100, 1, true, command, - Map("spark.mesos.dispatcher.queue" -> "EXCEPTIONAL"), "s2", new Date())) + Map("spark.mesos.dispatcher.queue" -> "EXCEPTIONAL"), "s2", getDate)) assert(response2.success) val response3 = scheduler.submitDriver( new MesosDriverDescription("d1", "jar", 100, 1, true, command, - Map("spark.mesos.dispatcher.queue" -> "URGENT"), "s3", new Date())) + Map("spark.mesos.dispatcher.queue" -> "URGENT"), "s3", getDate)) assert(response3.success) val state = scheduler.getSchedulerState() @@ -782,12 +796,12 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi val response0 = scheduler.submitDriver( new MesosDriverDescription("d1", "jar", 100, 1, true, command, - Map("spark.mesos.dispatcher.queue" -> "LOWER"), "s0", new Date())) + Map("spark.mesos.dispatcher.queue" -> "LOWER"), "s0", getDate)) assert(response0.success) val response1 = scheduler.submitDriver( new MesosDriverDescription("d1", "jar", 100, 1, true, command, - Map[String, String](), "s1", new Date())) + Map[String, String](), "s1", getDate)) assert(response1.success) val state = scheduler.getSchedulerState() @@ -812,7 +826,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi config.DRIVER_MEMORY_OVERHEAD.key -> "0") ++ addlSparkConfVars, "s1", - new Date()) + getDate) val response = scheduler.submitDriver(driverDesc) assert(response.success) val offer = Utils.createOffer("o1", "s1", mem, cpu)