-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
pipe.go
292 lines (242 loc) · 6.78 KB
/
pipe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
package pipe
import (
"context"
"fmt"
"os"
"path/filepath"
"sort"
"restic/errors"
"restic/debug"
"restic/fs"
)
type Result interface{}
type Job interface {
Path() string
Fullpath() string
Error() error
Info() os.FileInfo
Result() chan<- Result
}
type Entry struct {
basedir string
path string
info os.FileInfo
error error
result chan<- Result
// points to the old node if available, interface{} is used to prevent
// circular import
Node interface{}
}
func (e Entry) Path() string { return e.path }
func (e Entry) Fullpath() string { return filepath.Join(e.basedir, e.path) }
func (e Entry) Error() error { return e.error }
func (e Entry) Info() os.FileInfo { return e.info }
func (e Entry) Result() chan<- Result { return e.result }
type Dir struct {
basedir string
path string
error error
info os.FileInfo
Entries [](<-chan Result)
result chan<- Result
}
func (e Dir) Path() string { return e.path }
func (e Dir) Fullpath() string { return filepath.Join(e.basedir, e.path) }
func (e Dir) Error() error { return e.error }
func (e Dir) Info() os.FileInfo { return e.info }
func (e Dir) Result() chan<- Result { return e.result }
// readDirNames reads the directory named by dirname and returns
// a sorted list of directory entries.
// taken from filepath/path.go
func readDirNames(dirname string) ([]string, error) {
f, err := fs.Open(dirname)
if err != nil {
return nil, errors.Wrap(err, "Open")
}
names, err := f.Readdirnames(-1)
f.Close()
if err != nil {
return nil, errors.Wrap(err, "Readdirnames")
}
sort.Strings(names)
return names, nil
}
// SelectFunc returns true for all items that should be included (files and
// dirs). If false is returned, files are ignored and dirs are not even walked.
type SelectFunc func(item string, fi os.FileInfo) bool
func walk(ctx context.Context, basedir, dir string, selectFunc SelectFunc, jobs chan<- Job, res chan<- Result) (excluded bool) {
debug.Log("start on %q, basedir %q", dir, basedir)
relpath, err := filepath.Rel(basedir, dir)
if err != nil {
panic(err)
}
info, err := fs.Lstat(dir)
if err != nil {
err = errors.Wrap(err, "Lstat")
debug.Log("error for %v: %v, res %p", dir, err, res)
select {
case jobs <- Dir{basedir: basedir, path: relpath, info: info, error: err, result: res}:
case <-ctx.Done():
}
return
}
if !selectFunc(dir, info) {
debug.Log("file %v excluded by filter, res %p", dir, res)
excluded = true
return
}
if !info.IsDir() {
debug.Log("sending file job for %v, res %p", dir, res)
select {
case jobs <- Entry{info: info, basedir: basedir, path: relpath, result: res}:
case <-ctx.Done():
}
return
}
debug.RunHook("pipe.readdirnames", dir)
names, err := readDirNames(dir)
if err != nil {
debug.Log("Readdirnames(%v) returned error: %v, res %p", dir, err, res)
select {
case <-ctx.Done():
case jobs <- Dir{basedir: basedir, path: relpath, info: info, error: err, result: res}:
}
return
}
// Insert breakpoint to allow testing behaviour with vanishing files
// between Readdir() and lstat()
debug.RunHook("pipe.walk1", relpath)
entries := make([]<-chan Result, 0, len(names))
for _, name := range names {
subpath := filepath.Join(dir, name)
fi, statErr := fs.Lstat(subpath)
if !selectFunc(subpath, fi) {
debug.Log("file %v excluded by filter", subpath)
continue
}
ch := make(chan Result, 1)
entries = append(entries, ch)
if statErr != nil {
statErr = errors.Wrap(statErr, "Lstat")
debug.Log("sending file job for %v, err %v, res %p", subpath, err, res)
select {
case jobs <- Entry{info: fi, error: statErr, basedir: basedir, path: filepath.Join(relpath, name), result: ch}:
case <-ctx.Done():
return
}
continue
}
// Insert breakpoint to allow testing behaviour with vanishing files
// between walk and open
debug.RunHook("pipe.walk2", filepath.Join(relpath, name))
walk(ctx, basedir, subpath, selectFunc, jobs, ch)
}
debug.Log("sending dirjob for %q, basedir %q, res %p", dir, basedir, res)
select {
case jobs <- Dir{basedir: basedir, path: relpath, info: info, Entries: entries, result: res}:
case <-ctx.Done():
}
return
}
// cleanupPath is used to clean a path. For a normal path, a slice with just
// the path is returned. For special cases such as "." and "/" the list of
// names within those paths is returned.
func cleanupPath(path string) ([]string, error) {
path = filepath.Clean(path)
if filepath.Dir(path) != path {
return []string{path}, nil
}
paths, err := readDirNames(path)
if err != nil {
return nil, err
}
for i, p := range paths {
paths[i] = filepath.Join(path, p)
}
return paths, nil
}
// Walk sends a Job for each file and directory it finds below the paths. When
// the channel done is closed, processing stops.
func Walk(ctx context.Context, walkPaths []string, selectFunc SelectFunc, jobs chan<- Job, res chan<- Result) {
var paths []string
for _, p := range walkPaths {
ps, err := cleanupPath(p)
if err != nil {
fmt.Fprintf(os.Stderr, "Readdirnames(%v): %v, skipping\n", p, err)
debug.Log("Readdirnames(%v) returned error: %v, skipping", p, err)
continue
}
paths = append(paths, ps...)
}
debug.Log("start on %v", paths)
defer func() {
debug.Log("output channel closed")
close(jobs)
}()
entries := make([]<-chan Result, 0, len(paths))
for _, path := range paths {
debug.Log("start walker for %v", path)
ch := make(chan Result, 1)
excluded := walk(ctx, filepath.Dir(path), path, selectFunc, jobs, ch)
if excluded {
debug.Log("walker for %v done, it was excluded by the filter", path)
continue
}
entries = append(entries, ch)
debug.Log("walker for %v done", path)
}
debug.Log("sending root node, res %p", res)
select {
case <-ctx.Done():
return
case jobs <- Dir{Entries: entries, result: res}:
}
debug.Log("walker done")
}
// Split feeds all elements read from inChan to dirChan and entChan.
func Split(inChan <-chan Job, dirChan chan<- Dir, entChan chan<- Entry) {
debug.Log("start")
defer debug.Log("done")
inCh := inChan
dirCh := dirChan
entCh := entChan
var (
dir Dir
ent Entry
)
// deactivate sending until we received at least one job
dirCh = nil
entCh = nil
for {
select {
case job, ok := <-inCh:
if !ok {
// channel is closed
return
}
if job == nil {
panic("nil job received")
}
// disable receiving until the current job has been sent
inCh = nil
switch j := job.(type) {
case Dir:
dir = j
dirCh = dirChan
case Entry:
ent = j
entCh = entChan
default:
panic(fmt.Sprintf("unknown job type %v", j))
}
case dirCh <- dir:
// disable sending, re-enable receiving
dirCh = nil
inCh = inChan
case entCh <- ent:
// disable sending, re-enable receiving
entCh = nil
inCh = inChan
}
}
}