Skip to content

Commit

Permalink
Incorporate code review comments + use SparkFunSuite
Browse files Browse the repository at this point in the history
  • Loading branch information
Ankur Chauhan committed Jun 26, 2015
1 parent fdc0937 commit 662535f
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,11 @@ private[spark] class MesosSchedulerBackend(
// check if all constraints are satisfield
// 1. Attribute constraints
// 2. Memory requirements
// 3. CPU requirements
// 3. CPU requirements - need at least 1 for executor, 1 for task
val meetsConstrains = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)

// need at least 1 for executor, 1 for task
(meetsConstrains && meetsMemoryRequirements && meetsCPURequirements) ||
(slaveIdsWithExecutors.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.util.concurrent.CountDownLatch
import java.util.{List => JList}

import scala.collection.JavaConversions._
import scala.collection.mutable

import com.google.common.base.Splitter
import org.apache.mesos.Protos._
Expand Down Expand Up @@ -98,12 +97,12 @@ private[mesos] trait MesosSchedulerUtils extends Logging {


/** Helper method to get the key,value-set pair for a Mesos Attribute protobuf */
private[mesos] def getAttribute(attr: Attribute): (String, Set[String]) =
def getAttribute(attr: Attribute): (String, Set[String]) =
(attr.getName, attr.getText.getValue.split(',').toSet)


/** Build a Mesos resource protobuf object */
private[mesos] def createResource(resourceName: String, quantity: Double): Protos.Resource = {
def createResource(resourceName: String, quantity: Double): Protos.Resource = {
Resource.newBuilder()
.setName(resourceName)
.setType(Value.Type.SCALAR)
Expand All @@ -118,27 +117,24 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
* else if attribute is defined and no values are given, simple attribute presence is preformed
* else if attribute name and value is specified, subset match is performed on slave attributes
*/
private[mesos] def matchesAttributeRequirements(
def matchesAttributeRequirements(
slaveOfferConstraints: Map[String, Set[String]],
offerAttributes: Map[String, Set[String]]): Boolean =
if (slaveOfferConstraints.isEmpty) {
true
} else {
slaveOfferConstraints.forall {
// offer has the required attribute and subsumes the required values for that attribute
case (name, requiredValues) =>
// The attributes and their values are case sensitive during comparison
// i.e tachyon -> true != Tachyon -> true != tachyon -> True
offerAttributes.contains(name) && requiredValues.subsetOf(offerAttributes(name))
slaveOfferConstraints.forall {
// offer has the required attribute and subsumes the required values for that attribute
case (name, requiredValues) =>
// The attributes and their values are case sensitive during comparison
// i.e tachyon -> true != Tachyon -> true != tachyon -> True
offerAttributes.contains(name) && requiredValues.subsetOf(offerAttributes(name))

}
}

/**
* Parses the attributes constraints provided to spark and build a matching data struct:
* Map[<attribute-name>, Set[values-to-match]
* Map[<attribute-name>, Set[values-to-match]]
* The constraints are specified as ';' separated key-value pairs where keys and values
* are separated by ':'. The ':' implies equality. For example:
* are separated by ':'. The ':' implies equality (for singular values) and "is one of" for
* multiple values (comma separated). For example:
* {{{
* parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b")
* // would result in
Expand All @@ -152,7 +148,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
* by ':')
* @return Map of constraints to match resources offers.
*/
private[mesos] def parseConstraintString(constraintsVal: String): Map[String, Set[String]] = {
def parseConstraintString(constraintsVal: String): Map[String, Set[String]] = {
/*
Based on mesos docs:
attributes : attribute ( ";" attribute )*
Expand All @@ -167,7 +163,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
try {
Map() ++ mapAsScalaMap(splitter.split(constraintsVal)).map {
case (k, v) =>
if (v == null) {
if (v == null || v.isEmpty) {
(k, Set[String]())
} else {
(k, v.split(',').toSet)
Expand All @@ -185,7 +181,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
private val MEMORY_OVERHEAD_FRACTION = 0.10
private val MEMORY_OVERHEAD_MINIMUM = 384

private[mesos] def calculateTotalMemory(sc: SparkContext): Int = {
def calculateTotalMemory(sc: SparkContext): Int = {
sc.conf.getInt("spark.mesos.executor.memoryOverhead",
math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) +
sc.executorMemory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.spark.scheduler.cluster.mesos

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SparkFunSuite, SparkConf, SparkContext}
import org.mockito.Mockito._
import org.scalatest._
import org.scalatest.mock.MockitoSugar

class MesosSchedulerUtilsSuite extends FlatSpec with Matchers with MockitoSugar {
class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoSugar {

// scalastyle:off structural.type
// this is the documented way of generating fixtures in scalatest
Expand All @@ -31,44 +31,74 @@ class MesosSchedulerUtilsSuite extends FlatSpec with Matchers with MockitoSugar
val sc = mock[SparkContext]
when(sc.conf).thenReturn(sparkConf)
}
val utils = new MesosSchedulerUtils { }
// scalastyle:on structural.type

"MesosSchedulerUtils" should "use at-least minimum overhead" in new MesosSchedulerUtils {
test("use at-least minimum overhead") {
val f = fixture
// 384 > sc.executorMemory * 0.1 => 512 + 384 = 896
when(f.sc.executorMemory).thenReturn(512)
calculateTotalMemory(f.sc) shouldBe 896
utils.calculateTotalMemory(f.sc) shouldBe 896
}

it should "use overhead if it is greater than minimum value" in new MesosSchedulerUtils {
test("use overhead if it is greater than minimum value") {
val f = fixture
// 384 > sc.executorMemory * 0.1 => 512 + 384 = 896
when(f.sc.executorMemory).thenReturn(4096)
calculateTotalMemory(f.sc) shouldBe 4505
utils.calculateTotalMemory(f.sc) shouldBe 4505
}

it should "use spark.mesos.executor.memoryOverhead (if set)" in new MesosSchedulerUtils {
test("use spark.mesos.executor.memoryOverhead (if set)") {
val f = fixture
val utils = new MesosSchedulerUtils { }
// 384 > sc.executorMemory * 0.1 => 512 + 384 = 896
when(f.sc.executorMemory).thenReturn(1024)
f.sparkConf.set("spark.mesos.executor.memoryOverhead", "512")
calculateTotalMemory(f.sc) shouldBe 1536
utils.calculateTotalMemory(f.sc) shouldBe 1536
}

it should "parse a non-empty constraint string correctly" in new MesosSchedulerUtils {
test("parse a non-empty constraint string correctly") {
val expectedMap = Map(
"tachyon" -> Set("true"),
"zone" -> Set("us-east-1a", "us-east-1b")
)
parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b") should be (expectedMap)
utils.parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b") should be (expectedMap)
}

it should "parse an empty constraint string correctly" in new MesosSchedulerUtils {
parseConstraintString("") shouldBe Map()
test("parse an empty constraint string correctly") {
val utils = new MesosSchedulerUtils { }
utils.parseConstraintString("") shouldBe Map()
}

it should "throw an exception when the input is malformed" in new MesosSchedulerUtils {
an[IllegalArgumentException] should be thrownBy parseConstraintString("tachyon;zone:us-east")
test("throw an exception when the input is malformed") {
an[IllegalArgumentException] should be thrownBy
utils.parseConstraintString("tachyon;zone:us-east")
}

test("empty values for attributes' constraints matches all values") {
val constraintsStr = "tachyon:"
val parsedConstraints = utils.parseConstraintString(constraintsStr)

parsedConstraints shouldBe Map("tachyon" -> Set())

val `offer with no tachyon` = Map("zone" -> Set("us-east-1a", "us-east-1b"))
val `offer with tachyon:true` = Map("tachyon" -> Set("true"))
val `offer with tachyon:false` = Map("tachyon" -> Set("false"))

utils.matchesAttributeRequirements(parsedConstraints, `offer with no tachyon`) shouldBe false
utils.matchesAttributeRequirements(parsedConstraints, `offer with tachyon:true`) shouldBe true
utils.matchesAttributeRequirements(parsedConstraints, `offer with tachyon:false`) shouldBe true
}

test("subset match is performed constraint attributes") {
val `constraint with superset` = Map(
"tachyon" -> Set("true"),
"zone" -> Set("us-east-1a", "us-east-1b", "us-east-1c"))

val zoneConstraintStr = "tachyon:;zone:us-east-1a,us-east-1c"
val parsedConstraints = utils.parseConstraintString(zoneConstraintStr)

utils.matchesAttributeRequirements(parsedConstraints, `constraint with superset`) shouldBe true
}

}

0 comments on commit 662535f

Please sign in to comment.