forked from purpleidea/mgmt
/
mgraph.go
215 lines (192 loc) · 7.77 KB
/
mgraph.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
// Mgmt
// Copyright (C) 2013-2017+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package resources // TODO: can this be a separate package or will it break the dag?
import (
"log"
"sync"
"github.com/purpleidea/mgmt/event"
"github.com/purpleidea/mgmt/pgraph"
"github.com/purpleidea/mgmt/util/semaphore"
)
//go:generate stringer -type=graphState -output=graphstate_stringer.go
type graphState uint
const (
graphStateNil graphState = iota
graphStateStarting
graphStateStarted
graphStatePausing
graphStatePaused
)
// MGraph is a meta graph structure used to encapsulate a generic graph
// structure alongside some non-generic elements.
type MGraph struct {
//Graph *pgraph.Graph
*pgraph.Graph // wrap a graph, and use its methods directly
Data *ResData
FastPause bool
Debug bool
state graphState
// ptr b/c: Mutex/WaitGroup must not be copied after first use
mutex *sync.Mutex
wg *sync.WaitGroup
slock *sync.Mutex
semas map[string]*semaphore.Semaphore
}
// Init initializes the internal structures.
func (obj *MGraph) Init() {
obj.mutex = &sync.Mutex{}
obj.wg = &sync.WaitGroup{}
obj.slock = &sync.Mutex{} // semaphore lock
obj.semas = make(map[string]*semaphore.Semaphore)
}
// getState returns the state of the graph. This state is used for optimizing
// certain algorithms by knowing what part of processing the graph is currently
// undergoing.
func (obj *MGraph) getState() graphState {
//obj.mutex.Lock()
//defer obj.mutex.Unlock()
return obj.state
}
// setState sets the graph state and returns the previous state.
func (obj *MGraph) setState(state graphState) graphState {
obj.mutex.Lock()
defer obj.mutex.Unlock()
prev := obj.getState()
obj.state = state
return prev
}
// Update switches our graph structure to the new graph that we pass to it. This
// also updates any references to the old graph so that they're now correct. It
// also updates references to the Data structure that should be passed around.
func (obj *MGraph) Update(newGraph *pgraph.Graph) {
obj.Graph = newGraph.Copy() // store as new active graph
// update stored reference to graph and other values that need storing!
for _, v := range obj.Graph.Vertices() {
res := VtoR(v) // resource
*res.Data() = *obj.Data // push the data around
}
}
// Start is a main kick to start the graph. It goes through in reverse
// topological sort order so that events can't hit un-started vertices.
func (obj *MGraph) Start(first bool) { // start or continue
log.Printf("State: %v -> %v", obj.setState(graphStateStarting), obj.getState())
defer log.Printf("State: %v -> %v", obj.setState(graphStateStarted), obj.getState())
t, _ := obj.Graph.TopologicalSort()
indegree := obj.Graph.InDegree() // compute all of the indegree's
reversed := pgraph.Reverse(t)
wg := &sync.WaitGroup{}
for _, v := range reversed { // run the Setup() for everyone first
// run these in parallel, as long as we wait before continuing
wg.Add(1)
go func(vertex pgraph.Vertex, res Res) {
defer wg.Done()
// TODO: can't we do this check outside of the goroutine?
if !*res.Working() { // if Worker() is not running...
// NOTE: vertex == res here, but pass in both in
// case we ever wrap the res in something before
// we store it as the vertex in the graph struct
res.Setup(obj, vertex, res) // initialize some vars in the resource
}
}(v, VtoR(v))
}
wg.Wait()
// run through the topological reverse, and start or unpause each vertex
for _, v := range reversed {
res := VtoR(v)
// selective poke: here we reduce the number of initial pokes
// to the minimum required to activate every vertex in the
// graph, either by direct action, or by getting poked by a
// vertex that was previously activated. if we poke each vertex
// that has no incoming edges, then we can be sure to reach the
// whole graph. Please note: this may mask certain optimization
// failures, such as any poke limiting code in Poke() or
// BackPoke(). You might want to disable this selective start
// when experimenting with and testing those elements.
// if we are unpausing (since it's not the first run of this
// function) we need to poke to *unpause* every graph vertex,
// and not just selectively the subset with no indegree.
// let the startup code know to poke or not
// this triggers a CheckApply AFTER Watch is Running()
// We *don't* need to also do this to new nodes or nodes that
// are about to get unpaused, because they'll get poked by one
// of the indegree == 0 vertices, and an important aspect of the
// Process() function is that even if the state is correct, it
// will pass through the Poke so that it flows through the DAG.
res.Starter(indegree[v] == 0)
var unpause = true
if !*res.Working() { // if Worker() is not running...
*res.Working() = true // set Worker() running flag
unpause = false // doesn't need unpausing on first start
obj.wg.Add(1)
// must pass in value to avoid races...
// see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/
go func(vv pgraph.Vertex) {
defer obj.wg.Done()
// unset Worker() running flag just before exit
defer func() { *VtoR(vv).Working() = false }()
defer VtoR(vv).Reset()
// TODO: if a sufficient number of workers error,
// should something be done? Should these restart
// after perma-failure if we have a graph change?
log.Printf("%s: Started", vv)
if err := VtoR(vv).Worker(); err != nil { // contains the Watch and CheckApply loops
log.Printf("%s: Exited with failure: %v", vv, err)
return
}
log.Printf("%s: Exited", vv)
}(v)
}
select {
case <-res.Started(): // block until started
case <-res.Stopped(): // we failed on init
// if the resource Init() fails, we don't hang!
}
if unpause { // unpause (if needed)
res.SendEvent(event.EventStart, nil) // sync!
}
}
// we wait for everyone to start before exiting!
}
// Pause sends pause events to the graph in a topological sort order. If you set
// the fastPause argument to true, then it will ask future propagation waves to
// not run through the graph before exiting, and instead will exit much quicker.
func (obj *MGraph) Pause(fastPause bool) {
log.Printf("State: %v -> %v", obj.setState(graphStatePausing), obj.getState())
defer log.Printf("State: %v -> %v", obj.setState(graphStatePaused), obj.getState())
if fastPause {
obj.FastPause = true // set flag
}
t, _ := obj.Graph.TopologicalSort()
for _, v := range t { // squeeze out the events...
VtoR(v).SendEvent(event.EventPause, nil) // sync
}
obj.FastPause = false // reset flag
}
// Exit sends exit events to the graph in a topological sort order.
func (obj *MGraph) Exit() {
if obj.Graph == nil { // empty graph that wasn't populated yet
return
}
// FIXME: a second ^C could put this into fast pause, but do it for now!
obj.Pause(true) // implement this with pause to avoid duplicating the code
t, _ := obj.Graph.TopologicalSort()
for _, v := range t { // squeeze out the events...
// turn off the taps...
VtoR(v).Exit() // sync
}
obj.wg.Wait() // for now, this doesn't need to be a separate Wait() method
}