forked from openshift/library-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
observer_polling.go
190 lines (168 loc) · 5.29 KB
/
observer_polling.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
package fileobserver
import (
"bytes"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"os"
"sync"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"
)
type pollingObserver struct {
interval time.Duration
reactors map[string][]ReactorFn
files map[string]fileHashAndState
reactorsMutex sync.RWMutex
syncedMutex sync.RWMutex
hasSynced bool
}
// HasSynced indicates that the observer synced all observed files at least once.
func (o *pollingObserver) HasSynced() bool {
o.syncedMutex.RLock()
defer o.syncedMutex.RUnlock()
return o.hasSynced
}
// AddReactor will add new reactor to this observer.
func (o *pollingObserver) AddReactor(reaction ReactorFn, startingFileContent map[string][]byte, files ...string) Observer {
o.reactorsMutex.Lock()
defer o.reactorsMutex.Unlock()
for _, f := range files {
if len(f) == 0 {
panic(fmt.Sprintf("observed file name must not be empty (%#v)", files))
}
// Do not rehash existing files
if _, exists := o.files[f]; exists {
continue
}
var err error
if startingContent, ok := startingFileContent[f]; ok {
klog.V(3).Infof("Starting from specified content for file %q", f)
// if empty starting content is specified, do not hash the empty string but just return it the same
// way as calculateFileHash() does in that case.
// in case the file exists and is empty, we don't care about the initial content anyway, because we
// are only going to react when the file content change.
// in case the file does not exists but empty string is specified as initial content, without this
// the content will be hashed and reaction will trigger as if the content changed.
if len(startingContent) == 0 {
o.files[f] = fileHashAndState{exists: true}
o.reactors[f] = append(o.reactors[f], reaction)
continue
}
currentHash, emptyFile, err := calculateHash(bytes.NewBuffer(startingContent))
if err != nil {
panic(fmt.Sprintf("unexpected error while adding reactor for %#v: %v", files, err))
}
o.files[f] = fileHashAndState{exists: true, hash: currentHash, isEmpty: emptyFile}
} else {
klog.V(3).Infof("Adding reactor for file %q", f)
o.files[f], err = calculateFileHash(f)
if err != nil && !os.IsNotExist(err) {
panic(fmt.Sprintf("unexpected error while adding reactor for %#v: %v", files, err))
}
}
o.reactors[f] = append(o.reactors[f], reaction)
}
return o
}
func (o *pollingObserver) processReactors(stopCh <-chan struct{}) {
err := wait.PollImmediateInfinite(o.interval, func() (bool, error) {
select {
case <-stopCh:
return true, nil
default:
}
o.reactorsMutex.RLock()
defer o.reactorsMutex.RUnlock()
for filename, reactors := range o.reactors {
currentFileState, err := calculateFileHash(filename)
if err != nil && !os.IsNotExist(err) {
return false, err
}
lastKnownFileState := o.files[filename]
o.files[filename] = currentFileState
klog.Infof("Observed change: file:%s (current: %q, lastKnown: %q)", filename, currentFileState.hash, lastKnownFileState.hash)
for i := range reactors {
var action ActionType
switch {
case !lastKnownFileState.exists && !currentFileState.exists:
// skip non-existing file
continue
case !lastKnownFileState.exists && currentFileState.exists && (len(currentFileState.hash) > 0 || currentFileState.isEmpty):
// if we see a new file created that has content or its empty, trigger FileCreate action
action = FileCreated
case lastKnownFileState.exists && !currentFileState.exists:
action = FileDeleted
case lastKnownFileState.hash == currentFileState.hash:
// skip if the hashes are the same
continue
case lastKnownFileState.hash != currentFileState.hash:
action = FileModified
}
if err := reactors[i](filename, action); err != nil {
klog.Errorf("Reactor for %q failed: %v", filename, err)
}
}
}
if !o.HasSynced() {
o.syncedMutex.Lock()
o.hasSynced = true
o.syncedMutex.Unlock()
klog.V(3).Info("File observer successfully synced")
}
return false, nil
})
if err != nil {
klog.Fatalf("file observer failed: %v", err)
}
}
// Run will start a new observer.
func (o *pollingObserver) Run(stopChan <-chan struct{}) {
klog.Info("Starting file observer")
defer klog.Infof("Shutting down file observer")
o.processReactors(stopChan)
}
type fileHashAndState struct {
hash string
exists bool
isEmpty bool
}
func calculateFileHash(path string) (fileHashAndState, error) {
result := fileHashAndState{}
stat, err := os.Stat(path)
if err != nil {
return result, err
}
// this is fatal
if stat.IsDir() {
return result, fmt.Errorf("you can watch only files, %s is a directory", path)
}
f, err := os.Open(path)
if err != nil {
return result, err
}
defer f.Close()
// at this point we know for sure the file exists and we can read its content even if that content is empty
result.exists = true
hash, empty, err := calculateHash(f)
if err != nil {
return result, err
}
result.hash = hash
result.isEmpty = empty
return result, nil
}
func calculateHash(content io.Reader) (string, bool, error) {
hasher := sha256.New()
written, err := io.Copy(hasher, content)
if err != nil {
return "", false, err
}
// written == 0 means the content is empty
if written == 0 {
return "", true, nil
}
return hex.EncodeToString(hasher.Sum(nil)), false, nil
}