-
Notifications
You must be signed in to change notification settings - Fork 67
/
hasher.go
200 lines (161 loc) · 4.7 KB
/
hasher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
package iomeshage
import (
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"
log "github.com/sandia-minimega/minimega/v2/pkg/minilog"
"github.com/fsnotify/fsnotify"
"github.com/twmb/murmur3"
)
// ignoredDirectories is a list of strings to look for in directory paths to be
// ignored when adding directories to the file system event watcher.
//
// transfer_ - don't hash file parts as they're being transferred. Instead, just
// wait for the CREATE event for the final, combined file.
var ignoredDirectories = []string{"transfer_"}
// startHasher generates a Murmur3 hash for all existing files and also watches
// for new or updated files and generates a hash for them as well.
func (iom *IOMeshage) startHasher() {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatal("creating file watcher: %v", err)
}
defer watcher.Close()
// This goroutine continually processes events generated by the fsnotify
// watcher and hashes files in the background as needed to help speed things
// up when the hash actually needs to be used.
go func() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
if ignoreDirectory(event.Name) {
continue
}
log.Debug("new file system event: %s", event)
if event.Has(fsnotify.Create) {
stat, err := os.Stat(event.Name)
if err != nil {
log.Error("getting stats for newly created file %s: %v", event.Name, err)
continue
}
if stat.IsDir() {
log.Debug("adding new directory %s to file system watcher", event.Name)
watcher.Add(event.Name)
} else {
if stat.Size() > 0 {
log.Debug("getting hash for file %s", event.Name)
go func() {
hash, err := hashFile(event.Name)
if err != nil {
log.Error("getting hash for file %s: %v", event.Name, err)
return
}
iom.updateHash(event.Name, hash)
}()
}
}
}
if event.Has(fsnotify.Write) {
log.Debug("getting hash for file %s", event.Name)
go func() {
hash, err := hashFile(event.Name)
if err != nil {
log.Error("getting hash for file %s: %v", event.Name, err)
return
}
iom.updateHash(event.Name, hash)
}()
}
if event.Has(fsnotify.Remove) {
log.Debug("deleting hash for file %s", event.Name)
iom.updateHash(event.Name, "")
}
// TODO (future): figure out best way to handle detection of files being
// moved locally (renamed). May not happen that often, so not something
// to worry about right now.
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Error("watching for file events: %v", err)
}
}
}()
files, err := iom.List("/", true)
if err != nil {
log.Fatal("listing files: %v", err)
}
// This loop is only run once at startup, and creates a new goroutine for
// each existing file that needs to be hashed.
for _, info := range files {
if ignoreDirectory(info.Path) {
continue
}
if info.Size == 0 {
continue
}
go func(info FileInfo) {
hash, err := hashFile(info.Path)
if err != nil {
log.Error("getting hash for file %s: %v", info.Path, err)
return
}
iom.updateHash(info.Path, hash)
}(info)
}
// Don't start watching for fsnotify events until after we've already started
// hashing all the existing files. There's a very small, very unlikely chance
// that a new file could be created between the time when we walked all the
// existing files and we start watching for new files.
watcher.Add(iom.base)
err = filepath.Walk(iom.base, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if ignoreDirectory(path) {
return nil
}
if info.IsDir() {
watcher.Add(path)
}
return nil
})
if err != nil {
log.Fatal("walking %s", iom.base)
}
// Prevent this function from ever returning so the fsnotify event processing
// goroutine will stay in scope.
<-make(chan struct{})
}
// hashFile generates a Murmur3 hash for the file at the given path.
func hashFile(path string) (string, error) {
file, err := os.Open(path)
if err != nil {
return "", fmt.Errorf("opening file %s for hashing: %w", path, err)
}
defer file.Close()
start := time.Now()
hasher := murmur3.New64()
if _, err := io.Copy(hasher, file); err != nil {
return "", fmt.Errorf("hashing file: %w", err)
}
hash := fmt.Sprintf("%x", hasher.Sum(nil))
log.Debug("hashing %s (%s) took %s", path, hash, time.Since(start))
return hash, nil
}
// ignoreDirectory checks to see if the given path contains any of the
// ignoredDirectories.
func ignoreDirectory(path string) bool {
for _, ignore := range ignoredDirectories {
if strings.Contains(path, ignore) {
return true
}
}
return false
}