-
Notifications
You must be signed in to change notification settings - Fork 42
/
process_manager.go
250 lines (217 loc) · 6.6 KB
/
process_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
package pexec
import (
"context"
"sync"
"github.com/edaniels/golog"
"github.com/pkg/errors"
"go.uber.org/multierr"
)
// A ProcessManager is responsible for controlling the lifecycle of processes
// added to it.
type ProcessManager interface {
// ProcessIDs returns the IDs of all managed processes.
ProcessIDs() []string
// ProcessByID fetches the process by the given ID if it exists.
ProcessByID(id string) (ManagedProcess, bool)
// RemoveProcessByID removes a managed process by the given ID if it exists.
// It does not stop it it.
RemoveProcessByID(id string) (ManagedProcess, bool)
// Start starts all added processes and errors if any fail to start. The
// given context is only used for one shot processes.
Start(ctx context.Context) error
// AddProcess manages the given process and potentially starts it depending
// on the state of the ProcessManager and if it's requested. The same context
// semantics in Start apply here. If the process is replaced by its ID, the
// replaced process will be returned.
AddProcess(ctx context.Context, proc ManagedProcess, start bool) (ManagedProcess, error)
// AddProcess manages a new process from the given configuration and
// potentially starts it depending on the state of the ProcessManager.
// The same context semantics in Start apply here. If the process is
// replaced by its ID, the replaced process will be returned.
AddProcessFromConfig(ctx context.Context, config ProcessConfig) (ManagedProcess, error)
// Stop signals and waits for all managed processes to stop and returns
// any errors from stopping them.
Stop() error
// Clone gives a copy of the processes being managed but provides
// no guarantee of the current state of the processes.
Clone() ProcessManager
}
type processManager struct {
mu sync.Mutex
processesByID map[string]ManagedProcess
logger golog.Logger
started bool
stopped bool
}
// NewProcessManager returns a new ProcessManager.
func NewProcessManager(logger golog.Logger) ProcessManager {
return &processManager{
logger: logger,
processesByID: map[string]ManagedProcess{},
}
}
func (pm *processManager) ProcessIDs() []string {
pm.mu.Lock()
defer pm.mu.Unlock()
ids := make([]string, 0, len(pm.processesByID))
for id := range pm.processesByID {
ids = append(ids, id)
}
return ids
}
func (pm *processManager) ProcessByID(id string) (ManagedProcess, bool) {
pm.mu.Lock()
defer pm.mu.Unlock()
if pm.stopped {
return nil, false
}
proc, ok := pm.processesByID[id]
return proc, ok
}
func (pm *processManager) RemoveProcessByID(id string) (ManagedProcess, bool) {
pm.mu.Lock()
defer pm.mu.Unlock()
if pm.stopped {
return nil, false
}
proc, ok := pm.processesByID[id]
if !ok {
return nil, false
}
delete(pm.processesByID, id)
return proc, true
}
func (pm *processManager) Start(ctx context.Context) error {
pm.mu.Lock()
defer pm.mu.Unlock()
if pm.stopped {
return errAlreadyStopped
}
if pm.started {
return nil
}
for _, proc := range pm.processesByID {
if err := proc.Start(ctx); err != nil {
// be sure to stop anything that has already started
return multierr.Combine(err, pm.stop())
}
}
pm.started = true
return nil
}
func (pm *processManager) AddProcess(ctx context.Context, proc ManagedProcess, start bool) (ManagedProcess, error) {
pm.mu.Lock()
defer pm.mu.Unlock()
if pm.stopped {
return nil, errAlreadyStopped
}
replaced := pm.processesByID[proc.ID()]
if pm.started && start {
if err := proc.Start(ctx); err != nil {
return nil, err
}
}
pm.processesByID[proc.ID()] = proc
return replaced, nil
}
func (pm *processManager) AddProcessFromConfig(ctx context.Context, config ProcessConfig) (ManagedProcess, error) {
if pm.stopped {
return nil, errAlreadyStopped
}
proc := NewManagedProcess(config, pm.logger)
return pm.AddProcess(ctx, proc, true)
}
func (pm *processManager) Stop() error {
pm.mu.Lock()
defer pm.mu.Unlock()
if pm.stopped {
return nil
}
return pm.stop()
}
func (pm *processManager) stop() error {
pm.stopped = true
var err error
for _, proc := range pm.processesByID {
err = multierr.Combine(err, proc.Stop())
}
pm.processesByID = nil
return err
}
func (pm *processManager) Clone() ProcessManager {
pm.mu.Lock()
defer pm.mu.Unlock()
processesByIDCopy := make(map[string]ManagedProcess, len(pm.processesByID))
for k, v := range pm.processesByID {
processesByIDCopy[k] = v
}
return &processManager{
processesByID: processesByIDCopy,
logger: pm.logger,
started: pm.started,
stopped: pm.stopped,
}
}
// MergeAddProcessManagers merges in another process manager and takes ownership of
// its processes. This may replace existing processes and it's the
// callers responsibility to stop what has been replaced.
func MergeAddProcessManagers(dst, src ProcessManager) ([]ManagedProcess, error) {
var replacements []ManagedProcess
ids := src.ProcessIDs()
for _, id := range ids {
proc, ok := src.ProcessByID(id)
if !ok {
continue // should not happen
}
replaced, err := dst.AddProcess(context.Background(), proc, false)
if err != nil {
return nil, err
}
if replaced != nil {
replacements = append(replacements, replaced)
}
}
return replacements, nil
}
// MergeRemoveProcessManagers merges in another process manager and removes ownership of
// its own processes. It does not stop the processes.
func MergeRemoveProcessManagers(dst, src ProcessManager) []ManagedProcess {
ids := src.ProcessIDs()
removed := make([]ManagedProcess, 0, len(ids))
for _, id := range ids {
proc, ok := dst.RemoveProcessByID(id)
if !ok {
continue // should not happen
}
removed = append(removed, proc)
}
return removed
}
type noopProcessManager struct{}
// NoopProcessManager does nothing and is useful for places that
// need to return some ProcessManager.
var NoopProcessManager = &noopProcessManager{}
func (noop noopProcessManager) ProcessIDs() []string {
return nil
}
func (noop noopProcessManager) ProcessByID(id string) (ManagedProcess, bool) {
return nil, false
}
func (noop noopProcessManager) RemoveProcessByID(id string) (ManagedProcess, bool) {
return nil, false
}
func (noop noopProcessManager) Start(ctx context.Context) error {
return nil
}
func (noop noopProcessManager) AddProcess(ctx context.Context, proc ManagedProcess, start bool) (ManagedProcess, error) {
return nil, errors.New("unsupported")
}
func (noop noopProcessManager) AddProcessFromConfig(ctx context.Context, config ProcessConfig) (ManagedProcess, error) {
return nil, errors.New("unsupported")
}
func (noop noopProcessManager) Stop() error {
return nil
}
func (noop noopProcessManager) Clone() ProcessManager {
return noop
}