-
Notifications
You must be signed in to change notification settings - Fork 17
/
traversal.go
87 lines (74 loc) · 2.08 KB
/
traversal.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package traversal
import (
"io/fs"
"path/filepath"
"sync"
"k8s.io/klog/v2"
)
type Traverser interface {
Traverse()
}
type FileWalker struct {
inputPath string
workerCount int
workers []QueueProcessor
workerFactory func(int) QueueProcessor
}
// Traverse should be called to start processing the must-gather directory. This method will exist the CLI if an error is encountered.
func (w *FileWalker) Traverse() {
wg := sync.WaitGroup{}
errorCh := make(chan error, w.workerCount)
queue := make(chan workerInput, w.workerCount)
w.workers = make([]QueueProcessor, w.workerCount)
for i := 0; i < w.workerCount; i++ {
w.workers[i] = w.workerFactory(i + 1)
wg.Add(1)
go func(i int, queue chan workerInput, errorCh chan error) {
w.workers[i].ProcessQueue(queue, errorCh)
wg.Done()
}(i, queue, errorCh)
}
errorWg := sync.WaitGroup{}
errorWg.Add(1)
go func(errorCh <-chan error) {
for err := range errorCh {
switch e := err.(type) {
case *fileProcessingError:
klog.Exitf("failed to process %s due to %v", e.path, e.cause)
default:
klog.Exitf("unexpected error: %v", err)
}
}
errorWg.Done()
}(errorCh)
err := filepath.WalkDir(w.inputPath, func(path string, dirEntry fs.DirEntry, err error) error {
if err != nil {
return err
}
if !dirEntry.IsDir() {
// the rest of the logic expects the path to be relative to the input dir root, if it fails we assume it is already relative
relPath, err := filepath.Rel(w.inputPath, path)
if err != nil {
queue <- workerInput(path)
} else {
queue <- workerInput(relPath)
}
}
return nil
})
if err != nil {
klog.Exitf("failed to traverse the directory structure due to: %v", err)
}
close(queue)
wg.Wait()
// once all the workers have exited close the error channel and wait for the exit goroutine to complete.
close(errorCh)
errorWg.Wait()
}
func NewParallelFileWalker(inputPath string, workerCount int, workerFactory func(id int) QueueProcessor) *FileWalker {
return &FileWalker{
inputPath: inputPath,
workerCount: workerCount,
workerFactory: workerFactory,
}
}