Skip to content

Commit

Permalink
Fixed lost tasks garbage collection (#4203)
Browse files Browse the repository at this point in the history
  • Loading branch information
unterstein authored and gkleiman committed Aug 10, 2016
1 parent a3ad9bc commit f0d56de
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 7 deletions.
Expand Up @@ -55,7 +55,7 @@ class ExpungeOverdueLostTasksActor(
age > config.taskLostExpungeGC
}
}
tasks.values.flatMap(_.tasks.filter(task => isTimedOut(task.mesosStatus)))
tasks.values.flatMap(_.tasks.filter(task => task.isUnreachable && isTimedOut(task.mesosStatus)))
}
}

Expand Down
12 changes: 6 additions & 6 deletions src/test/scala/mesosphere/marathon/MarathonTestHelper.scala
Expand Up @@ -346,28 +346,28 @@ object MarathonTestHelper {
)
}

def mininimalLostTask(appId: PathId, marathonTaskStatus: MarathonTaskStatus = MarathonTaskStatus.Gone): Task.LaunchedEphemeral = {
def mininimalLostTask(appId: PathId, marathonTaskStatus: MarathonTaskStatus = MarathonTaskStatus.Gone, since: Timestamp = clock.now()): Task.LaunchedEphemeral = {
val taskId = Task.Id.forRunSpec(appId)
val status = TaskStatusUpdateTestHelper.makeMesosTaskStatus(taskId, TaskState.TASK_LOST, maybeReason = Some(TaskStatus.Reason.REASON_RECONCILIATION))
mininimalTask(
taskId = taskId.idString,
now = clock.now(),
now = since,
mesosStatus = Some(status),
marathonTaskStatus = marathonTaskStatus
)
}

def minimalUnreachableTask(appId: PathId, marathonTaskStatus: MarathonTaskStatus = MarathonTaskStatus.Unreachable): Task.LaunchedEphemeral = {
val lostTask = mininimalLostTask(appId)
def minimalUnreachableTask(appId: PathId, marathonTaskStatus: MarathonTaskStatus = MarathonTaskStatus.Unreachable, since: Timestamp = clock.now()): Task.LaunchedEphemeral = {
val lostTask = mininimalLostTask(appId, since = since)
lostTask.copy(status = lostTask.status.copy(taskStatus = marathonTaskStatus))
}

def minimalRunning(appId: PathId, marathonTaskStatus: MarathonTaskStatus = MarathonTaskStatus.Running): Task.LaunchedEphemeral = {
def minimalRunning(appId: PathId, marathonTaskStatus: MarathonTaskStatus = MarathonTaskStatus.Running, since: Timestamp = clock.now()): Task.LaunchedEphemeral = {
val taskId = Task.Id.forRunSpec(appId)
val status = TaskStatusUpdateTestHelper.makeMesosTaskStatus(taskId, TaskState.TASK_RUNNING, maybeHealth = Option(true))
mininimalTask(
taskId = taskId.idString,
now = clock.now(),
now = since,
mesosStatus = Some(status),
marathonTaskStatus = marathonTaskStatus
)
Expand Down
@@ -0,0 +1,78 @@
package mesosphere.marathon.core.task.jobs

import akka.actor.{ ActorRef, ActorSystem, PoisonPill, Terminated }
import akka.testkit.TestProbe
import mesosphere.marathon
import mesosphere.marathon.core.base.ConstantClock
import mesosphere.marathon.core.task.jobs.impl.ExpungeOverdueLostTasksActor
import mesosphere.marathon.core.task.tracker.TaskTracker.TasksByApp
import mesosphere.marathon.core.task.tracker.{ TaskStateOpProcessor, TaskTracker }
import mesosphere.marathon.{ MarathonSpec, MarathonTestHelper }
import org.scalatest.GivenWhenThen
import org.scalatest.concurrent.ScalaFutures
import mesosphere.marathon.state.PathId._
import mesosphere.marathon.state.Timestamp

import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContext, Future }
import scala.concurrent.duration.Duration

class ExpungeOverdueLostTasksActorTest extends MarathonSpec with GivenWhenThen with marathon.test.Mockito with ScalaFutures {
implicit var actorSystem: ActorSystem = _
val taskTracker: TaskTracker = mock[TaskTracker]
val clock = ConstantClock()
val config = MarathonTestHelper.defaultConfig(maxTasksPerOffer = 10)
val stateOpProcessor: TaskStateOpProcessor = mock[TaskStateOpProcessor]
var checkActor: ActorRef = _

before {
actorSystem = ActorSystem()
checkActor = actorSystem.actorOf(ExpungeOverdueLostTasksActor.props(clock, config, taskTracker, stateOpProcessor))
}

after {
def waitForActorProcessingAllAndDying(): Unit = {
checkActor ! PoisonPill
val probe = TestProbe()
probe.watch(checkActor)
val terminated = probe.expectMsgAnyClassOf(classOf[Terminated])
assert(terminated.actor == checkActor)
}

waitForActorProcessingAllAndDying()

Await.result(actorSystem.terminate(), Duration.Inf)
}

test("running tasks with more then 24 hours with no status update should not be killed") {
Given("two running tasks")
val running1 = MarathonTestHelper.minimalRunning("/running1".toPath, since = Timestamp.apply(0))
val running2 = MarathonTestHelper.minimalRunning("/running2".toPath, since = Timestamp.apply(0))

taskTracker.tasksByApp()(any[ExecutionContext]) returns Future.successful(TasksByApp.forTasks(running1, running2))

When("a check is performed")
val testProbe = TestProbe()
testProbe.send(checkActor, ExpungeOverdueLostTasksActor.Tick)
testProbe.receiveOne(3.seconds)

And("no kill calls are issued")
noMoreInteractions(stateOpProcessor)
}

test("a unreachable task with more then 24 hours with no status update should be killed") {
Given("one unreachable, one running tasks")
val running = MarathonTestHelper.minimalRunning("/running1".toPath, since= Timestamp.apply(0))
val unreachable = MarathonTestHelper.minimalUnreachableTask("/running2".toPath, since = Timestamp.apply(0))

taskTracker.tasksByApp()(any[ExecutionContext]) returns Future.successful(TasksByApp.forTasks(running, unreachable))

When("a check is performed")
val testProbe = TestProbe()
testProbe.send(checkActor, ExpungeOverdueLostTasksActor.Tick)
testProbe.receiveOne(3.seconds)

And("one kill call is issued")
verify(stateOpProcessor, once).process(any)
}
}
1 change: 1 addition & 0 deletions src/test/scala/mesosphere/marathon/test/Mockito.scala
Expand Up @@ -20,6 +20,7 @@ trait Mockito extends MockitoSugar {
def times(num: Int) = M.times(num)
def timeout(millis: Int) = M.timeout(millis.toLong)
def atLeastOnce = M.atLeastOnce()
def once = M.times(1)
def atLeast(num: Int) = M.atLeast(num)
def atMost(num: Int) = M.atMost(num)
def never = M.never()
Expand Down

0 comments on commit f0d56de

Please sign in to comment.