From 662535f6b95e063099fcd48db80a22688bdd6ea4 Mon Sep 17 00:00:00 2001 From: Ankur Chauhan Date: Thu, 25 Jun 2015 12:49:14 -0700 Subject: [PATCH] Incorporate code review comments + use SparkFunSuite --- .../cluster/mesos/MesosSchedulerBackend.scala | 3 +- .../cluster/mesos/MesosSchedulerUtils.scala | 34 +++++------ .../mesos/MesosSchedulerUtilsSuite.scala | 58 ++++++++++++++----- 3 files changed, 60 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 09414e7a67b62..ca52473c80ada 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -199,12 +199,11 @@ private[spark] class MesosSchedulerBackend( // check if all constraints are satisfield // 1. Attribute constraints // 2. Memory requirements - // 3. CPU requirements + // 3. CPU requirements - need at least 1 for executor, 1 for task val meetsConstrains = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) val meetsMemoryRequirements = mem >= calculateTotalMemory(sc) val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK) - // need at least 1 for executor, 1 for task (meetsConstrains && meetsMemoryRequirements && meetsCPURequirements) || (slaveIdsWithExecutors.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 5aa57b5a1a41c..439b2d7ecc48d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -21,7 +21,6 @@ import java.util.concurrent.CountDownLatch import java.util.{List => JList} import scala.collection.JavaConversions._ -import scala.collection.mutable import com.google.common.base.Splitter import org.apache.mesos.Protos._ @@ -98,12 +97,12 @@ private[mesos] trait MesosSchedulerUtils extends Logging { /** Helper method to get the key,value-set pair for a Mesos Attribute protobuf */ - private[mesos] def getAttribute(attr: Attribute): (String, Set[String]) = + def getAttribute(attr: Attribute): (String, Set[String]) = (attr.getName, attr.getText.getValue.split(',').toSet) /** Build a Mesos resource protobuf object */ - private[mesos] def createResource(resourceName: String, quantity: Double): Protos.Resource = { + def createResource(resourceName: String, quantity: Double): Protos.Resource = { Resource.newBuilder() .setName(resourceName) .setType(Value.Type.SCALAR) @@ -118,27 +117,24 @@ private[mesos] trait MesosSchedulerUtils extends Logging { * else if attribute is defined and no values are given, simple attribute presence is preformed * else if attribute name and value is specified, subset match is performed on slave attributes */ - private[mesos] def matchesAttributeRequirements( + def matchesAttributeRequirements( slaveOfferConstraints: Map[String, Set[String]], offerAttributes: Map[String, Set[String]]): Boolean = - if (slaveOfferConstraints.isEmpty) { - true - } else { - slaveOfferConstraints.forall { - // offer has the required attribute and subsumes the required values for that attribute - case (name, requiredValues) => - // The attributes and their values are case sensitive during comparison - // i.e tachyon -> true != Tachyon -> true != tachyon -> True - offerAttributes.contains(name) && requiredValues.subsetOf(offerAttributes(name)) + slaveOfferConstraints.forall { + // offer has the required attribute and subsumes the required values for that attribute + case (name, requiredValues) => + // The attributes and their values are case sensitive during comparison + // i.e tachyon -> true != Tachyon -> true != tachyon -> True + offerAttributes.contains(name) && requiredValues.subsetOf(offerAttributes(name)) - } } /** * Parses the attributes constraints provided to spark and build a matching data struct: - * Map[, Set[values-to-match] + * Map[, Set[values-to-match]] * The constraints are specified as ';' separated key-value pairs where keys and values - * are separated by ':'. The ':' implies equality. For example: + * are separated by ':'. The ':' implies equality (for singular values) and "is one of" for + * multiple values (comma separated). For example: * {{{ * parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b") * // would result in @@ -152,7 +148,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { * by ':') * @return Map of constraints to match resources offers. */ - private[mesos] def parseConstraintString(constraintsVal: String): Map[String, Set[String]] = { + def parseConstraintString(constraintsVal: String): Map[String, Set[String]] = { /* Based on mesos docs: attributes : attribute ( ";" attribute )* @@ -167,7 +163,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { try { Map() ++ mapAsScalaMap(splitter.split(constraintsVal)).map { case (k, v) => - if (v == null) { + if (v == null || v.isEmpty) { (k, Set[String]()) } else { (k, v.split(',').toSet) @@ -185,7 +181,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { private val MEMORY_OVERHEAD_FRACTION = 0.10 private val MEMORY_OVERHEAD_MINIMUM = 384 - private[mesos] def calculateTotalMemory(sc: SparkContext): Int = { + def calculateTotalMemory(sc: SparkContext): Int = { sc.conf.getInt("spark.mesos.executor.memoryOverhead", math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) + sc.executorMemory diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala index c0f92b327b25c..8228f9885cbfa 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala @@ -17,12 +17,12 @@ package org.apache.spark.scheduler.cluster.mesos -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.{SparkFunSuite, SparkConf, SparkContext} import org.mockito.Mockito._ import org.scalatest._ import org.scalatest.mock.MockitoSugar -class MesosSchedulerUtilsSuite extends FlatSpec with Matchers with MockitoSugar { +class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoSugar { // scalastyle:off structural.type // this is the documented way of generating fixtures in scalatest @@ -31,44 +31,74 @@ class MesosSchedulerUtilsSuite extends FlatSpec with Matchers with MockitoSugar val sc = mock[SparkContext] when(sc.conf).thenReturn(sparkConf) } + val utils = new MesosSchedulerUtils { } // scalastyle:on structural.type - "MesosSchedulerUtils" should "use at-least minimum overhead" in new MesosSchedulerUtils { + test("use at-least minimum overhead") { val f = fixture // 384 > sc.executorMemory * 0.1 => 512 + 384 = 896 when(f.sc.executorMemory).thenReturn(512) - calculateTotalMemory(f.sc) shouldBe 896 + utils.calculateTotalMemory(f.sc) shouldBe 896 } - it should "use overhead if it is greater than minimum value" in new MesosSchedulerUtils { + test("use overhead if it is greater than minimum value") { val f = fixture // 384 > sc.executorMemory * 0.1 => 512 + 384 = 896 when(f.sc.executorMemory).thenReturn(4096) - calculateTotalMemory(f.sc) shouldBe 4505 + utils.calculateTotalMemory(f.sc) shouldBe 4505 } - it should "use spark.mesos.executor.memoryOverhead (if set)" in new MesosSchedulerUtils { + test("use spark.mesos.executor.memoryOverhead (if set)") { val f = fixture + val utils = new MesosSchedulerUtils { } // 384 > sc.executorMemory * 0.1 => 512 + 384 = 896 when(f.sc.executorMemory).thenReturn(1024) f.sparkConf.set("spark.mesos.executor.memoryOverhead", "512") - calculateTotalMemory(f.sc) shouldBe 1536 + utils.calculateTotalMemory(f.sc) shouldBe 1536 } - it should "parse a non-empty constraint string correctly" in new MesosSchedulerUtils { + test("parse a non-empty constraint string correctly") { val expectedMap = Map( "tachyon" -> Set("true"), "zone" -> Set("us-east-1a", "us-east-1b") ) - parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b") should be (expectedMap) + utils.parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b") should be (expectedMap) } - it should "parse an empty constraint string correctly" in new MesosSchedulerUtils { - parseConstraintString("") shouldBe Map() + test("parse an empty constraint string correctly") { + val utils = new MesosSchedulerUtils { } + utils.parseConstraintString("") shouldBe Map() } - it should "throw an exception when the input is malformed" in new MesosSchedulerUtils { - an[IllegalArgumentException] should be thrownBy parseConstraintString("tachyon;zone:us-east") + test("throw an exception when the input is malformed") { + an[IllegalArgumentException] should be thrownBy + utils.parseConstraintString("tachyon;zone:us-east") + } + + test("empty values for attributes' constraints matches all values") { + val constraintsStr = "tachyon:" + val parsedConstraints = utils.parseConstraintString(constraintsStr) + + parsedConstraints shouldBe Map("tachyon" -> Set()) + + val `offer with no tachyon` = Map("zone" -> Set("us-east-1a", "us-east-1b")) + val `offer with tachyon:true` = Map("tachyon" -> Set("true")) + val `offer with tachyon:false` = Map("tachyon" -> Set("false")) + + utils.matchesAttributeRequirements(parsedConstraints, `offer with no tachyon`) shouldBe false + utils.matchesAttributeRequirements(parsedConstraints, `offer with tachyon:true`) shouldBe true + utils.matchesAttributeRequirements(parsedConstraints, `offer with tachyon:false`) shouldBe true + } + + test("subset match is performed constraint attributes") { + val `constraint with superset` = Map( + "tachyon" -> Set("true"), + "zone" -> Set("us-east-1a", "us-east-1b", "us-east-1c")) + + val zoneConstraintStr = "tachyon:;zone:us-east-1a,us-east-1c" + val parsedConstraints = utils.parseConstraintString(zoneConstraintStr) + + utils.matchesAttributeRequirements(parsedConstraints, `constraint with superset`) shouldBe true } }