-
Notifications
You must be signed in to change notification settings - Fork 178
/
transactions_local_data_provider.go
413 lines (358 loc) · 16.3 KB
/
transactions_local_data_provider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
package backend
import (
"context"
"errors"
"fmt"
"google.golang.org/grpc/status"
"github.com/onflow/flow/protobuf/go/flow/entities"
"google.golang.org/grpc/codes"
"github.com/onflow/flow-go/access"
"github.com/onflow/flow-go/engine/access/index"
"github.com/onflow/flow-go/engine/common/rpc"
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/counters"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/state"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
)
// ErrTransactionNotInBlock represents an error indicating that the transaction is not found in the block.
var ErrTransactionNotInBlock = errors.New("transaction not in block")
// TransactionErrorMessage declares the lookup transaction error methods by different input parameters.
type TransactionErrorMessage interface {
// LookupErrorMessageByTransactionID is a function type for getting transaction error message by block ID and transaction ID.
// Expected errors during normal operation:
// - InsufficientExecutionReceipts - found insufficient receipts for given block ID.
// - status.Error - remote GRPC call to EN has failed.
LookupErrorMessageByTransactionID(ctx context.Context, blockID flow.Identifier, transactionID flow.Identifier) (string, error)
// LookupErrorMessageByIndex is a function type for getting transaction error message by index.
// Expected errors during normal operation:
// - status.Error[codes.NotFound] - transaction result for given block ID and tx index is not available.
// - InsufficientExecutionReceipts - found insufficient receipts for given block ID.
// - status.Error - remote GRPC call to EN has failed.
LookupErrorMessageByIndex(ctx context.Context, blockID flow.Identifier, height uint64, index uint32) (string, error)
// LookupErrorMessagesByBlockID is a function type for getting transaction error messages by block ID.
// Expected errors during normal operation:
// - status.Error[codes.NotFound] - transaction results for given block ID are not available.
// - InsufficientExecutionReceipts - found insufficient receipts for given block ID.
// - status.Error - remote GRPC call to EN has failed.
LookupErrorMessagesByBlockID(ctx context.Context, blockID flow.Identifier, height uint64) (map[flow.Identifier]string, error)
}
// TransactionsLocalDataProvider provides functionality for retrieving transaction results and error messages from local storages
type TransactionsLocalDataProvider struct {
state protocol.State
collections storage.Collections
blocks storage.Blocks
eventsIndex *index.EventsIndex
txResultsIndex *index.TransactionResultsIndex
txErrorMessages TransactionErrorMessage
systemTxID flow.Identifier
lastFullBlockHeight *counters.PersistentStrictMonotonicCounter
}
// GetTransactionResultFromStorage retrieves a transaction result from storage by block ID and transaction ID.
// Expected errors during normal operation:
// - codes.NotFound when result cannot be provided by storage due to the absence of data.
// - codes.Internal if event payload conversion failed.
// - indexer.ErrIndexNotInitialized when txResultsIndex not initialized
// - storage.ErrHeightNotIndexed when data is unavailable
//
// All other errors are considered as state corruption (fatal) or internal errors in the transaction error message
// getter or when deriving transaction status.
func (t *TransactionsLocalDataProvider) GetTransactionResultFromStorage(
ctx context.Context,
block *flow.Block,
transactionID flow.Identifier,
requiredEventEncodingVersion entities.EventEncodingVersion,
) (*access.TransactionResult, error) {
blockID := block.ID()
txResult, err := t.txResultsIndex.ByBlockIDTransactionID(blockID, block.Header.Height, transactionID)
if err != nil {
return nil, rpc.ConvertIndexError(err, block.Header.Height, "failed to get transaction result")
}
var txErrorMessage string
var txStatusCode uint = 0
if txResult.Failed {
txErrorMessage, err = t.txErrorMessages.LookupErrorMessageByTransactionID(ctx, blockID, transactionID)
if err != nil {
return nil, err
}
if len(txErrorMessage) == 0 {
return nil, status.Errorf(codes.Internal, "transaction failed but error message is empty for tx ID: %s block ID: %s", txResult.TransactionID, blockID)
}
txStatusCode = 1 // statusCode of 1 indicates an error and 0 indicates no error, the same as on EN
}
txStatus, err := t.DeriveTransactionStatus(block.Header.Height, true)
if err != nil {
if !errors.Is(err, state.ErrUnknownSnapshotReference) {
irrecoverable.Throw(ctx, err)
}
return nil, rpc.ConvertStorageError(err)
}
events, err := t.eventsIndex.ByBlockIDTransactionID(blockID, block.Header.Height, transactionID)
if err != nil {
return nil, rpc.ConvertIndexError(err, block.Header.Height, "failed to get events")
}
// events are encoded in CCF format in storage. convert to JSON-CDC if requested
if requiredEventEncodingVersion == entities.EventEncodingVersion_JSON_CDC_V0 {
events, err = convert.CcfEventsToJsonEvents(events)
if err != nil {
return nil, rpc.ConvertError(err, "failed to convert event payload", codes.Internal)
}
}
return &access.TransactionResult{
TransactionID: txResult.TransactionID,
Status: txStatus,
StatusCode: txStatusCode,
Events: events,
ErrorMessage: txErrorMessage,
BlockID: blockID,
BlockHeight: block.Header.Height,
}, nil
}
// GetTransactionResultsByBlockIDFromStorage retrieves transaction results by block ID from storage
// Expected errors during normal operation:
// - codes.NotFound if result cannot be provided by storage due to the absence of data.
// - codes.Internal when event payload conversion failed.
// - indexer.ErrIndexNotInitialized when txResultsIndex not initialized
// - storage.ErrHeightNotIndexed when data is unavailable
//
// All other errors are considered as state corruption (fatal) or internal errors in the transaction error message
// getter or when deriving transaction status.
func (t *TransactionsLocalDataProvider) GetTransactionResultsByBlockIDFromStorage(
ctx context.Context,
block *flow.Block,
requiredEventEncodingVersion entities.EventEncodingVersion,
) ([]*access.TransactionResult, error) {
blockID := block.ID()
txResults, err := t.txResultsIndex.ByBlockID(blockID, block.Header.Height)
if err != nil {
return nil, rpc.ConvertIndexError(err, block.Header.Height, "failed to get transaction result")
}
txErrors, err := t.txErrorMessages.LookupErrorMessagesByBlockID(ctx, blockID, block.Header.Height)
if err != nil {
return nil, err
}
numberOfTxResults := len(txResults)
results := make([]*access.TransactionResult, 0, numberOfTxResults)
// cache the tx to collectionID mapping to avoid repeated lookups
txToCollectionID, err := t.buildTxIDToCollectionIDMapping(block)
if err != nil {
// this indicates that one or more of the collections for the block are not indexed. Since
// lookups are gated on the indexer signaling it has finished processing all data for the
// block, all data must be available in storage, otherwise there is an inconsistency in the
// state.
irrecoverable.Throw(ctx, fmt.Errorf("inconsistent index state: %w", err))
return nil, status.Errorf(codes.Internal, "failed to map tx to collection ID: %v", err)
}
for _, txResult := range txResults {
txID := txResult.TransactionID
var txErrorMessage string
var txStatusCode uint = 0
if txResult.Failed {
txErrorMessage = txErrors[txResult.TransactionID]
if len(txErrorMessage) == 0 {
return nil, status.Errorf(codes.Internal, "transaction failed but error message is empty for tx ID: %s block ID: %s", txID, blockID)
}
txStatusCode = 1
}
txStatus, err := t.DeriveTransactionStatus(block.Header.Height, true)
if err != nil {
if !errors.Is(err, state.ErrUnknownSnapshotReference) {
irrecoverable.Throw(ctx, err)
}
return nil, rpc.ConvertStorageError(err)
}
events, err := t.eventsIndex.ByBlockIDTransactionID(blockID, block.Header.Height, txResult.TransactionID)
if err != nil {
return nil, rpc.ConvertIndexError(err, block.Header.Height, "failed to get events")
}
// events are encoded in CCF format in storage. convert to JSON-CDC if requested
if requiredEventEncodingVersion == entities.EventEncodingVersion_JSON_CDC_V0 {
events, err = convert.CcfEventsToJsonEvents(events)
if err != nil {
return nil, rpc.ConvertError(err, "failed to convert event payload", codes.Internal)
}
}
collectionID, ok := txToCollectionID[txID]
if !ok {
return nil, status.Errorf(codes.Internal, "transaction %s not found in block %s", txID, blockID)
}
results = append(results, &access.TransactionResult{
Status: txStatus,
StatusCode: txStatusCode,
Events: events,
ErrorMessage: txErrorMessage,
BlockID: blockID,
TransactionID: txID,
CollectionID: collectionID,
BlockHeight: block.Header.Height,
})
}
return results, nil
}
// GetTransactionResultByIndexFromStorage retrieves a transaction result by index from storage.
// Expected errors during normal operation:
// - codes.NotFound if result cannot be provided by storage due to the absence of data.
// - codes.Internal when event payload conversion failed.
// - indexer.ErrIndexNotInitialized when txResultsIndex not initialized
// - storage.ErrHeightNotIndexed when data is unavailable
//
// All other errors are considered as state corruption (fatal) or internal errors in the transaction error message
// getter or when deriving transaction status.
func (t *TransactionsLocalDataProvider) GetTransactionResultByIndexFromStorage(
ctx context.Context,
block *flow.Block,
index uint32,
requiredEventEncodingVersion entities.EventEncodingVersion,
) (*access.TransactionResult, error) {
blockID := block.ID()
txResult, err := t.txResultsIndex.ByBlockIDTransactionIndex(blockID, block.Header.Height, index)
if err != nil {
return nil, rpc.ConvertIndexError(err, block.Header.Height, "failed to get transaction result")
}
var txErrorMessage string
var txStatusCode uint = 0
if txResult.Failed {
txErrorMessage, err = t.txErrorMessages.LookupErrorMessageByIndex(ctx, blockID, block.Header.Height, index)
if err != nil {
return nil, err
}
if len(txErrorMessage) == 0 {
return nil, status.Errorf(codes.Internal, "transaction failed but error message is empty for tx ID: %s block ID: %s", txResult.TransactionID, blockID)
}
txStatusCode = 1 // statusCode of 1 indicates an error and 0 indicates no error, the same as on EN
}
txStatus, err := t.DeriveTransactionStatus(block.Header.Height, true)
if err != nil {
if !errors.Is(err, state.ErrUnknownSnapshotReference) {
irrecoverable.Throw(ctx, err)
}
return nil, rpc.ConvertStorageError(err)
}
events, err := t.eventsIndex.ByBlockIDTransactionIndex(blockID, block.Header.Height, index)
if err != nil {
return nil, rpc.ConvertIndexError(err, block.Header.Height, "failed to get events")
}
// events are encoded in CCF format in storage. convert to JSON-CDC if requested
if requiredEventEncodingVersion == entities.EventEncodingVersion_JSON_CDC_V0 {
events, err = convert.CcfEventsToJsonEvents(events)
if err != nil {
return nil, rpc.ConvertError(err, "failed to convert event payload", codes.Internal)
}
}
collectionID, err := t.LookupCollectionIDInBlock(block, txResult.TransactionID)
if err != nil {
return nil, err
}
return &access.TransactionResult{
TransactionID: txResult.TransactionID,
Status: txStatus,
StatusCode: txStatusCode,
Events: events,
ErrorMessage: txErrorMessage,
BlockID: blockID,
BlockHeight: block.Header.Height,
CollectionID: collectionID,
}, nil
}
// DeriveUnknownTransactionStatus is used to determine the status of transaction
// that are not in a block yet based on the provided reference block ID.
func (t *TransactionsLocalDataProvider) DeriveUnknownTransactionStatus(refBlockID flow.Identifier) (flow.TransactionStatus, error) {
referenceBlock, err := t.state.AtBlockID(refBlockID).Head()
if err != nil {
return flow.TransactionStatusUnknown, err
}
refHeight := referenceBlock.Height
// get the latest finalized block from the state
finalized, err := t.state.Final().Head()
if err != nil {
return flow.TransactionStatusUnknown, irrecoverable.NewExceptionf("failed to lookup final header: %w", err)
}
finalizedHeight := finalized.Height
// if we haven't seen the expiry block for this transaction, it's not expired
if !isExpired(refHeight, finalizedHeight) {
return flow.TransactionStatusPending, nil
}
// At this point, we have seen the expiry block for the transaction.
// This means that, if no collections prior to the expiry block contain
// the transaction, it can never be included and is expired.
//
// To ensure this, we need to have received all collections up to the
// expiry block to ensure the transaction did not appear in any.
// the last full height is the height where we have received all
// collections for all blocks with a lower height
fullHeight := t.lastFullBlockHeight.Value()
// if we have received collections for all blocks up to the expiry block, the transaction is expired
if isExpired(refHeight, fullHeight) {
return flow.TransactionStatusExpired, nil
}
// tx found in transaction storage and collection storage but not in block storage
// However, this will not happen as of now since the ingestion engine doesn't subscribe
// for collections
return flow.TransactionStatusPending, nil
}
// DeriveTransactionStatus is used to determine the status of a transaction based on the provided block height, and execution status.
// No errors expected during normal operations.
func (t *TransactionsLocalDataProvider) DeriveTransactionStatus(blockHeight uint64, executed bool) (flow.TransactionStatus, error) {
if !executed {
// If we've gotten here, but the block has not yet been executed, report it as only been finalized
return flow.TransactionStatusFinalized, nil
}
// From this point on, we know for sure this transaction has at least been executed
// get the latest sealed block from the State
sealed, err := t.state.Sealed().Head()
if err != nil {
return flow.TransactionStatusUnknown, irrecoverable.NewExceptionf("failed to lookup sealed header: %w", err)
}
if blockHeight > sealed.Height {
// The block is not yet sealed, so we'll report it as only executed
return flow.TransactionStatusExecuted, nil
}
// otherwise, this block has been executed, and sealed, so report as sealed
return flow.TransactionStatusSealed, nil
}
// isExpired checks whether a transaction is expired given the height of the
// transaction's reference block and the height to compare against.
func isExpired(refHeight, compareToHeight uint64) bool {
if compareToHeight <= refHeight {
return false
}
return compareToHeight-refHeight > flow.DefaultTransactionExpiry
}
// LookupCollectionIDInBlock returns the collection ID based on the transaction ID. The lookup is performed in block
// collections.
func (t *TransactionsLocalDataProvider) LookupCollectionIDInBlock(
block *flow.Block,
txID flow.Identifier,
) (flow.Identifier, error) {
for _, guarantee := range block.Payload.Guarantees {
collection, err := t.collections.LightByID(guarantee.ID())
if err != nil {
return flow.ZeroID, fmt.Errorf("failed to get collection %s in indexed block: %w", guarantee.ID(), err)
}
for _, collectionTxID := range collection.Transactions {
if collectionTxID == txID {
return guarantee.ID(), nil
}
}
}
return flow.ZeroID, ErrTransactionNotInBlock
}
// buildTxIDToCollectionIDMapping returns a map of transaction ID to collection ID based on the provided block.
// No errors expected during normal operations.
func (t *TransactionsLocalDataProvider) buildTxIDToCollectionIDMapping(block *flow.Block) (map[flow.Identifier]flow.Identifier, error) {
txToCollectionID := make(map[flow.Identifier]flow.Identifier)
for _, guarantee := range block.Payload.Guarantees {
collection, err := t.collections.LightByID(guarantee.ID())
if err != nil {
// if the tx result is in storage, the collection must be too.
return nil, fmt.Errorf("failed to get collection %s in indexed block: %w", guarantee.ID(), err)
}
for _, txID := range collection.Transactions {
txToCollectionID[txID] = guarantee.ID()
}
}
txToCollectionID[t.systemTxID] = flow.ZeroID
return txToCollectionID, nil
}