/
folder_list.go
151 lines (132 loc) · 3.34 KB
/
folder_list.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package vaku
import (
"context"
"errors"
"sync"
"golang.org/x/sync/errgroup"
)
var (
// ErrFolderList when FolderList fails.
ErrFolderList = errors.New("folder list")
// ErrFolderListChan when FolderListChan fails.
ErrFolderListChan = errors.New("folder list chan")
)
// FolderList recursively lists the provided path and all subpaths.
func (c *Client) FolderList(ctx context.Context, p string) ([]string, error) {
resC, errC := c.FolderListChan(ctx, p)
// read results and errors. send on errC signifies done (can be nil).
var output []string
for {
select {
case res, ok := <-resC:
if !ok {
return output, nil
}
output = append(output, res)
case err := <-errC:
if err != nil {
return nil, newWrapErr(p, ErrFolderList, err)
}
}
}
}
// FolderListChan recursively lists the provided path and all subpaths. Returns an unbuffered
// channel that can be read until close and an error channel that sends either the first error or
// nil when the work is done.
func (c *Client) FolderListChan(ctx context.Context, p string) (<-chan string, <-chan error) {
// input must be a folder (end in "/")
root := EnsureFolder(p)
// eg manages workers reading from the paths channel
eg, ctx := errgroup.WithContext(ctx)
// wg tracks when to close the paths channel
var wg sync.WaitGroup
// pathC is paths to be processed
pathC := make(chan string)
// resC is processed paths
resC := make(chan string)
// errC for the first error seen
errC := make(chan error)
// add root path to paths
wg.Add(1)
go func(p string) { pathC <- p }(root)
// fan out and process paths
for i := 0; i < c.workers; i++ {
eg.Go(func() error {
return c.folderListWork(&folderListWorkInput{
ctx: ctx,
root: root,
wg: &wg,
pathC: pathC,
resC: resC,
})
})
}
// Wait until finished (success or not) and clean up
go func() {
// Close pathC after all paths added
wg.Wait()
close(pathC)
// Wait for all paths to process
err := eg.Wait()
// Report the error (or nil) to errC
errC <- err
// Clean up
close(resC)
close(errC)
}()
return resC, errC
}
// folderListWorkInput is the pieces needed to list a folder.
type folderListWorkInput struct {
ctx context.Context
root string
wg *sync.WaitGroup
pathC chan string
resC chan<- string
}
// folderListWork takes input from pathC, lists the path, adds listed folders back into pathC, and
// adds non-folders into results.
func (c *Client) folderListWork(i *folderListWorkInput) error {
for {
select {
case <-i.ctx.Done():
return ctxErr(i.ctx.Err())
case path, ok := <-i.pathC:
if !ok {
return nil
}
err := c.pathListWork(path, i)
if err != nil {
return err
}
}
}
}
// pathListWork takes a path and either adds it back to the pathC (if folder) or processes it and
// adds it to the resC.
func (c *Client) pathListWork(path string, i *folderListWorkInput) error {
defer i.wg.Done()
if IsFolder(path) {
list, err := c.PathList(path)
if err != nil {
return newWrapErr(i.root, ErrFolderListChan, err)
}
for _, item := range list {
i.wg.Add(1)
go func(item string) {
select {
case i.pathC <- c.inputPath(item, path):
case <-i.ctx.Done():
i.wg.Done()
}
}(item)
}
} else {
select {
case i.resC <- c.outputPath(path, i.root):
case <-i.ctx.Done():
return ctxErr(i.ctx.Err())
}
}
return nil
}