/
bot_process.go
362 lines (326 loc) · 13 KB
/
bot_process.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
// Package bot provides the internal machinery for most of Gopherbot.
package bot
/* bot.go defines core data structures and public methods for startup.
handler.go has the methods for callbacks from the connector, */
import (
crand "crypto/rand"
"encoding/base64"
"fmt"
"io/ioutil"
"log"
"math/rand"
"net"
"net/http"
"os"
"path/filepath"
"regexp"
"sync"
"time"
"github.com/lnxjedi/gopherbot/robot"
)
// VersionInfo holds information about the version, duh. (stupid linter)
type VersionInfo struct {
Version, Commit string
}
// global values for GOPHER_HOME, GOPHER_CONFIGDIR and GOPHER_INSTALLDIR
var homePath, configPath, configFull, installPath string
var botVersion VersionInfo
// Seed the pseudo-random number generator, for plugin IDs, RandomString, etc.
var random = rand.New(rand.NewSource(time.Now().UnixNano()))
var connectors = make(map[string]func(robot.Handler, *log.Logger) robot.Connector)
// RegisterConnector should be called in an init function to register a type
// of connector. Currently only Slack is implemented.
func RegisterConnector(name string, connstarter func(robot.Handler, *log.Logger) robot.Connector) {
if stopRegistrations {
return
}
if connectors[name] != nil {
log.Fatal("Attempted registration of duplicate connector:", name)
}
connectors[name] = connstarter
}
// Interfaces to external stuff, items should be set while single-threaded and never change
var interfaces struct {
robot.Connector // Connector interface, implemented by each specific protocol
brain robot.SimpleBrain // Interface for robot to Store and Retrieve data
history robot.HistoryProvider // Provider for storing and retrieving job / plugin histories
}
var done = make(chan bool) // shutdown channel, true to restart
var stopConnector = make(chan struct{}) // stop channel for stopping the connector
// internal state tracking
var state struct {
shuttingDown bool // to prevent new plugins from starting
restart bool // indicate stop and restart vs. stop only, for bootstrapping
pipelinesRunning int // a count of how many plugins are currently running
sync.WaitGroup // for keeping track of running plugins
sync.RWMutex // for safe updating of bot data structures
}
// regexes the bot uses to determine if it's being spoken to
var regexes struct {
preRegex *regexp.Regexp // regex for matching prefixed commands, e.g. "Gort, drop your weapon"
postRegex *regexp.Regexp // regex for matching, e.g. "open the pod bay doors, hal"
bareRegex *regexp.Regexp // regex for matching the robot's bare name, if you forgot it in the previous command
sync.RWMutex
}
// configuration struct holds all the internal data relevant to the Bot. Most of it is digested
// and populated by loadConfig.
type configuration struct {
adminUsers []string // List of users with access to administrative commands
alias rune // single-char alias for addressing the bot
botinfo UserInfo // robot's name, ID, email, etc.
adminContact string // who to contact for problems with the bot
mailConf botMailer // configuration to use when sending email
ignoreUsers []string // list of users to never listen to, like other bots
joinChannels []string // list of channels to join
defaultAllowDirect bool // whether plugins are available in DM by default
ignoreUnlistedUsers bool // ignore users not listed in the UserRoster
defaultMessageFormat robot.MessageFormat // Raw unless set to Variable or Fixed
plugChannels []string // list of channels where plugins are available by default
protocol string // Name of the protocol, e.g. "slack"
brainProvider string // Type of Brain provider to use
encryptionKey string // Key for encrypting data (unlocks "real" key in brain)
historyProvider string // Name of the history provider to use
workSpace string // Read/Write directory where the robot does work
defaultElevator string // Plugin name for performing elevation
defaultAuthorizer string // Plugin name for performing authorization
externalPlugins []TaskSettings // List of external plugins to load
externalJobs []TaskSettings // List of external jobs to load
externalTasks []TaskSettings // List of external tasks to load
goPlugins []TaskSettings // Settings for goPlugins: Name(match), Description, NameSpace, Parameters, Disabled
goJobs []TaskSettings // Settings for goJobs: Name(match), Description, NameSpace, Parameters, Disabled
goTasks []TaskSettings // Settings for goTasks: Name(match), Description, NameSpace, Parameters, Disabled
nsList []TaskSettings // loaded NameSpaces for shared parameters
psList []TaskSettings // loaded ParameterSets for shared parameter sets
ScheduledJobs []ScheduledTask // List of scheduled tasks
port string // Configured localhost port to listen on, or 0 for first open
timeZone *time.Location // for forcing the TimeZone, Unix only
defaultJobChannel string // where job statuses will post if not otherwise specified
}
// The current configuration and task list
var currentCfg = struct {
*configuration
*taskList
sync.RWMutex
}{
configuration: &configuration{},
taskList: &taskList{
t: []interface{}{struct{}{}}, // initialize 0 to "nothing", for namespaces only
nameMap: make(map[string]int),
idMap: make(map[string]int),
nameSpaces: make(map[string]ParameterSet),
parameterSets: make(map[string]ParameterSet),
},
RWMutex: sync.RWMutex{},
}
var listening bool // for tests where initBot runs multiple times
var listenPort string // actual listening port
// initBot sets up the global robot; when cli is false it also loads configuration.
// cli indicates that a CLI command is being processed, as opposed to actually running
// a robot.
func initBot(cpath, epath string) {
// Initialize current config with an empty struct (to be loaded)
currentCfg.configuration = &configuration{}
var err error
homePath, err = os.Getwd()
if err != nil {
Log(robot.Warn, "Unable to get cwd")
}
h := handler{}
if err := h.GetDirectory(cpath); err != nil {
Log(robot.Fatal, "Unable to get/create config path: %s", cpath)
}
configPath = cpath
if filepath.IsAbs(cpath) {
configFull = cpath
} else {
configFull = filepath.Join(homePath, cpath)
}
installPath = epath
state.shuttingDown = false
if cliOp {
setLogLevel(robot.Warn)
}
encryptionInitialized := initCrypt()
if encryptionInitialized {
os.Setenv("GOPHER_ENCRYPTION_INITIALIZED", "initialized")
}
// The pre-connect load is for initial configuration that doesn't
// run external scripts. External plugin configuration isn't loaded and plugins
// aren't initialized.
if err := loadConfig(true); err != nil {
Log(robot.Fatal, "Loading initial configuration: %v", err)
}
if cliOp {
if fileLog {
setLogLevel(robot.Debug)
} else {
setLogLevel(robot.Warn)
}
}
// All pluggables registered, ok to stop registrations
stopRegistrations = true
if len(currentCfg.brainProvider) > 0 {
if bprovider, ok := brains[currentCfg.brainProvider]; !ok {
Log(robot.Fatal, "No provider registered for brain: \"%s\"", currentCfg.brainProvider)
} else {
brain := bprovider(handle)
interfaces.brain = brain
Log(robot.Info, "Initialized brain provider '%s'", currentCfg.brainProvider)
}
} else {
bprovider := brains["mem"]
interfaces.brain = bprovider(handle)
Log(robot.Error, "No brain configured, falling back to default 'mem' brain - no memories will persist")
}
if !encryptionInitialized && len(currentCfg.encryptionKey) > 0 {
if initializeEncryptionFromBrain(currentCfg.encryptionKey) {
Log(robot.Info, "Successfully initialized encryption from configured key")
encryptionInitialized = true
} else {
Log(robot.Error, "Failed to initialize brain encryption with configured EncryptionKey")
}
}
if encryptBrain && !encryptionInitialized {
Log(robot.Warn, "Brain encryption specified but not initialized; use 'initialize brain <key>' to initialize the encrypted brain interactively")
}
// cli commands don't need an http listener
if cliOp {
return
}
if !listening {
listening = true
listener, err := net.Listen("tcp4", fmt.Sprintf("127.0.0.1:%s", currentCfg.port))
if err != nil {
Log(robot.Fatal, "Listening on tcp4 port 127.0.0.1:%s: %v", currentCfg.port, err)
}
listenPort = listener.Addr().String()
go func() {
raiseThreadPriv("http handler")
apiServer := http.NewServeMux()
apiServer.Handle("/json", handle)
Log(robot.Info, "Listening for external plugin connections on http://%s", listenPort)
Log(robot.Fatal, "Error serving '/json': %s", http.Serve(listener, apiServer))
}()
}
}
// set connector sets the connector, which should already be initialized
func setConnector(c robot.Connector) {
interfaces.Connector = c
}
var keyEnv = "GOPHER_ENCRYPTION_KEY"
func initCrypt() bool {
// Initialize encryption (new style for v2)
keyFileName := encryptedKeyFile
deployEnvironment := os.Getenv("GOPHER_ENVIRONMENT")
if deployEnvironment != "production" {
Log(robot.Info, "Initializing encryption for the '%s' environment", deployEnvironment)
keyFileName += "." + deployEnvironment
}
keyFile := filepath.Join(configPath, keyFileName)
encryptionInitialized := false
if ek, ok := os.LookupEnv(keyEnv); ok {
ik := []byte(ek)[0:32]
if bkf, err := ioutil.ReadFile(keyFile); err == nil {
if bke, err := base64.StdEncoding.DecodeString(string(bkf)); err == nil {
if key, err := decrypt(bke, ik); err == nil {
cryptKey.Lock()
cryptKey.key = key
cryptKey.initialized = true
cryptKey.Unlock()
encryptionInitialized = true
Log(robot.Info, "Successfully decrypted binary encryption key '%s'", keyFile)
} else {
Log(robot.Error, "Decrypting binary encryption key '%s' from environment key '%s': %v", keyFile, keyEnv, err)
}
} else {
Log(robot.Error, "Base64 decoding '%s': %v", keyFile, err)
}
} else {
Log(robot.Warn, "Binary encryption key not loaded from '%s': %v", keyFile, err)
if len(currentCfg.encryptionKey) == 0 {
// No encryptionKey in config, create new-style key
bk := make([]byte, 32)
_, err := crand.Read(bk)
if err != nil {
Log(robot.Error, "Generating new random encryption key: %v", err)
return false
}
bek, err := encrypt(bk, ik)
if err != nil {
Log(robot.Error, "Encrypting new random key: %v", err)
return false
}
beks := base64.StdEncoding.EncodeToString(bek)
raiseThreadPriv("writing generated encrypted key")
err = ioutil.WriteFile(keyFile, []byte(beks), 0444)
if err != nil {
Log(robot.Error, "Writing out generated key: %v", err)
return false
}
Log(robot.Info, "Successfully wrote new binary encryption key to '%s'", keyFile)
cryptKey.Lock()
cryptKey.key = bk
cryptKey.initialized = true
cryptKey.Unlock()
encryptionInitialized = true
return true
}
}
} else {
Log(robot.Warn, "GOPHER_ENCRYPTION_KEY not set in environment")
}
return encryptionInitialized
}
// run starts all the loops and returns a channel that closes when the robot
// shuts down. It should return after the connector loop has started and
// plugins are initialized.
func run() {
// Start the brain loop
go runBrain()
var cl []string
cl = append(cl, currentCfg.joinChannels...)
cl = append(cl, currentCfg.plugChannels...)
cl = append(cl, currentCfg.defaultJobChannel)
jc := make(map[string]bool)
for _, channel := range cl {
if _, ok := jc[channel]; !ok {
jc[channel] = true
interfaces.JoinChannel(channel)
}
}
// signal handler
sigBreak := make(chan struct{})
go sigHandle(sigBreak)
// connector loop
go func(conn robot.Connector, sigBreak chan<- struct{}) {
raiseThreadPriv("connector loop")
conn.Run(stopConnector)
close(sigBreak)
state.RLock()
restart := state.restart
state.RUnlock()
if restart {
Log(robot.Info, "Restarting...")
}
done <- restart
}(interfaces.Connector, sigBreak)
// The first run through is for configuring and running init
// jobs (which can't send messages), the second run through
// (post-connect) initializes plugins and may send messages.
if err := loadConfig(false); err != nil {
Log(robot.Fatal, "Loading full/post-connect configuration: %v", err)
}
Log(robot.Info, "Robot is initialized and running")
}
// stop is called whenever the robot needs to shut down gracefully. All callers
// should lock the bot and check the value of botCfg.shuttingDown; see
// builtins.go.
func stop() {
state.RLock()
pr := state.pipelinesRunning
state.RUnlock()
Log(robot.Info, "Stop called with %d pipelines running", pr)
state.Wait()
brainQuit()
stopConnector <- struct{}{}
}