/
watcher.go
430 lines (361 loc) · 9.25 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
// Copyright (c) 2016 Paul Jolly <paul@myitcv.org.uk>, all rights reserved.
// Use of this document is governed by a license found in the LICENSE document.
// watcher is a Linux-based directory watcher for triggering commands
package main
import (
"crypto/sha256"
"flag"
"fmt"
"io"
"log"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
fsnotify "gopkg.in/fsnotify/fsnotify.v1"
"github.com/kr/fs"
)
// TODO
// Warning: this code is pretty messy; some of my first Go code
//
// * implement timeout for killing long-running process
var (
fIgnorePaths ignorePaths
fDebug = flag.Bool("debug", false, "give debug output")
fQuiet = flag.Duration("q", 100*time.Millisecond, "the duration of the 'quiet' window; format is 1s, 10us etc. Min 1 millisecond")
fPath = flag.String("p", "", "the path to watch; default is CWD [*]")
fFollow = flag.Bool("f", false, "whether to follow symlinks or not (recursively) [*]")
fDie = flag.Bool("d", false, "die on first notification; only consider -p and -f flags")
fDontClearScreen = flag.Bool("c", false, "do not clear the screen before running the command")
fNotInitial = flag.Bool("i", false, "don't run command at time zero; only applies when -d not supplied")
fTimeout = flag.Duration("t", 0, "the timeout after which a process is killed; not valid with -k")
fDontKill = flag.Bool("k", false, "don't kill the running command on a new notification")
hashCache = make(map[string]string)
)
const (
GitDir = ".git"
)
var GloballyIgnoredDirs = []string{GitDir}
func init() {
flag.Var(&fIgnorePaths, "I", "Paths to ignore. Absolute paths are absolute to the path; relative paths can match anywhere in the tree")
}
type ignorePaths []string
func (i *ignorePaths) Set(value string) error {
*i = append(*i, value)
return nil
}
func (i *ignorePaths) String() string {
return fmt.Sprint(*i)
}
func showUsage() {
fmt.Fprintf(os.Stderr, "Command mode:\n\t%v [-q duration] [-p /path/to/watch] [-i] [-f] [-c] [-k] CMD ARG1 ARG2...\n\nDie mode:\n\t%v -d [-p /path/to/watch] [-f]\n\n", os.Args[0], os.Args[0])
flag.PrintDefaults()
fmt.Fprintf(os.Stderr, "\nOnly options marked with [*] are valid in die mode\n")
}
//go:generate gobin -m -run myitcv.io/cmd/pkgconcat -out gen_cliflag.go myitcv.io/_tmpls/cliflag
func main() {
setupAndParseFlags("")
if *fDebug {
*fDontClearScreen = true
}
path := *fPath
if path == "" {
path = "."
}
path, err := filepath.Abs(path)
if err != nil {
panic(err)
}
_, err = os.Stat(path)
if err != nil {
fatalf("Could not stat -p supplied path [%v]: %v\n", path, err)
}
if *fDie {
if *fQuiet < 0 {
fatalf("Quiet window duration [%v] must be positive\n", *fQuiet)
}
if *fTimeout < 0 {
fatalf("Command timeout duration [%v] must be positive\n", *fTimeout)
}
}
if *fDie && *fQuiet < time.Millisecond {
log.Fatalln("Quiet time period must be at least 1 millisecond")
}
w, err := newWatcher()
if err != nil {
fatalf("Could not create a watcher: %v\n", err)
}
defer w.close()
w.kill = !*fDontKill
w.timeout = *fTimeout
w.quiet = *fQuiet
w.initial = !*fNotInitial
w.command = flag.Args()
w.clearScreen = !*fDontClearScreen
w.ignorePaths = append(fIgnorePaths, GloballyIgnoredDirs...)
w.absPath = path
if *fDie {
w.watchOnce(path)
} else {
w.watchLoop(path)
}
}
type watcher struct {
iwatcher *fsnotify.Watcher
kill bool
clearScreen bool
command []string
ignorePaths []string
absPath string
initial bool
timeout time.Duration
quiet time.Duration
}
func newWatcher() (*watcher, error) {
res := &watcher{}
w, err := fsnotify.NewWatcher()
if err != nil {
return nil, fmt.Errorf("could not create a watcher: %v", err)
}
res.iwatcher = w
return res, nil
}
func (w *watcher) close() error {
err := w.iwatcher.Close()
if err != nil {
return fmt.Errorf("could not close watcher: %v", err)
}
return nil
}
func (w *watcher) recursiveWatchAdd(p string) {
// p is a path; may or may not be a directory
fi, err := os.Stat(p)
if err != nil {
debugf("** recursiveWatchAdd: os.Stat(%v): %v\n", p, err)
return
}
if !fi.IsDir() {
hashCache[p] = hash(p)
if err := w.iwatcher.Add(p); err != nil {
debugf("** recursiveWatchAdd: watcher add to %v: %v\n", p, err)
}
return
}
walker := fs.Walk(p)
WalkLoop:
for walker.Step() {
if err := walker.Err(); err != nil {
debugf("** recursiveWatchAdd: walker.Err: %v\n", err)
continue
}
s := walker.Stat()
hashCache[walker.Path()] = hash(walker.Path())
if s.IsDir() {
for _, s := range w.ignorePaths {
rel, _ := filepath.Rel(w.absPath, walker.Path())
if filepath.IsAbs(s) {
nonAbs := strings.TrimPrefix(s, "/")
if nonAbs == rel {
walker.SkipDir()
continue WalkLoop
}
} else {
if strings.HasSuffix(rel, s) {
walker.SkipDir()
continue WalkLoop
}
}
}
if err := w.iwatcher.Add(walker.Path()); err != nil {
debugf("** recursiveWatchAdd: walker add watch to dir member %v: %v\n", walker.Path(), err)
}
} else {
if err := w.iwatcher.Add(walker.Path()); err != nil {
debugf("** recursiveWatchAdd: walker add watch to %v: %v\n", walker.Path(), err)
}
}
}
}
func (w *watcher) recursiveWatchRemove(p string) error {
// TODO make this recursive if needs be?
err := w.iwatcher.Remove(p)
if err != nil {
// TODO anything better to do that just swallow it?
}
return nil
}
func (w *watcher) watchOnce(p string) {
w.recursiveWatchAdd(p)
retVal := 0
select {
case _ = <-w.iwatcher.Events:
// TODO handle the queue overflow? probably not needed
case _ = <-w.iwatcher.Errors:
// TODO handle the queue overflow
retVal = 1
}
os.Exit(retVal)
}
// in case of any errors simply return "" because we're probably
// racing with another process
func hash(fn string) (res string) {
h := sha256.New()
fi, err := os.Stat(fn)
if err != nil {
return
}
f, err := os.Open(fn)
if err != nil {
return
}
defer f.Close()
if fi.IsDir() {
ns, err := f.Readdirnames(0)
if err != nil {
return
}
for _, e := range ns {
h.Write([]byte(e))
}
} else {
if _, err := io.Copy(h, f); err != nil {
return
}
}
return string(h.Sum(nil))
}
func (w *watcher) watchLoop(p string) {
w.recursiveWatchAdd(p)
workBus := make(chan struct{})
eventBus := make(chan fsnotify.Event)
go w.commandLoop(workBus)
// buffer
go func() {
var buffer []fsnotify.Event
var backlog []fsnotify.Event
var timers []<-chan time.Time
debugf("buffer loop> initial: %v\n", w.initial)
if w.initial {
// dummy event
buffer = append(buffer, fsnotify.Event{})
timers = append(timers, time.After(0))
}
Buffer:
for {
debugln("buffer loop> start")
var timeout <-chan time.Time
if len(timers) > 0 {
debugln("buffer loop> can timeout")
timeout = timers[0]
}
var doWork chan struct{}
if len(backlog) > 0 {
debugln("buffer loop> have backlog")
doWork = workBus
}
select {
case e := <-eventBus:
for _, b := range buffer {
if b == e {
continue Buffer
}
}
for _, b := range backlog {
if b == e {
continue Buffer
}
}
buffer = append(buffer, e)
timers = append(timers, time.After(w.quiet))
case <-timeout:
e := buffer[0]
buffer = buffer[1:]
timers = timers[1:]
backlog = append(backlog, e)
case doWork <- struct{}{}:
debugln("buffer loop> sent work")
backlog = backlog[1:]
}
}
}()
for {
select {
case e := <-w.iwatcher.Events:
// TODO handle the queue overflow... this could happen
// if we do get queue overflow, might need to look at putting
// subscriptions on another goroutine, buffering the adds/
// removes somehow
switch e.Op {
case fsnotify.Create:
w.recursiveWatchAdd(e.Name)
case fsnotify.Remove, fsnotify.Rename:
w.recursiveWatchRemove(e.Name)
}
debugf("event loop> %v for %v\n", e.Op, e.Name)
hs := hash(e.Name)
ce := hashCache[e.Name]
if ce != hs {
debugln("event loop> cache miss")
hashCache[e.Name] = hs
eventBus <- e
}
case _ = <-w.iwatcher.Errors:
// TODO handle the queue overflow
}
}
}
func (w *watcher) commandLoop(workBus chan struct{}) {
outWorkBus := workBus
args := []string{"-O", "globstar", "-c", "--", strings.Join(w.command, " ")}
var command *exec.Cmd
cmdDone := make(chan struct{})
if w.clearScreen {
fmt.Printf("\033[2J")
}
runCmd := func() {
if command != nil {
_ = command.Process.Kill()
}
if w.clearScreen {
fmt.Printf("\033[2J")
}
command = exec.Command("bash", args...)
command.Stdout = os.Stdout
command.Stderr = os.Stderr
if *fDontKill {
outWorkBus = nil
}
debugf("work loop> starting %q\n", strings.Join(args, " "))
err := command.Start()
if err != nil {
fatalf("We could not run the command provided: %v\n", err)
}
go func(c *exec.Cmd) {
_ = c.Wait()
debugln("work loop> work done")
cmdDone <- struct{}{}
}(command)
}
for {
select {
case <-outWorkBus:
debugln("work loop> got work")
runCmd()
case <-cmdDone:
outWorkBus = workBus
command = nil
}
}
}
func debugf(format string, args ...interface{}) {
if *fDebug {
fmt.Fprintf(os.Stderr, format, args...)
}
}
func debugln(args ...interface{}) {
args = append(args, "\n")
if *fDebug {
fmt.Fprint(os.Stderr, args...)
}
}