Skip to content

Commit

Permalink
Decline offers that did not meet criteria
Browse files Browse the repository at this point in the history
  • Loading branch information
Ankur Chauhan committed Jun 26, 2015
1 parent 67b58a0 commit fdc0937
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
package org.apache.spark.scheduler.cluster.mesos

import java.io.File
import java.util.{Collections, List => JList}
import java.util.{List => JList}

import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}

import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
Expand Down Expand Up @@ -172,15 +173,14 @@ private[spark] class CoarseMesosSchedulerBackend(
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
synchronized {
val filters = Filters.newBuilder().setRefuseSeconds(5).build()

// filter out all the offers that do not meet constraints (if specified)
val qualifyingOffers = filterOffersByConstraints(offers, slaveOfferConstraints)

for (offer <- qualifyingOffers) {
for (offer <- offers) {
val offerAttributes = (offer.getAttributesList map getAttribute).toMap
val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
val slaveId = offer.getSlaveId.toString
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
if (totalCoresAcquired < maxCores &&
if (meetsConstraints &&
totalCoresAcquired < maxCores &&
mem >= calculateTotalMemory(sc) &&
cpus >= 1 &&
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
Expand All @@ -202,15 +202,14 @@ private[spark] class CoarseMesosSchedulerBackend(

sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
.setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder())
.setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder)
}

d.launchTasks(
Collections.singleton(offer.getId), Collections.singletonList(task.build()), filters)
// accept the offer and launch the task
d.launchTasks(List(offer.getId), List(task.build()), filters)
} else {
// Filter it out
d.launchTasks(
Collections.singleton(offer.getId), Collections.emptyList[MesosTaskInfo](), filters)
// Decline the offer
d.declineOffer(offer.getId)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster.mesos
import java.io.File
import java.util.{ArrayList => JArrayList, Collections, List => JList}

import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}

import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _}
Expand Down Expand Up @@ -188,19 +189,29 @@ private[spark] class MesosSchedulerBackend(
*/
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
inClassLoader() {
val qualifyingOffers = filterOffersByConstraints(offers, slaveOfferConstraints)
// Fail-fast on offers we know will be rejected
val (usableOffers, unUsableOffers) = qualifyingOffers.partition { o =>
val (usableOffers, unUsableOffers) = offers.partition { o =>
val mem = getResource(o.getResourcesList, "mem")
val cpus = getResource(o.getResourcesList, "cpus")
val slaveId = o.getSlaveId.getValue
(mem >= calculateTotalMemory(sc) &&
// need at least 1 for executor, 1 for task
cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)) ||
(slaveIdsWithExecutors.contains(slaveId) &&
cpus >= scheduler.CPUS_PER_TASK)
val offerAttributes = (o.getAttributesList map getAttribute).toMap

// check if all constraints are satisfield
// 1. Attribute constraints
// 2. Memory requirements
// 3. CPU requirements
val meetsConstrains = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)

// need at least 1 for executor, 1 for task
(meetsConstrains && meetsMemoryRequirements && meetsCPURequirements) ||
(slaveIdsWithExecutors.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)
}

// Decline offers we ruled out immediately
unUsableOffers.foreach(o => d.declineOffer(o.getId))

val workerOffers = usableOffers.map { o =>
val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) {
getResource(o.getResourcesList, "cpus").toInt
Expand Down Expand Up @@ -254,8 +265,6 @@ private[spark] class MesosSchedulerBackend(
d.declineOffer(o.getId)
}

// Decline offers we ruled out immediately
unUsableOffers.foreach(o => d.declineOffer(o.getId))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,16 +180,6 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
}
}

/**
* For the list of offers received, find the ones that match the offer constraints (if specified)
* @param offers set of all offers received
* @return Offers that match the constraints
*/
private[mesos] def filterOffersByConstraints(
offers: JList[Offer],
offerConstraints: Map[String, Set[String]]): mutable.Buffer[Offer] = offers.filter { o =>
matchesAttributeRequirements(offerConstraints, (o.getAttributesList map getAttribute).toMap)
}

// These defaults copied from YARN
private val MEMORY_OVERHEAD_FRACTION = 0.10
Expand Down

0 comments on commit fdc0937

Please sign in to comment.