-
Notifications
You must be signed in to change notification settings - Fork 16
/
changelog.go
278 lines (260 loc) · 7.12 KB
/
changelog.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
package event
import (
"bufio"
"bytes"
"context"
"encoding/json"
"io"
"os"
"path/filepath"
"time"
"github.com/fsnotify/fsnotify"
"github.com/segmentio/ctlstore/pkg/errs"
"github.com/segmentio/errors-go"
"github.com/segmentio/events/v2"
"github.com/segmentio/stats/v4"
)
type (
// fileChangelog is the main implementation of changelog. it continually reads
// from the file changelog and detects when it has been rotated. it buffers
// events in its composed events channel so that it can keep reading while events
// are being processed.
fileChangelog struct {
path string
events chan eventErr
}
// eventErr simply composes an event or an error. the events chan is a channel of
// this type, and is used to send errors encountered during reading the changelog
// to the iterator, and ultimately, its client.
eventErr struct {
event Event
err error
}
)
func newFileChangelog(path string) *fileChangelog {
return &fileChangelog{
path: path, // the path of the changelog on disk
events: make(chan eventErr, 1024), // probably good to have a buffer here
}
}
// next produces the next changelog Event, using a context for cancellation.
// the iterator will call this method to produce events to its client.
func (c *fileChangelog) next(ctx context.Context) (Event, error) {
select {
case ee := <-c.events:
return ee.event, ee.err
case <-ctx.Done():
return Event{}, ctx.Err()
}
}
// start creates the fs watchers and starts reading from
// the on-disk changelog. this method does not block while
// the changelog is reading from the filesystem.
//
// The context should be canceled when the changelog is no
// longer needed. If this method returns an error the
// *fileChangelog is not usable.
func (c *fileChangelog) start(ctx context.Context) error {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return errors.Wrap(err, "create fsnotify watcher")
}
go func() {
select {
case <-ctx.Done():
if err := watcher.Close(); err != nil {
events.Log("Could not close watcher: %{err}s", err)
}
}
}()
paths := []string{c.path, filepath.Dir(c.path)}
for _, w := range paths {
if err := watcher.Add(w); err != nil {
return errors.Wrapf(err, "could not watch '%s'", w)
}
}
fsNotifyCh := make(chan fsnotify.Event)
fsErrCh := make(chan error)
// fsnotify recommends reading the error and events chans from
// separate goroutines. indeed, not doing this causes our tests
// to fail.
go func() {
for {
select {
case err := <-watcher.Errors:
events.Log("FS err: %{err}s", err)
select {
case fsErrCh <- err:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
go func() {
for {
select {
case event := <-watcher.Events:
// filter out events that are not related to our changelog since
// we are also watching the parent dir of the changelog in order
// to correctly detect file rotation.
if event.Name == c.path {
select {
case fsNotifyCh <- event:
case <-ctx.Done():
return
}
}
case <-ctx.Done():
return
}
}
}()
go c.read(ctx, fsNotifyCh, fsErrCh)
return nil
}
// read continually reads from the changelog until the context is cancelled.
func (c *fileChangelog) read(ctx context.Context, fsNotifyCh chan fsnotify.Event, fsErrCh chan error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// each iteration opens the file and reads until it is rotated.
for ctx.Err() == nil {
// Read as much as possible until you hit EOF. Then wait for a notification
// from fsnotify or just check again later.
err := func() error {
// first, open the changelog
f, err := os.Open(c.path)
if err != nil {
return errors.Wrap(err, "open changelog")
}
defer func() {
if err := f.Close(); err != nil {
errs.Incr("changelog-errors", stats.T("op", "close file"))
events.Log("Could not close changelog file: %{error}s", err)
}
}()
events.Debug("Opening changelog...")
br := bufio.NewReaderSize(f, 60*1024)
handleEventData := func(b []byte) {
if len(b) == 0 {
// don't bother
return
}
var entry entry
if err := json.Unmarshal(b, &entry); err != nil {
c.send(ctx, eventErr{err: errors.Wrapf(err, "parse entry '%s'", b)})
errs.Incr("changelog-errors", stats.T("op", "parse json"))
return
}
event := entry.event()
c.send(ctx, eventErr{event: event})
}
var buffer bytes.Buffer
readEvents := func() error {
for {
b, err := br.ReadBytes('\n')
// if we hit an error, reset our buffer and quit
if err != nil && err != io.EOF {
buffer.Reset()
return err
}
// add to the buffer
buffer.Write(b)
// check to see if the buffer ends with a newline
if buffer.Len() > 0 {
if buffer.Bytes()[buffer.Len()-1] == '\n' {
handleEventData(buffer.Bytes())
buffer.Reset()
}
}
if err != nil {
return err
}
}
}
// loop and read as many lines as possible.
for {
err = readEvents()
if err != io.EOF {
return errors.Wrap(err, "read bytes")
}
select {
case <-time.After(time.Second):
events.Debug("Manually checking log")
continue
case err := <-fsErrCh:
if err := readEvents(); err != io.EOF {
events.Log("could not consume rest of file: %{error}s", err)
}
return errors.Wrap(err, "watcher error")
case event := <-fsNotifyCh:
switch event.Op {
case fsnotify.Write:
continue
case fsnotify.Create:
events.Debug("New changelog created. Consuming the rest of current one...")
err := readEvents()
if err != io.EOF {
return errors.Wrap(err, "consume rest of changelog")
}
events.Debug("Restarting reader")
return nil
}
case <-ctx.Done():
events.Debug("Changelog context finished. Exiting.")
return ctx.Err()
}
}
}()
switch {
case err == nil:
// loop again
case errs.IsCanceled(err):
return
case os.IsNotExist(errors.Cause(err)):
events.Log("Changelog file does not exist, rechecking...")
select {
case <-fsNotifyCh:
events.Log("Changelog notified")
case <-time.After(time.Second):
events.Log("Manually checking")
}
default:
errs.Incr("changelog-errors", stats.T("op", "open file"))
c.send(ctx, eventErr{err: err})
time.Sleep(time.Second)
}
} // for
}
// send attempts to send an eventErr on the c.event channel.
func (c *fileChangelog) send(ctx context.Context, ee eventErr) {
select {
case c.events <- ee:
case <-ctx.Done():
}
}
// validate ensures that the changelog exists before starting.
// if it does not already exist it will wait one second and
// try again.
func (c *fileChangelog) validate() error {
_, err := os.Stat(c.path)
switch {
case err == nil:
case os.IsNotExist(err):
events.Log("changelog does not exist. waiting 1s for rotation")
time.Sleep(time.Second)
_, err = os.Stat(c.path)
switch {
case err == nil:
case os.IsNotExist(err):
return errors.Wrap(err, "changelog does not exist")
default:
return errors.Wrap(err, "stat changelog")
}
default:
return errors.Wrap(err, "stat changelog")
}
return nil
}