Skip to content

Commit

Permalink
renamed all relevant variables to condition
Browse files Browse the repository at this point in the history
Summary: After renaming `InstanceStatus` to `Condition` all relevant variables must be renamed to `condition` as well. Depends on D66

Test Plan: sbt test

Reviewers: aquamatthias, meichstedt

Reviewed By: aquamatthias, meichstedt

Subscribers: jenkins, marathon-team

Differential Revision: https://phabricator.mesosphere.com/D67
  • Loading branch information
unterstein committed Oct 14, 2016
1 parent 81c2b7c commit 0f13200
Show file tree
Hide file tree
Showing 48 changed files with 198 additions and 198 deletions.
Expand Up @@ -542,8 +542,8 @@ class SchedulerActions(
launchQueue.purge(runSpec.id)

val toKill = instanceTracker.specInstancesSync(runSpec.id).toSeq
.filter(t => runningOrStaged.contains(t.state.status))
.sortWith(sortByStateAndTime)
.filter(t => runningOrStaged.contains(t.state.condition))
.sortWith(sortByConditionAndTime)
.take(launchedCount - targetCount)

log.info("Killing tasks {}", toKill.map(_.instanceId))
Expand All @@ -569,13 +569,13 @@ class SchedulerActions(
}

private[this] object SchedulerActions {
def sortByStateAndTime(a: Instance, b: Instance): Boolean = {
def sortByConditionAndTime(a: Instance, b: Instance): Boolean = {
def stagedEarlier: Boolean = {
val stagedA = a.tasks.map(_.status.stagedAt)
val stagedB = b.tasks.map(_.status.stagedAt)
if (stagedA.nonEmpty && stagedB.nonEmpty) stagedA.max.compareTo(stagedB.max) > 0 else false
}
runningOrStaged(b.state.status).compareTo(runningOrStaged(a.state.status)) match {
runningOrStaged(b.state.condition).compareTo(runningOrStaged(a.state.condition)) match {
case 0 => stagedEarlier
case value: Int => value > 0
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/mesosphere/marathon/api/v2/TasksResource.scala
Expand Up @@ -46,7 +46,7 @@ class TasksResource @Inject() (
@QueryParam("status[]") statuses: util.List[String],
@Context req: HttpServletRequest): Response = authenticated(req) { implicit identity =>
Option(status).map(statuses.add)
val statusSet: Set[Condition] = statuses.flatMap(toTaskState)(collection.breakOut)
val conditionSet: Set[Condition] = statuses.flatMap(toTaskState)(collection.breakOut)

val taskList = instanceTracker.instancesBySpecSync

Expand All @@ -69,7 +69,7 @@ class TasksResource @Inject() (
val enrichedTasks = tasks.flatMap {
case (appId, task) =>
val app = appIdsToApps(appId)
if (isAuthorized(ViewRunSpec, app) && (statusSet.isEmpty || statusSet(task.status.taskStatus))) {
if (isAuthorized(ViewRunSpec, app) && (conditionSet.isEmpty || conditionSet(task.status.condition))) {
Some(EnrichedTask(
appId,
task,
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/mesosphere/marathon/api/v2/json/Formats.scala
Expand Up @@ -613,7 +613,7 @@ trait EventFormats {
implicit lazy val InstanceChangedEventWrites: Writes[InstanceChanged] = Writes { change =>
Json.obj(
"instanceId" -> change.id,
"instanceStatus" -> change.status.toString,
"condition" -> change.condition.toString,
"runSpecId" -> change.runSpecId,
"agentId" -> change.instance.agentInfo.agentId,
"host" -> change.instance.agentInfo.host,
Expand All @@ -636,7 +636,7 @@ trait EventFormats {
Json.obj(
"instanceId" -> change.id,
"runSpecId" -> change.runSpecId,
"instanceStatus" -> change.status.toString,
"condition" -> change.condition.toString,
"timestamp" -> change.timestamp,
"eventType" -> change.eventType
)
Expand Down
Expand Up @@ -86,7 +86,7 @@ object Condition {
case object Unknown extends Condition with Terminal

object Terminal {
def unapply(status: Condition): Option[Terminal] = status match {
def unapply(condition: Condition): Option[Terminal] = condition match {
case terminal: Terminal => Some(terminal)
case _ => None
}
Expand Down Expand Up @@ -115,7 +115,7 @@ object Condition {
}
// scalastyle:on

def unapply(status: Condition): Option[String] = Some(status.toString.toLowerCase)
def unapply(condition: Condition): Option[String] = Some(condition.toString.toLowerCase)

implicit val instanceStatusFormat = Json.format[Condition]
implicit val conditionFormat = Json.format[Condition]
}
6 changes: 3 additions & 3 deletions src/main/scala/mesosphere/marathon/core/event/Events.scala
Expand Up @@ -217,23 +217,23 @@ case class InstanceChanged(
id: Instance.Id,
runSpecVersion: Timestamp,
runSpecId: PathId,
status: Condition,
condition: Condition,
instance: Instance) extends MarathonEvent {
override val eventType: String = "instance_changed_event"
override val timestamp: String = Timestamp.now().toString
}
object InstanceChanged {
def apply(instanceChange: InstanceChange): InstanceChanged = {
InstanceChanged(instanceChange.id, instanceChange.runSpecVersion,
instanceChange.runSpecId, instanceChange.status, instanceChange.instance)
instanceChange.runSpecId, instanceChange.condition, instanceChange.instance)
}
}

/** Event indicating an unknown instance is terminal */
case class UnknownInstanceTerminated(
id: Instance.Id,
runSpecId: PathId,
status: Condition) extends MarathonEvent {
condition: Condition) extends MarathonEvent {
override val eventType: String = "unknown_instance_terminated_event"
override val timestamp: String = Timestamp.now().toString
}
Expand Down
70 changes: 35 additions & 35 deletions src/main/scala/mesosphere/marathon/core/instance/Instance.scala
Expand Up @@ -83,13 +83,13 @@ case class Instance(
case TaskUpdateEffect.Update(updatedTask) =>
val updated = this.copy(
state = state.copy(
status = Condition.Staging,
condition = Condition.Staging,
since = timestamp
),
tasksMap = tasksMap.updated(task.taskId, updatedTask),
runSpecVersion = newRunSpecVersion
)
val events = eventsGenerator.events(updated.state.status, updated, task = None, timestamp)
val events = eventsGenerator.events(updated.state.condition, updated, task = None, timestamp)
InstanceUpdateEffect.Update(updated, oldState = Some(this), events)

case _ =>
Expand All @@ -102,7 +102,7 @@ case class Instance(
case InstanceUpdateOperation.ReservationTimeout(_) =>
if (this.isReserved) {
// TODO(PODS): don#t use Timestamp.now()
val events = eventsGenerator.events(state.status, this, task = None, Timestamp.now())
val events = eventsGenerator.events(state.condition, this, task = None, Timestamp.now())
InstanceUpdateEffect.Expunge(this, events)
} else {
InstanceUpdateEffect.Failure("ReservationTimeout can only be applied to a reserved instance")
Expand Down Expand Up @@ -163,7 +163,7 @@ object Instance {
* The order of the status is important.
* If 2 tasks are Running and 2 tasks already Finished, the final status is Running.
*/
private val AllInstanceStatuses: Seq[Condition] = Seq(
private val AllInstanceConditions: Seq[Condition] = Seq(
Condition.Created,
Condition.Reserved,
Condition.Running,
Expand All @@ -176,7 +176,7 @@ object Instance {
* The order of the status is important.
* If one task is Error and one task is Staging, the instance status is Error.
*/
private val DistinctInstanceStatuses: Seq[Condition] = Seq(
private val DistinctInstanceConditions: Seq[Condition] = Seq(
Condition.Error,
Condition.Failed,
Condition.Gone,
Expand All @@ -199,7 +199,7 @@ object Instance {

new Instance(task.taskId.instanceId, task.agentInfo, state, tasksMap, task.runSpecVersion)
}
case class InstanceState(status: Condition, since: Timestamp, healthy: Option[Boolean])
case class InstanceState(condition: Condition, since: Timestamp, healthy: Option[Boolean])

@SuppressWarnings(Array("TraversableHead"))
private[instance] def newInstanceState(
Expand All @@ -209,30 +209,30 @@ object Instance {

val tasks = newTaskMap.values

//compute the new instance status
val stateMap = tasks.groupBy(_.status.taskStatus)
val status = if (stateMap.size == 1) {
// all tasks have the same status -> this is the instance status
stateMap.keys.head
// compute the new instance state
val conditionMap = tasks.groupBy(_.status.condition)
val condition = if (conditionMap.size == 1) {
// all tasks have the same condition -> this is the instance condition
conditionMap.keys.head
} else {
// since we don't have a distinct state, we remove states where all tasks have to agree on
// and search for a distinct state
val distinctStates = Instance.AllInstanceStatuses.foldLeft(stateMap) { (ds, status) => ds - status }
Instance.DistinctInstanceStatuses.find(distinctStates.contains).getOrElse {
// if no distinct state is found all tasks are in different AllInstanceStatuses
val distinctCondition = Instance.AllInstanceConditions.foldLeft(conditionMap) { (ds, status) => ds - status }
Instance.DistinctInstanceConditions.find(distinctCondition.contains).getOrElse {
// if no distinct condition is found all tasks are in different AllInstanceConditions
// we pick the first matching one
Instance.AllInstanceStatuses.find(stateMap.contains).getOrElse {
Instance.AllInstanceConditions.find(conditionMap.contains).getOrElse {
// if we come here, something is wrong, since we covered all existing states
Instance.log.error(s"Could not compute new instance state for state map: $stateMap")
Instance.log.error(s"Could not compute new instance condition for condition map: $conditionMap")
Condition.Unknown
}
}
}

val healthy = computeHealth(tasks.toVector)
maybeOldState match {
case Some(state) if state.status == status && state.healthy == healthy => state
case _ => InstanceState(status, timestamp, healthy)
case Some(state) if state.condition == condition && state.healthy == healthy => state
case _ => InstanceState(condition, timestamp, healthy)
}
}

Expand All @@ -243,7 +243,7 @@ object Instance {
task.isRunning && task.status.mesosStatus.fold(false)(m => m.hasHealthy && m.getHealthy)
}
private[this] def isPending(task: Task): Boolean = {
task.status.taskStatus != Condition.Running && task.status.taskStatus != Condition.Finished
task.status.condition != Condition.Running && task.status.condition != Condition.Finished
}

/**
Expand Down Expand Up @@ -332,21 +332,21 @@ object Instance {
)
}

implicit class InstanceStatusComparison(val instance: Instance) extends AnyVal {
def isReserved: Boolean = instance.state.status == Condition.Reserved
def isCreated: Boolean = instance.state.status == Condition.Created
def isError: Boolean = instance.state.status == Condition.Error
def isFailed: Boolean = instance.state.status == Condition.Failed
def isFinished: Boolean = instance.state.status == Condition.Finished
def isKilled: Boolean = instance.state.status == Condition.Killed
def isKilling: Boolean = instance.state.status == Condition.Killing
def isRunning: Boolean = instance.state.status == Condition.Running
def isStaging: Boolean = instance.state.status == Condition.Staging
def isStarting: Boolean = instance.state.status == Condition.Starting
def isUnreachable: Boolean = instance.state.status == Condition.Unreachable
def isGone: Boolean = instance.state.status == Condition.Gone
def isUnknown: Boolean = instance.state.status == Condition.Unknown
def isDropped: Boolean = instance.state.status == Condition.Dropped
implicit class InstanceConditionComparison(val instance: Instance) extends AnyVal {
def isReserved: Boolean = instance.state.condition == Condition.Reserved
def isCreated: Boolean = instance.state.condition == Condition.Created
def isError: Boolean = instance.state.condition == Condition.Error
def isFailed: Boolean = instance.state.condition == Condition.Failed
def isFinished: Boolean = instance.state.condition == Condition.Finished
def isKilled: Boolean = instance.state.condition == Condition.Killed
def isKilling: Boolean = instance.state.condition == Condition.Killing
def isRunning: Boolean = instance.state.condition == Condition.Running
def isStaging: Boolean = instance.state.condition == Condition.Staging
def isStarting: Boolean = instance.state.condition == Condition.Starting
def isUnreachable: Boolean = instance.state.condition == Condition.Unreachable
def isGone: Boolean = instance.state.condition == Condition.Gone
def isUnknown: Boolean = instance.state.condition == Condition.Unknown
def isDropped: Boolean = instance.state.condition == Condition.Dropped
}

/**
Expand All @@ -369,7 +369,7 @@ object Instance {
}
implicit val agentFormat: Format[AgentInfo] = Json.format[AgentInfo]
implicit val idFormat: Format[Instance.Id] = Json.format[Instance.Id]
implicit val instanceStatusFormat: Format[Condition] = Json.format[Condition]
implicit val instanceConditionFormat: Format[Condition] = Json.format[Condition]
implicit val instanceStateFormat: Format[InstanceState] = Json.format[InstanceState]
implicit val instanceJsonFormat: Format[Instance] = Json.format[Instance]
implicit lazy val tasksMapFormat: Format[Map[Task.Id, Task]] = Format(
Expand Down
Expand Up @@ -32,8 +32,8 @@ sealed trait InstanceChange extends Product with Serializable {
val id: Instance.Id = instance.instanceId
/** version of the related run spec */
val runSpecVersion: Timestamp = instance.runSpecVersion
/** Status of the [[Instance]] */
val status: Condition = instance.state.status
/** Condition of the [[Instance]] */
val condition: Condition = instance.state.condition
/** Id of the related [[mesosphere.marathon.state.RunSpec]] */
val runSpecId: PathId = id.runSpecId
/** the previous state of this instance */
Expand Down
Expand Up @@ -9,15 +9,15 @@ import mesosphere.marathon.state.Timestamp
import scala.collection.immutable.Seq

object InstanceChangedEventsGenerator {
def events(status: Condition, instance: Instance, task: Option[Task], now: Timestamp): Seq[MarathonEvent] = {
def events(condition: Condition, instance: Instance, task: Option[Task], now: Timestamp): Seq[MarathonEvent] = {
val runSpecId = instance.runSpecId
val version = instance.runSpecVersion

def instanceEvent = InstanceChanged(
id = instance.instanceId,
runSpecVersion = version,
runSpecId = runSpecId,
status = status,
condition = condition,
instance = instance
)

Expand All @@ -31,7 +31,7 @@ object InstanceChangedEventsGenerator {
val taskEvent = MesosStatusUpdateEvent(
slaveId,
task.taskId,
status.toMesosStateName,
condition.toMesosStateName,
message,
appId = runSpecId,
host,
Expand Down
Expand Up @@ -44,7 +44,7 @@ object InstanceUpdateOperation {
hostPorts: Seq[Int]) extends InstanceUpdateOperation

case class MesosUpdate(
instance: Instance, status: Condition,
instance: Instance, condition: Condition,
mesosStatus: mesos.Protos.TaskStatus, now: Timestamp) extends InstanceUpdateOperation {

override def instanceId: Instance.Id = instance.instanceId
Expand Down
Expand Up @@ -87,7 +87,7 @@ class InstanceOpFactoryImpl(
runSpecVersion = runSpec.version,
status = Task.Status(
stagedAt = clock.now(),
taskStatus = Condition.Created
condition = Condition.Created
),
hostPorts = ports.flatten
)
Expand Down Expand Up @@ -184,7 +184,7 @@ class InstanceOpFactoryImpl(
timestamp = clock.now(),
status = Task.Status(
stagedAt = clock.now(),
taskStatus = Condition.Created
condition = Condition.Created
),
hostPorts = ports.flatten)

Expand Down Expand Up @@ -217,15 +217,15 @@ class InstanceOpFactoryImpl(
reservation = Task.Reservation(persistentVolumeIds, Task.Reservation.State.New(timeout = Some(timeout))),
status = Task.Status(
stagedAt = now,
taskStatus = Condition.Reserved
condition = Condition.Reserved
),
runSpecVersion = runSpec.version
)
val instance = Instance(
instanceId = task.taskId.instanceId,
agentInfo = agentInfo,
state = InstanceState(
status = Condition.Reserved,
condition = Condition.Reserved,
since = now,
healthy = None
),
Expand Down Expand Up @@ -287,7 +287,7 @@ object InstanceOpFactoryImpl {
taskId = taskId,
agentInfo = agentInfo,
runSpecVersion = pod.version,
status = Task.Status(stagedAt = since, taskStatus = Condition.Created),
status = Task.Status(stagedAt = since, condition = Condition.Created),
hostPorts = taskHostPorts
)
task.taskId -> task
Expand Down
Expand Up @@ -259,11 +259,11 @@ private class TaskLauncherActor(
case change: InstanceChange =>
change match {
case update: InstanceUpdated =>
log.info("receiveInstanceUpdate: {} is {}", update.id, update.status)
log.info("receiveInstanceUpdate: {} is {}", update.id, update.condition)
instanceMap += update.id -> update.instance

case update: InstanceDeleted =>
log.info("receiveInstanceUpdate: {} was deleted ({})", update.id, update.status)
log.info("receiveInstanceUpdate: {} was deleted ({})", update.id, update.condition)
removeInstance(update.id)
// A) If the app has constraints, we need to reconsider offers that
// we already rejected. E.g. when a host:unique constraint prevented
Expand Down Expand Up @@ -351,7 +351,7 @@ private class TaskLauncherActor(
sender ! MatchedInstanceOps(offer.getId)

case ActorOfferMatcher.MatchOffer(deadline, offer) =>
val reachableInstances = instanceMap.values.filterNot(_.state.status.isLost)
val reachableInstances = instanceMap.values.filterNot(_.state.condition.isLost)
val matchRequest = InstanceOpFactory.Request(runSpec, offer, reachableInstances, instancesToLaunch)
val instanceOp: Option[InstanceOp] = instanceOpFactory.buildTaskOp(matchRequest)
instanceOp match {
Expand Down

0 comments on commit 0f13200

Please sign in to comment.