-
Notifications
You must be signed in to change notification settings - Fork 178
/
results.go
160 lines (132 loc) · 5.15 KB
/
results.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package badger
import (
"errors"
"fmt"
"github.com/dgraph-io/badger/v2"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/badger/operation"
"github.com/onflow/flow-go/storage/badger/transaction"
)
// ExecutionResults implements persistent storage for execution results.
type ExecutionResults struct {
db *badger.DB
cache *Cache
}
var _ storage.ExecutionResults = (*ExecutionResults)(nil)
func NewExecutionResults(collector module.CacheMetrics, db *badger.DB) *ExecutionResults {
store := func(key interface{}, val interface{}) func(*transaction.Tx) error {
result := val.(*flow.ExecutionResult)
return transaction.WithTx(operation.SkipDuplicates(operation.InsertExecutionResult(result)))
}
retrieve := func(key interface{}) func(tx *badger.Txn) (interface{}, error) {
resultID := key.(flow.Identifier)
var result flow.ExecutionResult
return func(tx *badger.Txn) (interface{}, error) {
err := operation.RetrieveExecutionResult(resultID, &result)(tx)
return &result, err
}
}
res := &ExecutionResults{
db: db,
cache: newCache(collector, metrics.ResourceResult,
withLimit(flow.DefaultTransactionExpiry+100),
withStore(store),
withRetrieve(retrieve)),
}
return res
}
func (r *ExecutionResults) store(result *flow.ExecutionResult) func(*transaction.Tx) error {
return r.cache.PutTx(result.ID(), result)
}
func (r *ExecutionResults) byID(resultID flow.Identifier) func(*badger.Txn) (*flow.ExecutionResult, error) {
return func(tx *badger.Txn) (*flow.ExecutionResult, error) {
val, err := r.cache.Get(resultID)(tx)
if err != nil {
return nil, err
}
return val.(*flow.ExecutionResult), nil
}
}
func (r *ExecutionResults) byBlockID(blockID flow.Identifier) func(*badger.Txn) (*flow.ExecutionResult, error) {
return func(tx *badger.Txn) (*flow.ExecutionResult, error) {
var resultID flow.Identifier
err := operation.LookupExecutionResult(blockID, &resultID)(tx)
if err != nil {
return nil, fmt.Errorf("could not lookup execution result ID: %w", err)
}
return r.byID(resultID)(tx)
}
}
func (r *ExecutionResults) index(blockID, resultID flow.Identifier, force bool) func(*transaction.Tx) error {
return func(tx *transaction.Tx) error {
err := transaction.WithTx(operation.IndexExecutionResult(blockID, resultID))(tx)
if err == nil {
return nil
}
if !errors.Is(err, storage.ErrAlreadyExists) {
return err
}
if force {
return transaction.WithTx(operation.ReindexExecutionResult(blockID, resultID))(tx)
}
// when trying to index a result for a block, and there is already a result indexed for this block,
// double check if the indexed result is the same
var storedResultID flow.Identifier
err = transaction.WithTx(operation.LookupExecutionResult(blockID, &storedResultID))(tx)
if err != nil {
return fmt.Errorf("there is a result stored already, but cannot retrieve it: %w", err)
}
if storedResultID != resultID {
return fmt.Errorf("storing result that is different from the already stored one for block: %v, storing result: %v, stored result: %v. %w",
blockID, resultID, storedResultID, storage.ErrDataMismatch)
}
return nil
}
}
func (r *ExecutionResults) Store(result *flow.ExecutionResult) error {
return operation.RetryOnConflictTx(r.db, transaction.Update, r.store(result))
}
func (r *ExecutionResults) BatchStore(result *flow.ExecutionResult, batch storage.BatchStorage) error {
writeBatch := batch.GetWriter()
return operation.BatchInsertExecutionResult(result)(writeBatch)
}
func (r *ExecutionResults) BatchIndex(blockID flow.Identifier, resultID flow.Identifier, batch storage.BatchStorage) error {
writeBatch := batch.GetWriter()
return operation.BatchIndexExecutionResult(blockID, resultID)(writeBatch)
}
func (r *ExecutionResults) ByID(resultID flow.Identifier) (*flow.ExecutionResult, error) {
tx := r.db.NewTransaction(false)
defer tx.Discard()
return r.byID(resultID)(tx)
}
func (r *ExecutionResults) ByIDTx(resultID flow.Identifier) func(*transaction.Tx) (*flow.ExecutionResult, error) {
return func(tx *transaction.Tx) (*flow.ExecutionResult, error) {
result, err := r.byID(resultID)(tx.DBTxn)
return result, err
}
}
func (r *ExecutionResults) Index(blockID flow.Identifier, resultID flow.Identifier) error {
err := operation.RetryOnConflictTx(r.db, transaction.Update, r.index(blockID, resultID, false))
if err != nil {
return fmt.Errorf("could not index execution result: %w", err)
}
return nil
}
func (r *ExecutionResults) ForceIndex(blockID flow.Identifier, resultID flow.Identifier) error {
err := operation.RetryOnConflictTx(r.db, transaction.Update, r.index(blockID, resultID, true))
if err != nil {
return fmt.Errorf("could not index execution result: %w", err)
}
return nil
}
func (r *ExecutionResults) ByBlockID(blockID flow.Identifier) (*flow.ExecutionResult, error) {
tx := r.db.NewTransaction(false)
defer tx.Discard()
return r.byBlockID(blockID)(tx)
}
func (r *ExecutionResults) RemoveIndexByBlockID(blockID flow.Identifier) error {
return r.db.Update(operation.SkipNonExist(operation.RemoveExecutionResultIndex(blockID)))
}