-
Notifications
You must be signed in to change notification settings - Fork 174
/
transaction_results_indexer.go
147 lines (125 loc) · 6.1 KB
/
transaction_results_indexer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package backend
import (
"fmt"
"go.uber.org/atomic"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/state_synchronization"
"github.com/onflow/flow-go/module/state_synchronization/indexer"
"github.com/onflow/flow-go/storage"
)
// TransactionResultsIndex implements a wrapper around `storage.LightTransactionResult` ensuring that needed data has been synced and is available to the client.
// Note: `TransactionResultsIndex` is created with empty report due to the next reasoning:
// When the index is initially bootstrapped, the indexer needs to load an execution state checkpoint from
// disk and index all the data. This process can take more than 1 hour on some systems. Consequently, the Initialize
// pattern is implemented to enable the Access API to start up and serve queries before the index is fully ready. During
// the initialization phase, all calls to retrieve data from this struct should return indexer.ErrIndexNotInitialized.
// The caller is responsible for handling this error appropriately for the method.
type TransactionResultsIndex struct {
results storage.LightTransactionResults
reporter *atomic.Pointer[state_synchronization.IndexReporter]
}
var _ state_synchronization.IndexReporter = (*TransactionResultsIndex)(nil)
func NewTransactionResultsIndex(results storage.LightTransactionResults) *TransactionResultsIndex {
return &TransactionResultsIndex{
results: results,
reporter: atomic.NewPointer[state_synchronization.IndexReporter](nil),
}
}
// Initialize replaces a previously non-initialized reporter. Can be called once.
// No errors are expected during normal operations.
func (t *TransactionResultsIndex) Initialize(indexReporter state_synchronization.IndexReporter) error {
if t.reporter.CompareAndSwap(nil, &indexReporter) {
return nil
}
return fmt.Errorf("index reporter already initialized")
}
// ByBlockID checks data availability and returns all transaction results for a block
// Expected errors:
// - indexer.ErrIndexNotInitialized if the `TransactionResultsIndex` has not been initialized
// - storage.ErrHeightNotIndexed when data is unavailable
// - codes.NotFound if result cannot be provided by storage due to the absence of data.
func (t *TransactionResultsIndex) ByBlockID(blockID flow.Identifier, height uint64) ([]flow.LightTransactionResult, error) {
if err := t.checkDataAvailability(height); err != nil {
return nil, err
}
return t.results.ByBlockID(blockID)
}
// ByBlockIDTransactionID checks data availability and return the transaction result for the given block ID and transaction ID
// Expected errors:
// - indexer.ErrIndexNotInitialized if the `TransactionResultsIndex` has not been initialized
// - storage.ErrHeightNotIndexed when data is unavailable
// - codes.NotFound if result cannot be provided by storage due to the absence of data.
func (t *TransactionResultsIndex) ByBlockIDTransactionID(blockID flow.Identifier, height uint64, txID flow.Identifier) (*flow.LightTransactionResult, error) {
if err := t.checkDataAvailability(height); err != nil {
return nil, err
}
return t.results.ByBlockIDTransactionID(blockID, txID)
}
// ByBlockIDTransactionIndex checks data availability and return the transaction result for the given blockID and transaction index
// Expected errors:
// - indexer.ErrIndexNotInitialized if the `TransactionResultsIndex` has not been initialized
// - storage.ErrHeightNotIndexed when data is unavailable
// - codes.NotFound when result cannot be provided by storage due to the absence of data.
func (t *TransactionResultsIndex) ByBlockIDTransactionIndex(blockID flow.Identifier, height uint64, index uint32) (*flow.LightTransactionResult, error) {
if err := t.checkDataAvailability(height); err != nil {
return nil, err
}
return t.results.ByBlockIDTransactionIndex(blockID, index)
}
// LowestIndexedHeight returns the lowest height indexed by the execution state indexer.
// Expected errors:
// - indexer.ErrIndexNotInitialized if the `TransactionResultsIndex` has not been initialized
func (t *TransactionResultsIndex) LowestIndexedHeight() (uint64, error) {
reporter, err := t.getReporter()
if err != nil {
return 0, err
}
return reporter.LowestIndexedHeight()
}
// HighestIndexedHeight returns the highest height indexed by the execution state indexer.
// Expected errors:
// - indexer.ErrIndexNotInitialized if the `TransactionResultsIndex` has not been initialized
func (t *TransactionResultsIndex) HighestIndexedHeight() (uint64, error) {
reporter, err := t.getReporter()
if err != nil {
return 0, err
}
return reporter.HighestIndexedHeight()
}
// checkDataAvailability checks the availability of data at the given height by comparing it with the highest and lowest
// indexed heights. If the height is beyond the indexed range, an error is returned.
// Expected errors:
// - indexer.ErrIndexNotInitialized if the `TransactionResultsIndex` has not been initialized
// - storage.ErrHeightNotIndexed if the block at the provided height is not indexed yet
// - fmt.Errorf if the highest or lowest indexed heights cannot be retrieved from the reporter
func (t *TransactionResultsIndex) checkDataAvailability(height uint64) error {
reporter, err := t.getReporter()
if err != nil {
return err
}
highestHeight, err := reporter.HighestIndexedHeight()
if err != nil {
return fmt.Errorf("could not get highest indexed height: %w", err)
}
if height > highestHeight {
return fmt.Errorf("%w: block not indexed yet", storage.ErrHeightNotIndexed)
}
lowestHeight, err := reporter.LowestIndexedHeight()
if err != nil {
return fmt.Errorf("could not get lowest indexed height: %w", err)
}
if height < lowestHeight {
return fmt.Errorf("%w: block is before lowest indexed height", storage.ErrHeightNotIndexed)
}
return nil
}
// getReporter retrieves the current index reporter instance from the atomic pointer.
// Expected errors:
// - indexer.ErrIndexNotInitialized if the reporter is not initialized
func (t *TransactionResultsIndex) getReporter() (state_synchronization.IndexReporter, error) {
reporter := t.reporter.Load()
if reporter == nil {
return nil, indexer.ErrIndexNotInitialized
}
return *reporter, nil
}