-
Notifications
You must be signed in to change notification settings - Fork 12
/
storage.go
103 lines (87 loc) · 3.15 KB
/
storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package slotattestation
import (
"github.com/iotaledger/hive.go/ads"
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/kvstore"
iotago "github.com/iotaledger/iota.go/v4"
)
func (m *Manager) RestoreFromDisk() error {
m.commitmentMutex.Lock()
defer m.commitmentMutex.Unlock()
cutoffIndex, isValid := m.computeAttestationCommitmentOffset(m.lastCommittedSlot)
for i := cutoffIndex; isValid && i <= m.lastCommittedSlot; i++ {
storage, err := m.trackerStorage(i)
if err != nil {
return ierrors.Wrapf(err, "failed to get storage for slot %d", i)
}
//nolint:revive
err = storage.Iterate(kvstore.EmptyPrefix, func(key iotago.AccountID, value *iotago.Attestation) bool {
m.applyToPendingAttestations(value, cutoffIndex)
return true
})
if err != nil {
return ierrors.Wrapf(err, "failed to iterate over attestations of slot %d", i)
}
if err = storage.Clear(); err != nil {
return ierrors.Wrapf(err, "failed to clear tracker attestations of slot %d", i)
}
}
m.InitializedEvent().Trigger()
return nil
}
func (m *Manager) writeToDisk() error {
m.commitmentMutex.RLock()
defer m.commitmentMutex.RUnlock()
cutoffIndex, isValid := m.computeAttestationCommitmentOffset(m.lastCommittedSlot)
if !isValid {
return nil
}
for i := cutoffIndex; i <= m.lastCommittedSlot; i++ {
storage, err := m.trackerStorage(i)
if err != nil {
return ierrors.Wrapf(err, "failed to get storage for slot %d", i)
}
attestations := m.determineAttestationsFromWindow(i)
for _, a := range attestations {
if err := storage.Set(a.Header.IssuerID, a); err != nil {
return ierrors.Wrapf(err, "failed to set attestation %v", a)
}
}
}
return nil
}
func (m *Manager) trackerStorage(index iotago.SlotIndex) (*kvstore.TypedStore[iotago.AccountID, *iotago.Attestation], error) {
trackerStorage, err := m.bucketedStorage(index)
if err != nil {
return nil, ierrors.Errorf("failed to access storage for tracker of slot %d", index)
}
trackerStorage, err = trackerStorage.WithExtendedRealm(kvstore.Realm{prefixAttestationsTracker})
if err != nil {
return nil, ierrors.Wrapf(err, "failed to get extended realm for tracker of slot %d", index)
}
return kvstore.NewTypedStore[iotago.AccountID, *iotago.Attestation](trackerStorage,
iotago.AccountID.Bytes,
iotago.AccountIDFromBytes,
(*iotago.Attestation).Bytes,
iotago.AttestationFromBytes(m.apiProvider),
), nil
}
func (m *Manager) attestationsForSlot(index iotago.SlotIndex) (ads.Map[iotago.Identifier, iotago.AccountID, *iotago.Attestation], error) {
attestationsStorage, err := m.bucketedStorage(index)
if err != nil {
return nil, ierrors.Errorf("failed to access storage for attestors of slot %d", index)
}
attestationsStorage, err = attestationsStorage.WithExtendedRealm(kvstore.Realm{prefixAttestationsADSMap})
if err != nil {
return nil, ierrors.Wrapf(err, "failed to get extended realm for attestations of slot %d", index)
}
return ads.NewMap[iotago.Identifier](
attestationsStorage,
iotago.Identifier.Bytes,
iotago.IdentifierFromBytes,
iotago.AccountID.Bytes,
iotago.AccountIDFromBytes,
(*iotago.Attestation).Bytes,
iotago.AttestationFromBytes(m.apiProvider),
), nil
}