/
SchedulerLogicGraph.scala
114 lines (100 loc) · 4.33 KB
/
SchedulerLogicGraph.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package com.mesosphere.usi.core
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FanInShape2, Inlet, Outlet}
import com.mesosphere.mesos.client.MesosCalls
import com.mesosphere.usi.core.models.{PodId, PodRecord, SpecEvent, StateSnapshot}
import org.apache.mesos.v1.scheduler.Protos.{Event => MesosEvent}
import scala.collection.mutable
object SchedulerLogicGraph {
val BUFFER_SIZE = 32
}
/**
* A simple stateful fan-in graph stage that instantiates an instance of the SchedulerLogic, feeding messages from each
* of the two inputs as appropriate.
*
* This graph component looks like this:
*
* Spec Events +---------------------+
* -----------------> |
* | | Response
* | Scheduler Logic >-------------
* Mesos Events | |
* -----------------> |
* +---------------------+
*
* Only one event is processed at a time.
*
* For more detail, see Scheduler
*
* The component has an internal buffer; if this buffer is above BUFFER_SIZE, then it will stop pulling.
*
* Author's Note:
*
* This was prematurely implemented; a simple fan-in merge component would suffice for now. In early
* development phases of the scheduler, it seemed that having an actual BIDI flow stage with robust stream termination
* logic would've been needed.
*
* It's existence is only warranted by forecasted future needs. It's kept as a graph with an internal buffer as we will
* likely need timers, other callbacks, and additional output ports (such as an offer event stream?).
*/
private[core] class SchedulerLogicGraph(mesosCallFactory: MesosCalls, initialPodRecords: Map[PodId, PodRecord])
extends GraphStage[FanInShape2[SpecEvent, MesosEvent, SchedulerEvents]] {
import SchedulerLogicGraph.BUFFER_SIZE
private val mesosEventsInlet = Inlet[MesosEvent]("mesos-events")
private val specEventsInlet = Inlet[SpecEvent]("specs")
private val frameResultOutlet = Outlet[SchedulerEvents]("effects")
// Define the shape of this stage, which is SourceShape with the port we defined above
override val shape: FanInShape2[SpecEvent, MesosEvent, SchedulerEvents] =
new FanInShape2(specEventsInlet, mesosEventsInlet, frameResultOutlet)
// This is where the actual (possibly stateful) logic will live
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) {
private[this] var handler: SchedulerLogicHandler = _
val pendingEffects: mutable.Queue[SchedulerEvents] = mutable.Queue.empty
setHandler(mesosEventsInlet, new InHandler {
override def onPush(): Unit = {
pushOrQueueIntents(handler.handleMesosEvent(grab(mesosEventsInlet)))
maybePull()
}
})
setHandler(specEventsInlet, new InHandler {
override def onPush(): Unit = {
pushOrQueueIntents(handler.handleSpecEvent(grab(specEventsInlet)))
maybePull()
}
})
setHandler(frameResultOutlet, new OutHandler {
override def onPull(): Unit = {
if (pendingEffects.nonEmpty) {
push(frameResultOutlet, pendingEffects.dequeue())
maybePull()
}
}
})
override def preStart(): Unit = {
handler = new SchedulerLogicHandler(mesosCallFactory, initialPodRecords)
// Publish the initial state snapshot event; podStatuses will not be in the snapshot in the future
pushOrQueueIntents(SchedulerEvents(List(StateSnapshot(podRecords = initialPodRecords.values.toSeq))))
}
def pushOrQueueIntents(effects: SchedulerEvents): Unit = {
if (isAvailable(frameResultOutlet)) {
if (pendingEffects.nonEmpty) {
throw new IllegalStateException("We should always immediately push on pull if effects are queued")
}
push(frameResultOutlet, effects)
} else {
pendingEffects.enqueue(effects)
}
maybePull()
}
def maybePull(): Unit = {
if (pendingEffects.length < BUFFER_SIZE) {
if (!hasBeenPulled(mesosEventsInlet))
pull(mesosEventsInlet)
if (!hasBeenPulled(specEventsInlet))
pull(specEventsInlet)
}
}
}
}
}