/
StoppingBehavior.scala
81 lines (65 loc) · 2.39 KB
/
StoppingBehavior.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package mesosphere.marathon.upgrade
import akka.actor.{ Actor, ActorLogging, Cancellable }
import akka.event.EventStream
import mesosphere.marathon.TaskUpgradeCanceledException
import mesosphere.marathon.core.task.Task
import mesosphere.marathon.core.task.tracker.TaskTracker
import mesosphere.marathon.event.MesosStatusUpdateEvent
import mesosphere.marathon.state.PathId
import mesosphere.marathon.upgrade.StoppingBehavior.SynchronizeTasks
import org.apache.mesos.{ Protos, SchedulerDriver }
import scala.collection.mutable
import scala.concurrent.Promise
import scala.concurrent.duration._
trait StoppingBehavior extends Actor with ActorLogging {
import context.dispatcher
def driver: SchedulerDriver
def eventBus: EventStream
def promise: Promise[Unit]
def taskTracker: TaskTracker
def appId: PathId
var idsToKill: mutable.Set[Task.Id]
var periodicalCheck: Cancellable = _
def initializeStop(): Unit
final override def preStart(): Unit = {
eventBus.subscribe(self, classOf[MesosStatusUpdateEvent])
initializeStop()
scheduleSynchronization()
checkFinished()
}
final override def postStop(): Unit = {
eventBus.unsubscribe(self)
if (!promise.isCompleted)
promise.tryFailure(
new TaskUpgradeCanceledException(
"The operation has been cancelled"))
}
val taskFinished = "^TASK_(ERROR|FAILED|FINISHED|LOST|KILLED)$".r
def receive: Receive = {
case MesosStatusUpdateEvent(_, taskId, taskFinished(_), _, _, _, _, _, _, _, _) if idsToKill(taskId) =>
idsToKill.remove(taskId)
log.info(s"Task $taskId has been killed. Waiting for ${idsToKill.size} more tasks to be killed.")
checkFinished()
case SynchronizeTasks =>
val trackerIds = taskTracker.appTasksSync(appId).map(_.taskId).toSet
idsToKill = idsToKill.filter(trackerIds)
idsToKill.foreach { id =>
driver.killTask(id.mesosTaskId)
}
scheduleSynchronization()
checkFinished()
case x: MesosStatusUpdateEvent => log.debug(s"Received $x")
}
def checkFinished(): Unit =
if (idsToKill.isEmpty) {
log.info("Successfully killed all the tasks")
promise.success(())
periodicalCheck.cancel()
context.stop(self)
}
def scheduleSynchronization(): Unit =
periodicalCheck = context.system.scheduler.scheduleOnce(5.seconds, self, SynchronizeTasks)
}
object StoppingBehavior {
case object SynchronizeTasks
}