forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
container_log_manager.go
387 lines (356 loc) · 11.8 KB
/
container_log_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logs
import (
"compress/gzip"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"strings"
"time"
"k8s.io/klog"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/wait"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
)
const (
// logMonitorPeriod is the period container log manager monitors
// container logs and performs log rotation.
logMonitorPeriod = 10 * time.Second
// timestampFormat is format of the timestamp suffix for rotated log.
// See https://golang.org/pkg/time/#Time.Format.
timestampFormat = "20060102-150405"
// compressSuffix is the suffix for compressed log.
compressSuffix = ".gz"
// tmpSuffix is the suffix for temporary file.
tmpSuffix = ".tmp"
)
// ContainerLogManager manages lifecycle of all container logs.
//
// Implementation is thread-safe.
type ContainerLogManager interface {
// TODO(random-liu): Add RotateLogs function and call it under disk pressure.
// Start container log manager.
Start()
}
// LogRotatePolicy is a policy for container log rotation. The policy applies to all
// containers managed by kubelet.
type LogRotatePolicy struct {
// MaxSize in bytes of the container log file before it is rotated. Negative
// number means to disable container log rotation.
MaxSize int64
// MaxFiles is the maximum number of log files that can be present.
// If rotating the logs creates excess files, the oldest file is removed.
MaxFiles int
}
// GetAllLogs gets all inuse (rotated/compressed) logs for a specific container log.
// Returned logs are sorted in oldest to newest order.
// TODO(#59902): Leverage this function to support log rotation in `kubectl logs`.
func GetAllLogs(log string) ([]string, error) {
// pattern is used to match all rotated files.
pattern := fmt.Sprintf("%s.*", log)
logs, err := filepath.Glob(pattern)
if err != nil {
return nil, fmt.Errorf("failed to list all log files with pattern %q: %v", pattern, err)
}
inuse, _ := filterUnusedLogs(logs)
sort.Strings(inuse)
return append(inuse, log), nil
}
// compressReadCloser wraps gzip.Reader with a function to close file handler.
type compressReadCloser struct {
f *os.File
*gzip.Reader
}
func (rc *compressReadCloser) Close() error {
ferr := rc.f.Close()
rerr := rc.Reader.Close()
if ferr != nil {
return ferr
}
if rerr != nil {
return rerr
}
return nil
}
// UncompressLog compresses a compressed log and return a readcloser for the
// stream of the uncompressed content.
// TODO(#59902): Leverage this function to support log rotation in `kubectl logs`.
func UncompressLog(log string) (_ io.ReadCloser, retErr error) {
if !strings.HasSuffix(log, compressSuffix) {
return nil, fmt.Errorf("log is not compressed")
}
f, err := os.Open(log)
if err != nil {
return nil, fmt.Errorf("failed to open log: %v", err)
}
defer func() {
if retErr != nil {
f.Close()
}
}()
r, err := gzip.NewReader(f)
if err != nil {
return nil, fmt.Errorf("failed to create gzip reader: %v", err)
}
return &compressReadCloser{f: f, Reader: r}, nil
}
// parseMaxSize parses quantity string to int64 max size in bytes.
func parseMaxSize(size string) (int64, error) {
quantity, err := resource.ParseQuantity(size)
if err != nil {
return 0, err
}
maxSize, ok := quantity.AsInt64()
if !ok {
return 0, fmt.Errorf("invalid max log size")
}
if maxSize < 0 {
return 0, fmt.Errorf("negative max log size %d", maxSize)
}
return maxSize, nil
}
type containerLogManager struct {
runtimeService internalapi.RuntimeService
policy LogRotatePolicy
clock clock.Clock
}
// NewContainerLogManager creates a new container log manager.
func NewContainerLogManager(runtimeService internalapi.RuntimeService, maxSize string, maxFiles int) (ContainerLogManager, error) {
if maxFiles <= 1 {
return nil, fmt.Errorf("invalid MaxFiles %d, must be > 1", maxFiles)
}
parsedMaxSize, err := parseMaxSize(maxSize)
if err != nil {
return nil, fmt.Errorf("failed to parse container log max size %q: %v", maxSize, err)
}
// policy LogRotatePolicy
return &containerLogManager{
runtimeService: runtimeService,
policy: LogRotatePolicy{
MaxSize: parsedMaxSize,
MaxFiles: maxFiles,
},
clock: clock.RealClock{},
}, nil
}
// Start the container log manager.
func (c *containerLogManager) Start() {
// Start a goroutine periodically does container log rotation.
go wait.Forever(func() {
if err := c.rotateLogs(); err != nil {
klog.Errorf("Failed to rotate container logs: %v", err)
}
}, logMonitorPeriod)
}
func (c *containerLogManager) rotateLogs() error {
// TODO(#59998): Use kubelet pod cache.
containers, err := c.runtimeService.ListContainers(&runtimeapi.ContainerFilter{})
if err != nil {
return fmt.Errorf("failed to list containers: %v", err)
}
// NOTE(random-liu): Figure out whether we need to rotate container logs in parallel.
for _, container := range containers {
// Only rotate logs for running containers. Non-running containers won't
// generate new output, it doesn't make sense to keep an empty latest log.
if container.GetState() != runtimeapi.ContainerState_CONTAINER_RUNNING {
continue
}
id := container.GetId()
// Note that we should not block log rotate for an error of a single container.
status, err := c.runtimeService.ContainerStatus(id)
if err != nil {
klog.Errorf("Failed to get container status for %q: %v", id, err)
continue
}
path := status.GetLogPath()
info, err := os.Stat(path)
if err != nil {
if !os.IsNotExist(err) {
klog.Errorf("Failed to stat container log %q: %v", path, err)
continue
}
// In rotateLatestLog, there are several cases that we may
// lose original container log after ReopenContainerLog fails.
// We try to recover it by reopening container log.
if err := c.runtimeService.ReopenContainerLog(id); err != nil {
klog.Errorf("Container %q log %q doesn't exist, reopen container log failed: %v", id, path, err)
continue
}
// The container log should be recovered.
info, err = os.Stat(path)
if err != nil {
klog.Errorf("Failed to stat container log %q after reopen: %v", path, err)
continue
}
}
if info.Size() < c.policy.MaxSize {
continue
}
// Perform log rotation.
if err := c.rotateLog(id, path); err != nil {
klog.Errorf("Failed to rotate log %q for container %q: %v", path, id, err)
continue
}
}
return nil
}
func (c *containerLogManager) rotateLog(id, log string) error {
// pattern is used to match all rotated files.
pattern := fmt.Sprintf("%s.*", log)
logs, err := filepath.Glob(pattern)
if err != nil {
return fmt.Errorf("failed to list all log files with pattern %q: %v", pattern, err)
}
logs, err = c.cleanupUnusedLogs(logs)
if err != nil {
return fmt.Errorf("failed to cleanup logs: %v", err)
}
logs, err = c.removeExcessLogs(logs)
if err != nil {
return fmt.Errorf("failed to remove excess logs: %v", err)
}
// Compress uncompressed log files.
for _, l := range logs {
if strings.HasSuffix(l, compressSuffix) {
continue
}
if err := c.compressLog(l); err != nil {
return fmt.Errorf("failed to compress log %q: %v", l, err)
}
}
if err := c.rotateLatestLog(id, log); err != nil {
return fmt.Errorf("failed to rotate log %q: %v", log, err)
}
return nil
}
// cleanupUnusedLogs cleans up temporary or unused log files generated by previous log rotation
// failure.
func (c *containerLogManager) cleanupUnusedLogs(logs []string) ([]string, error) {
inuse, unused := filterUnusedLogs(logs)
for _, l := range unused {
if err := os.Remove(l); err != nil {
return nil, fmt.Errorf("failed to remove unused log %q: %v", l, err)
}
}
return inuse, nil
}
// filterUnusedLogs splits logs into 2 groups, the 1st group is in used logs,
// the second group is unused logs.
func filterUnusedLogs(logs []string) (inuse []string, unused []string) {
for _, l := range logs {
if isInUse(l, logs) {
inuse = append(inuse, l)
} else {
unused = append(unused, l)
}
}
return inuse, unused
}
// isInUse checks whether a container log file is still inuse.
func isInUse(l string, logs []string) bool {
// All temporary files are not in use.
if strings.HasSuffix(l, tmpSuffix) {
return false
}
// All compresed logs are in use.
if strings.HasSuffix(l, compressSuffix) {
return true
}
// Files has already been compressed are not in use.
for _, another := range logs {
if l+compressSuffix == another {
return false
}
}
return true
}
// removeExcessLogs removes old logs to make sure there are only at most MaxFiles log files.
func (c *containerLogManager) removeExcessLogs(logs []string) ([]string, error) {
// Sort log files in oldest to newest order.
sort.Strings(logs)
// Container will create a new log file, and we'll rotate the latest log file.
// Other than those 2 files, we can have at most MaxFiles-2 rotated log files.
// Keep MaxFiles-2 files by removing old files.
// We should remove from oldest to newest, so as not to break ongoing `kubectl logs`.
maxRotatedFiles := c.policy.MaxFiles - 2
if maxRotatedFiles < 0 {
maxRotatedFiles = 0
}
i := 0
for ; i < len(logs)-maxRotatedFiles; i++ {
if err := os.Remove(logs[i]); err != nil {
return nil, fmt.Errorf("failed to remove old log %q: %v", logs[i], err)
}
}
logs = logs[i:]
return logs, nil
}
// compressLog compresses a log to log.gz with gzip.
func (c *containerLogManager) compressLog(log string) error {
r, err := os.Open(log)
if err != nil {
return fmt.Errorf("failed to open log %q: %v", log, err)
}
defer r.Close()
tmpLog := log + tmpSuffix
f, err := os.OpenFile(tmpLog, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return fmt.Errorf("failed to create temporary log %q: %v", tmpLog, err)
}
defer func() {
// Best effort cleanup of tmpLog.
os.Remove(tmpLog)
}()
defer f.Close()
w := gzip.NewWriter(f)
defer w.Close()
if _, err := io.Copy(w, r); err != nil {
return fmt.Errorf("failed to compress %q to %q: %v", log, tmpLog, err)
}
compressedLog := log + compressSuffix
if err := os.Rename(tmpLog, compressedLog); err != nil {
return fmt.Errorf("failed to rename %q to %q: %v", tmpLog, compressedLog, err)
}
// Remove old log file.
if err := os.Remove(log); err != nil {
return fmt.Errorf("failed to remove log %q after compress: %v", log, err)
}
return nil
}
// rotateLatestLog rotates latest log without compression, so that container can still write
// and fluentd can finish reading.
func (c *containerLogManager) rotateLatestLog(id, log string) error {
timestamp := c.clock.Now().Format(timestampFormat)
rotated := fmt.Sprintf("%s.%s", log, timestamp)
if err := os.Rename(log, rotated); err != nil {
return fmt.Errorf("failed to rotate log %q to %q: %v", log, rotated, err)
}
if err := c.runtimeService.ReopenContainerLog(id); err != nil {
// Rename the rotated log back, so that we can try rotating it again
// next round.
// If kubelet gets restarted at this point, we'll lose original log.
if renameErr := os.Rename(rotated, log); renameErr != nil {
// This shouldn't happen.
// Report an error if this happens, because we will lose original
// log.
klog.Errorf("Failed to rename rotated log %q back to %q: %v, reopen container log error: %v", rotated, log, renameErr, err)
}
return fmt.Errorf("failed to reopen container log %q: %v", id, err)
}
return nil
}