/
discover.go
383 lines (306 loc) · 10.5 KB
/
discover.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
/*******************************************************************************
* Copyright (c) 2022 Genome Research Ltd.
*
* Author: Sendu Bala <sb10@sanger.ac.uk>
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be included
* in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
******************************************************************************/
package server
import (
"context"
"errors"
"io/fs"
"net/http"
"os"
"path/filepath"
"time"
"github.com/VertebrateResequencing/wr/queue"
"github.com/gin-gonic/gin"
"github.com/wtsi-hgi/ibackup/put"
"github.com/wtsi-hgi/ibackup/set"
"github.com/wtsi-ssg/wrstat/v4/walk"
)
const ttr = 6 * time.Minute
// triggerDiscovery triggers the file discovery process for the set with the id
// specified in the URL parameter.
//
// LoadSetDB() must already have been called. This is called when there is a GET
// on /rest/v1/auth/discover/[id].
func (s *Server) triggerDiscovery(c *gin.Context) {
set, ok := s.validateSet(c)
if !ok {
return
}
if err := s.discoverSet(set); err != nil {
c.AbortWithError(http.StatusBadRequest, err) //nolint:errcheck
return
}
c.Status(http.StatusOK)
}
// discoverSet discovers and stores file entry details for the given set.
// Immediately tries to record in the db that discovery has started, and create
// a transformer for local->remote paths and returns any error from doing that.
// Actual discovery will then proceed asynchronously, followed by adding all
// upload requests for the set to the global put queue.
func (s *Server) discoverSet(given *set.Set) error {
transformer, err := given.MakeTransformer()
if err != nil {
s.recordSetError("making transformer for %s failed: %s", given.ID(), err)
return err
}
go s.discoverThenEnqueue(given, transformer)
return nil
}
// discoverThenEnqueue updates file existence, discovers dir contents, then
// queues the set's files for uploading. Call this in a go-routine, but don't
// call it multiple times at once for the same set!
func (s *Server) discoverThenEnqueue(given *set.Set, transformer put.PathTransformer) {
updated, err := s.doDiscovery(given)
if err != nil {
s.Logger.Printf("discovery error %s: %s", given.ID(), err)
return
}
s.handleNewlyDefinedSets(updated)
if err := s.enqueueSetFiles(updated, transformer); err != nil {
s.recordSetError("queuing files for %s failed: %s", updated.ID(), err)
}
}
func (s *Server) doDiscovery(given *set.Set) (*set.Set, error) {
return s.db.Discover(given.ID(), func(entries []*set.Entry) ([]*walk.Dirent, error) {
entriesCh := make(chan *walk.Dirent)
doneCh := make(chan error)
warnCh := make(chan error)
go s.doSetDirWalks(entries, given, entriesCh, doneCh, warnCh)
return s.processSetDirWalkOutput(given, entriesCh, doneCh, warnCh)
})
}
func (s *Server) processSetDirWalkOutput(given *set.Set, entriesCh chan *walk.Dirent,
doneCh, warnCh chan error) ([]*walk.Dirent, error) {
warnDoneCh := s.processSetDirWalkWarnings(given, warnCh)
var fileEntries []*walk.Dirent //nolint:prealloc
for entry := range entriesCh {
fileEntries = append(fileEntries, entry)
}
err := <-doneCh
close(warnCh)
if err != nil {
<-warnDoneCh
return nil, err
}
return fileEntries, <-warnDoneCh
}
func (s *Server) processSetDirWalkWarnings(given *set.Set, warnCh chan error) chan error {
warnDoneCh := make(chan error)
go func() {
var warning error
for warn := range warnCh {
warning = errors.Join(warning, warn)
}
if warning != nil {
if err := s.db.SetWarning(given.ID(), warning.Error()); err != nil {
warnDoneCh <- err
return
}
}
warnDoneCh <- nil
}()
return warnDoneCh
}
// recordSetError sets the given err on the set, and logs on failure to do so.
// Also logs the given message which should include 2 %s which will be filled
// with the sid and err.
func (s *Server) recordSetError(msg, sid string, err error) {
s.Logger.Printf(msg, sid, err)
if err = s.db.SetError(sid, err.Error()); err != nil {
s.Logger.Printf("setting error for %s failed: %s", sid, err)
}
}
// handleNewlyDefinedSets is called when a set has had all its entries
// discovered and stored in the database. It then ensures the set is
// appropriately monitored, and we trigger a database backup.
func (s *Server) handleNewlyDefinedSets(given *set.Set) {
s.monitorSet(given)
s.tryBackup()
}
// doSetDirWalks walks the given dir entries of the given set concurrently,
// sending discovered file paths to the entriesCh. Closes the entriesCh when
// done, then sends any error on the doneCh. Non-critical warnings during the
// walk are sent to the warnChan.
func (s *Server) doSetDirWalks(entries []*set.Entry, given *set.Set, entriesCh chan *walk.Dirent,
doneCh, warnChan chan error) {
errCh := make(chan error, len(entries))
for _, entry := range entries {
dir := entry.Path
thisEntry := entry
s.dirPool.Submit(func() {
err := s.checkAndWalkDir(dir, onlyRegularAndSymlinks(entriesCh), warnChan)
errCh <- s.handleMissingDirectories(err, thisEntry, given)
})
}
var err error
for i := 0; i < len(entries); i++ {
thisErr := <-errCh
if thisErr != nil {
err = thisErr
}
}
close(entriesCh)
doneCh <- err
}
// checkAndWalkDir checks if the given dir exists, and if it does, walks the
// dir using the given cb. Major errors are returned; walk errors are logged and
// permission ones sent to the warnChan.
func (s *Server) checkAndWalkDir(dir string, cb walk.PathCallback, warnChan chan error) error {
_, err := os.Lstat(dir)
if err != nil {
return err
}
walker := walk.New(cb, false, false)
return walker.Walk(dir, func(path string, err error) {
s.Logger.Printf("walk found %s, but had error: %s", path, err)
if errors.Is(err, fs.ErrPermission) {
warnChan <- err
}
})
}
// onlyRegularAndSymlinks sends every entry found on the walk to the given
// entriesCh, except for entries that are not regular files or symlinks, which
// are silently skipped.
func onlyRegularAndSymlinks(entriesCh chan *walk.Dirent) func(entry *walk.Dirent) error {
return func(entry *walk.Dirent) error {
if !(entry.IsRegular() || entry.IsSymlink()) {
return nil
}
entriesCh <- entry
return nil
}
}
// handleMissingDirectories checks if the given error is not nil, and if so
// records in the database that the entry has problems or is missing.
func (s *Server) handleMissingDirectories(dirStatErr error, entry *set.Entry, given *set.Set) error {
if dirStatErr == nil {
return nil
}
r := &put.Request{
Local: entry.Path,
Requester: given.Requester,
Set: given.Name,
Size: 0,
Status: put.RequestStatusMissing,
Error: dirStatErr.Error(),
}
_, err := s.db.SetEntryStatus(r)
if err != nil {
return err
}
if os.IsNotExist(dirStatErr) {
return nil
}
return dirStatErr
}
// enqueueSetFiles gets all the set's file entries (set and discovered), creates
// put requests for them and adds them to the global put queue for uploading.
// Skips entries that are missing or that have failed or uploaded since the
// last discovery.
func (s *Server) enqueueSetFiles(given *set.Set, transformer put.PathTransformer) error {
entries, err := s.db.GetFileEntries(given.ID())
if err != nil {
return err
}
entries = uploadableEntries(entries, given)
return s.enqueueEntries(entries, given, transformer)
}
// uploadableEntries returns the subset of given entries that are suitable for
// uploading: pending and those that were dealt with before the last discovery.
func uploadableEntries(entries []*set.Entry, given *set.Set) []*set.Entry {
var filtered []*set.Entry
for _, entry := range entries {
if entry.ShouldUpload(given.LastDiscovery) {
filtered = append(filtered, entry)
}
}
return filtered
}
// enqueueEntries converts the given entries to requests, stores those in items
// and adds them the in-memory queue.
func (s *Server) enqueueEntries(entries []*set.Entry, given *set.Set, transformer put.PathTransformer) error {
defs := make([]*queue.ItemDef, len(entries))
for i, entry := range entries {
r, err := s.entryToRequest(entry, transformer, given)
if err != nil {
return err
}
defs[i] = &queue.ItemDef{
Key: r.ID(),
Data: r,
TTR: ttr,
}
}
if len(defs) == 0 {
return nil
}
_, dups, err := s.queue.AddMany(context.Background(), defs)
if dups > 0 {
s.markFailedEntries(given)
}
return err
}
// entryToRequest converts an Entry to a Request containing details of the given
// set.
func (s *Server) entryToRequest(entry *set.Entry, transformer put.PathTransformer,
given *set.Set) (*put.Request, error) {
r, err := put.NewRequestWithTransformedLocal(entry.Path, transformer)
if err != nil {
return nil, err
}
if err = r.ValidatePaths(); err != nil {
return nil, err
}
r.Set = given.Name
r.Requester = given.Requester
if entry.Type == set.Symlink {
r.Symlink = entry.Dest
r.Meta[put.MetaKeySymlink] = entry.Dest
}
if entry.Type == set.Hardlink && s.remoteHardlinkLocation != "" {
r.Hardlink = filepath.Join(s.remoteHardlinkLocation,
entry.InodeStoragePath())
r.Meta[put.MetaKeyHardlink] = entry.Dest
}
return r, nil
}
// markFailedEntries looks for buried items in the queue related to the given
// set and marks the corresponding entries as failed.
func (s *Server) markFailedEntries(given *set.Set) {
s.forEachBuriedItem(&BuriedFilter{
User: given.Requester,
Set: given.Name,
}, func(item *queue.Item) {
request := item.Data().(*put.Request) //nolint:errcheck,forcetypeassert
for i := 0; i < int(jobRetries); i++ {
_, err := s.db.SetEntryStatus(request)
if err != nil {
s.Logger.Printf("failed to mark entry as failed for buried item for set %s for %s: %s\n",
given.Name, given.Requester, err)
return
}
}
})
}