/
dupescout.go
183 lines (156 loc) · 4.45 KB
/
dupescout.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package dupescout
import (
"errors"
"fmt"
"log"
"os"
"os/signal"
"path/filepath"
"syscall"
"github.com/puzpuzpuz/xsync/v2"
"golang.org/x/sync/errgroup"
)
type pair struct {
key string // depends on the KeyGeneratorFunc
path string
}
type dupescout struct {
g *errgroup.Group // "wait group" to limit the num of concurrent search workers
pairs chan *pair // channel to send pairs to, which are processed and sent to the caller
shutdown chan os.Signal // channel to receive shutdown signals on
}
func newDupeScout(workers int) *dupescout {
g := new(errgroup.Group)
g.SetLimit(workers)
return &dupescout{
g: g,
pairs: make(chan *pair, workers),
shutdown: make(chan os.Signal, 1),
}
}
// Starts the search for duplicates which can be customized by the provided Cfg struct.
func run(c Cfg, dupesChan chan []string, stream bool) error {
c.defaults()
dup := newDupeScout(c.Workers)
go dup.consumePairs(dupesChan, stream)
go gracefulShutdown(dup.shutdown)
for _, path := range c.Paths {
p := path
dup.g.Go(func() error {
return dup.search(p, &c)
})
}
err := dup.g.Wait()
close(dup.pairs) // Trigger pair consumer to process the results.
return err
}
// Runs the duplicate search and returns a slice of all duplicate paths.
func GetResults(c Cfg) ([]string, error) {
dupesChan := make(chan []string, 1)
err := run(c, dupesChan, false)
return <-dupesChan, err
}
// Runs the duplicate search and streams the duplicate paths to the provided channel
// as they are found.
func StreamResults(c Cfg, dupesChan chan []string) error {
return run(c, dupesChan, true)
}
// Processes the produced pairs and sends the results to the provided channel.
// Depending on the stream bool, results are either sent in chunks or all at once.
func (dup *dupescout) consumePairs(dupesChan chan []string, stream bool) {
defer close(dupesChan)
// key -> last encountered path
m := xsync.NewMapOf[string]()
for p := range dup.pairs {
storedPath, ok := m.Load(p.key)
if !ok {
m.Store(p.key, p.path)
continue
}
// When storedPath is not empty, it indicates that we have found the first duplicate,
// so we send both the stored path and the current path.
if storedPath != "" {
m.Store(p.key, "")
paths := []string{storedPath, p.path}
if stream {
// Send in chunks.
dupesChan <- paths
continue
}
// Not streaming, collect all duplicate paths.
select {
case currDupes := <-dupesChan:
dupesChan <- append(currDupes, paths...)
default:
dupesChan <- paths
}
continue
}
// Previous duplicate paths have already been sent, so just send the current path.
if stream {
dupesChan <- []string{p.path}
continue
}
// Select is not needed since at this point the dupesChan can't be empty or full.
dupesChan <- append(<-dupesChan, p.path)
}
}
// Produces a pair with the key which is generated by the KeyGeneratorFunc and the path
// which is then sent to the pairs channel.
func (dup *dupescout) producePair(path string, keyGen KeyGeneratorFunc) error {
if dup.shuttingDown() {
return nil // Stop pair production if shutdown is in progress.
}
key, err := keyGen(path)
if err != nil {
if errors.Is(err, ErrSkipFile) {
return nil // Don't collect ErrSkipFile errors
}
return err
}
if key == "" {
return fmt.Errorf("\nkey generator returned an empty key for path: %s", path)
}
dup.pairs <- &pair{key, path}
return nil
}
// Walks the tree of the provided dir and triggers the production of pairs for each valid file.
func (dup *dupescout) search(dir string, c *Cfg) error {
return filepath.WalkDir(dir, func(path string, de os.DirEntry, err error) error {
if dup.shuttingDown() {
return nil
}
if err != nil {
return err
}
if de.IsDir() && c.skipDir(path) {
return filepath.SkipDir
}
if de.Type().IsRegular() && !c.skipFile(path) {
fi, err := de.Info()
if err != nil || fi.Size() == 0 {
return nil
}
dup.g.Go(func() error {
return dup.producePair(path, c.KeyGenerator)
})
}
return nil
})
}
// Helper to check if a shutdown signal has been received.
func (dup *dupescout) shuttingDown() bool {
select {
case <-dup.shutdown:
return true
default:
return false
}
}
// Sets up a signal handler worker for graceful shutdown.
func gracefulShutdown(shutdown chan os.Signal) {
signal.Notify(shutdown, syscall.SIGINT, syscall.SIGTERM)
<-shutdown
log.Println("\nReceived signal, shutting down after current workers are done...")
close(shutdown)
}