-
Notifications
You must be signed in to change notification settings - Fork 0
/
watcher.go
247 lines (227 loc) · 5.88 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
// Copyright ©2023 Dan Kortschak. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package config
import (
"context"
"crypto/sha1"
"encoding/json"
"errors"
"hash"
"io/fs"
"log/slog"
"os"
"path/filepath"
"time"
"github.com/BurntSushi/toml"
"github.com/fsnotify/fsnotify"
"github.com/kortschak/dex/rpc"
)
// FileDebounce is the default duration we wait for the contents to have
// stabilised to work around some editors writing an empty file and then the
// buffer.
const FileDebounce = 10 * time.Millisecond
// Change is a set of related configuration changes identified by Watch.
type Change struct {
Event []fsnotify.Event
Config *System
Err error
}
// Op returns an aggregated fsnotify.Op for all elements of the receivers'
// Event field.
func (c Change) Op() fsnotify.Op {
switch len(c.Event) {
case 0:
return 0
case 1:
return c.Event[0].Op
default:
var op fsnotify.Op
for _, o := range c.Event {
op |= o.Op
}
return op
}
}
// NewWatcher starts an fsnotify.Watcher for the provided directory, sending change
// events on the changes channel. If dir is deleted, it is recreated as a new
// directory and a new watcher is set. The debounce parameter specifies how
// long to wait after an fsnotify.Event before reading the file to ensure that
// writes will be reflected in the state checksum. If it is less than zero,
// FileDebounce is used.
func NewWatcher(ctx context.Context, dir string, changes chan<- Change, debounce time.Duration, log *slog.Logger) (*Watcher, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
_, err = os.Stat(dir)
if err != nil {
if !errors.Is(err, fs.ErrNotExist) {
return nil, err
}
err = os.Mkdir(dir, 0o755)
if err != nil {
return nil, err
}
}
err = watcher.Add(dir)
if err != nil {
return nil, err
}
return newWatcher(changes, watcher, dir, debounce, log).init(ctx)
}
// Watcher collects raw fsnotify.Events and aggregates and filters for
// semantically meaningful configuration changes.
type Watcher struct {
dir string
debounce time.Duration
watcher *fsnotify.Watcher
done chan struct{}
changes chan<- Change
hash hash.Hash
hashes map[string]Sum
log *slog.Logger
}
var watcherUID = rpc.UID{Module: "kernel", Service: "config_watcher"}
// newWatcher returns a new Watcher.
func newWatcher(changes chan<- Change, w *fsnotify.Watcher, dir string, debounce time.Duration, log *slog.Logger) *Watcher {
if debounce < 0 {
debounce = FileDebounce
}
return &Watcher{
dir: dir,
debounce: debounce,
watcher: w,
changes: changes,
log: log.With(slog.String("component", watcherUID.String())),
hash: sha1.New(),
hashes: make(map[string]Sum),
}
}
// init performs an initial scan of the streamProcessor's directory, sending
// create events for all toml files found in the directory.
func (w *Watcher) init(ctx context.Context) (*Watcher, error) {
de, err := os.ReadDir(w.dir)
if err != nil {
return nil, err
}
w.done = make(chan struct{})
go func() {
defer close(w.done)
for _, e := range de {
name := e.Name()
if filepath.Ext(name) != ".toml" {
continue
}
path := filepath.Join(w.dir, name)
fi, err := os.Stat(path)
if err != nil {
w.changes <- Change{Err: err}
continue
}
if fi.IsDir() {
continue
}
b, err := os.ReadFile(path)
if err != nil {
w.log.LogAttrs(ctx, slog.LevelError, "read file", slog.Any("error", err))
w.changes <- Change{Err: err}
continue
}
cfg, sum, err := unmarshalConfigs(w.hash, b)
if cfg != nil {
w.hashes[path] = sum
}
w.changes <- Change{
Event: []fsnotify.Event{{Name: path, Op: fsnotify.Create}},
Config: cfg,
Err: err,
}
}
}()
return w, nil
}
// unmarshalConfigs returns a, potentially partial, configuration and its
// semantic hash from the provided raw data.
func unmarshalConfigs(h hash.Hash, b []byte) (cfg *System, sum Sum, _ error) {
c := &System{}
err := toml.Unmarshal(b, c)
if err != nil {
return nil, sum, err
}
paths, deferredErr := Validate(fragmentSchema, c)
if deferredErr != nil {
c, err = remove(c, paths, false)
if err != nil {
panic(err)
}
}
enc := json.NewEncoder(h)
if c.Kernel != nil {
err = enc.Encode(c.Kernel)
if err != nil {
return nil, sum, err
}
c.Kernel.Sum = (*Sum)(h.Sum(nil))
h.Reset()
}
for name, config := range c.Modules {
err = enc.Encode(config)
if err != nil {
return nil, sum, err
}
config.Sum = (*Sum)(h.Sum(nil))
h.Reset()
c.Modules[name] = config
}
for name, config := range c.Services {
err = enc.Encode(config)
if err != nil {
return nil, sum, err
}
config.Sum = (*Sum)(h.Sum(nil))
h.Reset()
c.Services[name] = config
}
err = enc.Encode(c)
if err != nil {
return nil, sum, err
}
sum = ([sha1.Size]byte)(h.Sum(nil))
h.Reset()
return c, sum, deferredErr
}
// remove removes modules and services in cfg that correspond to invalid
// field paths identified by Vet until no invalid fields are found, and
// returning the result. If safe is true, paths referring to invalid fields
// in the kernel configuration will result in an error. Paths referring to
// the kernel otherwise have no effect. The final result may have no
// configured module or instance.
func remove(cfg *System, paths [][]string, safe bool) (*System, error) {
if safe {
for _, p := range paths {
if len(p) == 0 {
// Not all cue Errors will have a path,
// so we may have an empty path here.
return cfg, errors.New("cannot remove: empty path")
}
if p[0] == "kernel" {
return cfg, errors.New("cannot repair kernel config")
}
}
}
for _, p := range paths {
if len(p) < 2 {
continue
}
switch p[0] {
case kernelName:
cfg.Kernel = nil
case moduleName:
delete(cfg.Modules, p[1])
case serviceName:
delete(cfg.Services, p[1])
}
}
return cfg, nil
}