/
SchedulerLogicHandler.scala
209 lines (190 loc) · 8.74 KB
/
SchedulerLogicHandler.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
package com.mesosphere.usi.core
import com.mesosphere.mesos.client.MesosCalls
import com.mesosphere.usi.core.logic.{MesosEventsLogic, SpecLogic}
import com.mesosphere.usi.core.models.PodRecord
import com.mesosphere.usi.core.models.{PodId, PodInvalid, PodSpec, PodSpecUpdated, SpecEvent, SpecsSnapshot}
import org.apache.mesos.v1.scheduler.Protos.{Call, Event => MesosEvent}
/**
* Container class responsible for keeping track of the state and cache.
*
* ## SpecificationState vs SchedulerLogic state
*
* The SchedulerLogic has two pieces of state: SpecificationState and SchedulerState (records and statuses). The
* SchedulerLogic maintains the latter. All manipulation to the SchedulerLogic state is done via one of the two
* processes:
*
* - The specification state is updated through incoming SpecEvents (we consume and replicate the
* framework-implementation's specifications)
* - The SchedulerState (statuses, records, etc.) is updated through StateEvents that returned as intents
*
* As such, it's worth emphasizing that business logic does not have any direct side-effects, and it manipulates the
* SchedulerState only by returning intents. This allows us the following:
*
* - It's more efficient, since it saves us the trouble of diffing a large data-structure with each update
* - We have built-in guarantees that evolutions to the SchedulerState can be reliably replicated by processing these
* events, since they led to the changes in the first place
* - We can easily know which portions of the SchedulerState should be persisted during the persistence layer.
* - It restricts, via the type system, the portions of the state the business logic is allowed (IE: it
* would be illegal for the business logic to update a specification)
*
* ## Intents and Events
*
* In the SchedulerLogic code, we'll use the word intents and events. Intents are things not yet applied, and should
* be. Events are things that were applied and we're notifying you about.
*
* In the SchedulerLogic, a StateEvent is used both to manipulate the SchedulerState (similar to how event-sourced
* persistent actors evolve their state), and is also used to describe the evolution (so that the state can be
* incrementally persisted and followed). In the SchedulerLogic, we'll refer to a StateEvent as an intent until it is
* applied, after-which it will be called an event. Mesos calls will be referred to as intents as they are not applied
* until they are published to the Mesos Master.
*
* ## Concept of a "Frame"
*
* Visualized:
*
* [Incoming Event: offer match]
* |
* v
* (beginning of frame)
* apply pod launch logic:
* create an intent for Mesos accept offer call for matched pending launch pods
* specify the existence of a podRecord, with the launch time and agentId
* update internal cache
* process revive / suppress need
* (end of frame)
* |
* v
* [Emit all state events and Mesos call intents]
*
* In the SchedulerLogic, each event is processed (either a Specification updated event, or a Mesos event) in what we
* call a "frame". During the processing of a single event (such as an offer), the SchedulerLogic will be updated
* through the application of several functions. The result of that event will lead to the emission of several
* stateEvents and Mesos intents, which will be accumulated and emitted via a data structure we call the FrameResult.
*/
private[core] class SchedulerLogicHandler(mesosCallFactory: MesosCalls, initialPodRecords: Map[PodId, PodRecord]) {
private val schedulerLogic = new SpecLogic(mesosCallFactory)
private val mesosEventsLogic = new MesosEventsLogic(mesosCallFactory)
/**
* Our view of the framework-implementations specifications, which we replicate by consuming Specification events
*/
private var specs: SpecState = SpecState.empty
/**
* State managed by the SchedulerLogicHandler (records and statuses). Statuses are derived from Mesos events. Records
* contain persistent, non-recoverable facts from Mesos, such as pod-launched time, agentId on which a pod was
* launched, agent information or the time at which a pod was first seen as unhealthy or unreachable.
*/
private var state: SchedulerState = SchedulerState(podStatuses = Map.empty, podRecords = initialPodRecords)
/**
* Cached view of SchedulerState and SpecificationState. We incrementally update this computation at the end of each
* frame based on podIds becoming dirty.
*/
private var cachedPendingLaunch = CachedPendingLaunch(Set.empty)
def validateEvent(msg: SpecEvent): Seq[PodInvalid] = msg match {
case SpecsSnapshot(podSpecs, _) =>
podSpecs.flatMap { p =>
PodSpec.isValid(p).toList.map(err => PodInvalid(p.id, Seq(err)))
}
case PodSpecUpdated(_, Some(podSpec)) =>
PodSpec.isValid(podSpec).toList.map(err => PodInvalid(podSpec.id, Seq(err)))
case _ => Seq.empty
}
def handleSpecEvent(msg: SpecEvent): SchedulerEvents = {
val invalidPods = validateEvent(msg)
if (invalidPods.nonEmpty) {
SchedulerEvents(invalidPods.toList, List.empty)
} else {
handleFrame { builder =>
builder
.applySpecEvent(msg)
.process { (specs, state, dirtyPodIds) =>
schedulerLogic.computeNextStateForPods(specs, state)(dirtyPodIds)
}
}
}
}
/**
* Process a Mesos event and update internal state.
*
* @param event
* @return The events describing state changes as Mesos call intents
*/
def handleMesosEvent(event: MesosEvent): SchedulerEvents = {
handleFrame { builder =>
builder.process { (specs, state, _) =>
mesosEventsLogic.processEvent(specs, state, cachedPendingLaunch.pendingLaunch)(event)
}
}
}
/**
* Instantiate a frameResultBuilder instance, call the handler, then follow up with housekeeping:
*
* - Prune terminal / unreachable podStatuses for which no podSpec is defined
* - Update the pending launch set index / cache
* - (WIP) issue any revive calls (this should be done elsewhere)
*
* @return The total state effects applied over the life-cycle of this state evaluation.
*/
private def handleFrame(fn: FrameResultBuilder => FrameResultBuilder): SchedulerEvents = {
val frameResultBuilder = fn(FrameResultBuilder.givenState(this.specs, this.state)).process {
(specs, state, dirtyPodIds) =>
pruneTaskStatuses(specs, state)(dirtyPodIds)
}.process(updateCachesAndSuppressAndRevive)
// update our state for the next frame processing
this.state = frameResultBuilder.state
this.specs = frameResultBuilder.specs
// Return our result
frameResultBuilder.result
}
def generateSuppressCalls(pendingLaunch: Set[PodId], newLaunched: Set[PodId]): List[Call] = {
val alreadyLaunchedRoles: Iterator[String] = newLaunched.iterator.collect {
case id if specs.podSpecs.contains(id) => specs.podSpecs(id).runSpec.role
}
val rolesBeingLaunched: Set[String] = pendingLaunch
.map(id => specs.podSpecs.get(id))
.collect { case Some(p) => p.runSpec.role }
alreadyLaunchedRoles
.filterNot(rolesBeingLaunched)
.toSeq
.distinct
.map { r =>
mesosCallFactory.newSuppress(Some(r))
}
.toList
}
private def updateCachesAndSuppressAndRevive(
specs: SpecState,
state: SchedulerState,
dirtyPodIds: Set[PodId]): SchedulerEvents = {
val updateResult = this.cachedPendingLaunch.update(specs, state, dirtyPodIds)
this.cachedPendingLaunch = updateResult.cachedPendingLaunch
val reviveCalls = updateResult.toBeLaunched.iterator
.map(id => specs.podSpecs.get(id))
.collect { case Some(p) => p.runSpec.role }
.toSeq
.distinct
.map(r => mesosCallFactory.newRevive(Some(r)))
.toList
val suppressCalls = generateSuppressCalls(cachedPendingLaunch.pendingLaunch, updateResult.launched)
SchedulerEvents(mesosCalls = reviveCalls ++ suppressCalls)
}
/**
* We remove a task if it is not reachable and running, and it has no podSpec defined
*
* Should be called with the effects already applied for the specified podIds
*
* @param podIds podIds changed during the last state
* @return
*/
private def pruneTaskStatuses(specs: SpecState, state: SchedulerState)(podIds: Set[PodId]): SchedulerEvents = {
podIds.iterator.filter { podId =>
state.podStatuses.contains(podId)
}.filter { podId =>
val podSpecDefined = !specs.podSpecs.contains(podId)
// prune terminal statuses for which there's no defined podSpec
!podSpecDefined && state.podStatuses(podId).isTerminalOrUnreachable
}.foldLeft(SchedulerEventsBuilder.empty) { (effects, podId) =>
effects.withPodStatus(podId, None)
}
.result
}
}