-
Notifications
You must be signed in to change notification settings - Fork 14
/
loader.go
324 lines (273 loc) · 7.43 KB
/
loader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
package myqlib
import (
"bytes"
"errors"
"fmt"
"os"
"os/exec"
"strconv"
"strings"
"time"
)
type MySQLCommand string
const (
MYSQLCLI string = "mysql"
STATUS_COMMAND MySQLCommand = "SHOW GLOBAL STATUS;SELECT 'END';\n"
VARIABLES_COMMAND MySQLCommand = "SHOW GLOBAL VARIABLES;SELECT 'END';\n"
// prefix of SHOW VARIABLES keys, they are stored (if available) in the same map as the status variables
VAR_PREFIX = "V_"
)
// Build the argument list
var MYSQLCLIARGS []string = []string{
"-B", // Batch mode (tab-separated output)
"-n", // Unbuffered
"-N", // Skip column names
}
type Loader interface {
getStatus() (chan MyqSample, error)
getVars() (chan MyqSample, error)
getInterval() time.Duration
}
// MyqSamples are K->V maps
type MyqSample map[string]string
// Number of keys in the sample
func (s MyqSample) Length() int {
return len(s)
}
// Get methods for the given key. Returns a value of the appropriate type (error is nil) or default value and an error if it can't parse
func (s MyqSample) getInt(key string) (int64, error) {
val, ok := s[key]
if !ok {
return 0, errors.New("Key not found")
}
conv, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return 0, err
} else {
return conv, nil
}
}
func (s MyqSample) getFloat(key string) (float64, error) {
val, ok := s[key]
if !ok {
return 0.0, errors.New("Key not found")
}
conv, err := strconv.ParseFloat(val, 64)
if err != nil {
return 0.0, err
} else {
return conv, nil
}
}
func (s MyqSample) getString(key string) (string, error) {
val, ok := s[key]
if !ok {
return "", errors.New("Key not found")
}
return val, nil // no errors possible here
}
// Same as above, just ignore the error
func (s MyqSample) getI(key string) int64 {
i, _ := s.getInt(key)
return i
}
func (s MyqSample) getF(key string) float64 {
f, _ := s.getFloat(key)
return f
}
func (s MyqSample) getStr(key string) string {
str, _ := s.getString(key)
return str
}
// Gets either a float or an int (check type of result), or an error
func (s MyqSample) getNumeric(key string) (interface{}, error) {
if val, err := s.getInt(key); err != nil {
return val, nil
} else if val, err := s.getFloat(key); err != nil {
return val, nil
} else {
return nil, errors.New("Value is not numeric")
}
}
// MyqState contains the current and previous SHOW STATUS outputs. Also SHOW VARIABLES.
// Prev might be nil
type MyqState struct {
Cur, Prev MyqSample
SecondsDiff float64 // Difference between Cur and Prev
FirstUptime int64 // Uptime of our first sample this run
}
// Given a loader, get a channel of myqstates being returned
func GetState(l Loader) (chan *MyqState, error) {
// First getVars, if possible
var latestvars MyqSample // whatever the last vars sample is will be here (may be empty)
varsch, varserr := l.getVars()
// return the error if getVars fails, but not if it's just due to a missing file
if varserr != nil && varserr.Error() != "No file given" {
// Serious error
return nil, varserr
}
// Now getStatus
var ch = make(chan *MyqState)
statusch, statuserr := l.getStatus()
if statuserr != nil {
return nil, statuserr
}
// Main status loop
go func() {
defer close(ch)
var prev MyqSample
var firstUptime int64
for status := range statusch {
// Init new state
state := new(MyqState)
state.Cur = status
// Only needed for File loaders really
if firstUptime == 0 {
firstUptime, _ = status.getInt(`uptime`)
}
state.FirstUptime = firstUptime
// Assign the prev
if prev != nil {
state.Prev = prev
// Calcuate timediff if there is a prev. Only file loader?
curup, _ := status.getFloat(`uptime`)
preup, _ := prev.getFloat(`uptime`)
state.SecondsDiff = curup - preup
// Skip to the next sample if SecondsDiff is < the interval
if state.SecondsDiff < l.getInterval().Seconds() {
continue
}
}
// If varserr is clear at this point, we're expecting some vars
if varserr == nil {
// get some new vars, or skip if the varsch is closed
newvars, ok := <-varsch
if ok {
latestvars = newvars
}
}
// Add latest vars to status with prefix
for k, v := range latestvars {
newkey := fmt.Sprint(VAR_PREFIX, k)
state.Cur[newkey] = v
}
// Send the state
ch <- state
// Set the state for the next round
prev = status
}
}()
return ch, nil
}
type loaderInterval time.Duration
func (l loaderInterval) getInterval() time.Duration {
return time.Duration(l)
}
// Load mysql status output from a mysqladmin output file
type FileLoader struct {
loaderInterval
statusFile string
variablesFile string
}
func NewFileLoader(i time.Duration, statusFile, varFile string) *FileLoader {
return &FileLoader{loaderInterval(i), statusFile, varFile}
}
func (l FileLoader) harvestFile(filename string) (chan MyqSample, error) {
file, err := os.OpenFile(filename, os.O_RDONLY, 0)
if err != nil {
return nil, err
}
var ch = make(chan MyqSample)
// The file scanning goes into the background
go func() {
defer file.Close()
defer close(ch)
parseSamples(file, ch, l.loaderInterval.getInterval())
}()
return ch, nil
}
func (l FileLoader) getStatus() (chan MyqSample, error) {
return l.harvestFile(l.statusFile)
}
func (l FileLoader) getVars() (chan MyqSample, error) {
if l.variablesFile != "" {
return l.harvestFile(l.variablesFile)
} else {
return nil, errors.New("No file given")
}
}
// SHOW output via mysqladmin on a live server
type LiveLoader struct {
loaderInterval
args string // other args for mysqladmin (like -u, -p, -h, etc.)
}
func NewLiveLoader(i time.Duration, args string) *LiveLoader {
return &LiveLoader{loaderInterval(i), args}
}
// Collect output from MYSQLCLI and send it back in a sample
func (l LiveLoader) harvestMySQL(command MySQLCommand) (chan MyqSample, error) {
// Make sure we have MYSQLCLI
path, err := exec.LookPath(MYSQLCLI)
if err != nil {
return nil, err
}
var args = MYSQLCLIARGS
if l.args != "" {
args = append(args, strings.Split(l.args, ` `)...)
}
// Initialize the command
cmd := exec.Command(path, args...)
cleanupSubcmd(cmd)
// Collect Stderr in a buffer
var stderr bytes.Buffer
cmd.Stderr = &stderr
// Create a pipe for Stdout
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
// Create a pipe for Stdin -- we input our command here every interval
stdin, err := cmd.StdinPipe()
if err != nil {
return nil, err
}
// Start the command
if err := cmd.Start(); err != nil {
return nil, err
}
// feed the MYSQLCLI the given command to produce more output
send_command := func() {
_, err := stdin.Write([]byte(command))
if err != nil {
panic("Could not write to MySQL command any longer")
}
}
// send the first command immediately
send_command()
// produce more output every interval
ticker := time.NewTicker(l.getInterval())
go func() {
defer stdin.Close()
for range ticker.C {
send_command()
}
}()
// parse samples in the background
var ch = make(chan MyqSample)
go func() {
defer close(ch)
parseSamples(stdout, ch, l.loaderInterval.getInterval())
}()
// Handle if the subcommand exits
go func() {
err := cmd.Wait()
if err != nil {
os.Stderr.WriteString(stderr.String())
os.Exit(1)
}
}()
// Got this far, the channel should start getting samples
return ch, nil
}
func (l LiveLoader) getStatus() (chan MyqSample, error) { return l.harvestMySQL(STATUS_COMMAND) }
func (l LiveLoader) getVars() (chan MyqSample, error) { return l.harvestMySQL(VARIABLES_COMMAND) }