/
disaster_recovery.go
91 lines (78 loc) · 2.38 KB
/
disaster_recovery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package localstore
import (
"encoding/binary"
"fmt"
"github.com/redesblock/mop/core/incentives/voucher"
"github.com/redesblock/mop/core/storer/sharky"
"github.com/redesblock/mop/core/storer/shed"
)
const headerSize = 16 + voucher.StampSize
type locOrErr struct {
err error
loc sharky.Location
}
// recovery tries to recover a dirty database.
func recovery(db *DB) (chan locOrErr, error) {
// - go through all retrieval data index entries
// - find all used locations in sharky
// - return them so that sharky can be initialized with them
// first define the index instance
retrievalDataIndex, err := db.shed.NewIndex("Address->StoreTimestamp|BinID|BatchID|BatchIndex|Sig|Location", shed.IndexFuncs{
EncodeKey: func(fields shed.Item) (key []byte, err error) {
return fields.Address, nil
},
DecodeKey: func(key []byte) (e shed.Item, err error) {
e.Address = key
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
b := make([]byte, headerSize)
binary.BigEndian.PutUint64(b[:8], fields.BinID)
binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp))
stamp, err := voucher.NewStamp(fields.BatchID, fields.Index, fields.Timestamp, fields.Sig).MarshalBinary()
if err != nil {
return nil, err
}
copy(b[16:], stamp)
value = append(b, fields.Location...)
return value, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[8:16]))
e.BinID = binary.BigEndian.Uint64(value[:8])
stamp := new(voucher.Stamp)
if err = stamp.UnmarshalBinary(value[16:headerSize]); err != nil {
return e, err
}
e.BatchID = stamp.BatchID()
e.Index = stamp.Index()
e.Timestamp = stamp.Timestamp()
e.Sig = stamp.Sig()
e.Location = value[headerSize:]
return e, nil
},
})
if err != nil {
return nil, err
}
usedLocations := make(chan locOrErr)
go func() {
defer close(usedLocations)
err := retrievalDataIndex.Iterate(func(item shed.Item) (stop bool, err error) {
loc, err := sharky.LocationFromBinary(item.Location)
if err != nil {
return true, fmt.Errorf("location from binary: %w", err)
}
usedLocations <- locOrErr{
loc: loc,
}
return false, nil
}, nil)
if err != nil {
usedLocations <- locOrErr{
err: fmt.Errorf("iterate index: %w", err),
}
}
}()
return usedLocations, nil
}