-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
index.go
586 lines (484 loc) · 14.5 KB
/
index.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
package index
import (
"context"
"encoding/json"
"io"
"sync"
"time"
"github.com/restic/restic/internal/crypto"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/debug"
)
// In large repositories, millions of blobs are stored in the repository
// and restic needs to store an index entry for each blob in memory for
// most operations.
// Hence the index data structure defined here is one of the main contributions
// to the total memory requirements of restic.
//
// We store the index entries in indexMaps. In these maps, entries take 56
// bytes each, plus 8/4 = 2 bytes of unused pointers on average, not counting
// malloc and header struct overhead and ignoring duplicates (those are only
// present in edge cases and are also removed by prune runs).
//
// In the index entries, we need to reference the packID. As one pack may
// contain many blobs the packIDs are saved in a separate array and only the index
// within this array is saved in the indexEntry
//
// We assume on average a minimum of 8 blobs per pack; BP=8.
// (Note that for large files there should be 3 blobs per pack as the average chunk
// size is 1.5 MB and the minimum pack size is 4 MB)
//
// We have the following sizes:
// indexEntry: 56 bytes (on amd64)
// each packID: 32 bytes
//
// To save N index entries, we therefore need:
// N * (56 + 2) bytes + N * 32 bytes / BP = N * 62 bytes,
// i.e., fewer than 64 bytes per blob in an index.
// Index holds lookup tables for id -> pack.
type Index struct {
m sync.Mutex
byType [restic.NumBlobTypes]indexMap
packs restic.IDs
final bool // set to true for all indexes read from the backend ("finalized")
ids restic.IDs // set to the IDs of the contained finalized indexes
supersedes restic.IDs
created time.Time
}
// NewIndex returns a new index.
func NewIndex() *Index {
return &Index{
created: time.Now(),
}
}
// addToPacks saves the given pack ID and return the index.
// This procedere allows to use pack IDs which can be easily garbage collected after.
func (idx *Index) addToPacks(id restic.ID) int {
idx.packs = append(idx.packs, id)
return len(idx.packs) - 1
}
const maxuint32 = 1<<32 - 1
func (idx *Index) store(packIndex int, blob restic.Blob) {
// assert that offset and length fit into uint32!
if blob.Offset > maxuint32 || blob.Length > maxuint32 || blob.UncompressedLength > maxuint32 {
panic("offset or length does not fit in uint32. You have packs > 4GB!")
}
m := &idx.byType[blob.Type]
m.add(blob.ID, packIndex, uint32(blob.Offset), uint32(blob.Length), uint32(blob.UncompressedLength))
}
// Final returns true iff the index is already written to the repository, it is
// finalized.
func (idx *Index) Final() bool {
idx.m.Lock()
defer idx.m.Unlock()
return idx.final
}
const (
indexMaxBlobs = 50000
indexMaxBlobsCompressed = 3 * indexMaxBlobs
indexMaxAge = 10 * time.Minute
)
// IndexFull returns true iff the index is "full enough" to be saved as a preliminary index.
var IndexFull = func(idx *Index, compress bool) bool {
idx.m.Lock()
defer idx.m.Unlock()
debug.Log("checking whether index %p is full", idx)
var blobs uint
for typ := range idx.byType {
blobs += idx.byType[typ].len()
}
age := time.Since(idx.created)
var maxBlobs uint
if compress {
maxBlobs = indexMaxBlobsCompressed
} else {
maxBlobs = indexMaxBlobs
}
switch {
case age >= indexMaxAge:
debug.Log("index %p is old enough", idx, age)
return true
case blobs >= maxBlobs:
debug.Log("index %p has %d blobs", idx, blobs)
return true
}
debug.Log("index %p only has %d blobs and is too young (%v)", idx, blobs, age)
return false
}
// StorePack remembers the ids of all blobs of a given pack
// in the index
func (idx *Index) StorePack(id restic.ID, blobs []restic.Blob) {
idx.m.Lock()
defer idx.m.Unlock()
if idx.final {
panic("store new item in finalized index")
}
debug.Log("%v", blobs)
packIndex := idx.addToPacks(id)
for _, blob := range blobs {
idx.store(packIndex, blob)
}
}
func (idx *Index) toPackedBlob(e *indexEntry, t restic.BlobType) restic.PackedBlob {
return restic.PackedBlob{
Blob: restic.Blob{
BlobHandle: restic.BlobHandle{
ID: e.id,
Type: t},
Length: uint(e.length),
Offset: uint(e.offset),
UncompressedLength: uint(e.uncompressedLength),
},
PackID: idx.packs[e.packIndex],
}
}
// Lookup queries the index for the blob ID and returns all entries including
// duplicates. Adds found entries to blobs and returns the result.
func (idx *Index) Lookup(bh restic.BlobHandle, pbs []restic.PackedBlob) []restic.PackedBlob {
idx.m.Lock()
defer idx.m.Unlock()
idx.byType[bh.Type].foreachWithID(bh.ID, func(e *indexEntry) {
pbs = append(pbs, idx.toPackedBlob(e, bh.Type))
})
return pbs
}
// Has returns true iff the id is listed in the index.
func (idx *Index) Has(bh restic.BlobHandle) bool {
idx.m.Lock()
defer idx.m.Unlock()
return idx.byType[bh.Type].get(bh.ID) != nil
}
// LookupSize returns the length of the plaintext content of the blob with the
// given id.
func (idx *Index) LookupSize(bh restic.BlobHandle) (plaintextLength uint, found bool) {
idx.m.Lock()
defer idx.m.Unlock()
e := idx.byType[bh.Type].get(bh.ID)
if e == nil {
return 0, false
}
if e.uncompressedLength != 0 {
return uint(e.uncompressedLength), true
}
return uint(crypto.PlaintextLength(int(e.length))), true
}
// Supersedes returns the list of indexes this index supersedes, if any.
func (idx *Index) Supersedes() restic.IDs {
return idx.supersedes
}
// AddToSupersedes adds the ids to the list of indexes superseded by this
// index. If the index has already been finalized, an error is returned.
func (idx *Index) AddToSupersedes(ids ...restic.ID) error {
idx.m.Lock()
defer idx.m.Unlock()
if idx.final {
return errors.New("index already finalized")
}
idx.supersedes = append(idx.supersedes, ids...)
return nil
}
// Each passes all blobs known to the index to the callback fn. This blocks any
// modification of the index.
func (idx *Index) Each(ctx context.Context, fn func(restic.PackedBlob)) {
idx.m.Lock()
defer idx.m.Unlock()
for typ := range idx.byType {
m := &idx.byType[typ]
m.foreach(func(e *indexEntry) bool {
if ctx.Err() != nil {
return false
}
fn(idx.toPackedBlob(e, restic.BlobType(typ)))
return true
})
}
}
type EachByPackResult struct {
PackID restic.ID
Blobs []restic.Blob
}
// EachByPack returns a channel that yields all blobs known to the index
// grouped by packID but ignoring blobs with a packID in packPlacklist for
// finalized indexes.
// This filtering is used when rebuilding the index where we need to ignore packs
// from the finalized index which have been re-read into a non-finalized index.
// When the context is cancelled, the background goroutine
// terminates. This blocks any modification of the index.
func (idx *Index) EachByPack(ctx context.Context, packBlacklist restic.IDSet) <-chan EachByPackResult {
idx.m.Lock()
ch := make(chan EachByPackResult)
go func() {
defer idx.m.Unlock()
defer close(ch)
byPack := make(map[restic.ID][restic.NumBlobTypes][]*indexEntry)
for typ := range idx.byType {
m := &idx.byType[typ]
m.foreach(func(e *indexEntry) bool {
packID := idx.packs[e.packIndex]
if !idx.final || !packBlacklist.Has(packID) {
v := byPack[packID]
v[typ] = append(v[typ], e)
byPack[packID] = v
}
return true
})
}
for packID, packByType := range byPack {
var result EachByPackResult
result.PackID = packID
for typ, pack := range packByType {
for _, e := range pack {
result.Blobs = append(result.Blobs, idx.toPackedBlob(e, restic.BlobType(typ)).Blob)
}
}
// allow GC once entry is no longer necessary
delete(byPack, packID)
select {
case <-ctx.Done():
return
case ch <- result:
}
}
}()
return ch
}
// Packs returns all packs in this index
func (idx *Index) Packs() restic.IDSet {
idx.m.Lock()
defer idx.m.Unlock()
packs := restic.NewIDSet()
for _, packID := range idx.packs {
packs.Insert(packID)
}
return packs
}
type packJSON struct {
ID restic.ID `json:"id"`
Blobs []blobJSON `json:"blobs"`
}
type blobJSON struct {
ID restic.ID `json:"id"`
Type restic.BlobType `json:"type"`
Offset uint `json:"offset"`
Length uint `json:"length"`
UncompressedLength uint `json:"uncompressed_length,omitempty"`
}
// generatePackList returns a list of packs.
func (idx *Index) generatePackList() ([]*packJSON, error) {
list := []*packJSON{}
packs := make(map[restic.ID]*packJSON)
for typ := range idx.byType {
m := &idx.byType[typ]
m.foreach(func(e *indexEntry) bool {
packID := idx.packs[e.packIndex]
if packID.IsNull() {
panic("null pack id")
}
debug.Log("handle blob %v", e.id)
// see if pack is already in map
p, ok := packs[packID]
if !ok {
// else create new pack
p = &packJSON{ID: packID}
// and append it to the list and map
list = append(list, p)
packs[p.ID] = p
}
// add blob
p.Blobs = append(p.Blobs, blobJSON{
ID: e.id,
Type: restic.BlobType(typ),
Offset: uint(e.offset),
Length: uint(e.length),
UncompressedLength: uint(e.uncompressedLength),
})
return true
})
}
debug.Log("done")
return list, nil
}
type jsonIndex struct {
Supersedes restic.IDs `json:"supersedes,omitempty"`
Packs []*packJSON `json:"packs"`
}
// Encode writes the JSON serialization of the index to the writer w.
func (idx *Index) Encode(w io.Writer) error {
debug.Log("encoding index")
idx.m.Lock()
defer idx.m.Unlock()
list, err := idx.generatePackList()
if err != nil {
return err
}
enc := json.NewEncoder(w)
idxJSON := jsonIndex{
Supersedes: idx.supersedes,
Packs: list,
}
return enc.Encode(idxJSON)
}
// Finalize sets the index to final.
func (idx *Index) Finalize() {
debug.Log("finalizing index")
idx.m.Lock()
defer idx.m.Unlock()
idx.final = true
}
// IDs returns the IDs of the index, if available. If the index is not yet
// finalized, an error is returned.
func (idx *Index) IDs() (restic.IDs, error) {
idx.m.Lock()
defer idx.m.Unlock()
if !idx.final {
return nil, errors.New("index not finalized")
}
return idx.ids, nil
}
// SetID sets the ID the index has been written to. This requires that
// Finalize() has been called before, otherwise an error is returned.
func (idx *Index) SetID(id restic.ID) error {
idx.m.Lock()
defer idx.m.Unlock()
if !idx.final {
return errors.New("index is not final")
}
if len(idx.ids) > 0 {
return errors.New("ID already set")
}
debug.Log("ID set to %v", id)
idx.ids = append(idx.ids, id)
return nil
}
// Dump writes the pretty-printed JSON representation of the index to w.
func (idx *Index) Dump(w io.Writer) error {
debug.Log("dumping index")
idx.m.Lock()
defer idx.m.Unlock()
list, err := idx.generatePackList()
if err != nil {
return err
}
outer := jsonIndex{
Supersedes: idx.Supersedes(),
Packs: list,
}
buf, err := json.MarshalIndent(outer, "", " ")
if err != nil {
return err
}
_, err = w.Write(append(buf, '\n'))
if err != nil {
return errors.Wrap(err, "Write")
}
debug.Log("done")
return nil
}
// merge() merges indexes, i.e. idx.merge(idx2) merges the contents of idx2 into idx.
// During merging exact duplicates are removed; idx2 is not changed by this method.
func (idx *Index) merge(idx2 *Index) error {
idx.m.Lock()
defer idx.m.Unlock()
idx2.m.Lock()
defer idx2.m.Unlock()
if !idx2.final {
return errors.New("index to merge is not final")
}
packlen := len(idx.packs)
// first append packs as they might be accessed when looking for duplicates below
idx.packs = append(idx.packs, idx2.packs...)
// copy all index entries of idx2 to idx
for typ := range idx2.byType {
m2 := &idx2.byType[typ]
m := &idx.byType[typ]
// helper func to test if identical entry is contained in idx
hasIdenticalEntry := func(e2 *indexEntry) (found bool) {
m.foreachWithID(e2.id, func(e *indexEntry) {
b := idx.toPackedBlob(e, restic.BlobType(typ))
b2 := idx2.toPackedBlob(e2, restic.BlobType(typ))
if b == b2 {
found = true
}
})
return found
}
m2.foreach(func(e2 *indexEntry) bool {
if !hasIdenticalEntry(e2) {
// packIndex needs to be changed as idx2.pack was appended to idx.pack, see above
m.add(e2.id, e2.packIndex+packlen, e2.offset, e2.length, e2.uncompressedLength)
}
return true
})
}
idx.ids = append(idx.ids, idx2.ids...)
idx.supersedes = append(idx.supersedes, idx2.supersedes...)
return nil
}
// isErrOldIndex returns true if the error may be caused by an old index
// format.
func isErrOldIndex(err error) bool {
e, ok := err.(*json.UnmarshalTypeError)
return ok && e.Value == "array"
}
// DecodeIndex unserializes an index from buf.
func DecodeIndex(buf []byte, id restic.ID) (idx *Index, oldFormat bool, err error) {
debug.Log("Start decoding index")
idxJSON := &jsonIndex{}
err = json.Unmarshal(buf, idxJSON)
if err != nil {
debug.Log("Error %v", err)
if isErrOldIndex(err) {
debug.Log("index is probably old format, trying that")
idx, err = decodeOldIndex(buf)
return idx, err == nil, err
}
return nil, false, errors.Wrap(err, "DecodeIndex")
}
idx = NewIndex()
for _, pack := range idxJSON.Packs {
packID := idx.addToPacks(pack.ID)
for _, blob := range pack.Blobs {
idx.store(packID, restic.Blob{
BlobHandle: restic.BlobHandle{
Type: blob.Type,
ID: blob.ID},
Offset: blob.Offset,
Length: blob.Length,
UncompressedLength: blob.UncompressedLength,
})
}
}
idx.supersedes = idxJSON.Supersedes
idx.ids = append(idx.ids, id)
idx.final = true
debug.Log("done")
return idx, false, nil
}
// DecodeOldIndex loads and unserializes an index in the old format from rd.
func decodeOldIndex(buf []byte) (idx *Index, err error) {
debug.Log("Start decoding old index")
list := []*packJSON{}
err = json.Unmarshal(buf, &list)
if err != nil {
debug.Log("Error %#v", err)
return nil, errors.Wrap(err, "Decode")
}
idx = NewIndex()
for _, pack := range list {
packID := idx.addToPacks(pack.ID)
for _, blob := range pack.Blobs {
idx.store(packID, restic.Blob{
BlobHandle: restic.BlobHandle{
Type: blob.Type,
ID: blob.ID},
Offset: blob.Offset,
Length: blob.Length,
// no compressed length in the old index format
})
}
}
idx.final = true
debug.Log("done")
return idx, nil
}