-
Notifications
You must be signed in to change notification settings - Fork 265
/
plugin_manager.go
288 lines (239 loc) · 7.81 KB
/
plugin_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
package pluginmanager
import (
"fmt"
"log"
"os"
"os/exec"
"strings"
"sync"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
"github.com/turbot/go-kit/helpers"
sdkshared "github.com/turbot/steampipe-plugin-sdk/v3/grpc/shared"
pb "github.com/turbot/steampipe/pluginmanager/grpc/proto"
pluginshared "github.com/turbot/steampipe/pluginmanager/grpc/shared"
"github.com/turbot/steampipe/utils"
)
type runningPlugin struct {
client *plugin.Client
reattach *pb.ReattachConfig
initialized chan bool
}
// PluginManager is the real implementation of grpc.PluginManager
type PluginManager struct {
pb.UnimplementedPluginManagerServer
Plugins map[string]*runningPlugin
mut sync.Mutex
connectionConfig map[string]*pb.ConnectionConfig
logger hclog.Logger
}
func NewPluginManager(connectionConfig map[string]*pb.ConnectionConfig, logger hclog.Logger) *PluginManager {
pluginManager := &PluginManager{
logger: logger,
connectionConfig: connectionConfig,
Plugins: make(map[string]*runningPlugin),
}
return pluginManager
}
// plugin interface functions
func (m *PluginManager) Serve() {
// create a plugin map, using ourselves as the implementation
pluginMap := map[string]plugin.Plugin{
pluginshared.PluginName: &pluginshared.PluginManagerPlugin{Impl: m},
}
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: pluginshared.Handshake,
Plugins: pluginMap,
// enable gRPC serving for this plugin...
GRPCServer: plugin.DefaultGRPCServer,
})
}
func (m *PluginManager) Get(req *pb.GetRequest) (*pb.GetResponse, error) {
resp := &pb.GetResponse{ReattachMap: make(map[string]*pb.ReattachConfig)}
var errors []error
var resultLock sync.Mutex
var resultWg sync.WaitGroup
log.Printf("[TRACE] PluginManager Get, connections: '%s'\n", req.Connections)
for _, c := range req.Connections {
resultWg.Add(1)
go func(connectionName string) {
reattach, err := m.getPlugin(connectionName)
resultLock.Lock()
if err != nil {
errors = append(errors, err)
} else {
resp.ReattachMap[connectionName] = reattach
}
resultLock.Unlock()
resultWg.Done()
}(c)
}
resultWg.Wait()
if len(errors) > 0 {
return nil, utils.CombineErrors(errors...)
}
// TODO ADD PLUGINS TO OUR STATE FILE - JUST SERIALISE THE Plugins map?
log.Printf("[TRACE] PluginManager Get returning %+v", resp)
return resp, nil
}
func (m *PluginManager) getPlugin(connection string) (_ *pb.ReattachConfig, err error) {
defer func() {
if r := recover(); r != nil {
err = helpers.ToError(r)
}
}()
log.Printf("[TRACE] PluginManager getPlugin connection '%s'\n", connection)
// reason for starting the plugin (if we need to
var reason string
// is this plugin already running
// lock access to plugin map
m.mut.Lock()
p, ok := m.Plugins[connection]
if ok {
// unlock access to map
m.mut.Unlock()
// so we have the plugin in our map - is it started?
err = m.waitForPluginLoad(connection, p)
if err != nil {
return nil, err
}
log.Printf("[TRACE] connection %s is loaded, check for running PID", connection)
// ok so the plugin should now be running
// now check if the plugins process IS running
reattach := p.reattach
// check the pid exists
exists, _ := utils.PidExists(int(reattach.Pid))
if exists {
// so the plugin is good
log.Printf("[TRACE] PluginManager found '%s' in map, pid %d, reattach %v", connection, reattach.Pid, reattach)
// return the reattach config
return reattach, nil
}
// TODO combine with 'else' code to remove duplication
// either the pid does not exist or the plugin has exited
// remove from map
m.mut.Lock()
delete(m.Plugins, connection)
m.Plugins[connection] = &runningPlugin{
initialized: make(chan (bool), 1),
}
m.mut.Unlock()
// update reason
reason = fmt.Sprintf("PluginManager found pid %d for connection '%s' in plugin map but plugin process does not exist - killing client and removing from map", reattach.Pid, connection)
} else {
// so the plugin is NOT loaded or loading - this is the first time anyone has requested this connection
// put in a placeholder so no other thread tries to create start this plugin
m.Plugins[connection] = &runningPlugin{
initialized: make(chan (bool), 1),
}
// unlock access to map
m.mut.Unlock()
reason = fmt.Sprintf("PluginManager %p '%s' NOT found in map - starting", m, connection)
}
// fall through to plugin startup
// log the startup reason
log.Printf("[TRACE] %s", reason)
// so we need to start the plugin
client, err := m.startPlugin(connection)
if err != nil {
m.mut.Lock()
delete(m.Plugins, connection)
m.mut.Unlock()
log.Println("[TRACE] startPlugin failed with", err)
return nil, err
}
// store the client to our map
reattach := m.storeClientToMap(connection, client)
log.Printf("[TRACE] PluginManager getPlugin complete, returning reattach config with PID: %d", reattach.Pid)
// and return
return reattach, nil
}
// create reattach config for plugin, store to map and close initialized channel
func (m *PluginManager) storeClientToMap(connection string, client *plugin.Client) *pb.ReattachConfig {
// lock access to map
m.mut.Lock()
defer m.mut.Unlock()
reattach := pb.NewReattachConfig(client.ReattachConfig())
p := m.Plugins[connection]
p.client = client
p.reattach = reattach
m.Plugins[connection] = p
// mark as initialized
close(p.initialized)
return reattach
}
func (m *PluginManager) SetConnectionConfigMap(configMap map[string]*pb.ConnectionConfig) {
m.mut.Lock()
defer m.mut.Unlock()
names := make([]string, len(configMap))
idx := 0
for name := range configMap {
names[idx] = name
idx++
}
log.Printf("[TRACE] SetConnectionConfigMap: %s", strings.Join(names, ","))
m.connectionConfig = configMap
}
func (m *PluginManager) Shutdown(req *pb.ShutdownRequest) (resp *pb.ShutdownResponse, err error) {
log.Printf("[TRACE] PluginManager Shutdown %v", m.Plugins)
m.mut.Lock()
defer func() {
m.mut.Unlock()
if r := recover(); r != nil {
err = helpers.ToError(r)
}
}()
for name, p := range m.Plugins {
if p.client == nil {
log.Printf("[WARN] plugin %s has no client - cannot kill", name)
// shouldn't happen but has been observed in error situations
continue
}
log.Printf("[TRACE] killing plugin %s (%v)", name, p.reattach.Pid)
p.client.Kill()
}
return &pb.ShutdownResponse{}, nil
}
func (m *PluginManager) startPlugin(connection string) (*plugin.Client, error) {
log.Printf("[TRACE] ************ start plugin %s ********************\n", connection)
// get connection config
connectionConfig, ok := m.connectionConfig[connection]
if !ok {
return nil, fmt.Errorf("no config loaded for connection %s", connection)
}
pluginPath, err := GetPluginPath(connectionConfig.Plugin, connectionConfig.PluginShortName)
if err != nil {
return nil, err
}
// create the plugin map
pluginName := connectionConfig.Plugin
pluginMap := map[string]plugin.Plugin{
pluginName: &sdkshared.WrapperPlugin{},
}
cmd := exec.Command(pluginPath)
// pass env to command
cmd.Env = os.Environ()
client := plugin.NewClient(&plugin.ClientConfig{
HandshakeConfig: sdkshared.Handshake,
Plugins: pluginMap,
Cmd: cmd,
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
// pass our logger to the plugin client to ensure plugin logs end up in logfile
Logger: m.logger,
})
if _, err := client.Start(); err != nil {
return nil, err
}
return client, nil
}
func (m *PluginManager) waitForPluginLoad(connection string, p *runningPlugin) error {
pluginStartTimeoutSecs := 5
select {
case <-p.initialized:
log.Printf("[TRACE] initialized: %d", p.reattach.Pid)
return nil
case <-time.After(time.Duration(pluginStartTimeoutSecs) * time.Second):
return fmt.Errorf("timed out waiting for %s to startup after %d seconds", connection, pluginStartTimeoutSecs)
}
}