forked from vmware-tanzu/velero
/
restartable_process.go
191 lines (155 loc) · 5.31 KB
/
restartable_process.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
/*
Copyright 2018 the Velero contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package clientmgmt
import (
"sync"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type RestartableProcessFactory interface {
newRestartableProcess(command string, logger logrus.FieldLogger, logLevel logrus.Level) (RestartableProcess, error)
}
type restartableProcessFactory struct {
}
func newRestartableProcessFactory() RestartableProcessFactory {
return &restartableProcessFactory{}
}
func (rpf *restartableProcessFactory) newRestartableProcess(command string, logger logrus.FieldLogger, logLevel logrus.Level) (RestartableProcess, error) {
return newRestartableProcess(command, logger, logLevel)
}
type RestartableProcess interface {
addReinitializer(key kindAndName, r reinitializer)
reset() error
resetIfNeeded() error
getByKindAndName(key kindAndName) (interface{}, error)
stop()
}
// restartableProcess encapsulates the lifecycle for all plugins contained in a single executable file. It is able
// to restart a plugin process if it is terminated for any reason. If this happens, all plugins are reinitialized using
// the original configuration data.
type restartableProcess struct {
command string
logger logrus.FieldLogger
logLevel logrus.Level
// lock guards all of the fields below
lock sync.RWMutex
process Process
plugins map[kindAndName]interface{}
reinitializers map[kindAndName]reinitializer
resetFailures int
}
// reinitializer is capable of reinitializing a restartable plugin instance using the newly dispensed plugin.
type reinitializer interface {
// reinitialize reinitializes a restartable plugin instance using the newly dispensed plugin.
reinitialize(dispensed interface{}) error
}
// newRestartableProcess creates a new restartableProcess for the given command and options.
func newRestartableProcess(command string, logger logrus.FieldLogger, logLevel logrus.Level) (RestartableProcess, error) {
p := &restartableProcess{
command: command,
logger: logger,
logLevel: logLevel,
plugins: make(map[kindAndName]interface{}),
reinitializers: make(map[kindAndName]reinitializer),
}
// This launches the process
err := p.reset()
return p, err
}
// addReinitializer registers the reinitializer r for key.
func (p *restartableProcess) addReinitializer(key kindAndName, r reinitializer) {
p.lock.Lock()
defer p.lock.Unlock()
p.reinitializers[key] = r
}
// reset acquires the lock and calls resetLH.
func (p *restartableProcess) reset() error {
p.lock.Lock()
defer p.lock.Unlock()
return p.resetLH()
}
// resetLH (re)launches the plugin process. It redispenses all previously dispensed plugins and reinitializes all the
// registered reinitializers using the newly dispensed plugins.
//
// Callers of resetLH *must* acquire the lock before calling it.
func (p *restartableProcess) resetLH() error {
if p.resetFailures > 10 {
return errors.Errorf("unable to restart plugin process: execeeded maximum number of reset failures")
}
process, err := newProcess(p.command, p.logger, p.logLevel)
if err != nil {
p.resetFailures++
return err
}
p.process = process
// Redispense any previously dispensed plugins, reinitializing if necessary.
// Start by creating a new map to hold the newly dispensed plugins.
newPlugins := make(map[kindAndName]interface{})
for key := range p.plugins {
// Re-dispense
dispensed, err := p.process.dispense(key)
if err != nil {
p.resetFailures++
return err
}
// Store in the new map
newPlugins[key] = dispensed
// Reinitialize
if r, found := p.reinitializers[key]; found {
if err := r.reinitialize(dispensed); err != nil {
p.resetFailures++
return err
}
}
}
// Make sure we update p's plugins!
p.plugins = newPlugins
p.resetFailures = 0
return nil
}
// resetIfNeeded checks if the plugin process has exited and resets p if it has.
func (p *restartableProcess) resetIfNeeded() error {
p.lock.Lock()
defer p.lock.Unlock()
if p.process.exited() {
p.logger.Info("Plugin process exited - restarting.")
return p.resetLH()
}
return nil
}
// getByKindAndName acquires the lock and calls getByKindAndNameLH.
func (p *restartableProcess) getByKindAndName(key kindAndName) (interface{}, error) {
p.lock.Lock()
defer p.lock.Unlock()
return p.getByKindAndNameLH(key)
}
// getByKindAndNameLH returns the dispensed plugin for key. If the plugin hasn't been dispensed before, it dispenses a
// new one.
func (p *restartableProcess) getByKindAndNameLH(key kindAndName) (interface{}, error) {
dispensed, found := p.plugins[key]
if found {
return dispensed, nil
}
dispensed, err := p.process.dispense(key)
if err != nil {
return nil, err
}
p.plugins[key] = dispensed
return p.plugins[key], nil
}
// stop terminates the plugin process.
func (p *restartableProcess) stop() {
p.lock.Lock()
p.process.kill()
p.lock.Unlock()
}