forked from evergreen-ci/evergreen
/
logger.go
334 lines (275 loc) · 10.8 KB
/
logger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
package comm
import (
"io"
"strconv"
"strings"
"sync"
"time"
"unicode/utf8"
"github.com/evergreen-ci/evergreen"
"github.com/evergreen-ci/evergreen/model"
"github.com/mongodb/grip"
"github.com/mongodb/grip/message"
"github.com/mongodb/grip/send"
"github.com/mongodb/grip/slogger"
)
// StreamLogger holds a set of stream-delineated loggers. Each logger is used
// to communicate different kinds of logs to the API Server or local file system.
// StreamLogger is used to distinguish logs of system statistics, shell output,
// and internal agent logs.
type StreamLogger struct {
// Local is used for file system logging on the host the agent is running on.
Local *slogger.Logger
// System is used for logging system stats gotten from commands like df, ps, etc.
System *slogger.Logger
// Task is used for logging command input, output and errors of the task.
Task *slogger.Logger
// Execution is used for logging the agent's internal state.
Execution *slogger.Logger
// apiLogger is used to send data back to the API server.
apiLogger APILogger
}
// GetTaskLogWriter returns an io.Writer of the given level that writes to the task log stream.
func (lgr *StreamLogger) GetTaskLogWriter(level slogger.Level) io.Writer {
return &evergreen.LoggingWriter{
Logger: lgr.Task,
Severity: level.Priority(),
}
}
// GetSystemLogWriter returns an io.Writer of the given level that writes to the system log stream.
func (lgr *StreamLogger) GetSystemLogWriter(level slogger.Level) io.Writer {
return &evergreen.LoggingWriter{
Logger: lgr.System,
Severity: level.Priority(),
}
}
// appendMessages sends a log message to every component sender in a slogger.Logger
func appendMessages(logger *slogger.Logger, msg message.Composer) {
for _, sender := range logger.Appenders {
sender.Send(msg)
}
}
// LogLocal logs a message to the agent logs on the machine's local file system.
//
// Anything logged by this method will not be sent to the server, so only use it
// to log information that would only be useful when debugging locally.
func (lgr *StreamLogger) LogLocal(level slogger.Level, messageFmt string, args ...interface{}) {
msg := slogger.NewPrefixedLog(lgr.Local.Name, message.NewFormattedMessage(level.Priority(), messageFmt, args...))
appendMessages(lgr.Local, msg)
}
// LogExecution logs a message related to the agent's internal workings.
//
// Internally this is used to log things like heartbeats and command internals that
// would pollute the regular task test output.
func (lgr *StreamLogger) LogExecution(level slogger.Level, messageFmt string, args ...interface{}) {
msg := slogger.NewPrefixedLog(lgr.Execution.Name, message.NewFormattedMessage(level.Priority(), messageFmt, args...))
appendMessages(lgr.Execution, msg)
}
// LogTask logs a message to the task's logs.
//
// This log type is for main task input and output. LogTask should be used for logging
// first-class information like test results and shell script output.
func (lgr *StreamLogger) LogTask(level slogger.Level, messageFmt string, args ...interface{}) {
msg := slogger.NewPrefixedLog(lgr.Task.Name, message.NewFormattedMessage(level.Priority(), messageFmt, args...))
appendMessages(lgr.Task, msg)
}
// LogSystem logs passive system messages.
//
// Internally this is used for periodically logging process information and CPU usage.
func (lgr *StreamLogger) LogSystem(level slogger.Level, messageFmt string, args ...interface{}) {
msg := slogger.NewPrefixedLog(lgr.System.Name, message.NewFormattedMessage(level.Priority(), messageFmt, args...))
appendMessages(lgr.System, msg)
}
// Flush flushes the logs to the server. Returns immediately.
func (lgr *StreamLogger) Flush() {
lgr.apiLogger.Flush()
}
// FlushAndWait flushes and blocks until the HTTP request to send the logs has completed.
// This works in contrast with Flush, which triggers the flush asynchronously.
func (lgr *StreamLogger) FlushAndWait() int {
return lgr.apiLogger.FlushAndWait()
}
// Wraps an Logger, with additional context about which command is currently being run.
type CommandLogger struct {
commandName string
logger *StreamLogger
}
func NewCommandLogger(name string, logger *StreamLogger) *CommandLogger {
return &CommandLogger{name, logger}
}
func (cmdLgr *CommandLogger) addCommandToMsgAndArgs(messageFmt string, args []interface{}) (string, []interface{}) {
return "[%v] " + messageFmt, append([]interface{}{cmdLgr.commandName}, args...)
}
func (cmdLgr *CommandLogger) GetTaskLogWriter(level slogger.Level) io.Writer {
return cmdLgr.logger.GetTaskLogWriter(level)
}
func (cmdLgr *CommandLogger) GetSystemLogWriter(level slogger.Level) io.Writer {
return cmdLgr.logger.GetSystemLogWriter(level)
}
func (cmdLgr *CommandLogger) LogLocal(level slogger.Level, messageFmt string, args ...interface{}) {
messageFmt, args = cmdLgr.addCommandToMsgAndArgs(messageFmt, args)
cmdLgr.logger.LogLocal(level, messageFmt, args...)
}
func (cmdLgr *CommandLogger) LogExecution(level slogger.Level, messageFmt string, args ...interface{}) {
messageFmt, args = cmdLgr.addCommandToMsgAndArgs(messageFmt, args)
cmdLgr.logger.LogExecution(level, messageFmt, args...)
}
func (cmdLgr *CommandLogger) LogTask(level slogger.Level, messageFmt string, args ...interface{}) {
messageFmt, args = cmdLgr.addCommandToMsgAndArgs(messageFmt, args)
cmdLgr.logger.LogTask(level, messageFmt, args...)
}
func (cmdLgr *CommandLogger) LogSystem(level slogger.Level, messageFmt string, args ...interface{}) {
messageFmt, args = cmdLgr.addCommandToMsgAndArgs(messageFmt, args)
cmdLgr.logger.LogSystem(level, messageFmt, args...)
}
func (cmdLgr *CommandLogger) Flush() {
cmdLgr.logger.Flush()
}
// NewStreamLogger creates a StreamLogger wrapper for the apiLogger with a given timeoutWatcher.
// Any logged messages on the StreamLogger will reset the TimeoutWatcher.
func NewStreamLogger(timeoutWatcher *TimeoutWatcher, apiLgr *APILogger) (*StreamLogger, error) {
defaultLoggers := []send.Sender{slogger.WrapAppender(apiLgr), grip.GetSender()}
timeoutLogger := slogger.WrapAppender(&TimeoutResetLogger{timeoutWatcher, apiLgr})
return &StreamLogger{
Local: &slogger.Logger{
Name: "local",
Appenders: []send.Sender{grip.GetSender()},
},
System: &slogger.Logger{
Name: model.SystemLogPrefix,
Appenders: defaultLoggers,
},
Task: &slogger.Logger{
Name: model.TaskLogPrefix,
Appenders: []send.Sender{timeoutLogger, grip.GetSender()},
},
Execution: &slogger.Logger{
Name: model.AgentLogPrefix,
Appenders: defaultLoggers,
},
}, nil
}
// TimeoutResetLogger wraps any slogger.Appender and resets a TimeoutWatcher
// each time any log message is appended to it.
type TimeoutResetLogger struct {
*TimeoutWatcher
*APILogger
}
// Append passes the message to the underlying appender, and resets the timeout
func (trLgr *TimeoutResetLogger) Append(log *slogger.Log) error {
trLgr.TimeoutWatcher.CheckIn()
return trLgr.APILogger.Append(log)
}
// APILogger is a slogger.Appender which makes a call to the
// remote service's log endpoint after SendAfterLines messages have been
// received (or if set, after SendAfterDuration time has passed with no flush).
type APILogger struct {
// An internal buffer of messages to send.
messages []model.LogMessage
// a mutex to ensure only one flush attempt is in progress at a time.
flushLock sync.Mutex
// mutex to protect appends to the buffer so that messages don't get lost
// if both a flush/append happen concurrently
appendLock sync.Mutex
// last time flush actually flushed lines
lastFlush time.Time
// The number of log lines that the buffer must reach to trigger a flush
SendAfterLines int
// How long to wait without any flushes before triggering one automatically
SendAfterDuration time.Duration
// Timer to trigger autoflushing based on SendAfterDuration.
// When set to nil, flushing will only happen when called directly
// or the SendAfterLines threshold is reached.
autoFlushTimer *time.Timer
// The mechanism for communicating with the remote endpoint.
TaskCommunicator
}
// NewAPILogger creates an initialized logger around the given TaskCommunicator.
func NewAPILogger(tc TaskCommunicator) *APILogger {
sendAfterDuration := 5 * time.Second
return &APILogger{
messages: make([]model.LogMessage, 0, 100),
flushLock: sync.Mutex{},
appendLock: sync.Mutex{},
SendAfterLines: 100,
SendAfterDuration: sendAfterDuration,
autoFlushTimer: time.NewTimer(sendAfterDuration),
TaskCommunicator: tc,
}
}
// Append (to satisfy the Appender interface) adds a log message to the internal
// buffer, and translates the log message into a format that is used by the
// remote endpoint.
func (apiLgr *APILogger) Append(log *slogger.Log) error {
message := strings.TrimRight(log.Message(), "\r\t")
// MCI-972: ensure message is valid UTF-8
if !utf8.ValidString(message) {
message = strconv.QuoteToASCII(message)
}
logMessage := &model.LogMessage{
Timestamp: log.Timestamp,
Severity: levelToString(log.Level),
Type: log.Prefix,
Version: evergreen.LogmessageCurrentVersion,
Message: message,
}
apiLgr.appendLock.Lock()
defer apiLgr.appendLock.Unlock()
apiLgr.messages = append(apiLgr.messages, *logMessage)
if len(apiLgr.messages) < apiLgr.SendAfterLines ||
time.Since(apiLgr.lastFlush) < apiLgr.SendAfterDuration {
return nil
}
apiLgr.flushInternal()
return nil
}
func (apiLgr *APILogger) sendLogs(flushMsgs []model.LogMessage) int {
start := time.Now()
apiLgr.flushLock.Lock()
defer apiLgr.flushLock.Unlock()
if len(flushMsgs) == 0 {
return 0
}
grip.CatchError(apiLgr.TaskCommunicator.Log(flushMsgs))
grip.Infof("sent %d log messages to api server, in %s",
len(flushMsgs), time.Since(start))
return len(flushMsgs)
}
func (apiLgr *APILogger) FlushAndWait() int {
apiLgr.appendLock.Lock()
defer apiLgr.appendLock.Unlock()
apiLgr.lastFlush = time.Now()
if len(apiLgr.messages) == 0 {
return 0
}
numMessages := apiLgr.sendLogs(apiLgr.messages)
apiLgr.messages = make([]model.LogMessage, 0, apiLgr.SendAfterLines)
return numMessages
}
// flushInternal assumes that the caller already holds apiLgr.appendLock
func (apiLgr *APILogger) flushInternal() {
apiLgr.lastFlush = time.Now()
messagesToSend := make([]model.LogMessage, len(apiLgr.messages))
copy(messagesToSend, apiLgr.messages)
apiLgr.messages = make([]model.LogMessage, 0, apiLgr.SendAfterLines)
go apiLgr.sendLogs(messagesToSend)
}
// Flush pushes log messages (asynchronously, without waiting for messages to send.)
func (apiLgr *APILogger) Flush() {
apiLgr.appendLock.Lock()
defer apiLgr.appendLock.Unlock()
apiLgr.flushInternal()
}
func levelToString(level slogger.Level) string {
switch level {
case slogger.DEBUG:
return model.LogDebugPrefix
case slogger.INFO:
return model.LogInfoPrefix
case slogger.WARN:
return model.LogWarnPrefix
case slogger.ERROR:
return model.LogErrorPrefix
}
return "UNKNOWN"
}