-
Notifications
You must be signed in to change notification settings - Fork 178
/
backend_events.go
221 lines (186 loc) · 6.96 KB
/
backend_events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
package backend
import (
"context"
"encoding/hex"
"errors"
"fmt"
"github.com/hashicorp/go-multierror"
execproto "github.com/onflow/flow/protobuf/go/flow/execution"
"github.com/rs/zerolog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
)
type backendEvents struct {
staticExecutionRPC execproto.ExecutionAPIClient
blocks storage.Blocks
executionReceipts storage.ExecutionReceipts
state protocol.State
connFactory ConnectionFactory
log zerolog.Logger
}
// GetEventsForHeightRange retrieves events for all sealed blocks between the start block height and
// the end block height (inclusive) that have the given type.
func (b *backendEvents) GetEventsForHeightRange(
ctx context.Context,
eventType string,
startHeight, endHeight uint64,
) ([]flow.BlockEvents, error) {
if endHeight < startHeight {
return nil, status.Error(codes.InvalidArgument, "invalid start or end height")
}
// get the latest sealed block header
head, err := b.state.Sealed().Head()
if err != nil {
return nil, status.Errorf(codes.Internal, " failed to get events: %v", err)
}
// start height should not be beyond the last sealed height
if head.Height < startHeight {
return nil, status.Errorf(codes.Internal,
" start height %d is greater than the last sealed block height %d", startHeight, head.Height)
}
// limit max height to last sealed block in the chain
if head.Height < endHeight {
endHeight = head.Height
}
// find the block headers for all the blocks between min and max height (inclusive)
blockHeaders := make([]*flow.Header, 0)
for i := startHeight; i <= endHeight; i++ {
block, err := b.blocks.ByHeight(i)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get events: %v", err)
}
blockHeaders = append(blockHeaders, block.Header)
}
return b.getBlockEventsFromExecutionNode(ctx, blockHeaders, eventType)
}
// GetEventsForBlockIDs retrieves events for all the specified block IDs that have the given type
func (b *backendEvents) GetEventsForBlockIDs(
ctx context.Context,
eventType string,
blockIDs []flow.Identifier,
) ([]flow.BlockEvents, error) {
// find the block headers for all the block IDs
blockHeaders := make([]*flow.Header, 0)
for _, blockID := range blockIDs {
block, err := b.blocks.ByID(blockID)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get events: %v", err)
}
blockHeaders = append(blockHeaders, block.Header)
}
// forward the request to the execution node
return b.getBlockEventsFromExecutionNode(ctx, blockHeaders, eventType)
}
func (b *backendEvents) getBlockEventsFromExecutionNode(
ctx context.Context,
blockHeaders []*flow.Header,
eventType string,
) ([]flow.BlockEvents, error) {
// create an execution API request for events at block ID
blockIDs := make([]flow.Identifier, len(blockHeaders))
for i := range blockIDs {
blockIDs[i] = blockHeaders[i].ID()
}
if len(blockIDs) == 0 {
return []flow.BlockEvents{}, nil
}
req := execproto.GetEventsForBlockIDsRequest{
Type: eventType,
BlockIds: convert.IdentifiersToMessages(blockIDs),
}
// choose the last block ID to find the list of execution nodes
lastBlockID := blockIDs[len(blockIDs)-1]
execNodes, err := executionNodesForBlockID(lastBlockID, b.executionReceipts, b.state)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to retrieve events from execution node: %v", err)
}
var resp *execproto.GetEventsForBlockIDsResponse
if len(execNodes) == 0 {
if b.staticExecutionRPC == nil {
return nil, status.Errorf(codes.Internal, "failed to retrieve events from execution node")
}
// call the execution node gRPC
resp, err = b.staticExecutionRPC.GetEventsForBlockIDs(ctx, &req)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to retrieve events from execution node: %v", err)
}
} else {
var successfulNode *flow.Identity
resp, successfulNode, err = b.getEventsFromAnyExeNode(ctx, execNodes, req)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to retrieve events from execution nodes %s: %v", execNodes, err)
}
b.log.Trace().
Str("execution_id", successfulNode.String()).
Str("last_block_id", lastBlockID.String()).
Msg("successfully got events")
}
// convert execution node api result to access node api result
results, err := verifyAndConvertToAccessEvents(resp.GetResults(), blockHeaders)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to verify retrieved events from execution node: %v", err)
}
return results, nil
}
// verifyAndConvertToAccessEvents converts execution node api result to access node api result, and verifies that the results contains
// results from each block that was requested
func verifyAndConvertToAccessEvents(execEvents []*execproto.GetEventsForBlockIDsResponse_Result, requestedBlockHeaders []*flow.Header) ([]flow.BlockEvents, error) {
if len(execEvents) != len(requestedBlockHeaders) {
return nil, errors.New("number of results does not match number of blocks requested")
}
reqestedBlockHeaderSet := map[string]*flow.Header{}
for _, header := range requestedBlockHeaders {
reqestedBlockHeaderSet[header.ID().String()] = header
}
results := make([]flow.BlockEvents, len(execEvents))
for i, result := range execEvents {
header, expected := reqestedBlockHeaderSet[hex.EncodeToString(result.GetBlockId())]
if !expected {
return nil, fmt.Errorf("unexpected blockID from exe node %x", result.GetBlockId())
}
if result.GetBlockHeight() != header.Height {
return nil, fmt.Errorf("unexpected block height %d for block %x from exe node",
result.GetBlockHeight(),
result.GetBlockId())
}
results[i] = flow.BlockEvents{
BlockID: header.ID(),
BlockHeight: header.Height,
BlockTimestamp: header.Timestamp,
Events: convert.MessagesToEvents(result.GetEvents()),
}
}
return results, nil
}
func (b *backendEvents) getEventsFromAnyExeNode(ctx context.Context,
execNodes flow.IdentityList,
req execproto.GetEventsForBlockIDsRequest) (*execproto.GetEventsForBlockIDsResponse, *flow.Identity, error) {
var errors *multierror.Error
// try to get events from one of the execution nodes
for _, execNode := range execNodes {
resp, err := b.tryGetEvents(ctx, execNode, req)
if err == nil {
return resp, execNode, nil
}
errors = multierror.Append(errors, err)
}
return nil, nil, errors.ErrorOrNil()
}
func (b *backendEvents) tryGetEvents(ctx context.Context,
execNode *flow.Identity,
req execproto.GetEventsForBlockIDsRequest) (*execproto.GetEventsForBlockIDsResponse, error) {
execRPCClient, closer, err := b.connFactory.GetExecutionAPIClient(execNode.Address)
if err != nil {
return nil, err
}
defer closer.Close()
resp, err := execRPCClient.GetEventsForBlockIDs(ctx, &req)
if err != nil {
return nil, err
}
return resp, nil
}