Skip to content

Commit

Permalink
Fix up tests + remove redundant method override, combine utility clas…
Browse files Browse the repository at this point in the history
…s into new mesos scheduler util trait
  • Loading branch information
Ankur Chauhan committed Jun 26, 2015
1 parent 92b47fd commit 72fe88a
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private[spark] class CoarseMesosSchedulerBackend(
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
if (totalCoresAcquired < maxCores &&
mem >= MemoryUtils.calculateTotalMemory(sc) &&
mem >= calculateTotalMemory(sc) &&
cpus >= 1 &&
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
!slaveIdsWithExecutors.contains(slaveId)) {
Expand All @@ -198,8 +198,7 @@ private[spark] class CoarseMesosSchedulerBackend(
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
.setName("Task " + taskId)
.addResources(createResource("cpus", cpusToUse))
.addResources(createResource("mem",
MemoryUtils.calculateTotalMemory(sc)))
.addResources(createResource("mem", calculateTotalMemory(sc)))

sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
Expand All @@ -217,14 +216,6 @@ private[spark] class CoarseMesosSchedulerBackend(
}
}

/** Build a Mesos resource protobuf object */
private def createResource(resourceName: String, quantity: Double): Protos.Resource = {
Resource.newBuilder()
.setName(resourceName)
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
.build()
}

override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
val taskId = status.getTaskId.getValue.toInt
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ private[spark] class MesosSchedulerBackend(
.setType(Value.Type.SCALAR)
.setScalar(
Value.Scalar.newBuilder()
.setValue(MemoryUtils.calculateTotalMemory(sc)).build())
.setValue(calculateTotalMemory(sc)).build())
.build()
val executorInfo = MesosExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
Expand Down Expand Up @@ -194,7 +194,7 @@ private[spark] class MesosSchedulerBackend(
val mem = getResource(o.getResourcesList, "mem")
val cpus = getResource(o.getResourcesList, "cpus")
val slaveId = o.getSlaveId.getValue
(mem >= MemoryUtils.calculateTotalMemory(sc) &&
(mem >= calculateTotalMemory(sc) &&
// need at least 1 for executor, 1 for task
cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)) ||
(slaveIdsWithExecutors.contains(slaveId) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@

package org.apache.spark.scheduler.cluster.mesos

import java.util.{List => JList}
import java.util.concurrent.CountDownLatch

import com.google.common.base.Splitter
import java.util.{List => JList}

import scala.collection.JavaConversions._
import scala.collection.mutable

import com.google.common.base.Splitter
import org.apache.mesos.Protos._
import org.apache.mesos.{Protos, MesosSchedulerDriver, Scheduler}
import org.apache.spark.Logging
import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler}
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SparkContext}


/**
Expand Down Expand Up @@ -187,4 +186,13 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
matchesAttributeRequirements(offerConstraints, (o.getAttributesList map getAttribute).toMap)
}

// These defaults copied from YARN
val OVERHEAD_FRACTION = 0.10
val OVERHEAD_MINIMUM = 384

private[mesos] def calculateTotalMemory(sc: SparkContext): Int = {
sc.conf.getInt("spark.mesos.executor.memoryOverhead",
math.max(OVERHEAD_FRACTION * sc.executorMemory, OVERHEAD_MINIMUM).toInt) + sc.executorMemory
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,16 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
when(sc.conf).thenReturn(new SparkConf)
when(sc.listenerBus).thenReturn(listenerBus)

val minMem = MemoryUtils.calculateTotalMemory(sc).toInt
val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")

val minMem = backend.calculateTotalMemory(sc)
val minCpu = 4

val mesosOffers = new java.util.ArrayList[Offer]
mesosOffers.add(createOffer(1, minMem, minCpu))
mesosOffers.add(createOffer(2, minMem - 1, minCpu))
mesosOffers.add(createOffer(3, minMem, minCpu))

val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")

val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2)
expectedWorkerOffers.append(new WorkerOffer(
mesosOffers.get(0).getSlaveId.getValue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,39 @@

package org.apache.spark.scheduler.cluster.mesos

import org.apache.spark.{SparkConf, SparkContext}
import org.mockito.Mockito._
import org.scalatest._
import org.scalatest.mock.MockitoSugar

import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
class MesosSchedulerUtilsSuite extends FlatSpec with Matchers with MockitoSugar {

class MemoryUtilsSuite extends SparkFunSuite with MockitoSugar {
test("MesosMemoryUtils should always override memoryOverhead when it's set") {
def fixture: Object {val sc: SparkContext; val sparkConf: SparkConf} = new {
val sparkConf = new SparkConf

val sc = mock[SparkContext]
when(sc.conf).thenReturn(sparkConf)
}

"MesosSchedulerUtils" should "use at-least minimum overhead" in new MesosSchedulerUtils {
val f = fixture
// 384 > sc.executorMemory * 0.1 => 512 + 384 = 896
when(f.sc.executorMemory).thenReturn(512)
calculateTotalMemory(f.sc) shouldBe 896
}

it should "use overhead if it is greater than minimum value" in new MesosSchedulerUtils {
val f = fixture
// 384 > sc.executorMemory * 0.1 => 512 + 384 = 896
when(sc.executorMemory).thenReturn(512)
assert(MemoryUtils.calculateTotalMemory(sc) === 896)

// 384 < sc.executorMemory * 0.1 => 4096 + (4096 * 0.1) = 4505.6
when(sc.executorMemory).thenReturn(4096)
assert(MemoryUtils.calculateTotalMemory(sc) === 4505)

// set memoryOverhead
sparkConf.set("spark.mesos.executor.memoryOverhead", "100")
assert(MemoryUtils.calculateTotalMemory(sc) === 4196)
sparkConf.set("spark.mesos.executor.memoryOverhead", "400")
assert(MemoryUtils.calculateTotalMemory(sc) === 4496)
when(f.sc.executorMemory).thenReturn(4096)
calculateTotalMemory(f.sc) shouldBe 4505
}

it should "use spark.mesos.executor.memoryOverhead (if set)" in new MesosSchedulerUtils {
val f = fixture
// 384 > sc.executorMemory * 0.1 => 512 + 384 = 896
when(f.sc.executorMemory).thenReturn(1024)
f.sparkConf.set("spark.mesos.executor.memoryOverhead", "512")
calculateTotalMemory(f.sc) shouldBe 1536
}

}

0 comments on commit 72fe88a

Please sign in to comment.