Skip to content

Commit

Permalink
Inform the rate limiter for pods and apps (#4863)
Browse files Browse the repository at this point in the history
Summary: The rate limiter incorrectly assumed that all runSpecs are apps.

Test Plan: sbt test with new test

Reviewers: meichstedt, jdef, timcharper, aquamatthias

Reviewed By: jdef, timcharper, aquamatthias

Subscribers: aquamatthias, jenkins, marathon-team

Differential Revision: https://phabricator.mesosphere.com/D352

(cherry picked from commit c8924aa)
  • Loading branch information
Jason Gilanfarr authored and aquamatthias committed Dec 21, 2016
1 parent ac0e78e commit 5069189
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 20 deletions.
Expand Up @@ -10,6 +10,7 @@ import mesosphere.marathon.core.group.GroupManager
import mesosphere.marathon.core.instance.update.{ InstanceChange, InstanceChangeHandler }
import mesosphere.marathon.core.launchqueue.LaunchQueue
import mesosphere.marathon.state.PathId
import scala.async.Async._

import scala.concurrent.Future

Expand All @@ -18,6 +19,7 @@ class NotifyRateLimiterStepImpl @Inject() (
groupManagerProvider: Provider[GroupManager]) extends InstanceChangeHandler {

import NotifyRateLimiterStep._
import scala.concurrent.ExecutionContext.Implicits.global

private[this] lazy val launchQueue = launchQueueProvider.get()
private[this] lazy val groupManager = groupManagerProvider.get()
Expand All @@ -32,16 +34,14 @@ class NotifyRateLimiterStepImpl @Inject() (
}
}

private[this] def notifyRateLimiter(runSpecId: PathId, version: OffsetDateTime): Future[Done] = {
import scala.concurrent.ExecutionContext.Implicits.global
groupManager.appVersion(runSpecId, version).map { maybeApp =>
// It would be nice if we could make sure that the delay gets send
// to the AppTaskLauncherActor before we continue but that would require quite some work.
//
// In production, the worst case would be that we restart one or few tasks without delay –
// this is unlikely but possible. It is unlikely that this causes noticeable harm.
maybeApp.foreach(launchQueue.addDelay)
}.map(_ => Done)
@SuppressWarnings(Array("all")) // async/await
private[this] def notifyRateLimiter(runSpecId: PathId, version: OffsetDateTime): Future[Done] = async {
val appFuture = groupManager.appVersion(runSpecId, version)
val podFuture = groupManager.podVersion(runSpecId, version)
val (app, pod) = (await(appFuture), await(podFuture))
app.foreach(launchQueue.addDelay)
pod.foreach(launchQueue.addDelay)
Done
}
}

Expand Down
@@ -1,4 +1,5 @@
package mesosphere.marathon.core.task.tracker.impl
package mesosphere.marathon
package core.task.tracker.impl

import akka.Done
import akka.actor.{ ActorRef, Status }
Expand All @@ -7,32 +8,29 @@ import akka.testkit.TestProbe
import ch.qos.logback.classic.Level
import com.codahale.metrics.MetricRegistry
import com.google.inject.Provider
import mesosphere.AkkaFunTest
import mesosphere.marathon.core.CoreGuiceModule
import mesosphere.marathon.core.base.ConstantClock
import mesosphere.marathon.core.group.GroupManager
import mesosphere.marathon.core.health.HealthCheckManager
import mesosphere.marathon.core.instance.TestInstanceBuilder
import mesosphere.marathon.core.instance.update.{ InstanceUpdateEffect, InstanceUpdateOpResolver, InstanceUpdateOperation }
import mesosphere.marathon.core.instance.update.{ InstanceUpdateEffect, InstanceUpdateOpResolver, InstanceUpdateOperation, InstanceUpdated }
import mesosphere.marathon.core.launchqueue.LaunchQueue
import mesosphere.marathon.core.pod.PodDefinition
import mesosphere.marathon.core.task.bus.{ MesosTaskStatusTestHelper, TaskStatusEmitter }
import mesosphere.marathon.core.task.tracker.TaskUpdater
import mesosphere.marathon.core.task.update.impl.steps.{ NotifyHealthCheckManagerStepImpl, NotifyLaunchQueueStepImpl, NotifyRateLimiterStepImpl, PostToEventStreamStepImpl, ScaleAppUpdateStepImpl, TaskStatusEmitterPublishStepImpl }
import mesosphere.marathon.metrics.Metrics
import mesosphere.marathon.state.{ PathId, Timestamp }
import mesosphere.marathon.state.{ AppDefinition, PathId, Timestamp }
import mesosphere.marathon.storage.repository.InstanceRepository
import mesosphere.marathon.test.{ CaptureLogEvents, MarathonActorSupport, Mockito, _ }
import mesosphere.marathon.test.{ CaptureLogEvents, _ }
import org.apache.mesos.SchedulerDriver
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{ GivenWhenThen, Matchers }

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{ Failure, Success, Try }

class InstanceOpProcessorImplTest
extends MarathonActorSupport with MarathonSpec with Mockito with GivenWhenThen with ScalaFutures with Matchers {

import scala.concurrent.ExecutionContext.Implicits.global
class InstanceOpProcessorImplTest extends AkkaFunTest {

// ignored by the TaskOpProcessorImpl
val deadline = Timestamp.zero
Expand Down Expand Up @@ -379,6 +377,36 @@ class InstanceOpProcessorImplTest
f.verifyNoMoreInteractions()
}

test("the rate limiter will inform the launch queue of apps") {
val f = new Fixture
val appId = PathId("/pod")
val app = AppDefinition(id = appId)
val version = Timestamp.now()
val builder = TestInstanceBuilder.newBuilder(appId, version, version).addTaskDropped()

f.groupManager.appVersion(appId, version.toOffsetDateTime) returns Future.successful(Some(app))
f.groupManager.podVersion(appId, version.toOffsetDateTime) returns Future.successful(None)
f.notifyRateLimiter.process(InstanceUpdated(builder.instance, None, Nil)).futureValue
verify(f.groupManager).appVersion(appId, version.toOffsetDateTime)
verify(f.groupManager).podVersion(appId, version.toOffsetDateTime)
verify(f.launchQueue).addDelay(app)
}

test("the rate limiter will inform the launch queue of pods") {
val f = new Fixture
val podId = PathId("/pod")
val pod = PodDefinition(id = podId)
val version = Timestamp.now()
val builder = TestInstanceBuilder.newBuilder(podId, version, version).addTaskDropped()

f.groupManager.appVersion(podId, version.toOffsetDateTime) returns Future.successful(None)
f.groupManager.podVersion(podId, version.toOffsetDateTime) returns Future.successful(Some(pod))
f.notifyRateLimiter.process(InstanceUpdated(builder.instance, None, Nil)).futureValue
verify(f.groupManager).appVersion(podId, version.toOffsetDateTime)
verify(f.groupManager).podVersion(podId, version.toOffsetDateTime)
verify(f.launchQueue).addDelay(pod)
}

class Fixture {
lazy val config = MarathonTestHelper.defaultConfig()
lazy val instanceTrackerProbe = TestProbe()
Expand Down

0 comments on commit 5069189

Please sign in to comment.