forked from google/go-cloud
/
filevar.go
333 lines (297 loc) · 10 KB
/
filevar.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
// Copyright 2018 The Go Cloud Development Kit Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package filevar provides a runtimevar implementation with variables
// backed by the filesystem. Use OpenVariable to construct a *runtimevar.Variable.
//
// Configuration files can be updated using any commands (cp, mv) or
// tools/editors. This package does not guarantee read consistency since
// it does not have control over the writes. For example, some kinds of
// updates might result in filevar temporarily receiving an error or an
// empty value.
//
// Known Issues:
//
// * On macOS, if an empty file is copied into a configuration file,
// filevar will not detect the change.
//
// URLs
//
// For runtimevar.OpenVariable, filevar registers for the scheme "file".
// To customize the URL opener, or for more details on the URL format,
// see URLOpener.
// See https://github.com/kainoaseto/go-cloud/concepts/urls/ for background information.
//
// As
//
// filevar does not support any types for As.
package filevar // import "github.com/kainoaseto/go-cloud/runtimevar/filevar"
import (
"bytes"
"context"
"errors"
"fmt"
"io/ioutil"
"net/url"
"os"
"path/filepath"
"strings"
"time"
"github.com/fsnotify/fsnotify"
"github.com/kainoaseto/go-cloud/gcerrors"
"github.com/kainoaseto/go-cloud/runtimevar"
"github.com/kainoaseto/go-cloud/runtimevar/driver"
)
func init() {
runtimevar.DefaultURLMux().RegisterVariable(Scheme, &URLOpener{})
}
// Scheme is the URL scheme filevar registers its URLOpener under on runtimevar.DefaultMux.
const Scheme = "file"
// URLOpener opens filevar URLs like "file:///path/to/config.json?decoder=json".
//
// The URL's host+path is used as the path to the file to watch.
// If os.PathSeparator != "/", any leading "/" from the path is dropped
// and remaining '/' characters are converted to os.PathSeparator.
//
// The following URL parameters are supported:
// - decoder: The decoder to use. Defaults to URLOpener.Decoder, or
// runtimevar.BytesDecoder if URLOpener.Decoder is nil.
// See runtimevar.DecoderByName for supported values.
type URLOpener struct {
// Decoder specifies the decoder to use if one is not specified in the URL.
// Defaults to runtimevar.BytesDecoder.
Decoder *runtimevar.Decoder
// Options specifies the options to pass to OpenVariable.
Options Options
}
// OpenVariableURL opens the variable at the URL's path. See the package doc
// for more details.
func (o *URLOpener) OpenVariableURL(ctx context.Context, u *url.URL) (*runtimevar.Variable, error) {
q := u.Query()
decoderName := q.Get("decoder")
q.Del("decoder")
decoder, err := runtimevar.DecoderByName(ctx, decoderName, o.Decoder)
if err != nil {
return nil, fmt.Errorf("open variable %v: invalid decoder: %v", u, err)
}
for param := range q {
return nil, fmt.Errorf("open variable %v: invalid query parameter %q", u, param)
}
path := u.Path
if os.PathSeparator != '/' {
path = strings.TrimPrefix(path, "/")
}
return OpenVariable(filepath.FromSlash(path), decoder, &o.Options)
}
// Options sets options.
type Options struct {
// WaitDuration controls the frequency of retries after an error. For example,
// if the file does not exist. Defaults to 30 seconds.
WaitDuration time.Duration
}
// OpenVariable constructs a *runtimevar.Variable backed by the file at path.
// The file holds raw bytes; provide a decoder to decode the raw bytes into the
// appropriate type for runtimevar.Snapshot.Value.
// See the runtimevar package documentation for examples of decoders.
func OpenVariable(path string, decoder *runtimevar.Decoder, opts *Options) (*runtimevar.Variable, error) {
w, err := newWatcher(path, decoder, opts)
if err != nil {
return nil, err
}
return runtimevar.New(w), nil
}
func newWatcher(path string, decoder *runtimevar.Decoder, opts *Options) (*watcher, error) {
if opts == nil {
opts = &Options{}
}
if path == "" {
return nil, errors.New("path is required")
}
if decoder == nil {
return nil, errors.New("decoder is required")
}
// Use absolute file path.
abspath, err := filepath.Abs(path)
if err != nil {
return nil, err
}
// Construct a fsnotify.Watcher.
notifier, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
// Create a ctx for the background goroutine that does all of the reading.
// The cancel function will be used to shut it down during Close, with the
// result being passed back via closeCh.
ctx, cancel := context.WithCancel(context.Background())
w := &watcher{
path: abspath,
// See struct comments for why it's buffered.
ch: make(chan *state, 1),
closeCh: make(chan error),
shutdown: cancel,
}
go w.watch(ctx, notifier, abspath, decoder, driver.WaitDuration(opts.WaitDuration))
return w, nil
}
// errNotExist wraps an underlying error in cases where the file likely doesn't
// exist.
type errNotExist struct {
err error
}
func (e *errNotExist) Error() string {
return e.err.Error()
}
// state implements driver.State.
type state struct {
val interface{}
updateTime time.Time
raw []byte
err error
}
func (s *state) Value() (interface{}, error) {
return s.val, s.err
}
func (s *state) UpdateTime() time.Time {
return s.updateTime
}
func (s *state) As(i interface{}) bool {
return false
}
// watcher implements driver.Watcher for configurations stored in files.
type watcher struct {
// The path for the file we're watching.
path string
// The background goroutine writes new *state values to ch.
// It is buffered so that the background goroutine can write without
// blocking; it always drains the buffer before writing so that the latest
// write is buffered. If writes could block, the background goroutine could be
// blocked indefinitely from reading fsnotify events.
ch chan *state
// closeCh is used to return any errors from closing the notifier
// back to watcher.Close.
closeCh chan error
// shutdown tells the background goroutine to exit.
shutdown func()
}
// WatchVariable implements driver.WatchVariable.
func (w *watcher) WatchVariable(ctx context.Context, _ driver.State) (driver.State, time.Duration) {
select {
case <-ctx.Done():
return &state{err: ctx.Err()}, 0
case cur := <-w.ch:
return cur, 0
}
}
// updateState checks to see if s and prev both represent the same error.
// If not, it drains any previous state buffered in w.ch, then writes s to it.
// It always return s.
func (w *watcher) updateState(s, prev *state) *state {
if s.err != nil && prev != nil && prev.err != nil && (s.err == prev.err || s.err.Error() == prev.err.Error() || (os.IsNotExist(s.err) && os.IsNotExist(prev.err))) {
// s represents the same error as prev.
return s
}
// Drain any buffered value on ch; it is now stale.
select {
case <-w.ch:
default:
}
// This write can't block, since we're the only writer, ch has a buffer
// size of 1, and we just read anything that was buffered.
w.ch <- s
return s
}
// watch is run by a background goroutine.
// It watches file using notifier, and writes new states to w.ch.
// If it can't read or watch the file, it re-checks every wait.
// It exits when ctx is canceled, and writes any shutdown errors (or
// nil if there weren't any) to w.closeCh.
func (w *watcher) watch(ctx context.Context, notifier *fsnotify.Watcher, file string, decoder *runtimevar.Decoder, wait time.Duration) {
var cur *state
for {
// If the current state is an error, pause between attempts
// to avoid spin loops. In particular, this happens when the file
// doesn't exist.
if cur != nil && cur.err != nil {
select {
case <-ctx.Done():
w.closeCh <- notifier.Close()
return
case <-time.After(wait):
}
}
// Add the file to the notifier to be watched. It's fine to be
// added multiple times, and fsnotifier is a bit flaky about when
// it's needed during renames, so just always try.
if err := notifier.Add(file); err != nil {
// File probably does not exist. Try again later.
cur = w.updateState(&state{err: &errNotExist{err}}, cur)
continue
}
// Read the file.
b, err := ioutil.ReadFile(file)
if err != nil {
// File probably does not exist. Try again later.
cur = w.updateState(&state{err: &errNotExist{err}}, cur)
continue
}
// If it's a new value, decode and return it.
if cur == nil || cur.err != nil || !bytes.Equal(cur.raw, b) {
if val, err := decoder.Decode(ctx, b); err != nil {
cur = w.updateState(&state{err: err}, cur)
} else {
cur = w.updateState(&state{val: val, updateTime: time.Now(), raw: b}, cur)
}
}
// Block until notifier tells us something relevant changed.
wait := true
for wait {
select {
case <-ctx.Done():
w.closeCh <- notifier.Close()
return
case event := <-notifier.Events:
if event.Name != file {
continue
}
// Ignore if not one of the following operations.
if event.Op&(fsnotify.Create|fsnotify.Write|fsnotify.Remove|fsnotify.Rename) == 0 {
continue
}
wait = false
case err := <-notifier.Errors:
cur = w.updateState(&state{err: err}, cur)
}
}
}
}
// Close implements driver.WatchVariable.
func (w *watcher) Close() error {
// Tell the background goroutine to shut down by canceling its ctx.
w.shutdown()
// Wait for it to return the result of closing the notifier.
err := <-w.closeCh
// Cleanup our channels.
close(w.ch)
close(w.closeCh)
return err
}
// ErrorAs implements driver.ErrorAs.
func (w *watcher) ErrorAs(err error, i interface{}) bool { return false }
// ErrorCode implements driver.ErrorCode.
func (*watcher) ErrorCode(err error) gcerrors.ErrorCode {
if _, ok := err.(*errNotExist); ok {
return gcerrors.NotFound
}
return gcerrors.Unknown
}