-
Notifications
You must be signed in to change notification settings - Fork 265
/
file_watcher.go
312 lines (267 loc) · 7.81 KB
/
file_watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
package utils
import (
"fmt"
"log"
"os"
"sync"
"time"
"github.com/fsnotify/fsnotify"
filehelpers "github.com/turbot/go-kit/files"
"github.com/turbot/steampipe/pkg/error_helpers"
)
// allow a short delay before starting handler
// - this allows multiple events to be gathered for editors (such as vim) which make multiple file
// operations when saving a file
const handlerDelay = 100 * time.Millisecond
type FileWatcher struct {
watch *fsnotify.Watcher
// directories to watch
directories map[string]bool
// fnmatch format inclusions/exclusions
include []string
exclude []string
listFlag filehelpers.ListFlag
onChange func([]fsnotify.Event)
onError func(error)
closeChan chan bool
pollInterval time.Duration
watches map[string]bool
dirLock sync.Mutex
handlerLock sync.Mutex
// when did the handler last run
lastHandlerTime time.Time
// events to be handled at the next handler execution
events []fsnotify.Event
}
type WatcherOptions struct {
Directories []string
Include []string
Exclude []string
OnChange func([]fsnotify.Event)
OnError func(error)
ListFlag filehelpers.ListFlag
}
func NewWatcher(opts *WatcherOptions) (*FileWatcher, error) {
if len(opts.Directories) == 0 {
return nil, fmt.Errorf("WatcherOptions must include at least one directory")
}
// Create an fsnotify watcher object
watch, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
var baseExclude []string
// create the watcher
watcher := &FileWatcher{
watch: watch,
directories: make(map[string]bool),
include: opts.Include,
exclude: append(baseExclude, opts.Exclude...),
listFlag: opts.ListFlag,
onChange: opts.OnChange,
onError: opts.OnError,
closeChan: make(chan bool),
pollInterval: 4 * time.Second,
watches: make(map[string]bool),
lastHandlerTime: time.Now(),
}
// we store directories as a map to simplify removing and checking for dupes
for _, d := range opts.Directories {
watcher.addDirectory(d)
}
return watcher, nil
}
func (w *FileWatcher) Close() {
if w.watch != nil {
w.watch.Close()
}
w.closeChan <- true
}
func (w *FileWatcher) Start() {
// make an initial call to addWatches to add watches on existing files matching our criteria
w.addWatches()
// start a goroutine to poll for file changes, and handle file events
go func() {
for {
select {
case <-time.After(w.pollInterval):
// every poll interval, enumerate files to watch in all watched folders and add watches for any new files
w.addWatches()
case ev := <-w.watch.Events:
err := w.handleEvent(ev)
log.Printf("[TRACE] handleEvent error %v", err)
if w.onError != nil {
// leave it to the client to decide what to do after an error - it can close us if it wants
w.onError(err)
}
case err := <-w.watch.Errors:
if err == nil {
continue
}
log.Printf("[TRACE] file watcher error %v", err)
if w.onError != nil {
// leave it to the client to decide what to do after an error - it can close us if it wants
w.onError(err)
}
case <-w.closeChan:
return
}
}
}()
}
func (w *FileWatcher) addWatches() {
w.dirLock.Lock()
defer w.dirLock.Unlock()
// enumerate all files meeting inclusions and exclusions in each watched directory and add a watch
opts := &filehelpers.ListOptions{
Flags: filehelpers.FilesFlat,
Exclude: w.exclude,
Include: w.include,
}
var errors []error
for directory := range w.directories {
sourcePaths, err := filehelpers.ListFiles(directory, opts)
if err != nil {
errors = append(errors, err)
continue
}
// add watches for all files we find (if we are not already watching)
for _, p := range sourcePaths {
if !w.watches[p] {
if err := w.addWatch(p); err != nil {
errors = append(errors, err)
}
}
}
}
// just log errors
if len(errors) > 0 {
for _, err := range errors {
log.Printf("[TRACE] error occurred setting watches: %v", err)
}
}
}
func (w *FileWatcher) addWatch(path string) error {
// raise a create event for this file
w.scheduleHandler(fsnotify.Event{
Name: path,
Op: fsnotify.Create,
})
// add the watch
if err := w.watch.Add(path); err != nil {
return err
}
// successfully added watch - mark in our map
w.watches[path] = true
return nil
}
func (w *FileWatcher) handleEvent(ev fsnotify.Event) error {
log.Printf("[TRACE] file watcher event %v", ev)
// is this an event for a folder
if w.isFolder(ev) {
err := w.handleFolderEvent(ev)
if err != nil {
return err
}
} else {
w.handleFileEvent(ev)
}
return nil
}
func (w *FileWatcher) handleFileEvent(ev fsnotify.Event) {
log.Printf("[TRACE] file watcher event %v", ev)
// check whether file name meets file inclusion/exclusions
if filehelpers.ShouldIncludePath(ev.Name, w.include, w.exclude) {
log.Printf("[TRACE] notify file change")
// schedule a handler run - maintain a minimum interval since the last run
w.scheduleHandler(ev)
// if this was a deletion or rename event, remove our local watch flag
if ev.Op == fsnotify.Remove || ev.Op == fsnotify.Rename {
w.watches[ev.Name] = false
}
} else {
log.Printf("[TRACE] ignore file change %v", ev)
}
}
// if we are watching recursively, add or remove the folder from our list of watched folders
func (w *FileWatcher) handleFolderEvent(ev fsnotify.Event) error {
// if we are not watching recursively, we do not care about folder events
if w.recursive() {
return nil
}
// check whether dirname meets directory inclusions/exclusions
if filehelpers.ShouldIncludePath(ev.Name, w.include, w.exclude) {
// if it a create event, add to our list of watched folders
if ev.Op == fsnotify.Create {
log.Printf("[TRACE] new directory created: '%s' - add watch", ev.Name)
w.addDirectory(ev.Name)
// we will just wait until nex scheduled poll to add files in this directory
}
// it is a deletion, remove watch
if ev.Op == fsnotify.Remove {
log.Printf("[TRACE] new directory deleted: '%s' - remove watch", ev.Name)
w.removeDirectory(ev.Name)
}
} else {
log.Printf("[TRACE] ignore directory change %v", ev)
}
return nil
}
func (w *FileWatcher) isFolder(ev fsnotify.Event) bool {
info, err := os.Stat(ev.Name)
if err != nil {
return false
}
return info.IsDir()
}
func (w *FileWatcher) addDirectory(name string) {
w.dirLock.Lock()
defer w.dirLock.Unlock()
directories := []string{name}
// if we are watching recursively, add child directories
if w.recursive() {
opts := &filehelpers.ListOptions{
Flags: filehelpers.DirectoriesRecursive,
Exclude: w.exclude,
}
childDirectories, err := filehelpers.ListFiles(name, opts)
if err != nil {
error_helpers.ShowWarning(fmt.Sprintf("failed to add recursive watch on directory '%s': %v", name, err))
}
directories = append(directories, childDirectories...)
}
for _, d := range directories {
w.directories[d] = true
}
}
func (w *FileWatcher) removeDirectory(name string) {
w.dirLock.Lock()
defer w.dirLock.Unlock()
delete(w.directories, name)
}
func (w *FileWatcher) recursive() bool {
return w.listFlag&filehelpers.Recursive != 0
}
func (w *FileWatcher) scheduleHandler(ev fsnotify.Event) {
w.handlerLock.Lock()
defer w.handlerLock.Unlock()
// we can tell if a handler is scheduled by looking at the events array
handlerScheduled := len(w.events) > 0
// now add our event to the array
w.events = append(w.events, ev)
// if handler scheduled, there is nothing to do, it will process this event
if handlerScheduled {
return
}
// so no handler is scheduled - schedule handler to run - with a short pause
go func() {
time.Sleep(handlerDelay)
// lock the handlerLock AFTER the delay
w.handlerLock.Lock()
defer w.handlerLock.Unlock()
w.lastHandlerTime = time.Now()
w.onChange(w.events)
// clear events - this indicates no handler is scheduled
w.events = nil
}()
}