/
fswatcher.go
169 lines (152 loc) · 4.56 KB
/
fswatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
// Copyright (c) 2021 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fswatcher
import (
"crypto/sha256"
"fmt"
"io"
"os"
"path"
"path/filepath"
"sync"
"github.com/fsnotify/fsnotify"
"go.uber.org/zap"
)
type FSWatcher struct {
watcher *fsnotify.Watcher
logger *zap.Logger
fileHashContentMap map[string]string
onChange func()
mu sync.RWMutex
}
// FSWatcher waits for notifications of changes in the watched directories
// and attempts to reload all files that changed.
//
// Write and Rename events indicate that some files might have changed and reload might be necessary.
// Remove event indicates that the file was deleted and we should write a warn to log.
//
// Reasoning:
//
// Write event is sent if the file content is rewritten.
//
// Usually files are not rewritten, but they are updated by swapping them with new
// ones by calling Rename. That avoids files being read while they are not yet
// completely written but it also means that inotify on file level will not work:
// watch is invalidated when the old file is deleted.
//
// If reading from Kubernetes Secret volumes the target files are symbolic links
// to files in a different directory. That directory is swapped with a new one,
// while the symbolic links remain the same. This guarantees atomic swap for all
// files at once, but it also means any Rename event in the directory might
// indicate that the files were replaced, even if event.Name is not any of the
// files we are monitoring. We check the hashes of the files to detect if they
// were really changed.
func New(filepaths []string, onChange func(), logger *zap.Logger) (*FSWatcher, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
w := &FSWatcher{
watcher: watcher,
logger: logger,
fileHashContentMap: make(map[string]string),
onChange: onChange,
}
if err = w.setupWatchedPaths(filepaths); err != nil {
w.Close()
return nil, err
}
go w.watch()
return w, nil
}
func (w *FSWatcher) setupWatchedPaths(filepaths []string) error {
uniqueDirs := make(map[string]bool)
for _, p := range filepaths {
if p == "" {
continue
}
if h, err := hashFile(p); err == nil {
w.fileHashContentMap[p] = h
} else {
return err
}
dir := path.Dir(p)
if _, ok := uniqueDirs[dir]; !ok {
if err := w.watcher.Add(dir); err != nil {
return err
}
uniqueDirs[dir] = true
}
}
return nil
}
// Watch watches for Events and Errors of files.
// Each time an Event happen, all the files are checked for content change.
// If a file's content changes, its hashed content is updated and
// onChange is invoked after all file checks.
func (w *FSWatcher) watch() {
for {
select {
case event, ok := <-w.watcher.Events:
if !ok {
return
}
w.logger.Info("Received event", zap.String("event", event.String()))
var changed bool
w.mu.Lock()
for file, hash := range w.fileHashContentMap {
fileChanged, newHash := w.isModified(file, hash)
if fileChanged {
changed = fileChanged
w.fileHashContentMap[file] = newHash
}
}
w.mu.Unlock()
if changed {
w.onChange()
}
case err, ok := <-w.watcher.Errors:
if !ok {
return
}
w.logger.Error("fsnotifier reported an error", zap.Error(err))
}
}
}
// Close closes the watcher.
func (w *FSWatcher) Close() error {
return w.watcher.Close()
}
// isModified returns true if the file has been modified since the last check.
func (w *FSWatcher) isModified(filepath string, previousHash string) (bool, string) {
hash, err := hashFile(filepath)
if err != nil {
w.logger.Warn("Unable to read the file", zap.String("file", filepath), zap.Error(err))
return true, ""
}
return previousHash != hash, hash
}
// hashFile returns the SHA256 hash of the file.
func hashFile(file string) (string, error) {
f, err := os.Open(filepath.Clean(file))
if err != nil {
return "", err
}
defer f.Close()
h := sha256.New()
if _, err := io.Copy(h, f); err != nil {
return "", err
}
return fmt.Sprintf("%x", h.Sum(nil)), nil
}