-
Notifications
You must be signed in to change notification settings - Fork 178
/
approvals.go
138 lines (117 loc) · 4.69 KB
/
approvals.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package badger
import (
"errors"
"fmt"
"github.com/dgraph-io/badger/v2"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/badger/operation"
"github.com/onflow/flow-go/storage/badger/transaction"
)
// ResultApprovals implements persistent storage for result approvals.
type ResultApprovals struct {
db *badger.DB
cache *Cache
}
func NewResultApprovals(collector module.CacheMetrics, db *badger.DB) *ResultApprovals {
store := func(key interface{}, val interface{}) func(*transaction.Tx) error {
approval := val.(*flow.ResultApproval)
return transaction.WithTx(operation.SkipDuplicates(operation.InsertResultApproval(approval)))
}
retrieve := func(key interface{}) func(tx *badger.Txn) (interface{}, error) {
approvalID := key.(flow.Identifier)
var approval flow.ResultApproval
return func(tx *badger.Txn) (interface{}, error) {
err := operation.RetrieveResultApproval(approvalID, &approval)(tx)
return &approval, err
}
}
res := &ResultApprovals{
db: db,
cache: newCache(collector, metrics.ResourceResultApprovals,
withLimit(flow.DefaultTransactionExpiry+100),
withStore(store),
withRetrieve(retrieve)),
}
return res
}
func (r *ResultApprovals) store(approval *flow.ResultApproval) func(*transaction.Tx) error {
return r.cache.PutTx(approval.ID(), approval)
}
func (r *ResultApprovals) byID(approvalID flow.Identifier) func(*badger.Txn) (*flow.ResultApproval, error) {
return func(tx *badger.Txn) (*flow.ResultApproval, error) {
val, err := r.cache.Get(approvalID)(tx)
if err != nil {
return nil, err
}
return val.(*flow.ResultApproval), nil
}
}
func (r *ResultApprovals) byChunk(resultID flow.Identifier, chunkIndex uint64) func(*badger.Txn) (*flow.ResultApproval, error) {
return func(tx *badger.Txn) (*flow.ResultApproval, error) {
var approvalID flow.Identifier
err := operation.LookupResultApproval(resultID, chunkIndex, &approvalID)(tx)
if err != nil {
return nil, fmt.Errorf("could not lookup result approval ID: %w", err)
}
return r.byID(approvalID)(tx)
}
}
func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(*badger.Txn) error {
return func(tx *badger.Txn) error {
err := operation.IndexResultApproval(resultID, chunkIndex, approvalID)(tx)
if err == nil {
return nil
}
if !errors.Is(err, storage.ErrAlreadyExists) {
return err
}
// When trying to index an approval for a result, and there is already
// an approval for the result, double check if the indexed approval is
// the same.
// We don't allow indexing multiple approvals per chunk because the
// store is only used within Verification nodes, and it is impossible
// for a Verification node to compute different approvals for the same
// chunk.
var storedApprovalID flow.Identifier
err = operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(tx)
if err != nil {
return fmt.Errorf("there is an approval stored already, but cannot retrieve it: %w", err)
}
if storedApprovalID != approvalID {
return fmt.Errorf("attempting to store conflicting approval (result: %v, chunk index: %d): storing: %v, stored: %v. %w",
resultID, chunkIndex, approvalID, storedApprovalID, storage.ErrDataMismatch)
}
return nil
}
}
// Store stores a ResultApproval
func (r *ResultApprovals) Store(approval *flow.ResultApproval) error {
return operation.RetryOnConflictTx(r.db, transaction.Update, r.store(approval))
}
// Index indexes a ResultApproval by chunk (ResultID + chunk index).
// operation is idempotent (repeated calls with the same value are equivalent to
// just calling the method once; still the method succeeds on each call).
func (r *ResultApprovals) Index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) error {
err := operation.RetryOnConflict(r.db.Update, r.index(resultID, chunkIndex, approvalID))
if err != nil {
return fmt.Errorf("could not index result approval: %w", err)
}
return nil
}
// ByID retrieves a ResultApproval by its ID
func (r *ResultApprovals) ByID(approvalID flow.Identifier) (*flow.ResultApproval, error) {
tx := r.db.NewTransaction(false)
defer tx.Discard()
return r.byID(approvalID)(tx)
}
// ByChunk retrieves a ResultApproval by result ID and chunk index. The
// ResultApprovals store is only used within a verification node, where it is
// assumed that there is never more than one approval per chunk.
func (r *ResultApprovals) ByChunk(resultID flow.Identifier, chunkIndex uint64) (*flow.ResultApproval, error) {
tx := r.db.NewTransaction(false)
defer tx.Discard()
return r.byChunk(resultID, chunkIndex)(tx)
}