/
registers.go
173 lines (149 loc) · 5.17 KB
/
registers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package pebble
import (
"encoding/binary"
"fmt"
"github.com/cockroachdb/pebble"
"github.com/pkg/errors"
"go.uber.org/atomic"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/storage"
)
// Registers library that implements pebble storage for registers
// given a pebble instance with root block and root height populated
type Registers struct {
db *pebble.DB
firstHeight uint64
latestHeight *atomic.Uint64
}
var _ storage.RegisterIndex = (*Registers)(nil)
// NewRegisters takes a populated pebble instance with LatestHeight and FirstHeight set.
// return storage.ErrNotBootstrapped if they those two keys are unavailable as it implies a uninitialized state
// return other error if database is in a corrupted state
func NewRegisters(db *pebble.DB) (*Registers, error) {
// check height keys and populate cache. These two variables will have been set
firstHeight, latestHeight, err := ReadHeightsFromBootstrappedDB(db)
if err != nil {
// first height is found, but latest height is not found, this means that the DB is in a corrupted state
return nil, fmt.Errorf("unable to initialize register storage, latest height unavailable in db: %w", err)
}
// All registers between firstHeight and lastHeight have been indexed
return &Registers{
db: db,
firstHeight: firstHeight,
latestHeight: atomic.NewUint64(latestHeight),
}, nil
}
// Get returns the most recent updated payload for the given RegisterID.
// "most recent" means the updates happens most recent up the given height.
//
// For example, if there are 2 values stored for register A at height 6 and 11, then
// GetPayload(13, A) would return the value at height 11.
//
// - storage.ErrNotFound if no register values are found
// - storage.ErrHeightNotIndexed if the requested height is out of the range of stored heights
func (s *Registers) Get(
reg flow.RegisterID,
height uint64,
) (flow.RegisterValue, error) {
latestHeight := s.latestHeight.Load()
if height > latestHeight || height < s.firstHeight {
return nil, errors.Wrap(
storage.ErrHeightNotIndexed,
fmt.Sprintf("height %d not indexed, indexed range is [%d-%d]", height, s.firstHeight, latestHeight),
)
}
key := newLookupKey(height, reg)
return s.lookupRegister(key.Bytes())
}
func (s *Registers) lookupRegister(key []byte) (flow.RegisterValue, error) {
iter, err := s.db.NewIter(&pebble.IterOptions{
UseL6Filters: true,
})
if err != nil {
return nil, err
}
defer iter.Close()
ok := iter.SeekPrefixGE(key)
if !ok {
// no such register found
return nil, storage.ErrNotFound
}
binaryValue, err := iter.ValueAndErr()
if err != nil {
return nil, fmt.Errorf("failed to get value: %w", err)
}
// preventing caller from modifying the iterator's value slices
valueCopy := make([]byte, len(binaryValue))
copy(valueCopy, binaryValue)
return valueCopy, nil
}
// Store sets the given entries in a batch.
// This function is expected to be called at one batch per height, sequentially. Under normal conditions,
// it should be called wth the value of height set to LatestHeight + 1
// CAUTION: This function is not safe for concurrent use.
func (s *Registers) Store(
entries flow.RegisterEntries,
height uint64,
) error {
latestHeight := s.latestHeight.Load()
// This check is for a special case for the execution node.
// Upon restart, it may be in a state where registers are indexed in pebble for the latest height
// but the remaining execution data in badger is not, so we skip the indexing step without throwing an error
if height == latestHeight {
// already updated
return nil
}
nextHeight := latestHeight + 1
if height != nextHeight {
return fmt.Errorf("must store registers with the next height %v, but got %v", nextHeight, height)
}
batch := s.db.NewBatch()
defer batch.Close()
for _, entry := range entries {
encoded := newLookupKey(height, entry.Key).Bytes()
err := batch.Set(encoded, entry.Value, nil)
if err != nil {
return fmt.Errorf("failed to set key: %w", err)
}
}
// increment height and commit
err := batch.Set(latestHeightKey, encodedUint64(height), nil)
if err != nil {
return fmt.Errorf("failed to update latest height %d", height)
}
err = batch.Commit(pebble.Sync)
if err != nil {
return fmt.Errorf("failed to commit batch: %w", err)
}
s.latestHeight.Store(height)
return nil
}
// LatestHeight Gets the latest height of complete registers available
func (s *Registers) LatestHeight() uint64 {
return s.latestHeight.Load()
}
// FirstHeight first indexed height found in the store, typically root block for the spork
func (s *Registers) FirstHeight() uint64 {
return s.firstHeight
}
func firstStoredHeight(db *pebble.DB) (uint64, error) {
return heightLookup(db, firstHeightKey)
}
func latestStoredHeight(db *pebble.DB) (uint64, error) {
return heightLookup(db, latestHeightKey)
}
func heightLookup(db *pebble.DB, key []byte) (uint64, error) {
res, closer, err := db.Get(key)
if err != nil {
return 0, convertNotFoundError(err)
}
defer closer.Close()
return binary.BigEndian.Uint64(res), nil
}
// convert pebble NotFound error to storage NotFound error
func convertNotFoundError(err error) error {
if errors.Is(err, pebble.ErrNotFound) {
return storage.ErrNotFound
}
return err
}