-
Notifications
You must be signed in to change notification settings - Fork 7.4k
/
supervisor.go
148 lines (124 loc) · 3.45 KB
/
supervisor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package plugin
import (
"fmt"
"os/exec"
"path/filepath"
"runtime"
"strings"
"sync"
"time"
plugin "github.com/hashicorp/go-plugin"
"github.com/mattermost/mattermost-server/v5/einterfaces"
"github.com/mattermost/mattermost-server/v5/mlog"
"github.com/mattermost/mattermost-server/v5/model"
)
type supervisor struct {
lock sync.RWMutex
client *plugin.Client
hooks Hooks
implemented [TotalHooksId]bool
pid int
}
func newSupervisor(pluginInfo *model.BundleInfo, apiImpl API, parentLogger *mlog.Logger, metrics einterfaces.MetricsInterface) (retSupervisor *supervisor, retErr error) {
sup := supervisor{}
defer func() {
if retErr != nil {
sup.Shutdown()
}
}()
wrappedLogger := pluginInfo.WrapLogger(parentLogger)
hclogAdaptedLogger := &hclogAdapter{
wrappedLogger: wrappedLogger.WithCallerSkip(1),
extrasKey: "wrapped_extras",
}
pluginMap := map[string]plugin.Plugin{
"hooks": &hooksPlugin{
log: wrappedLogger,
apiImpl: &apiTimerLayer{pluginInfo.Manifest.Id, apiImpl, metrics},
},
}
executable := filepath.Clean(filepath.Join(
".",
pluginInfo.Manifest.GetExecutableForRuntime(runtime.GOOS, runtime.GOARCH),
))
if strings.HasPrefix(executable, "..") {
return nil, fmt.Errorf("invalid backend executable")
}
executable = filepath.Join(pluginInfo.Path, executable)
cmd := exec.Command(executable)
sup.client = plugin.NewClient(&plugin.ClientConfig{
HandshakeConfig: handshake,
Plugins: pluginMap,
Cmd: cmd,
SyncStdout: wrappedLogger.With(mlog.String("source", "plugin_stdout")).StdLogWriter(),
SyncStderr: wrappedLogger.With(mlog.String("source", "plugin_stderr")).StdLogWriter(),
Logger: hclogAdaptedLogger,
StartTimeout: time.Second * 3,
})
rpcClient, err := sup.client.Client()
if err != nil {
return nil, err
}
sup.pid = cmd.Process.Pid
raw, err := rpcClient.Dispense("hooks")
if err != nil {
return nil, err
}
sup.hooks = &hooksTimerLayer{pluginInfo.Manifest.Id, raw.(Hooks), metrics}
impl, err := sup.hooks.Implemented()
if err != nil {
return nil, err
}
for _, hookName := range impl {
if hookId, ok := hookNameToId[hookName]; ok {
sup.implemented[hookId] = true
}
}
return &sup, nil
}
func (sup *supervisor) Shutdown() {
sup.lock.RLock()
defer sup.lock.RUnlock()
if sup.client != nil {
sup.client.Kill()
}
}
func (sup *supervisor) Hooks() Hooks {
sup.lock.RLock()
defer sup.lock.RUnlock()
return sup.hooks
}
// PerformHealthCheck checks the plugin through an an RPC ping.
func (sup *supervisor) PerformHealthCheck() error {
// No need for a lock here because Ping is read-locked.
if pingErr := sup.Ping(); pingErr != nil {
for pingFails := 1; pingFails < HEALTH_CHECK_PING_FAIL_LIMIT; pingFails++ {
pingErr = sup.Ping()
if pingErr == nil {
break
}
}
if pingErr != nil {
mlog.Debug("Error pinging plugin", mlog.Err(pingErr))
return fmt.Errorf("Plugin RPC connection is not responding")
}
}
return nil
}
// Ping checks that the RPC connection with the plugin is alive and healthy.
func (sup *supervisor) Ping() error {
sup.lock.RLock()
defer sup.lock.RUnlock()
client, err := sup.client.Client()
if err != nil {
return err
}
return client.Ping()
}
func (sup *supervisor) Implements(hookId int) bool {
sup.lock.RLock()
defer sup.lock.RUnlock()
return sup.implemented[hookId]
}