-
Notifications
You must be signed in to change notification settings - Fork 231
/
notify.go
268 lines (242 loc) · 6.66 KB
/
notify.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
package main
// this code is imported from moby, unfortunately i can't import it directly as dependencies from its repo,
// cause there was a problem between moby vendor and fsnotify
// i have just added only walk methods and some little changes to polling interval, originally set as static.
import (
"errors"
"fmt"
"github.com/fsnotify/fsnotify"
"github.com/sirupsen/logrus"
"os"
"sync"
"time"
)
var (
// errPollerClosed is returned when the poller is closed
errPollerClosed = errors.New("poller is closed")
// errNoSuchWatch is returned when trying to remove a watch that doesn't exist
errNoSuchWatch = errors.New("watch does not exist")
)
type (
// FileWatcher is an interface for implementing file notification watchers
FileWatcher interface {
Close() error
Add(string) error
Walk(string, bool) string
Remove(string) error
Errors() <-chan error
Events() <-chan fsnotify.Event
}
// fsNotifyWatcher wraps the fsnotify package to satisfy the FileNotifier interface
fsNotifyWatcher struct {
*fsnotify.Watcher
}
// filePoller is used to poll files for changes, especially in cases where fsnotify
// can't be run (e.g. when inotify handles are exhausted)
// filePoller satisfies the FileWatcher interface
filePoller struct {
// watches is the list of files currently being polled, close the associated channel to stop the watch
watches map[string]chan struct{}
// events is the channel to listen to for watch events
events chan fsnotify.Event
// errors is the channel to listen to for watch errors
errors chan error
// mu locks the poller for modification
mu sync.Mutex
// closed is used to specify when the poller has already closed
closed bool
// polling interval
interval time.Duration
}
)
// PollingWatcher returns a poll-based file watcher
func PollingWatcher(interval time.Duration) FileWatcher {
if interval == 0 {
interval = 100 * time.Millisecond
}
return &filePoller{
interval: interval,
events: make(chan fsnotify.Event),
errors: make(chan error),
}
}
// Watcher tries to use an fs-event watcher, and falls back to the poller if there is an error
func Watcher(force bool, interval time.Duration) (FileWatcher, error) {
if !force {
if w, err := EventWatcher(); err == nil {
return w, nil
}
}
return PollingWatcher(interval), nil
}
// EventWatcher returns an fs-event based file watcher
func EventWatcher() (FileWatcher, error) {
w, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
return &fsNotifyWatcher{Watcher: w}, nil
}
// Errors returns the fsnotify error channel receiver
func (w *fsNotifyWatcher) Errors() <-chan error {
return w.Watcher.Errors
}
// Events returns the fsnotify event channel receiver
func (w *fsNotifyWatcher) Events() <-chan fsnotify.Event {
return w.Watcher.Events
}
// Walk fsnotify
func (w *fsNotifyWatcher) Walk(path string, init bool) string {
if err := w.Add(path); err != nil {
return ""
}
return path
}
// Close closes the poller
// All watches are stopped, removed, and the poller cannot be added to
func (w *filePoller) Close() error {
w.mu.Lock()
if w.closed {
w.mu.Unlock()
return nil
}
w.closed = true
for name := range w.watches {
w.remove(name)
delete(w.watches, name)
}
w.mu.Unlock()
return nil
}
// Errors returns the errors channel
// This is used for notifications about errors on watched files
func (w *filePoller) Errors() <-chan error {
return w.errors
}
// Add adds a filename to the list of watches
// once added the file is polled for changes in a separate goroutine
func (w *filePoller) Add(name string) error {
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
return errPollerClosed
}
f, err := os.Open(name)
if err != nil {
return err
}
fi, err := os.Stat(name)
if err != nil {
return err
}
if w.watches == nil {
w.watches = make(map[string]chan struct{})
}
if _, exists := w.watches[name]; exists {
return fmt.Errorf("watch exists")
}
chClose := make(chan struct{})
w.watches[name] = chClose
go w.watch(f, fi, chClose)
return nil
}
// Remove poller
func (w *filePoller) remove(name string) error {
if w.closed {
return errPollerClosed
}
chClose, exists := w.watches[name]
if !exists {
return errNoSuchWatch
}
close(chClose)
delete(w.watches, name)
return nil
}
// Remove stops and removes watch with the specified name
func (w *filePoller) Remove(name string) error {
w.mu.Lock()
defer w.mu.Unlock()
return w.remove(name)
}
// Events returns the event channel
// This is used for notifications on events about watched files
func (w *filePoller) Events() <-chan fsnotify.Event {
return w.events
}
// Walk poller
func (w *filePoller) Walk(path string, init bool) string {
check := w.watches[path]
if err := w.Add(path); err != nil {
return ""
}
if check == nil && init {
_, err := os.Stat(path)
if err == nil {
go w.sendEvent(fsnotify.Event{Op: fsnotify.Create, Name: path}, w.watches[path])
}
}
return path
}
// sendErr publishes the specified error to the errors channel
func (w *filePoller) sendErr(e error, chClose <-chan struct{}) error {
select {
case w.errors <- e:
case <-chClose:
return fmt.Errorf("closed")
}
return nil
}
// sendEvent publishes the specified event to the events channel
func (w *filePoller) sendEvent(e fsnotify.Event, chClose <-chan struct{}) error {
select {
case w.events <- e:
case <-chClose:
return fmt.Errorf("closed")
}
return nil
}
// watch is responsible for polling the specified file for changes
// upon finding changes to a file or errors, sendEvent/sendErr is called
func (w *filePoller) watch(f *os.File, lastFi os.FileInfo, chClose chan struct{}) {
defer f.Close()
for {
time.Sleep(w.interval)
select {
case <-chClose:
logrus.Debugf("watch for %s closed", f.Name())
return
default:
}
fi, err := os.Stat(f.Name())
switch {
case err != nil && lastFi != nil:
// If it doesn't exist at this point, it must have been removed
// no need to send the error here since this is a valid operation
if os.IsNotExist(err) {
if err := w.sendEvent(fsnotify.Event{Op: fsnotify.Remove, Name: f.Name()}, chClose); err != nil {
return
}
lastFi = nil
}
// at this point, send the error
w.sendErr(err, chClose)
return
case lastFi == nil:
if err := w.sendEvent(fsnotify.Event{Op: fsnotify.Create, Name: f.Name()}, chClose); err != nil {
return
}
lastFi = fi
case fi.Mode() != lastFi.Mode():
if err := w.sendEvent(fsnotify.Event{Op: fsnotify.Chmod, Name: f.Name()}, chClose); err != nil {
return
}
lastFi = fi
case fi.ModTime() != lastFi.ModTime() || fi.Size() != lastFi.Size():
if err := w.sendEvent(fsnotify.Event{Op: fsnotify.Write, Name: f.Name()}, chClose); err != nil {
return
}
lastFi = fi
}
}
}