Skip to content

Commit

Permalink
Merge d07e145 into 104f21b
Browse files Browse the repository at this point in the history
  • Loading branch information
meichstedt committed Sep 5, 2016
2 parents 104f21b + d07e145 commit 4cc9c95
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 13 deletions.
Expand Up @@ -355,7 +355,9 @@ private class TaskLauncherActor(
sender ! MatchedTaskOps(offer.getId, Seq.empty)

case ActorOfferMatcher.MatchOffer(deadline, offer) =>
val matchRequest = TaskOpFactory.Request(runSpec, offer, tasksMap, tasksToLaunch)
import org.apache.mesos.Protos.TaskState
val reachableTasks = tasksMap.values.filterNot(_.mesosStatus.exists(_.getState == TaskState.TASK_LOST))
val matchRequest = TaskOpFactory.Request(runSpec, offer, reachableTasks, tasksToLaunch)
val taskOp: Option[TaskOp] = taskOpFactory.buildTaskOp(matchRequest)
taskOp match {
case Some(op) => handleTaskOp(op, offer)
Expand Down
Expand Up @@ -22,6 +22,7 @@ sealed trait MarathonTaskStatus {

object MarathonTaskStatus {
import org.apache.mesos.Protos.TaskState._
import org.apache.mesos.Protos.TaskStatus.Reason

sealed trait Terminal

Expand All @@ -33,18 +34,27 @@ object MarathonTaskStatus {
case TASK_FINISHED => Finished
case TASK_KILLED => Killed
case TASK_KILLING => Killing
case TASK_LOST => taskStatus.getReason match {
case reason: mesos.Protos.TaskStatus.Reason if MarathonTaskStatusMapping.Gone(reason) => Gone
case reason: mesos.Protos.TaskStatus.Reason if MarathonTaskStatusMapping.Unreachable(reason) => Unreachable
case reason: mesos.Protos.TaskStatus.Reason if MarathonTaskStatusMapping.Unknown(reason) => Unknown
case _ => Dropped
}
case TASK_LOST => inferStateForLost(taskStatus.getReason, taskStatus.getMessage)
case TASK_RUNNING => Running
case TASK_STAGING => Staging
case TASK_STARTING => Starting
}
}

private[this] val MessageIndicatingUnknown = "Reconciliation: Task is unknown to the"

private[this] def inferStateForLost(reason: Reason, message: String): MarathonTaskStatus = {
if (message.startsWith(MessageIndicatingUnknown) || MarathonTaskStatusMapping.Unknown(reason)) {
Unknown
} else if (MarathonTaskStatusMapping.Gone(reason)) {
Gone
} else if (MarathonTaskStatusMapping.Unreachable(reason)) {
Unreachable
} else {
Dropped
}
}

// Reserved: Task with persistent volume has reservation, but is not launched yet
case object Reserved extends MarathonTaskStatus

Expand Down
Expand Up @@ -21,7 +21,7 @@ import mesosphere.marathon.core.task.tracker.TaskTracker
import mesosphere.marathon.state.{ AppDefinition, PathId, Timestamp }
import mesosphere.marathon.{ MarathonSpec, MarathonTestHelper, Protos }
import org.mockito
import org.mockito.Mockito
import org.mockito.{ ArgumentCaptor, Mockito }
import org.scalatest.GivenWhenThen
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -141,6 +141,33 @@ class TaskLauncherActorTest extends MarathonSpec with GivenWhenThen {
Mockito.verify(taskOpFactory).buildTaskOp(matchRequest)
}

test("Don't pass the task factory lost tasks when asking for new tasks") {
import mesosphere.marathon.Protos.Constraint.Operator

val uniqueConstraint = Protos.Constraint.newBuilder
.setField("hostname")
.setOperator(Operator.UNIQUE)
.setValue("")
.build
val constraintApp: AppDefinition = f.app.copy(constraints = Set(uniqueConstraint))
val offer = MarathonTestHelper.makeBasicOffer().build()

val lostTask = MarathonTestHelper.mininimalLostTask(f.app.id)

Mockito.when(taskTracker.tasksByAppSync).thenReturn(TaskTracker.TasksByApp.forTasks(lostTask))
val captor = ArgumentCaptor.forClass(classOf[TaskOpFactory.Request])
// we're only interested in capturing the argument, so return value doesn't matte
Mockito.when(taskOpFactory.buildTaskOp(captor.capture())).thenReturn(None)

val launcherRef = createLauncherRef(instances = 1)
launcherRef ! RateLimiterActor.DelayUpdate(constraintApp, clock.now())

Await.result(launcherRef ? ActorOfferMatcher.MatchOffer(clock.now() + 1.seconds, offer), 3.seconds).asInstanceOf[MatchedTaskOps]
Mockito.verify(taskTracker).tasksByAppSync
Mockito.verify(taskOpFactory).buildTaskOp(m.any())
assert(captor.getValue.taskMap.isEmpty)
}

test("Wait for inflight task launches on stop") {
Mockito.when(taskTracker.tasksByAppSync).thenReturn(TaskTracker.TasksByApp.empty)
val offer = MarathonTestHelper.makeBasicOffer().build()
Expand Down
Expand Up @@ -56,16 +56,17 @@ object TaskStatusUpdateTestHelper {
TaskStateChange.Expunge(task)))
}

def makeMesosTaskStatus(taskId: Task.Id, state: TaskState, maybeHealth: Option[Boolean] = None, maybeReason: Option[TaskStatus.Reason] = None) = {
def makeMesosTaskStatus(taskId: Task.Id, state: TaskState, maybeHealth: Option[Boolean] = None, maybeReason: Option[TaskStatus.Reason] = None, maybeMessage: Option[String] = None, timestamp: Timestamp = Timestamp.zero) = {
val mesosStatus = TaskStatus.newBuilder
.setTaskId(taskId.mesosTaskId)
.setState(state)
maybeHealth.foreach(mesosStatus.setHealthy)
maybeReason.foreach(mesosStatus.setReason)
maybeMessage.foreach(mesosStatus.setMessage)
mesosStatus.build()
}
def makeTaskStatus(taskId: Task.Id, state: TaskState, maybeHealth: Option[Boolean] = None, maybeReason: Option[TaskStatus.Reason] = None) = {
makeMesosTaskStatus(taskId, state, maybeHealth, maybeReason)
def makeTaskStatus(taskId: Task.Id, state: TaskState, maybeHealth: Option[Boolean] = None, maybeReason: Option[TaskStatus.Reason] = None, maybeMessage: Option[String] = None) = {
makeMesosTaskStatus(taskId, state, maybeHealth, maybeReason, maybeMessage)
}

def running(task: Task = defaultTask) = taskUpdateFor(task, MarathonTaskStatus.Running, makeTaskStatus(task.taskId, TaskState.TASK_RUNNING))
Expand All @@ -78,8 +79,8 @@ object TaskStatusUpdateTestHelper {

def finished(task: Task = defaultTask) = taskExpungeFor(task, MarathonTaskStatus.Finished, makeTaskStatus(task.taskId, TaskState.TASK_FINISHED))

def lost(reason: Reason, task: Task = defaultTask) = {
val mesosStatus = makeTaskStatus(task.taskId, TaskState.TASK_LOST, maybeReason = Some(reason))
def lost(reason: Reason, task: Task = defaultTask, maybeMessage: Option[String] = None) = {
val mesosStatus = makeTaskStatus(task.taskId, TaskState.TASK_LOST, maybeReason = Some(reason), maybeMessage = maybeMessage)
val marathonTaskStatus = MarathonTaskStatus(mesosStatus)

marathonTaskStatus match {
Expand Down
Expand Up @@ -148,6 +148,30 @@ class TaskStateOpResolverTest
}
}

for (
reason <- MarathonTaskStatusMapping.Unreachable
) {
test(s"a TASK_LOST update with an unreachable $reason but a message saying that the task is unknown to the slave is mapped to an expunge") {
val f = new Fixture

Given("an existing task")
f.taskTracker.task(f.existingTask.taskId) returns Future.successful(Some(f.existingTask))

When("A TASK_LOST update is received indicating the agent is unknown")
val message = "Reconciliation: Task is unknown to the slave"
val stateOp: TaskStateOp.MesosUpdate = TaskStatusUpdateTestHelper.lost(reason, f.existingTask, Some(message)).wrapped.stateOp.asInstanceOf[TaskStateOp.MesosUpdate]
val stateChange = f.stateOpResolver.resolve(stateOp).futureValue

Then("taskTracker.task is called")
verify(f.taskTracker).task(f.existingTask.taskId)

And("the result is an expunge")
stateChange shouldBe a[TaskStateChange.Expunge]
And("there are no more interactions")
f.verifyNoMoreInteractions()
}
}

test("a subsequent TASK_LOST update with another reason is mapped to a noop and will not update the timestamp") {
val f = new Fixture

Expand All @@ -168,6 +192,27 @@ class TaskStateOpResolverTest
f.verifyNoMoreInteractions()
}

test("a subsequent TASK_LOST update with a message saying that the task is unknown to the slave is mapped to an expunge") {
val f = new Fixture

Given("an existing lost task")
f.taskTracker.task(f.existingLostTask.taskId) returns Future.successful(Some(f.existingLostTask))

When("A subsequent TASK_LOST update is received indicating the agent is unknown")
val reason = mesos.Protos.TaskStatus.Reason.REASON_RECONCILIATION
val maybeMessage = Some("Reconciliation: Task is unknown to the slave")
val stateOp: TaskStateOp.MesosUpdate = TaskStatusUpdateTestHelper.lost(reason, f.existingLostTask, maybeMessage).wrapped.stateOp.asInstanceOf[TaskStateOp.MesosUpdate]
val stateChange = f.stateOpResolver.resolve(stateOp).futureValue

Then("taskTracker.task is called")
verify(f.taskTracker).task(f.existingLostTask.taskId)

And("the result is an expunge")
stateChange shouldBe a[TaskStateChange.Expunge]
And("there are no more interactions")
f.verifyNoMoreInteractions()
}

test("ReservationTimeout fails if task does not exist") {
val f = new Fixture
Given("a non existing taskId")
Expand Down

0 comments on commit 4cc9c95

Please sign in to comment.