Skip to content

Commit

Permalink
fixes #476
Browse files Browse the repository at this point in the history
- made statuses field on MarathonTask obsolete
- added status field that contains only the last status
- added migration for status field changes
- added `status` and `status[]` params to /v2/tasks
- updated docs

Conflicts:
	docs/docs/rest-api.md
	src/main/java/mesosphere/marathon/Protos.java
	src/main/scala/mesosphere/marathon/MarathonSchedulerActor.scala
	src/main/scala/mesosphere/marathon/api/EndpointsHelper.scala
	src/main/scala/mesosphere/marathon/api/v2/TasksResource.scala
	src/main/scala/mesosphere/marathon/state/AppDefinition.scala
	src/main/scala/mesosphere/marathon/state/Migration.scala
	src/test/scala/mesosphere/marathon/tasks/TaskTrackerTest.scala
  • Loading branch information
drexin authored and ConnorDoyle committed Sep 18, 2014
1 parent 58fb982 commit cffd9c4
Show file tree
Hide file tree
Showing 8 changed files with 426 additions and 195 deletions.
555 changes: 380 additions & 175 deletions src/main/java/mesosphere/marathon/Protos.java

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion src/main/proto/marathon.proto
Expand Up @@ -61,8 +61,9 @@ message MarathonTask {
repeated mesos.Attribute attributes = 4;
optional int64 staged_at = 5;
optional int64 started_at = 6;
repeated mesos.TaskStatus statuses = 7;
repeated mesos.TaskStatus OBSOLETE_statuses = 7;
optional string version = 8;
optional mesos.TaskStatus status = 9;
}

message MarathonApp {
Expand Down
4 changes: 3 additions & 1 deletion src/main/scala/mesosphere/marathon/MarathonScheduler.scala
Expand Up @@ -284,7 +284,9 @@ class MarathonScheduler @Inject() (
for (appId <- appIds) scale(driver, appId)

val knownTaskStatuses = appIds.flatMap { appId =>
taskTracker.get(appId).flatMap(_.getStatusesList.asScala.lastOption)
taskTracker.get(appId).collect {
case task if task.hasStatus => task.getStatus
}
}

for (unknownAppId <- taskTracker.list.keySet -- appIds) {
Expand Down
5 changes: 3 additions & 2 deletions src/main/scala/mesosphere/marathon/api/EndpointsHelper.scala
Expand Up @@ -2,6 +2,7 @@ package mesosphere.marathon.api

import mesosphere.marathon.tasks.TaskTracker
import mesosphere.marathon.api.v1.AppDefinition
import org.apache.mesos.Protos.TaskState
import scala.collection.JavaConverters._

object EndpointsHelper {
Expand All @@ -22,15 +23,15 @@ object EndpointsHelper {

if (app.ports.isEmpty) {
sb.append(s"${cleanId}$delimiter $delimiter")
tasks.foreach { task =>
for (task <- tasks if task.getStatus.getState == TaskState.TASK_RUNNING) {
sb.append(s"${task.getHost} ")
}
sb.append(s"\n")
}
else {
for ((port, i) <- app.ports.zipWithIndex) {
sb.append(s"$cleanId$delimiter$port$delimiter")
for (task <- tasks) {
for (task <- tasks if task.getStatus.getState == TaskState.TASK_RUNNING) {
val ports = task.getPortsList.asScala.lift
sb.append(s"${task.getHost}:${ports(i).getOrElse(0)}$delimiter")
}
Expand Down
Expand Up @@ -193,8 +193,7 @@ object AppDefinition {
*/
@JsonProperty
val tasksRunning: Int = appTasks.count { task =>
val statusList = task.getStatusesList.asScala
statusList.nonEmpty && statusList.last.getState == TaskState.TASK_RUNNING
task.hasStatus && task.getStatus.getState == TaskState.TASK_RUNNING
}
}

Expand Down
43 changes: 33 additions & 10 deletions src/main/scala/mesosphere/marathon/api/v2/TasksResource.scala
@@ -1,33 +1,51 @@
package mesosphere.marathon.api.v2

import java.util
import javax.ws.rs._
import scala.Array
import javax.ws.rs.core.MediaType
import javax.inject.Inject
import mesosphere.marathon.MarathonSchedulerService
import mesosphere.marathon.api.{ EndpointsHelper }
import mesosphere.marathon.api.v2.json.EnrichedTask
import mesosphere.marathon.health.HealthCheckManager
import mesosphere.marathon.{ MarathonConf, MarathonSchedulerService }
import mesosphere.marathon.tasks.TaskTracker
import mesosphere.marathon.api.EndpointsHelper
import mesosphere.marathon.api.v2.json.EnrichedTask
import org.apache.log4j.Logger
import com.codahale.metrics.annotation.Timed
import mesosphere.marathon.health.HealthCheckActor.Health
import org.apache.mesos.Protos.TaskState
import scala.concurrent.Await
import scala.collection.JavaConverters._

@Path("v2/tasks")
class TasksResource @Inject() (service: MarathonSchedulerService,
taskTracker: TaskTracker) {
taskTracker: TaskTracker,
healthCheckManager: HealthCheckManager,
config: MarathonConf) {

val log = Logger.getLogger(getClass.getName)

@GET
@Produces(Array(MediaType.APPLICATION_JSON))
@Timed
def indexJson() = {
val flatTasksList = taskTracker.list.flatMap {
case (appId, setOfTasks) =>
setOfTasks.tasks.map(EnrichedTask(appId, _, Seq[Option[Health]]()))
}

Map("tasks" -> flatTasksList)
def indexJson(@QueryParam("status") status: String,
@QueryParam("status[]") statuses: util.List[String]) = {
if (status != null) statuses.add(status)
val statusSet = statuses.asScala.flatMap(toTaskState).toSet
Map(
"tasks" -> taskTracker.list.flatMap {
case (appId, setOfTasks) =>
setOfTasks.tasks.collect {
case task if statusSet.isEmpty || statusSet(task.getStatus.getState) =>
EnrichedTask(
appId,
task,
Await.result(healthCheckManager.status(appId, task.getId), config.zkTimeoutDuration)
)
}
}
)
}

@GET
Expand All @@ -39,4 +57,9 @@ class TasksResource @Inject() (service: MarathonSchedulerService,
"\t"
)

private def toTaskState(state: String): Option[TaskState] = state.toLowerCase match {
case "running" => Some(TaskState.TASK_RUNNING)
case "staging" => Some(TaskState.TASK_STAGING)
case _ => None
}
}
2 changes: 1 addition & 1 deletion src/main/scala/mesosphere/marathon/state/Migration.scala
Expand Up @@ -34,4 +34,4 @@ object StorageVersions {
}

def empty: StorageVersion = StorageVersions(0, 0, 0)
}
}
6 changes: 3 additions & 3 deletions src/main/scala/mesosphere/marathon/tasks/TaskTracker.scala
Expand Up @@ -50,7 +50,7 @@ class TaskTracker @Inject() (state: State, config: MarathonConf) {
get(appName).remove(stagedTask)
stagedTask.toBuilder
.setStartedAt(System.currentTimeMillis)
.addStatuses(status)
.setStatus(status)
.build

case _ =>
Expand All @@ -60,7 +60,7 @@ class TaskTracker @Inject() (state: State, config: MarathonConf) {
.setId(taskId)
.setStagedAt(System.currentTimeMillis)
.setStartedAt(System.currentTimeMillis)
.addStatuses(status)
.setStatus(status)
.build
}
get(appName) += task
Expand Down Expand Up @@ -103,7 +103,7 @@ class TaskTracker @Inject() (state: State, config: MarathonConf) {
case Some(task) =>
get(appName).remove(task)
val updatedTask = task.toBuilder
.addStatuses(status)
.setStatus(status)
.build
get(appName) += updatedTask
store(appName).map(_ => Some(updatedTask))
Expand Down

1 comment on commit cffd9c4

@ConnorDoyle
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WIP backport; still needs task status migration. Pulled that part out due to large diff. CC @drexin

Please sign in to comment.