/
main.go
521 lines (435 loc) · 18.4 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
// Copyright 2018-2020 (c) Cognizant Digital Business, Evolutionary AI. All rights reserved. Issued under the Apache 2.0 License.
package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"path"
"path/filepath"
"regexp"
"syscall"
"time"
"github.com/leaf-ai/go-service/pkg/aws_gsc"
"github.com/leaf-ai/go-service/pkg/log"
"github.com/leaf-ai/go-service/pkg/process"
"github.com/leaf-ai/go-service/pkg/runtime"
"github.com/leaf-ai/go-service/pkg/server"
"github.com/leaf-ai/studio-go-runner/internal/cpu_resource"
"github.com/leaf-ai/studio-go-runner/internal/cuda"
"github.com/leaf-ai/studio-go-runner/internal/defense"
"github.com/leaf-ai/studio-go-runner/internal/disk_resource"
"github.com/leaf-ai/studio-go-runner/internal/resources"
"github.com/davecgh/go-spew/spew"
"github.com/karlmutch/envflag"
"github.com/go-stack/stack"
"github.com/jjeffery/kv" // MIT License
"github.com/dustin/go-humanize"
)
var (
// TestMode will be set to true if the test flag is set during a build when the exe
// runs
TestMode = false
// TriggerCacheC can be used when the caching system is active to initiate a cache
// expired items purge. This variable is used during testing no dependency injection
// is needed.
TriggerCacheC chan<- struct{}
// Spew contains the process wide configuration preferences for the structure dumping
// package
Spew *spew.ConfigState
buildTime string
gitHash string
logger = log.NewLogger("runner")
cfgNamespace = flag.String("k8s-namespace", "default", "The namespace that is being used for our configuration")
cfgConfigMap = flag.String("k8s-configmap", "studioml-go-runner", "The name of the Kubernetes ConfigMap where our configuration can be found")
amqpURL = flag.String("amqp-url", "", "The URL for an amqp message exchange through which StudioML is being sents work")
amqpMgtURL = flag.String("amqp-mgt-url", "", "The URL for the management interface for an amqp message exchange which StudioML can use to query the broker for queue stats etc")
queueMatch = flag.String("queue-match", "^(rmq|sqs|local)_.*$", "User supplied regular expression that needs to match a queues name to be considered for work")
queueMismatch = flag.String("queue-mismatch", "", "User supplied regular expression that must not match a queues name to be considered for work")
tempOpt = flag.String("working-dir", setTemp(), "the local working directory being used for runner storage, defaults to env var %TMPDIR, or /tmp")
debugOpt = flag.Bool("debug", false, "leave debugging artifacts in place, can take a large amount of disk space (intended for developers only)")
cpuOnlyOpt = flag.Bool("cpu-only", false, "in the event no gpus are found continue with only CPU support")
maxCoresOpt = flag.Uint("max-cores", 0, "maximum number of cores to be used (default 0, all cores available will be used)")
maxMemOpt = flag.String("max-mem", "0gb", "maximum amount of memory to be allocated to tasks using SI, ICE units, for example 512gb, 16gib, 1024mb, 64mib etc' (default 0, is all available RAM)")
maxDiskOpt = flag.String("max-disk", "0gb", "maximum amount of local disk storage to be allocated to tasks using SI, ICE units, for example 512gb, 16gib, 1024mb, 64mib etc' (default 0, is 85% of available Disk)")
msgEncryptDirOpt = flag.String("encrypt-dir", "./certs/message", "directory where secrets have been mounted into pod containers")
acceptClearTextOpt = flag.Bool("clear-text-messages", false, "enables clear-text messages across queues support (Associated Risk)")
cpuProfileOpt = flag.String("cpu-profile", "", "write a cpu profile to file")
sigsRqstDirOpt = flag.String("request-signatures-dir", "./certs/queues/signing", "the directory for queue message signing files")
// rqstSigs contains a map with the index being the prefix of queue names and their public keys for inbound request queues
rqstSigs = &defense.PubkeyStore{}
sigsRspnsDirOpt = flag.String("response-signatures-dir", "./certs/queues/response-encrypt", "the directory for response queue message encryption files")
// rqstSigs contains a map with the index being the prefix of queue names and their public keys for inbound request queues
rspnsEncrypt = &defense.PubkeyStore{}
promAddrOpt = flag.String("prom-address", ":9090", "the address for the prometheus http server within the runner")
captureOutputMD = flag.Bool("schema-logs", true, "automatically add experiment logs to metadata json")
localQueueRootOpt = flag.String("queue-root", "", "Local file path to directory serving as a root for local file queues")
)
// GetRqstSigs returns the signing public key struct for
// methods related to signature selection etc.
//
func GetRqstSigs() (s *defense.PubkeyStore) {
return rqstSigs
}
// GetRspnsSigs returns the encryption public key struct for
// methods related to signature selection etc.
//
func GetRspnsEncrypt() (s *defense.PubkeyStore) {
return rspnsEncrypt
}
func init() {
Spew = spew.NewDefaultConfig()
Spew.Indent = " "
Spew.SortKeys = true
}
func setTemp() (dir string) {
if dir = os.Getenv("TMPDIR"); len(dir) != 0 {
return dir
}
if _, err := os.Stat("/tmp"); err == nil {
dir = "/tmp"
}
return dir
}
func usage() {
fmt.Fprintln(os.Stderr, path.Base(os.Args[0]))
fmt.Fprintln(os.Stderr, "usage: ", os.Args[0], "[arguments] studioml runner ", gitHash, " ", buildTime)
fmt.Fprintln(os.Stderr, "")
fmt.Fprintln(os.Stderr, "Arguments:")
fmt.Fprintln(os.Stderr, "")
flag.PrintDefaults()
fmt.Fprintln(os.Stderr, "")
fmt.Fprintln(os.Stderr, "Environment Variables:")
fmt.Fprintln(os.Stderr, "")
fmt.Fprintln(os.Stderr, "runner options can be read for environment variables by changing dashes '-' to underscores")
fmt.Fprintln(os.Stderr, "and using upper case letters. The certs-dir option is a mandatory option.")
fmt.Fprintln(os.Stderr, "")
fmt.Fprintln(os.Stderr, "To control log levels the LOGXI env variables can be used, these are documented at https://github.com/mgutz/logxi")
}
func resourceLimits() (cores uint, mem uint64, storage uint64, err error) {
cores = *maxCoresOpt
if mem, err = humanize.ParseBytes(*maxMemOpt); err != nil {
return 0, 0, 0, err
}
if storage, err = humanize.ParseBytes(*maxDiskOpt); err != nil {
return 0, 0, 0, err
}
return cores, mem, storage, err
}
// Go runtime entry point for production builds. This function acts as an alias
// for the main.Main function. This allows testing and code coverage features of
// go to invoke the logic within the server main without skipping important
// runtime initialization steps. The coverage tools can then run this server as if it
// was a production binary.
//
// main will be called by the go runtime when the master is run in production mode
// avoiding this alias.
//
func main() {
// Allow the enclave for secrets to wipe things
defense.StopSecret()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// This is the one check that does not get tested when the server is under test
//
if _, err := process.NewExclusive(ctx, "studio-go-runner"); err != nil {
logger.Error(fmt.Sprintf("An instance of this process is already running %s", err.Error()))
os.Exit(-1)
}
Main()
}
// Main is a production style main that will invoke the server as a go routine to allow
// a very simple supervisor and a test wrapper to coexist in terms of our logic.
//
// When using test mode 'go test ...' this function will not, normally, be run and
// instead the EntryPoint function will be called avoiding some initialization
// logic that is not applicable when testing. There is one exception to this
// and that is when the go unit test framework is linked to the master binary,
// using a TestRunMain build flag which allows a binary with coverage
// instrumentation to be compiled with only a single unit test which is,
// infact an alias to this main.
//
func Main() {
fmt.Printf("%s built at %s, against commit id %s\n", os.Args[0], buildTime, gitHash)
flag.Usage = usage
// Use the go options parser to load command line options that have been set, and look
// for these options inside the env variable table
//
envflag.Parse()
doneC := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
// Start the profiler as early as possible and only in production will there
// be a command line option to do it
if err := runtime.InitCPUProfiler(ctx, *cpuProfileOpt); err != nil {
logger.Warn(err.Error())
}
if errs := EntryPoint(ctx, cancel, doneC); len(errs) != 0 {
for _, err := range errs {
logger.Error(err.Error())
}
os.Exit(-1)
}
// After starting the application message handling loops
// wait until the system has shutdown
//
<-ctx.Done()
// Allow the quitC to be sent across the server for a short period of time before exiting
time.Sleep(5 * time.Second)
}
// watchReportingChannels will monitor channels for events etc that will be reported
// to the output of the server. Typically these events will originate inside
// libraries within the sever implementation that dont use logging packages etc
func watchReportingChannels(ctx context.Context, cancel context.CancelFunc) (stopC chan os.Signal, errorC chan kv.Error, statusC chan []string) {
// Setup a channel to allow a CTRL-C to terminate all processing. When the CTRL-C
// occurs we cancel the background msg pump processing queue mesages from
// the queue specific implementations, and this will also cause the main thread
// to unblock and return
//
stopC = make(chan os.Signal)
errorC = make(chan kv.Error)
statusC = make(chan []string)
go func() {
select {
case msgs := <-statusC:
switch len(msgs) {
case 0:
case 1:
logger.Info(msgs[0])
default:
logger.Info(msgs[0], msgs[1:])
}
case err := <-errorC:
if err != nil {
logger.Warn(fmt.Sprint(err))
}
case <-ctx.Done():
logger.Warn("quit ctx Seen")
return
case <-stopC:
logger.Warn("CTRL-C Seen")
cancel()
return
}
}()
return stopC, errorC, statusC
}
func validateGPUOpts() (errs []kv.Error) {
errs = []kv.Error{}
if !*cpuOnlyOpt {
if _, free := cuda.GPUSlots(); free == 0 {
if cuda.HasCUDA() {
msg := fmt.Errorf("no available GPUs could be found using the nvidia management library")
if cuda.CudaInitErr != nil {
msg = *cuda.CudaInitErr
}
err := kv.Wrap(msg).With("stack", stack.Trace().TrimRuntime())
if *debugOpt {
logger.Warn(fmt.Sprint(err))
} else {
errs = append(errs, err)
}
}
}
}
return errs
}
func validateCredsOpts() (errs []kv.Error) {
errs = []kv.Error{}
// Make at least one of the credentials directories is valid, as long as this is not a test
if TestMode {
logger.Warn("running in test mode, queue validation not performed")
} else {
if len(*sqsCertsDirOpt) == 0 && len(*amqpURL) == 0 &&
len(*localQueueRootOpt) == 0 {
errs = append(errs, kv.NewError("One of the amqp-url, sqs-certs or queue-root options must be set for the runner to work"))
} else {
stat, err := os.Stat(*sqsCertsDirOpt)
if err != nil || !stat.Mode().IsDir() {
if len(*amqpURL) == 0 {
*localQueueRootOpt = os.ExpandEnv(*localQueueRootOpt)
stat, err = os.Stat(*localQueueRootOpt)
if err != nil || !stat.Mode().IsDir() {
msg := fmt.Sprintf(
"sqs-certs must be set to an existing directory, or amqp-url is specified, or queue-root must be set to an existing directory for the runner to perform any useful work (%s)",
*sqsCertsDirOpt)
errs = append(errs, kv.NewError(msg))
}
}
}
}
}
return errs
}
func validateResourceOpts() (errs []kv.Error) {
errs = []kv.Error{}
// Attempt to deal with user specified hard limits on the CPU, this is a validation step for options
// from the CLI
//
limitCores, limitMem, limitDisk, err := resourceLimits()
if err != nil {
errs = append(errs, kv.Wrap(err).With("stack", stack.Trace().TrimRuntime()))
}
if err = cpu_resource.SetCPULimits(limitCores, limitMem); err != nil {
errs = append(errs, kv.Wrap(err, "the cores, or memory limits on command line option were invalid").With("stack", stack.Trace().TrimRuntime()))
}
avail, err := disk_resource.SetDiskLimits(*tempOpt, limitDisk)
if err != nil {
errs = append(errs, kv.Wrap(err, "the disk storage limits on command line option were invalid").With("stack", stack.Trace().TrimRuntime()))
} else {
if 0 == avail {
msg := fmt.Sprintf("insufficient disk storage available %s", humanize.Bytes(avail))
errs = append(errs, kv.NewError(msg))
} else {
logger.Debug(fmt.Sprintf("%s available diskspace", humanize.Bytes(avail)))
}
}
return errs
}
func validateServerOpts() (errs []kv.Error) {
errs = []kv.Error{}
// First gather any and as many kv.as we can before stopping to allow one pass at the user
// fixing things than than having them retrying multiple times
errs = append(errs, validateGPUOpts()...)
if len(*tempOpt) == 0 {
msg := "the working-dir command line option must be supplied with a valid working directory location, or the TEMP, or TMP env vars need to be set"
errs = append(errs, kv.NewError(msg))
}
if _, _, err := getCacheOptions(); err != nil {
errs = append(errs, kv.Wrap(err).With("stack", stack.Trace().TrimRuntime()))
}
errs = append(errs, validateResourceOpts()...)
errs = append(errs, validateCredsOpts()...)
if len(*amqpURL) != 0 {
// Just looking for syntax errors that we should stop on if seen. We wont
// save the results of the compilation itself
if _, errGo := regexp.Compile(*queueMatch); errGo != nil {
errs = append(errs, kv.Wrap(errGo))
}
if _, errGo := regexp.Compile(*queueMismatch); errGo != nil {
errs = append(errs, kv.Wrap(errGo))
}
}
return errs
}
// EntryPoint enables both test and standard production infrastructure to
// invoke this server.
//
// quitC can be used by the invoking functions to stop the processing
// inside the server and exit from the EntryPoint function
//
// doneC is used by the EntryPoint function to indicate when it has terminated
// its processing
//
func EntryPoint(ctx context.Context, cancel context.CancelFunc, doneC chan struct{}) (errs []kv.Error) {
defer close(doneC)
// Start a go function that will monitor all of the error and status reporting channels
// for events and report these events to the output of the proicess etc
stopC, errorC, statusC := watchReportingChannels(ctx, cancel)
signal.Notify(stopC, os.Interrupt, syscall.SIGTERM)
// One of the first thimgs to do is to determine if ur configuration is
// coming from a remote source which in our case will typically be a
// k8s configmap that is not supplied by the k8s deployment spec. This
// happens when the config map is to be dynamically tracked to allow
// the runner to change is behaviour or shutdown etc
logger.Info("version", "git_hash", gitHash)
if aws, err := aws_gsc.IsAWS(); aws {
logger.Info("AWS detected")
} else {
if err == nil {
logger.Info("AWS not detected")
} else {
logger.Info("AWS not detected", "error", err)
}
}
// Before continuing convert several if the directories specified in the CLI
// to using absolute paths
tmp, errGo := filepath.Abs(*tempOpt)
if errGo == nil {
*tempOpt = tmp
}
tmp, errGo = filepath.Abs(*msgEncryptDirOpt)
if errGo == nil {
*msgEncryptDirOpt = tmp
}
tmp, errGo = filepath.Abs(*sigsRqstDirOpt)
if errGo == nil {
*sigsRqstDirOpt = tmp
}
tmp, errGo = filepath.Abs(*sigsRspnsDirOpt)
if errGo == nil {
*sigsRspnsDirOpt = tmp
}
// Runs in the background handling the Kubernetes client subscription
// that is used to monitor for configuration map based changes. Wait
// for its setup processing to be done before continuing
dedupeMsg := time.Duration(15 * time.Minute)
readyC := make(chan struct{})
go server.InitiateK8s(ctx, *cfgNamespace, *cfgConfigMap, readyC, dedupeMsg, logger, errorC)
<-readyC
errs = validateServerOpts()
// initialize the disk based artifact cache, after the signal handlers are in place
//
if triggerCacheC, err := runObjCache(ctx); err != nil {
errs = append(errs, kv.Wrap(err))
} else {
TriggerCacheC = triggerCacheC
}
// Now check for any fatal kv.before allowing the system to continue. This allows
// all kv.that could have ocuured as a result of incorrect options to be flushed
// out rather than having a frustrating single failure at a time loop for users
// to fix things
//
if len(errs) != 0 {
return errs
}
// None blocking function that initializes independent services in the runner
startServices(ctx, cancel, statusC, errorC)
return nil
}
func startServices(ctx context.Context, cancel context.CancelFunc, statusC chan []string, errorC chan kv.Error) {
// Watch for GPU hardware events that are of interest
go cuda.MonitorGPUs(ctx, statusC, errorC)
// loops doing prometheus exports for resource consumption statistics etc
// on a regular basis
server.StartPrometheusExporter(ctx, *promAddrOpt, &resources.Resources{}, time.Duration(10*time.Second), logger)
// The timing for queues being refreshed should me much more frequent when testing
// is being done to allow short lived resources such as queues etc to be refreshed
// between and within test cases reducing test times etc, but not so quick as to
// hide or shadow any bugs or issues
serviceIntervals := time.Duration(15 * time.Second)
if TestMode {
serviceIntervals = time.Duration(5 * time.Second)
}
// Setup a watcher that will scan a signatures directory loading in
// new queue related message signing keys, non blocking function that
// spins off a servicing function
store, err := defense.InitRqstSigWatcher(ctx, *sigsRqstDirOpt, errorC)
if err != nil {
errorC <- err
}
rqstSigs = store
// Setup a watcher that will scan a response encryption directory loading in
// new response queue related message encryption keys, non blocking function that
// spins off a servicing function
if store, err = defense.InitRspnsEncryptWatcher(ctx, *sigsRspnsDirOpt, errorC); err != nil {
errorC <- err
}
rspnsEncrypt = store
// run a limiter that will check for various termination conditions for the
// runner including idle times, and the maximum number of tasks to complete
go serviceLimiter(ctx, cancel)
// Create a component that listens to AWS credentials directories
// and starts and stops run methods as needed based on the credentials
// it has for the AWS infrastructure
//
go serviceSQS(ctx, serviceIntervals)
// Create a component that listens to an amqp (rabbitMQ) exchange for work
// queues
//
go serviceRMQ(ctx, serviceIntervals, 15*time.Second)
// Create a component that listens to local file queues root for work
// queues
//
go serviceFileQueue(ctx, 3*time.Second)
}