-
Notifications
You must be signed in to change notification settings - Fork 0
/
watch.go
191 lines (159 loc) · 5.48 KB
/
watch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package watch
import (
"context"
"sync"
"time"
"github.com/jakewright/home-automation/libraries/go/errors"
"github.com/jakewright/home-automation/libraries/go/slog"
"github.com/jakewright/home-automation/service.log/domain"
"github.com/jakewright/home-automation/service.log/repository"
"github.com/fsnotify/fsnotify"
)
// Watcher notifies subscribers of new events whenever the log file is written to
type Watcher struct {
// LogDAO provides access to the log events
LogRepository *repository.LogRepository
subscribers map[chan<- *domain.Event]*repository.LogQuery
mux sync.Mutex // Concurrent map access
notify chan struct{} // Triggers reading new events from the log files
ticker *time.Ticker // Used as a rate limiter
watcher *fsnotify.Watcher // Internal file watcher
}
// GetName returns the name "watcher"
func (w *Watcher) GetName() string {
return "watcher"
}
// Start begins watching for log file changes and notifies subscribers accordingly
func (w *Watcher) Start() error {
// Make sure the receiver struct has been initialised properly
if w.LogRepository == nil {
return errors.InternalService("LogRepository is not set")
}
if w.LogRepository.LogDirectory == "" {
return errors.InternalService("Log directory is not set")
}
// Create an fsnotify watcher and attach to w so
// that the Stop method can call Close() on it
watcher, err := fsnotify.NewWatcher()
if err != nil {
return errors.Wrap(err, nil)
}
defer watcher.Close()
w.watcher = watcher
// Start watching the log file directory so we
// are notified when new log files are created
if err = watcher.Add(w.LogRepository.LogDirectory); err != nil {
return errors.Wrap(err, nil)
}
slog.Info("Watching %s for changes", w.LogRepository.LogDirectory)
// Create a notification channel with a buffer of 1 so
// that we can always queue a new event while the current
// one is in process. If the channel was unbuffered, we would
// risk missing events if the notifier were not ready to
// receive when the file write happened.
w.notify = make(chan struct{}, 1)
// Create a ticker to act as the rate limiter when notifying
// subscribers. Without this then we risk thrashing the disk.
w.ticker = time.NewTicker(time.Second * 2)
go w.notifySubscribers()
for {
select {
case fileEvent, ok := <-watcher.Events:
if !ok {
// If the channel is closed then just exit silently
// because Stop() was probably called
return nil
}
// We'll get a write event if any file inside the directory is written to.
// If the file isn't actually a log file we'll waste some work
// trying to read new events but it's safe to do.
if fileEvent.Op&fsnotify.Write != fsnotify.Write {
continue
}
// Write to the notify channel but do not block.
// If the channel is not ready to receive then skip.
select {
case w.notify <- struct{}{}:
default:
}
case err, ok := <-watcher.Errors:
if !ok {
// If the channel is closed then just exit silently
// because Stop() was probably called
return nil
}
// It's unclear what state the watcher will be in if we receive
// any errors so just return, which will trigger Close()
return errors.Wrap(err, nil)
}
}
}
// Stop stops watching for log file changes
func (w *Watcher) Stop(ctx context.Context) error {
w.ticker.Stop()
close(w.notify)
if w.watcher != nil {
return w.watcher.Close()
}
return nil
}
// Subscribe starts sending all events that match the query over the given channel. The query
// will be updated with the a new SinceUUID value whenever events are published to the channel.
func (w *Watcher) Subscribe(c chan<- *domain.Event, q *repository.LogQuery) error {
// Obtain a lock so we can write to the map
w.mux.Lock()
defer w.mux.Unlock()
// Initialise the map if necessary
if w.subscribers == nil {
w.subscribers = make(map[chan<- *domain.Event]*repository.LogQuery)
}
// A channel is comparable so it's fine to use as a key
w.subscribers[c] = q
return nil
}
// Unsubscribe stops publishing events to the channel but does not close the channel
func (w *Watcher) Unsubscribe(c chan<- *domain.Event) {
w.mux.Lock()
defer w.mux.Unlock()
delete(w.subscribers, c)
}
// notifySubscribers finds and sends new events to all subscribers
// whenever the notify channel is written to, rate limited by w.ticker.
func (w *Watcher) notifySubscribers() {
// Read notify events until the channel is closed
for range w.notify {
// Block on the ticker to rate limit
<-w.ticker.C
w.findAndSendEvents()
}
}
// findAndSendEvents will find all new events for each subscriber
// and send them over the subscribers' channels
func (w *Watcher) findAndSendEvents() {
// Obtain a write lock before doing anything so that
// we don't send duplicate events to the subscriber
w.mux.Lock()
defer w.mux.Unlock()
for c, q := range w.subscribers {
// Ensure that events are always published in order
q.Reverse = false
// Get all new events for this subscriber
events, err := w.LogRepository.Find(q)
if err != nil {
slog.Error("Failed to get events for subscriber: %v", err)
continue
}
// Send the events over the channel
for _, event := range events {
select {
case c <- event: // Non-blocking write to the channel
default: // Don't log otherwise we get a cycle of logs
}
}
// Update the query for this subscriber
if len(events) > 0 {
// Events will always be in order so we can take the UUID of the last one
q.SinceUUID = events[len(events)-1].UUID
}
}
}