Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support mesos primitives for availability #5435

Merged
merged 17 commits into from
Sep 28, 2017
Merged
4 changes: 4 additions & 0 deletions docs/docs/command-line-flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ The core functionality flags can be also set by environment variable `MARATHON_O
- "vips" can be used to enable the networking VIP integration UI.
- "task\_killing" can be used to enable the TASK\_KILLING state in Mesos (0.28 or later)
- "external\_volumes" can be used if the cluster is configured to use external volumes.
- "maintenance_mode" can be used to respect maintenance window during offer matching.
Example: `--enable_features vips,task_killing,external_volumes`
* `--executor` (Optional. Default: "//cmd"): Executor to use when none is
specified.
Expand Down Expand Up @@ -171,6 +172,9 @@ The core functionality flags can be also set by environment variable `MARATHON_O
- S3 provider (experimental): s3://bucket-name/key-in-bucket?access_key=xxx&secret_key=xxx&region=eu-central-1
Please note: access_key and secret_key are optional.
If not provided, the [AWS default credentials provider chain](http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html) is used to look up aws credentials.
* <span class="label label-default">v1.5.0</span>`--draining_seconds` (Optional. Default: 0):
Time (in seconds) when marathon will start declining offers before a [maintenance window](http://mesos.apache.org/documentation/latest/maintenance/) start time.
**Note:** In order to activate the `--draining_seconds` configuration, you must add `maintenance_mode` to the set of `--enable_features`.

## Tuning Flags for Offer Matching/Launching Tasks

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.apache.mesos.Protos.Offer
* Allows to use external logic to decline offers.
*/
trait SchedulerPlugin extends Plugin {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rm new line

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a remove of an unwanted whitespace to prevent a reformat in the next time.

/**
* @return true if offer matches
*/
Expand Down
6 changes: 5 additions & 1 deletion src/main/scala/mesosphere/marathon/Features.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ object Features {
//enable GPUs
lazy val GPU_RESOURCES = "gpu_resources"

//enable maintenance mode
lazy val MAINTENANCE_MODE = "maintenance_mode"

lazy val availableFeatures = Map(
VIPS -> "Enable networking VIPs UI",
TASK_KILLING -> "Enable the optional TASK_KILLING state, available in Mesos 0.28 and later",
EXTERNAL_VOLUMES -> "Enable external volumes support in Marathon",
SECRETS -> "Enable support for secrets in Marathon (experimental)",
GPU_RESOURCES -> "Enable support for GPU in Marathon (experimental)"
GPU_RESOURCES -> "Enable support for GPU in Marathon (experimental)",
MAINTENANCE_MODE -> "Enable support for maintenance mode in Marathon (experimental)"
)

def description: String = {
Expand Down
11 changes: 10 additions & 1 deletion src/main/scala/mesosphere/marathon/MarathonConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import mesosphere.marathon.core.task.tracker.InstanceTrackerConfig
import mesosphere.marathon.core.task.update.TaskStatusUpdateConfig
import mesosphere.marathon.state.ResourceRole
import mesosphere.marathon.storage.StorageConf
import mesosphere.mesos.MatcherConf
import org.rogach.scallop.{ ScallopConf, ScallopOption }

import scala.sys.SystemProperties
Expand All @@ -34,7 +35,8 @@ trait MarathonConf
with EventConf with NetworkConf with GroupManagerConfig with LaunchQueueConfig with LaunchTokenConfig
with LeaderProxyConf with MarathonSchedulerServiceConfig with OfferMatcherManagerConfig with OfferProcessorConfig
with PluginManagerConfiguration with ReviveOffersConfig with StorageConf with KillConfig
with TaskJobsConfig with TaskStatusUpdateConfig with InstanceTrackerConfig with DeploymentConfig with ZookeeperConf {
with TaskJobsConfig with TaskStatusUpdateConfig with InstanceTrackerConfig with DeploymentConfig with ZookeeperConf
with MatcherConf {

lazy val mesosMaster = opt[String](
"master",
Expand Down Expand Up @@ -307,5 +309,12 @@ trait MarathonConf
noshort = true,
hidden = true,
default = Some(MesosHeartbeatMonitor.DEFAULT_HEARTBEAT_FAILURE_THRESHOLD))

lazy val drainingSeconds = opt[Long](
"draining_seconds",
descr = "(Default: 0 seconds) the seconds when marathon will start declining offers before a maintenance " +
"window start time. This is only evaluated if `maintenance_mode` is in the set of `enable_features`!",
default = Some(0)
)
}

Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class InstanceOpFactoryImpl(
config.mesosBridgeName())

val matchedOffer =
RunSpecOfferMatcher.matchOffer(pod, request.offer, request.instances, builderConfig.acceptedResourceRoles)
RunSpecOfferMatcher.matchOffer(pod, request.offer, request.instances,
builderConfig.acceptedResourceRoles, config)

matchedOffer match {
case matches: ResourceMatchResponse.Match =>
Expand All @@ -91,7 +92,8 @@ class InstanceOpFactoryImpl(
val InstanceOpFactory.Request(runSpec, offer, instances, _) = request

val matchResponse =
RunSpecOfferMatcher.matchOffer(app, offer, instances.values.toIndexedSeq, config.defaultAcceptedResourceRolesSet)
RunSpecOfferMatcher.matchOffer(app, offer, instances.values.toIndexedSeq,
config.defaultAcceptedResourceRolesSet, config)
matchResponse match {
case matches: ResourceMatchResponse.Match =>
val taskId = Task.Id.forRunSpec(app.id)
Expand Down Expand Up @@ -156,7 +158,7 @@ class InstanceOpFactoryImpl(
val resourceMatchResponse =
ResourceMatcher.matchResources(
offer, runSpec, instancesToConsiderForConstraints,
ResourceSelector.reservedWithLabels(rolesToConsider, reservationLabels),
ResourceSelector.reservedWithLabels(rolesToConsider, reservationLabels), config,
schedulerPlugins
)

Expand All @@ -183,8 +185,8 @@ class InstanceOpFactoryImpl(
}

val resourceMatchResponse =
ResourceMatcher.matchResources(offer, runSpec, instances.valuesIterator.toStream, ResourceSelector.reservable,
schedulerPlugins)
ResourceMatcher.matchResources(offer, runSpec, instances.valuesIterator.toStream,
ResourceSelector.reservable, config, schedulerPlugins)
resourceMatchResponse match {
case matches: ResourceMatchResponse.Match =>
val instanceOp = reserveAndCreateVolumes(request.frameworkId, runSpec, offer, matches.resourceMatch)
Expand Down
45 changes: 45 additions & 0 deletions src/main/scala/mesosphere/mesos/Availability.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package mesosphere.mesos

import java.time.Clock

import mesosphere.marathon.RichClock
import mesosphere.marathon.state.Timestamp
import org.apache.mesos.Protos.{ DurationInfo, Offer }

import scala.concurrent.duration._

object Availability {

def offerAvailable(offer: Offer, drainingTime: FiniteDuration)(implicit clock: Clock): Boolean = {
val now = clock.now()
if (offerHasUnavailability(offer)) {
val start: Timestamp = offer.getUnavailability.getStart

if (currentlyInDrainingState(now, start, drainingTime)) {
isAgentOutsideUnavailabilityWindow(offer, start, now)
} else true
} else true
}

private def currentlyInDrainingState(now: Timestamp, start: Timestamp, drainingTime: FiniteDuration) = {
now.after(start - drainingTime)
}

private def offerHasUnavailability(offer: Offer) = {
offer.hasUnavailability && offer.getUnavailability.hasStart
}

private def isAgentOutsideUnavailabilityWindow(offer: Offer, start: Timestamp, now: Timestamp) = {
offer.getUnavailability.hasDuration && now.after(start + offer.getUnavailability.getDuration.toDuration)
}

/**
* Convert Mesos DurationInfo to FiniteDuration.
*
* @return FiniteDuration for DurationInfo
*/
implicit class DurationInfoHelper(val di: DurationInfo) extends AnyVal {
def toDuration: FiniteDuration = FiniteDuration(di.getNanoseconds, NANOSECONDS)
}

}
14 changes: 14 additions & 0 deletions src/main/scala/mesosphere/mesos/MatcherConf.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package mesosphere.mesos

import org.rogach.scallop.{ ScallopConf, ScallopOption }

import scala.concurrent.duration._

trait MatcherConf {

def availableFeatures: Set[String]

def drainingSeconds: ScallopOption[Long]

def drainingTime: FiniteDuration = FiniteDuration(drainingSeconds(), SECONDS)
}
1 change: 1 addition & 0 deletions src/main/scala/mesosphere/mesos/NoOfferMatchReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ object NoOfferMatchReason {
case object InsufficientPorts extends NoOfferMatchReason
case object UnfulfilledRole extends NoOfferMatchReason
case object UnfulfilledConstraint extends NoOfferMatchReason
case object AgentUnavailable extends NoOfferMatchReason
case object NoCorrespondingReservationFound extends NoOfferMatchReason

/**
Expand Down
21 changes: 19 additions & 2 deletions src/main/scala/mesosphere/mesos/ResourceMatcher.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package mesosphere.mesos

import java.time.Clock

import com.typesafe.scalalogging.StrictLogging
import mesosphere.marathon.Features
import mesosphere.marathon.core.instance.Instance
import mesosphere.marathon.core.launcher.impl.TaskLabels
import mesosphere.marathon.plugin.scheduler.SchedulerPlugin
import mesosphere.marathon.state.{ DiskSource, DiskType, PersistentVolume, ResourceRole, RunSpec }
import mesosphere.marathon.state._
import mesosphere.marathon.stream.Implicits._
import mesosphere.marathon.tasks.{ PortsMatch, PortsMatcher, ResourceUtil }
import mesosphere.mesos.protos.Resource
Expand Down Expand Up @@ -130,7 +133,7 @@ object ResourceMatcher extends StrictLogging {
* the reservation.
*/
def matchResources(offer: Offer, runSpec: RunSpec, knownInstances: => Seq[Instance],
selector: ResourceSelector, schedulerPlugins: Seq[SchedulerPlugin] = Seq.empty): ResourceMatchResponse = {
selector: ResourceSelector, conf: MatcherConf, schedulerPlugins: Seq[SchedulerPlugin] = Seq.empty)(implicit clock: Clock): ResourceMatchResponse = {

val groupedResources: Map[Role, Seq[Protos.Resource]] = offer.getResourcesList.groupBy(_.getName).map { case (k, v) => k -> v.to[Seq] }

Expand Down Expand Up @@ -195,8 +198,22 @@ object ResourceMatcher extends StrictLogging {
badConstraints.isEmpty
}

val checkAvailability: Boolean = {
if (conf.availableFeatures.contains(Features.MAINTENANCE_MODE)) {
val result = Availability.offerAvailable(offer, conf.drainingTime)
noOfferMatchReasons += NoOfferMatchReason.UnfulfilledConstraint
// Add unavailability to noOfferMatchReasons
noOfferMatchReasons += NoOfferMatchReason.AgentUnavailable
logger.info(
s"Offer [${offer.getId.getValue}]. Agent [${offer.getSlaveId}] on host [${offer.getHostname}] unavailable.\n"
)
result
} else true
}

val resourceMatchOpt = if (scalarMatchResults.forall(_.matches)
&& meetsAllConstraints
&& checkAvailability
&& schedulerPlugins.forall(_.isMatch(offer, runSpec))) {
portsMatchOpt match {
case Some(portsMatch) =>
Expand Down
7 changes: 5 additions & 2 deletions src/main/scala/mesosphere/mesos/RunSpecOfferMatcher.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package mesosphere.mesos

import java.time.Clock

import com.google.protobuf.TextFormat
import com.typesafe.scalalogging.StrictLogging
import mesosphere.marathon.core.instance.Instance
Expand All @@ -18,7 +20,8 @@ object RunSpecOfferMatcher extends StrictLogging {
* @param knownInstances All instances associated with the given runSpec, needed to validate constraints
* @param givenAcceptedResourceRoles The resource roles for which to look.
*/
def matchOffer(runSpec: RunSpec, offer: Offer, knownInstances: => Seq[Instance], givenAcceptedResourceRoles: Set[String]): ResourceMatchResponse = {
def matchOffer(runSpec: RunSpec, offer: Offer, knownInstances: => Seq[Instance],
givenAcceptedResourceRoles: Set[String], conf: MatcherConf)(implicit clock: Clock): ResourceMatchResponse = {
val acceptedResourceRoles: Set[String] = {
val roles = if (runSpec.acceptedResourceRoles.isEmpty) {
givenAcceptedResourceRoles
Expand All @@ -30,7 +33,7 @@ object RunSpecOfferMatcher extends StrictLogging {
}

val resourceMatchResponse =
ResourceMatcher.matchResources(offer, runSpec, knownInstances, ResourceSelector.any(acceptedResourceRoles))
ResourceMatcher.matchResources(offer, runSpec, knownInstances, ResourceSelector.any(acceptedResourceRoles), conf)

def logInsufficientResources(): Unit = {
val runSpecHostPorts = runSpec match {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package mesosphere.marathon
package core.health

import java.time.Clock

import com.wix.accord.validate
import mesosphere.UnitTest
import mesosphere.marathon.Protos.HealthCheckDefinition.Protocol
Expand All @@ -22,6 +24,7 @@ import scala.concurrent.duration._

class MesosHealthCheckTest extends UnitTest {

implicit val clock = Clock.systemUTC()
implicit val healthCheckWrites: Writes[HealthCheck] = Writes { check =>
val appCheck: AppHealthCheck = Raml.toRaml(check)
AppHealthCheck.playJsonFormat.writes(appCheck)
Expand Down Expand Up @@ -825,7 +828,8 @@ class MesosHealthCheckTest extends UnitTest {
val config = MarathonTestHelper.defaultConfig()
val taskId = Task.Id.forRunSpec(app.id)
val builder = new TaskBuilder(app, taskId, config)
val resourceMatch = RunSpecOfferMatcher.matchOffer(app, offer, Seq.empty, config.defaultAcceptedResourceRolesSet)
val resourceMatch = RunSpecOfferMatcher.matchOffer(app, offer, Seq.empty,
config.defaultAcceptedResourceRolesSet, config)
resourceMatch match {
case matches: ResourceMatchResponse.Match => Some(builder.build(offer, matches.resourceMatch, None))
case _ => None
Expand Down
12 changes: 12 additions & 0 deletions src/test/scala/mesosphere/marathon/test/MarathonTestHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.mesos.Protos._
import org.apache.mesos.{ Protos => Mesos }
import play.api.libs.json.Json

import scala.concurrent.duration._
import scala.util.Random

object MarathonTestHelper {
Expand Down Expand Up @@ -125,6 +126,17 @@ object MarathonTestHelper {
offerBuilder
}

def makeBasicOfferWithUnavailability(startTime: Timestamp, duration: FiniteDuration = Duration(5, MINUTES)): Offer.Builder = {
val unavailableOfferBuilder = Unavailability.newBuilder()
.setStart(TimeInfo.newBuilder().setNanoseconds(startTime.nanos))

if (duration.isFinite()) {
unavailableOfferBuilder.setDuration(DurationInfo.newBuilder().setNanoseconds(duration.toNanos))
}

MarathonTestHelper.makeBasicOffer().setUnavailability(unavailableOfferBuilder.build())
}

def mountSource(path: Option[String]): Mesos.Resource.DiskInfo.Source = {
val b = Mesos.Resource.DiskInfo.Source.newBuilder.
setType(Mesos.Resource.DiskInfo.Source.Type.MOUNT)
Expand Down
39 changes: 39 additions & 0 deletions src/test/scala/mesosphere/mesos/AvailabilityTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package mesosphere.mesos

import java.time.Clock

import mesosphere.UnitTest
import mesosphere.marathon.RichClock
import mesosphere.marathon.state.Timestamp
import mesosphere.marathon.test.MarathonTestHelper
import org.apache.mesos.Protos.Offer

import scala.concurrent.duration._

class AvailabilityTest extends UnitTest {

implicit val clock = Clock.systemUTC()
val now = clock.now()

"Availability" should {
"drop offer from nodes in maintenance" in {
Availability.offerAvailable(makeBasicOfferWithUnavailability().build(), FiniteDuration(0, SECONDS)) shouldBe false
Availability.offerAvailable(makeBasicOfferWithUnavailability(now - Duration(1, HOURS), Duration(1, DAYS)).build(), FiniteDuration(0, SECONDS)) shouldBe false
}
"accept offers from nodes not in maintenance" in {
Availability.offerAvailable(makeBasicOfferWithUnavailability(now + Duration(1, HOURS), Duration(1, DAYS)).build(), FiniteDuration(0, SECONDS)) shouldBe true
Availability.offerAvailable(MarathonTestHelper.makeBasicOffer().build(), Duration(0, SECONDS)) shouldBe true
Availability.offerAvailable(makeBasicOfferWithUnavailability(now - Duration(1, DAYS), Duration(1, HOURS)).build(), FiniteDuration(0, SECONDS)) shouldBe true
}
"drop offers {drainingTime} seconds before node maintenance starts" in {
Availability.offerAvailable(makeBasicOfferWithUnavailability(now + Duration(200, SECONDS), Duration(1, DAYS)).build(), Duration(300, SECONDS)) shouldBe false
}
"drop offer when maintenance with infinite duration" in {
Availability.offerAvailable(makeBasicOfferWithUnavailability(now - Duration(1, HOURS)).build(), FiniteDuration(0, SECONDS)) shouldBe false
}
}

def makeBasicOfferWithUnavailability(startTime: Timestamp = now, duration: FiniteDuration = Duration(5, DAYS)): Offer.Builder = {
MarathonTestHelper.makeBasicOfferWithUnavailability(startTime, duration)
}
}
Loading