-
Notifications
You must be signed in to change notification settings - Fork 63
/
logtail.go
124 lines (103 loc) · 2.66 KB
/
logtail.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// SPDX-License-Identifier: BSD-3-Clause
// Copyright (c) 2022, Unikraft GmbH and The KraftKit Authors.
// Licensed under the BSD-3-Clause License (the "License").
// You may not use this file expect in compliance with the License.
package logtail
import (
"bufio"
"bytes"
"context"
"io"
"os"
"github.com/fsnotify/fsnotify"
)
const (
DefaultTailBufferSize = 4 * 1024
DefaultTailPeekSize = 1024
)
// NewLogTail returns a string channel that receives new lines from
// tailing/following the supplied logFile. Errors can also occur while reading
// the file, which are propagated through the error channel. If a fatal error
// occurs during the initialization of this method, the last error is returned.
func NewLogTail(ctx context.Context, logFile string) (chan string, chan error, error) {
f, err := os.Open(logFile)
if err != nil {
return nil, nil, err
}
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, nil, err
}
if err := watcher.Add(logFile); err != nil {
return nil, nil, err
}
logs := make(chan string)
errs := make(chan error)
reader := bufio.NewReaderSize(f, DefaultTailBufferSize)
// Start a goroutine which continuously outputs the logs to the provided
// channel.
go func() {
// First read everything that already exists inside of the log file.
for {
if peekAndRead(f, reader, &logs, &errs) {
break
}
}
for {
select {
case <-ctx.Done():
errs <- ctx.Err()
return
case event, ok := <-watcher.Events:
if !ok {
return
}
switch event.Op {
case fsnotify.Write:
peekAndRead(f, reader, &logs, &errs)
}
}
}
}()
return logs, errs, nil
}
func peekAndRead(file *os.File, reader *bufio.Reader, logs *chan string, errs *chan error) bool {
// discard leading NUL bytes
var discarded int
for {
b, _ := reader.Peek(DefaultTailPeekSize)
i := bytes.LastIndexByte(b, '\x00')
if i > 0 {
n, _ := reader.Discard(i + 1)
discarded += n
}
if i+1 < DefaultTailPeekSize {
break
}
}
s, err := reader.ReadBytes('\n')
if err != nil && err != io.EOF {
*errs <- err
return true
}
// If we encounter EOF before a line delimiter, ReadBytes() will return the
// remaining bytes, so push them back onto the buffer, rewind our seek
// position, and wait for further file changes. We also have to save our
// dangling byte count in the event that we want to re-open the file and
// seek to the end.
if err == io.EOF {
l := len(s)
_, err = file.Seek(-int64(l), io.SeekCurrent)
if err != nil {
*errs <- err
return true
}
reader.Reset(file)
*errs <- io.EOF
return true
}
if len(s) > discarded {
*logs <- string(s[discarded:])
}
return false
}