-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
master_index.go
445 lines (372 loc) · 10.9 KB
/
master_index.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
package index
import (
"bytes"
"context"
"fmt"
"runtime"
"sync"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui/progress"
"golang.org/x/sync/errgroup"
)
// MasterIndex is a collection of indexes and IDs of chunks that are in the process of being saved.
type MasterIndex struct {
idx []*Index
pendingBlobs restic.BlobSet
idxMutex sync.RWMutex
compress bool
}
// NewMasterIndex creates a new master index.
func NewMasterIndex() *MasterIndex {
// Always add an empty final index, such that MergeFinalIndexes can merge into this.
// Note that removing this index could lead to a race condition in the rare
// sitation that only two indexes exist which are saved and merged concurrently.
idx := []*Index{NewIndex()}
idx[0].Finalize()
return &MasterIndex{idx: idx, pendingBlobs: restic.NewBlobSet()}
}
func (mi *MasterIndex) MarkCompressed() {
mi.compress = true
}
// Lookup queries all known Indexes for the ID and returns all matches.
func (mi *MasterIndex) Lookup(bh restic.BlobHandle) (pbs []restic.PackedBlob) {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
for _, idx := range mi.idx {
pbs = idx.Lookup(bh, pbs)
}
return pbs
}
// LookupSize queries all known Indexes for the ID and returns the first match.
func (mi *MasterIndex) LookupSize(bh restic.BlobHandle) (uint, bool) {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
for _, idx := range mi.idx {
if size, found := idx.LookupSize(bh); found {
return size, found
}
}
return 0, false
}
// AddPending adds a given blob to list of pending Blobs
// Before doing so it checks if this blob is already known.
// Returns true if adding was successful and false if the blob
// was already known
func (mi *MasterIndex) AddPending(bh restic.BlobHandle) bool {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
// Check if blob is pending or in index
if mi.pendingBlobs.Has(bh) {
return false
}
for _, idx := range mi.idx {
if idx.Has(bh) {
return false
}
}
// really not known -> insert
mi.pendingBlobs.Insert(bh)
return true
}
// Has queries all known Indexes for the ID and returns the first match.
// Also returns true if the ID is pending.
func (mi *MasterIndex) Has(bh restic.BlobHandle) bool {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
// also return true if blob is pending
if mi.pendingBlobs.Has(bh) {
return true
}
for _, idx := range mi.idx {
if idx.Has(bh) {
return true
}
}
return false
}
// IDs returns the IDs of all indexes contained in the index.
func (mi *MasterIndex) IDs() restic.IDSet {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
ids := restic.NewIDSet()
for _, idx := range mi.idx {
if !idx.Final() {
continue
}
indexIDs, err := idx.IDs()
if err != nil {
debug.Log("not using index, ID() returned error %v", err)
continue
}
for _, id := range indexIDs {
ids.Insert(id)
}
}
return ids
}
// Packs returns all packs that are covered by the index.
// If packBlacklist is given, those packs are only contained in the
// resulting IDSet if they are contained in a non-final (newly written) index.
func (mi *MasterIndex) Packs(packBlacklist restic.IDSet) restic.IDSet {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
packs := restic.NewIDSet()
for _, idx := range mi.idx {
idxPacks := idx.Packs()
if idx.final && len(packBlacklist) > 0 {
idxPacks = idxPacks.Sub(packBlacklist)
}
packs.Merge(idxPacks)
}
return packs
}
// Insert adds a new index to the MasterIndex.
func (mi *MasterIndex) Insert(idx *Index) {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
mi.idx = append(mi.idx, idx)
}
// StorePack remembers the id and pack in the index.
func (mi *MasterIndex) StorePack(id restic.ID, blobs []restic.Blob) {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
// delete blobs from pending
for _, blob := range blobs {
mi.pendingBlobs.Delete(restic.BlobHandle{Type: blob.Type, ID: blob.ID})
}
for _, idx := range mi.idx {
if !idx.Final() {
idx.StorePack(id, blobs)
return
}
}
newIdx := NewIndex()
newIdx.StorePack(id, blobs)
mi.idx = append(mi.idx, newIdx)
}
// finalizeNotFinalIndexes finalizes all indexes that
// have not yet been saved and returns that list
func (mi *MasterIndex) finalizeNotFinalIndexes() []*Index {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
var list []*Index
for _, idx := range mi.idx {
if !idx.Final() {
idx.Finalize()
list = append(list, idx)
}
}
debug.Log("return %d indexes", len(list))
return list
}
// finalizeFullIndexes finalizes all indexes that are full and returns that list.
func (mi *MasterIndex) finalizeFullIndexes() []*Index {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
var list []*Index
debug.Log("checking %d indexes", len(mi.idx))
for _, idx := range mi.idx {
if idx.Final() {
continue
}
if IndexFull(idx, mi.compress) {
debug.Log("index %p is full", idx)
idx.Finalize()
list = append(list, idx)
} else {
debug.Log("index %p not full", idx)
}
}
debug.Log("return %d indexes", len(list))
return list
}
// Each runs fn on all blobs known to the index. When the context is cancelled,
// the index iteration return immediately. This blocks any modification of the index.
func (mi *MasterIndex) Each(ctx context.Context, fn func(restic.PackedBlob)) {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
for _, idx := range mi.idx {
idx.Each(ctx, fn)
}
}
// MergeFinalIndexes merges all final indexes together.
// After calling, there will be only one big final index in MasterIndex
// containing all final index contents.
// Indexes that are not final are left untouched.
// This merging can only be called after all index files are loaded - as
// removing of superseded index contents is only possible for unmerged indexes.
func (mi *MasterIndex) MergeFinalIndexes() error {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
// The first index is always final and the one to merge into
newIdx := mi.idx[:1]
for i := 1; i < len(mi.idx); i++ {
idx := mi.idx[i]
// clear reference in masterindex as it may become stale
mi.idx[i] = nil
// do not merge indexes that have no id set
ids, _ := idx.IDs()
if !idx.Final() || len(ids) == 0 {
newIdx = append(newIdx, idx)
} else {
err := mi.idx[0].merge(idx)
if err != nil {
return fmt.Errorf("MergeFinalIndexes: %w", err)
}
}
}
mi.idx = newIdx
return nil
}
// Save saves all known indexes to index files, leaving out any
// packs whose ID is contained in packBlacklist from finalized indexes.
// The new index contains the IDs of all known indexes in the "supersedes"
// field. The IDs are also returned in the IDSet obsolete.
// After calling this function, you should remove the obsolete index files.
func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverUnpacked, packBlacklist restic.IDSet, extraObsolete restic.IDs, p *progress.Counter) (obsolete restic.IDSet, err error) {
p.SetMax(uint64(len(mi.Packs(packBlacklist))))
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
debug.Log("start rebuilding index of %d indexes, pack blacklist: %v", len(mi.idx), packBlacklist)
newIndex := NewIndex()
obsolete = restic.NewIDSet()
// track spawned goroutines using wg, create a new context which is
// cancelled as soon as an error occurs.
wg, ctx := errgroup.WithContext(ctx)
ch := make(chan *Index)
wg.Go(func() error {
defer close(ch)
for i, idx := range mi.idx {
if idx.Final() {
ids, err := idx.IDs()
if err != nil {
debug.Log("index %d does not have an ID: %v", err)
return err
}
debug.Log("adding index ids %v to supersedes field", ids)
err = newIndex.AddToSupersedes(ids...)
if err != nil {
return err
}
obsolete.Merge(restic.NewIDSet(ids...))
} else {
debug.Log("index %d isn't final, don't add to supersedes field", i)
}
debug.Log("adding index %d", i)
for pbs := range idx.EachByPack(ctx, packBlacklist) {
newIndex.StorePack(pbs.PackID, pbs.Blobs)
p.Add(1)
if IndexFull(newIndex, mi.compress) {
select {
case ch <- newIndex:
case <-ctx.Done():
return ctx.Err()
}
newIndex = NewIndex()
}
}
}
err = newIndex.AddToSupersedes(extraObsolete...)
if err != nil {
return err
}
obsolete.Merge(restic.NewIDSet(extraObsolete...))
select {
case ch <- newIndex:
case <-ctx.Done():
}
return nil
})
// a worker receives an index from ch, and saves the index
worker := func() error {
for idx := range ch {
idx.Finalize()
if _, err := SaveIndex(ctx, repo, idx); err != nil {
return err
}
}
return nil
}
// encoding an index can take quite some time such that this can be both CPU- or IO-bound
workerCount := int(repo.Connections()) + runtime.GOMAXPROCS(0)
// run workers on ch
for i := 0; i < workerCount; i++ {
wg.Go(worker)
}
err = wg.Wait()
return obsolete, err
}
// SaveIndex saves an index in the repository.
func SaveIndex(ctx context.Context, repo restic.SaverUnpacked, index *Index) (restic.ID, error) {
buf := bytes.NewBuffer(nil)
err := index.Encode(buf)
if err != nil {
return restic.ID{}, err
}
id, err := repo.SaveUnpacked(ctx, restic.IndexFile, buf.Bytes())
ierr := index.SetID(id)
if ierr != nil {
// logic bug
panic(ierr)
}
return id, err
}
// saveIndex saves all indexes in the backend.
func (mi *MasterIndex) saveIndex(ctx context.Context, r restic.SaverUnpacked, indexes ...*Index) error {
for i, idx := range indexes {
debug.Log("Saving index %d", i)
sid, err := SaveIndex(ctx, r, idx)
if err != nil {
return err
}
debug.Log("Saved index %d as %v", i, sid)
}
return mi.MergeFinalIndexes()
}
// SaveIndex saves all new indexes in the backend.
func (mi *MasterIndex) SaveIndex(ctx context.Context, r restic.SaverUnpacked) error {
return mi.saveIndex(ctx, r, mi.finalizeNotFinalIndexes()...)
}
// SaveFullIndex saves all full indexes in the backend.
func (mi *MasterIndex) SaveFullIndex(ctx context.Context, r restic.SaverUnpacked) error {
return mi.saveIndex(ctx, r, mi.finalizeFullIndexes()...)
}
// ListPacks returns the blobs of the specified pack files grouped by pack file.
func (mi *MasterIndex) ListPacks(ctx context.Context, packs restic.IDSet) <-chan restic.PackBlobs {
out := make(chan restic.PackBlobs)
go func() {
defer close(out)
// only resort a part of the index to keep the memory overhead bounded
for i := byte(0); i < 16; i++ {
if ctx.Err() != nil {
return
}
packBlob := make(map[restic.ID][]restic.Blob)
for pack := range packs {
if pack[0]&0xf == i {
packBlob[pack] = nil
}
}
if len(packBlob) == 0 {
continue
}
mi.Each(ctx, func(pb restic.PackedBlob) {
if packs.Has(pb.PackID) && pb.PackID[0]&0xf == i {
packBlob[pb.PackID] = append(packBlob[pb.PackID], pb.Blob)
}
})
// pass on packs
for packID, pbs := range packBlob {
// allow GC
packBlob[packID] = nil
select {
case out <- restic.PackBlobs{PackID: packID, Blobs: pbs}:
case <-ctx.Done():
return
}
}
}
}()
return out
}