/
main.go
245 lines (204 loc) · 7.86 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
// Copyright 2021 (c) Cognizant Digital Business, Evolutionary AI. All rights reserved. Issued under the Apache 2.0 License.
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"os"
"os/signal"
"path"
"regexp"
"syscall"
"github.com/leaf-ai/go-service/pkg/log"
"github.com/karlmutch/envflag"
"github.com/jjeffery/kv" // MIT License
)
var (
// TestMode will be set to true if the test flag is set during a build when the exe
// runs
TestMode = false
buildTime string
gitHash string
logger = log.NewErrLogger("queue-scaler")
debugOpt = flag.Bool("debug", false, "leave debugging artifacts in place, print internal execution information")
queueRegexOpt = flag.String("queue-name", ".*", "A regular expression for selecting the queues to be queried")
eksClusterOpt = flag.String("eks-cluster-name", "", "cluster name for EKS scaling support, when used the cluster will be scaled out using Jobs")
namespaceOpt = flag.String("namespace", "default", "the namespace being used by jobs being tracked against queues")
inClusterOpt = flag.Bool("in-cluster", false, "used to indicate if this component is running inside a cluster")
jobTmplOptName = "job-template"
jobTmplOpt = flag.String(jobTmplOptName, "", "file containing a Kubernetes Job YAML template sent to the cluster to add runners")
dryRunOpt = flag.Bool("dry-run", false, "output the new kubernetes resources on stdout without taking any actions")
qReportOnlyOpt = flag.Bool("queue-report-only", false, "list queue details only then exit")
)
func setTemp() (dir string) {
if dir = os.Getenv("TMPDIR"); len(dir) != 0 {
return dir
}
if _, err := os.Stat("/tmp"); err == nil {
dir = "/tmp"
}
return dir
}
func usage() {
fmt.Fprintln(os.Stderr, path.Base(os.Args[0]))
fmt.Fprintln(os.Stderr, "usage: ", os.Args[0], "[arguments] SQS Queue Scaler tool ", gitHash, " ", buildTime)
fmt.Fprintln(os.Stderr, "")
fmt.Fprintln(os.Stderr, "Arguments:")
fmt.Fprintln(os.Stderr, "")
flag.PrintDefaults()
fmt.Fprintln(os.Stderr, "")
fmt.Fprintln(os.Stderr, "Environment Variables:")
fmt.Fprintln(os.Stderr, "")
fmt.Fprintln(os.Stderr, "options can be read for environment variables by changing dashes '-' to underscores")
fmt.Fprintln(os.Stderr, "and using upper case letters.")
fmt.Fprintln(os.Stderr, "")
fmt.Fprintln(os.Stderr, "To control log levels the LOGXI env variables can be used, these are documented at https://github.com/mgutz/logxi")
fmt.Fprintln(os.Stderr, "All logging output goes to stderr, stdout contains command output only.")
}
// Go runtime entry point for production builds. This function acts as an alias
// for the main.Main function. This allows testing and code coverage features of
// go to invoke the logic within the command main without skipping important
// runtime initialization steps. The coverage tools can then run this server as if it
// was a production binary.
//
// main will be called by the go runtime when the server is run in production mode
// avoiding this alias.
//
func main() {
Main()
}
// Main is a production style main that will invoke the command as a go routine to allow
// a very simple supervisor and a test wrapper to coexist in terms of our logic.
//
// When using test mode 'go test ...' this function will not, normally, be run and
// instead the EntryPoint function will be called avoiding some initialization
// logic that is not applicable when testing. There is one exception to this
// and that is when the go unit test framework is linked to the master binary,
// using a TestRunMain build flag which allows a binary with coverage
// instrumentation to be compiled with only a single unit test which is,
// infact an alias to this main.
//
func Main() {
flag.Usage = usage
// Use the go options parser to load command line options that have been set, and look
// for these options inside the env variable table
//
envflag.Parse()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if errs := EntryPoint(ctx, cancel); len(errs) != 0 {
for _, err := range errs {
logger.Error(err.Error())
}
os.Exit(-1)
}
}
// watchReportingChannels will monitor channels for events etc that will be reported
// to the output of the command. Typically these events will originate inside
// libraries within the command implementation that dont use logging packages etc
func watchReportingChannels(ctx context.Context, cancel context.CancelFunc) (errorC chan kv.Error, statusC chan []string) {
// Setup a channel to allow a CTRL-C to terminate all processing. When the CTRL-C
// occurs we cancel the background msg pump processing queue mesages from
// the queue specific implementations, and this will also cause the main thread
// to unblock and return
//
stopC := make(chan os.Signal, 1)
errorC = make(chan kv.Error)
statusC = make(chan []string)
go func() {
defer cancel()
for {
select {
case msgs := <-statusC:
switch len(msgs) {
case 0:
case 1:
logger.Info(msgs[0])
default:
logger.Info(msgs[0], msgs[1:])
}
case err := <-errorC:
if err != nil {
logger.Warn(fmt.Sprint(err))
}
case <-ctx.Done():
return
case <-stopC:
logger.Warn("CTRL-C seen")
return
}
}
}()
signal.Reset()
signal.Notify(stopC, os.Interrupt, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP)
return errorC, statusC
}
// EntryPoint enables both test and standard production infrastructure to
// invoke this command.
//
func EntryPoint(ctx context.Context, cancel context.CancelFunc) (errs []kv.Error) {
// Validate the regex that will be used to select the queues for processing
_, errGo := regexp.Compile(*queueRegexOpt)
if errGo != nil {
return []kv.Error{kv.Wrap(errGo).With("expression", *queueRegexOpt)}
}
if len(*eksClusterOpt) != 0 {
if len(*jobTmplOpt) == 0 {
return []kv.Error{kv.NewError("a job template file must be supplied using the " + jobTmplOptName + " option")}
}
if _, errGo := os.Stat(*jobTmplOpt); errGo != nil {
if os.IsNotExist(errGo) {
return []kv.Error{kv.NewError("job template file " + *jobTmplOpt + " does not exist")}
}
}
}
// Start a go function that will monitor all of the error and status reporting channels
// for events and report these events to the output of the process etc
_, _ = watchReportingChannels(ctx, cancel)
cfg, err := GetDefaultCfg()
if err != nil {
return append(errs, err)
}
if cfg == nil {
return append(errs, kv.NewError("Configuration could not be generated for the queue-scaler to run. Check your Kubernetes configuration is present."))
}
// Function to query queue lists
queues, err := GetQueues(ctx, cfg, *queueRegexOpt)
if err != nil {
return []kv.Error{err}
}
if *qReportOnlyOpt {
json, errGo := json.MarshalIndent(queues, "", " ")
if errGo != nil {
return []kv.Error{kv.Wrap(errGo)}
}
fmt.Println(string(json))
return []kv.Error{}
}
// If the user wants to add information related to spawning jobs within an existing auto scaled cluster then
// we do that
if len(*eksClusterOpt) != 0 {
if len(*jobTmplOpt) == 0 {
return []kv.Error{kv.NewError(fmt.Sprint("a job template file must be supplied using the", jobTmplOptName, "option"))}
}
// Obtain appropriate nodeGroups that can handle work for our queues
if err = jobQAssign(ctx, cfg, *eksClusterOpt, &queues); err != nil {
return []kv.Error{err}
}
// Get the cluster status for jobs that we know about for these queues
if err = loadKnownJobs(ctx, cfg, *eksClusterOpt, *namespaceOpt, *inClusterOpt, &queues); err != nil {
return []kv.Error{err}
}
// Remove any queues that are currently being fully serviced
if err = groomQueues(&queues); err != nil {
return []kv.Error{err}
}
// Generate jobs to fill the gap between running jobs and queue work waiting to be done
if err := jobGenerate(ctx, cfg, *eksClusterOpt, *jobTmplOpt, os.Stdout, &queues); err != nil {
return []kv.Error{err}
}
return nil
}
return nil
}