-
Notifications
You must be signed in to change notification settings - Fork 166
/
backend_scripts.go
356 lines (301 loc) · 11.8 KB
/
backend_scripts.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
package backend
import (
"context"
"crypto/md5" //nolint:gosec
"time"
lru "github.com/hashicorp/golang-lru/v2"
execproto "github.com/onflow/flow/protobuf/go/flow/execution"
"github.com/rs/zerolog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/onflow/flow-go/engine/access/rpc/connection"
"github.com/onflow/flow-go/engine/common/rpc"
fvmerrors "github.com/onflow/flow-go/fvm/errors"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/execution"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/utils/logging"
)
// uniqueScriptLoggingTimeWindow is the duration for checking the uniqueness of scripts sent for execution
const uniqueScriptLoggingTimeWindow = 10 * time.Minute
type backendScripts struct {
log zerolog.Logger
headers storage.Headers
executionReceipts storage.ExecutionReceipts
state protocol.State
connFactory connection.ConnectionFactory
metrics module.BackendScriptsMetrics
loggedScripts *lru.Cache[[md5.Size]byte, time.Time]
nodeCommunicator Communicator
scriptExecutor execution.ScriptExecutor
scriptExecMode IndexQueryMode
}
// scriptExecutionRequest encapsulates the data needed to execute a script to make it easier
// to pass around between the various methods involved in script execution
type scriptExecutionRequest struct {
blockID flow.Identifier
height uint64
script []byte
arguments [][]byte
insecureScriptHash [md5.Size]byte
}
func newScriptExecutionRequest(blockID flow.Identifier, height uint64, script []byte, arguments [][]byte) *scriptExecutionRequest {
return &scriptExecutionRequest{
blockID: blockID,
height: height,
script: script,
arguments: arguments,
// encode to MD5 as low compute/memory lookup key
// CAUTION: cryptographically insecure md5 is used here, but only to de-duplicate logs.
// *DO NOT* use this hash for any protocol-related or cryptographic functions.
insecureScriptHash: md5.Sum(script), //nolint:gosec
}
}
// ExecuteScriptAtLatestBlock executes provided script at the latest sealed block.
func (b *backendScripts) ExecuteScriptAtLatestBlock(
ctx context.Context,
script []byte,
arguments [][]byte,
) ([]byte, error) {
latestHeader, err := b.state.Sealed().Head()
if err != nil {
// the latest sealed header MUST be available
err := irrecoverable.NewExceptionf("failed to lookup sealed header: %w", err)
irrecoverable.Throw(ctx, err)
return nil, err
}
return b.executeScript(ctx, newScriptExecutionRequest(latestHeader.ID(), latestHeader.Height, script, arguments))
}
// ExecuteScriptAtBlockID executes provided script at the provided block ID.
func (b *backendScripts) ExecuteScriptAtBlockID(
ctx context.Context,
blockID flow.Identifier,
script []byte,
arguments [][]byte,
) ([]byte, error) {
header, err := b.headers.ByBlockID(blockID)
if err != nil {
return nil, rpc.ConvertStorageError(err)
}
return b.executeScript(ctx, newScriptExecutionRequest(blockID, header.Height, script, arguments))
}
// ExecuteScriptAtBlockHeight executes provided script at the provided block height.
func (b *backendScripts) ExecuteScriptAtBlockHeight(
ctx context.Context,
blockHeight uint64,
script []byte,
arguments [][]byte,
) ([]byte, error) {
header, err := b.headers.ByHeight(blockHeight)
if err != nil {
return nil, rpc.ConvertStorageError(err)
}
return b.executeScript(ctx, newScriptExecutionRequest(header.ID(), blockHeight, script, arguments))
}
// executeScript executes the provided script using either the local execution state or the execution
// nodes depending on the node's configuration and the availability of the data.
func (b *backendScripts) executeScript(
ctx context.Context,
scriptRequest *scriptExecutionRequest,
) ([]byte, error) {
switch b.scriptExecMode {
case IndexQueryModeExecutionNodesOnly:
result, _, err := b.executeScriptOnAvailableExecutionNodes(ctx, scriptRequest)
return result, err
case IndexQueryModeLocalOnly:
result, _, err := b.executeScriptLocally(ctx, scriptRequest)
return result, err
case IndexQueryModeFailover:
localResult, localDuration, localErr := b.executeScriptLocally(ctx, scriptRequest)
if localErr == nil || isInvalidArgumentError(localErr) || status.Code(localErr) == codes.Canceled {
return localResult, localErr
}
// Note: scripts that timeout are retried on the execution nodes since ANs may have performance
// issues for some scripts.
execResult, execDuration, execErr := b.executeScriptOnAvailableExecutionNodes(ctx, scriptRequest)
resultComparer := newScriptResultComparison(b.log, b.metrics, scriptRequest)
_ = resultComparer.compare(
newScriptResult(execResult, execDuration, execErr),
newScriptResult(localResult, localDuration, localErr),
)
return execResult, execErr
case IndexQueryModeCompare:
execResult, execDuration, execErr := b.executeScriptOnAvailableExecutionNodes(ctx, scriptRequest)
// we can only compare the results if there were either no errors or a cadence error
// since we cannot distinguish the EN error as caused by the block being pruned or some other reason,
// which may produce a valid RN output but an error for the EN
if execErr != nil && !isInvalidArgumentError(execErr) {
return nil, execErr
}
localResult, localDuration, localErr := b.executeScriptLocally(ctx, scriptRequest)
resultComparer := newScriptResultComparison(b.log, b.metrics, scriptRequest)
_ = resultComparer.compare(
newScriptResult(execResult, execDuration, execErr),
newScriptResult(localResult, localDuration, localErr),
)
// always return EN results
return execResult, execErr
default:
return nil, status.Errorf(codes.Internal, "unknown script execution mode: %v", b.scriptExecMode)
}
}
// executeScriptLocally executes the provided script using the local execution state.
func (b *backendScripts) executeScriptLocally(
ctx context.Context,
r *scriptExecutionRequest,
) ([]byte, time.Duration, error) {
execStartTime := time.Now()
result, err := b.scriptExecutor.ExecuteAtBlockHeight(ctx, r.script, r.arguments, r.height)
execEndTime := time.Now()
execDuration := execEndTime.Sub(execStartTime)
lg := b.log.With().
Str("script_executor_addr", "localhost").
Hex("block_id", logging.ID(r.blockID)).
Uint64("height", r.height).
Hex("script_hash", r.insecureScriptHash[:]).
Dur("execution_dur_ms", execDuration).
Logger()
if err != nil {
convertedErr := convertScriptExecutionError(err, r.height)
switch status.Code(convertedErr) {
case codes.InvalidArgument, codes.Canceled, codes.DeadlineExceeded:
lg.Debug().Err(err).
Str("script", string(r.script)).
Msg("script failed to execute locally")
default:
lg.Error().Err(err).Msg("script execution failed")
b.metrics.ScriptExecutionErrorLocal()
}
return nil, execDuration, convertedErr
}
if b.log.GetLevel() == zerolog.DebugLevel && b.shouldLogScript(execEndTime, r.insecureScriptHash) {
lg.Debug().
Str("script", string(r.script)).
Msg("Successfully executed script")
b.loggedScripts.Add(r.insecureScriptHash, execEndTime)
}
// log execution time
b.metrics.ScriptExecuted(execDuration, len(r.script))
return result, execDuration, nil
}
// executeScriptOnAvailableExecutionNodes executes the provided script using available execution nodes.
func (b *backendScripts) executeScriptOnAvailableExecutionNodes(
ctx context.Context,
r *scriptExecutionRequest,
) ([]byte, time.Duration, error) {
// find few execution nodes which have executed the block earlier and provided an execution receipt for it
executors, err := executionNodesForBlockID(ctx, r.blockID, b.executionReceipts, b.state, b.log)
if err != nil {
return nil, 0, status.Errorf(codes.Internal, "failed to find script executors at blockId %v: %v", r.blockID.String(), err)
}
lg := b.log.With().
Hex("block_id", logging.ID(r.blockID)).
Hex("script_hash", r.insecureScriptHash[:]).
Logger()
var result []byte
var execDuration time.Duration
errToReturn := b.nodeCommunicator.CallAvailableNode(
executors,
func(node *flow.IdentitySkeleton) error {
execStartTime := time.Now()
result, err = b.tryExecuteScriptOnExecutionNode(ctx, node.Address, r)
executionTime := time.Now()
execDuration = executionTime.Sub(execStartTime)
if err != nil {
return err
}
if b.log.GetLevel() == zerolog.DebugLevel {
if b.shouldLogScript(executionTime, r.insecureScriptHash) {
lg.Debug().
Str("script_executor_addr", node.Address).
Str("script", string(r.script)).
Dur("execution_dur_ms", execDuration).
Msg("Successfully executed script")
b.loggedScripts.Add(r.insecureScriptHash, executionTime)
}
}
// log execution time
b.metrics.ScriptExecuted(time.Since(execStartTime), len(r.script))
return nil
},
func(node *flow.IdentitySkeleton, err error) bool {
if status.Code(err) == codes.InvalidArgument {
lg.Debug().Err(err).
Str("script_executor_addr", node.Address).
Str("script", string(r.script)).
Msg("script failed to execute on the execution node")
return true
}
return false
},
)
if errToReturn != nil {
if status.Code(errToReturn) != codes.InvalidArgument {
b.metrics.ScriptExecutionErrorOnExecutionNode()
b.log.Error().Err(errToReturn).Msg("script execution failed for execution node internal reasons")
}
return nil, execDuration, rpc.ConvertError(errToReturn, "failed to execute script on execution nodes", codes.Internal)
}
return result, execDuration, nil
}
// tryExecuteScriptOnExecutionNode attempts to execute the script on the given execution node.
func (b *backendScripts) tryExecuteScriptOnExecutionNode(
ctx context.Context,
executorAddress string,
r *scriptExecutionRequest,
) ([]byte, error) {
execRPCClient, closer, err := b.connFactory.GetExecutionAPIClient(executorAddress)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to create client for execution node %s: %v",
executorAddress, err)
}
defer closer.Close()
execResp, err := execRPCClient.ExecuteScriptAtBlockID(ctx, &execproto.ExecuteScriptAtBlockIDRequest{
BlockId: r.blockID[:],
Script: r.script,
Arguments: r.arguments,
})
if err != nil {
return nil, status.Errorf(status.Code(err), "failed to execute the script on the execution node %s: %v", executorAddress, err)
}
return execResp.GetValue(), nil
}
// isInvalidArgumentError checks if the error is from an invalid argument
func isInvalidArgumentError(scriptExecutionErr error) bool {
return status.Code(scriptExecutionErr) == codes.InvalidArgument
}
// shouldLogScript checks if the script hash is unique in the time window
func (b *backendScripts) shouldLogScript(execTime time.Time, scriptHash [md5.Size]byte) bool {
timestamp, seen := b.loggedScripts.Get(scriptHash)
if seen {
return execTime.Sub(timestamp) >= uniqueScriptLoggingTimeWindow
}
return true
}
// convertScriptExecutionError converts the script execution error to a gRPC error
func convertScriptExecutionError(err error, height uint64) error {
if err == nil {
return nil
}
var failure fvmerrors.CodedFailure
if fvmerrors.As(err, &failure) {
return rpc.ConvertError(err, "failed to execute script", codes.Internal)
}
// general FVM/ledger errors
var coded fvmerrors.CodedError
if fvmerrors.As(err, &coded) {
switch coded.Code() {
case fvmerrors.ErrCodeScriptExecutionCancelledError:
return status.Errorf(codes.Canceled, "script execution canceled: %v", err)
case fvmerrors.ErrCodeScriptExecutionTimedOutError:
return status.Errorf(codes.DeadlineExceeded, "script execution timed out: %v", err)
default:
// runtime errors
return status.Errorf(codes.InvalidArgument, "failed to execute script: %v", err)
}
}
return rpc.ConvertIndexError(err, height, "failed to execute script")
}