-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
streams_lookup.go
630 lines (568 loc) · 26.4 KB
/
streams_lookup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
package evm
import (
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"math/big"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
"github.com/avast/retry-go/v4"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/patrickmn/go-cache"
ocr2keepers "github.com/smartcontractkit/ocr2keepers/pkg/v3/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evm21/encoding"
)
const (
applicationJson = "application/json"
blockNumber = "blockNumber" // valid for v0.2
feedIDs = "feedIDs" // valid for v0.3
feedIdHex = "feedIdHex" // valid for v0.2
headerAuthorization = "Authorization"
headerContentType = "Content-Type"
headerTimestamp = "X-Authorization-Timestamp"
headerSignature = "X-Authorization-Signature-SHA256"
headerUpkeepId = "X-Authorization-Upkeep-Id"
mercuryPathV02 = "/client?" // only used to access mercury v0.2 server
mercuryBatchPathV03 = "/api/v1/reports/bulk?" // only used to access mercury v0.3 server
mercuryBatchPathV03BlockNumber = "/api/v1gmx/reports/bulk?" // only used to access mercury v0.3 server with blockNumber
retryDelay = 500 * time.Millisecond
timestamp = "timestamp" // valid for v0.3
totalAttempt = 3
)
type StreamsLookup struct {
*encoding.StreamsLookupError
upkeepId *big.Int
block uint64
}
// MercuryV02Response represents a JSON structure used by Mercury v0.2
type MercuryV02Response struct {
ChainlinkBlob string `json:"chainlinkBlob"`
}
// MercuryV03Response represents a JSON structure used by Mercury v0.3
type MercuryV03Response struct {
Reports []MercuryV03Report `json:"reports"`
}
type MercuryV03Report struct {
FeedID string `json:"feedID"` // feed id in hex encoded
ValidFromTimestamp uint32 `json:"validFromTimestamp"`
ObservationsTimestamp uint32 `json:"observationsTimestamp"`
FullReport string `json:"fullReport"` // the actual hex encoded mercury report of this feed, can be sent to verifier
}
type MercuryData struct {
Index int
Error error
Retryable bool
Bytes [][]byte
State encoding.PipelineExecutionState
}
// UpkeepPrivilegeConfig represents the administrative offchain config for each upkeep. It can be set by s_upkeepPrivilegeManager
// role on the registry. Upkeeps allowed to use Mercury server will have this set to true.
type UpkeepPrivilegeConfig struct {
MercuryEnabled bool `json:"mercuryEnabled"`
}
// streamsLookup looks through check upkeep results looking for any that need off chain lookup
func (r *EvmRegistry) streamsLookup(ctx context.Context, checkResults []ocr2keepers.CheckResult) []ocr2keepers.CheckResult {
lggr := r.lggr.With("where", "StreamsLookup")
lookups := map[int]*StreamsLookup{}
for i, res := range checkResults {
if res.IneligibilityReason != uint8(encoding.UpkeepFailureReasonTargetCheckReverted) {
// Streams Lookup only works when upkeep target check reverts
continue
}
block := big.NewInt(int64(res.Trigger.BlockNumber))
upkeepId := res.UpkeepID
// Try to decode the revert error into streams lookup format. User upkeeps can revert with any reason, see if they
// tried to call mercury
lggr.Infof("at block %d upkeep %s trying to DecodeStreamsLookupRequest performData=%s", block, upkeepId, hexutil.Encode(checkResults[i].PerformData))
streamsLookupErr, err := r.packer.DecodeStreamsLookupRequest(res.PerformData)
if err != nil {
lggr.Debugf("at block %d upkeep %s DecodeStreamsLookupRequest failed: %v", block, upkeepId, err)
// user contract did not revert with StreamsLookup error
continue
}
l := &StreamsLookup{StreamsLookupError: streamsLookupErr}
if r.mercury.cred == nil {
lggr.Errorf("at block %d upkeep %s tries to access mercury server but mercury credential is not configured", block, upkeepId)
continue
}
if len(l.Feeds) == 0 {
checkResults[i].IneligibilityReason = uint8(encoding.UpkeepFailureReasonInvalidRevertDataInput)
lggr.Debugf("at block %s upkeep %s has empty feeds array", block, upkeepId)
continue
}
// mercury permission checking for v0.3 is done by mercury server
if l.FeedParamKey == feedIdHex && l.TimeParamKey == blockNumber {
// check permission on the registry for mercury v0.2
opts := r.buildCallOpts(ctx, block)
state, reason, retryable, allowed, err := r.allowedToUseMercury(opts, upkeepId.BigInt())
if err != nil {
lggr.Warnf("at block %s upkeep %s failed to query mercury allow list: %s", block, upkeepId, err)
checkResults[i].PipelineExecutionState = uint8(state)
checkResults[i].IneligibilityReason = uint8(reason)
checkResults[i].Retryable = retryable
continue
}
if !allowed {
lggr.Debugf("at block %d upkeep %s NOT allowed to query Mercury server", block, upkeepId)
checkResults[i].IneligibilityReason = uint8(encoding.UpkeepFailureReasonMercuryAccessNotAllowed)
continue
}
} else if l.FeedParamKey != feedIDs {
// if mercury version cannot be determined, set failure reason
lggr.Debugf("at block %d upkeep %s NOT allowed to query Mercury server", block, upkeepId)
checkResults[i].IneligibilityReason = uint8(encoding.UpkeepFailureReasonInvalidRevertDataInput)
continue
}
l.upkeepId = upkeepId.BigInt()
// the block here is exclusively used to call checkCallback at this block, not to be confused with the block number
// in the revert for mercury v0.2, which is denoted by time in the struct bc starting from v0.3, only timestamp will be supported
l.block = uint64(block.Int64())
lggr.Infof("at block %d upkeep %s DecodeStreamsLookupRequest feedKey=%s timeKey=%s feeds=%v time=%s extraData=%s", block, upkeepId, l.FeedParamKey, l.TimeParamKey, l.Feeds, l.Time, hexutil.Encode(l.ExtraData))
lookups[i] = l
}
var wg sync.WaitGroup
for i, lookup := range lookups {
wg.Add(1)
go r.doLookup(ctx, &wg, lookup, i, checkResults, lggr)
}
wg.Wait()
// don't surface error to plugin bc StreamsLookup process should be self-contained.
return checkResults
}
func (r *EvmRegistry) doLookup(ctx context.Context, wg *sync.WaitGroup, lookup *StreamsLookup, i int, checkResults []ocr2keepers.CheckResult, lggr logger.Logger) {
defer wg.Done()
state, reason, values, retryable, ri, err := r.doMercuryRequest(ctx, lookup, generatePluginRetryKey(checkResults[i].WorkID, lookup.block), lggr)
if err != nil {
lggr.Errorf("upkeep %s retryable %v retryInterval %s doMercuryRequest: %s", lookup.upkeepId, retryable, ri, err.Error())
checkResults[i].Retryable = retryable
checkResults[i].RetryInterval = ri
checkResults[i].PipelineExecutionState = uint8(state)
checkResults[i].IneligibilityReason = uint8(reason)
return
}
for j, v := range values {
lggr.Infof("upkeep %s doMercuryRequest values[%d]: %s", lookup.upkeepId, j, hexutil.Encode(v))
}
state, retryable, mercuryBytes, err := r.checkCallback(ctx, values, lookup)
if err != nil {
lggr.Errorf("at block %d upkeep %s checkCallback err: %s", lookup.block, lookup.upkeepId, err.Error())
checkResults[i].Retryable = retryable
checkResults[i].PipelineExecutionState = uint8(state)
return
}
lggr.Infof("checkCallback mercuryBytes=%s", hexutil.Encode(mercuryBytes))
state, needed, performData, failureReason, _, err := r.packer.UnpackCheckCallbackResult(mercuryBytes)
if err != nil {
lggr.Errorf("at block %d upkeep %s UnpackCheckCallbackResult err: %s", lookup.block, lookup.upkeepId, err.Error())
checkResults[i].PipelineExecutionState = uint8(state)
return
}
if failureReason == uint8(encoding.UpkeepFailureReasonMercuryCallbackReverted) {
checkResults[i].IneligibilityReason = uint8(encoding.UpkeepFailureReasonMercuryCallbackReverted)
lggr.Debugf("at block %d upkeep %s mercury callback reverts", lookup.block, lookup.upkeepId)
return
}
if !needed {
checkResults[i].IneligibilityReason = uint8(encoding.UpkeepFailureReasonUpkeepNotNeeded)
lggr.Debugf("at block %d upkeep %s callback reports upkeep not needed", lookup.block, lookup.upkeepId)
return
}
checkResults[i].IneligibilityReason = uint8(encoding.UpkeepFailureReasonNone)
checkResults[i].Eligible = true
checkResults[i].PerformData = performData
lggr.Infof("at block %d upkeep %s successful with perform data: %s", lookup.block, lookup.upkeepId, hexutil.Encode(performData))
}
// allowedToUseMercury retrieves upkeep's administrative offchain config and decode a mercuryEnabled bool to indicate if
// this upkeep is allowed to use Mercury service.
func (r *EvmRegistry) allowedToUseMercury(opts *bind.CallOpts, upkeepId *big.Int) (state encoding.PipelineExecutionState, reason encoding.UpkeepFailureReason, retryable bool, allow bool, err error) {
allowed, ok := r.mercury.allowListCache.Get(upkeepId.String())
if ok {
return encoding.NoPipelineError, encoding.UpkeepFailureReasonNone, false, allowed.(bool), nil
}
payload, err := r.packer.PackGetUpkeepPrivilegeConfig(upkeepId)
if err != nil {
// pack error, no retryable
r.lggr.Warnf("failed to pack getUpkeepPrivilegeConfig data for upkeepId %s: %s", upkeepId, err)
return encoding.PackUnpackDecodeFailed, encoding.UpkeepFailureReasonNone, false, false, fmt.Errorf("failed to pack upkeepId: %w", err)
}
var resultBytes hexutil.Bytes
args := map[string]interface{}{
"to": r.addr.Hex(),
"data": hexutil.Bytes(payload),
}
// call checkCallback function at the block which OCR3 has agreed upon
err = r.client.CallContext(opts.Context, &resultBytes, "eth_call", args, hexutil.EncodeBig(opts.BlockNumber))
if err != nil {
return encoding.RpcFlakyFailure, encoding.UpkeepFailureReasonNone, true, false, fmt.Errorf("failed to get upkeep privilege config: %v", err)
}
cfg, err := r.packer.UnpackGetUpkeepPrivilegeConfig(resultBytes)
if err != nil {
return encoding.PackUnpackDecodeFailed, encoding.UpkeepFailureReasonNone, false, false, fmt.Errorf("failed to get upkeep privilege config: %v", err)
}
if len(cfg) == 0 {
r.mercury.allowListCache.Set(upkeepId.String(), false, cache.DefaultExpiration)
return encoding.NoPipelineError, encoding.UpkeepFailureReasonMercuryAccessNotAllowed, false, false, fmt.Errorf("upkeep privilege config is empty")
}
var privilegeConfig UpkeepPrivilegeConfig
if err := json.Unmarshal(cfg, &privilegeConfig); err != nil {
return encoding.MercuryUnmarshalError, encoding.UpkeepFailureReasonNone, false, false, fmt.Errorf("failed to unmarshal privilege config: %v", err)
}
r.mercury.allowListCache.Set(upkeepId.String(), privilegeConfig.MercuryEnabled, cache.DefaultExpiration)
return encoding.NoPipelineError, encoding.UpkeepFailureReasonNone, false, privilegeConfig.MercuryEnabled, nil
}
func (r *EvmRegistry) checkCallback(ctx context.Context, values [][]byte, lookup *StreamsLookup) (encoding.PipelineExecutionState, bool, hexutil.Bytes, error) {
payload, err := r.abi.Pack("checkCallback", lookup.upkeepId, values, lookup.ExtraData)
if err != nil {
return encoding.PackUnpackDecodeFailed, false, nil, err
}
var b hexutil.Bytes
args := map[string]interface{}{
"to": r.addr.Hex(),
"data": hexutil.Bytes(payload),
}
// call checkCallback function at the block which OCR3 has agreed upon
err = r.client.CallContext(ctx, &b, "eth_call", args, hexutil.EncodeUint64(lookup.block))
if err != nil {
return encoding.RpcFlakyFailure, true, nil, err
}
return encoding.NoPipelineError, false, b, nil
}
// doMercuryRequest sends requests to Mercury API to retrieve mercury data.
func (r *EvmRegistry) doMercuryRequest(ctx context.Context, sl *StreamsLookup, prk string, lggr logger.Logger) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, error) {
var isMercuryV03 bool
resultLen := len(sl.Feeds)
ch := make(chan MercuryData, resultLen)
if len(sl.Feeds) == 0 {
return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", sl.FeedParamKey, sl.TimeParamKey, sl.Feeds)
}
if sl.FeedParamKey == feedIdHex && sl.TimeParamKey == blockNumber {
// only mercury v0.2
for i := range sl.Feeds {
go r.singleFeedRequest(ctx, ch, i, sl, lggr)
}
} else if sl.FeedParamKey == feedIDs {
// only mercury v0.3
resultLen = 1
isMercuryV03 = true
ch = make(chan MercuryData, resultLen)
go r.multiFeedsRequest(ctx, ch, sl, lggr)
} else {
return encoding.NoPipelineError, encoding.UpkeepFailureReasonInvalidRevertDataInput, [][]byte{}, false, 0 * time.Second, fmt.Errorf("invalid revert data input: feed param key %s, time param key %s, feeds %s", sl.FeedParamKey, sl.TimeParamKey, sl.Feeds)
}
var reqErr error
var ri time.Duration
results := make([][]byte, len(sl.Feeds))
retryable := true
allSuccess := true
// in v0.2, use the last execution error as the state, if no execution errors, state will be no error
state := encoding.NoPipelineError
for i := 0; i < resultLen; i++ {
m := <-ch
if m.Error != nil {
reqErr = errors.Join(reqErr, m.Error)
retryable = retryable && m.Retryable
allSuccess = false
if m.State != encoding.NoPipelineError {
state = m.State
}
continue
}
if isMercuryV03 {
results = m.Bytes
} else {
results[m.Index] = m.Bytes[0]
}
}
if retryable && !allSuccess {
ri = r.calculateRetryConfig(prk)
}
// only retry when not all successful AND none are not retryable
return state, encoding.UpkeepFailureReasonNone, results, retryable && !allSuccess, ri, reqErr
}
// singleFeedRequest sends a v0.2 Mercury request for a single feed report.
func (r *EvmRegistry) singleFeedRequest(ctx context.Context, ch chan<- MercuryData, index int, sl *StreamsLookup, lggr logger.Logger) {
q := url.Values{
sl.FeedParamKey: {sl.Feeds[index]},
sl.TimeParamKey: {sl.Time.String()},
}
mercuryURL := r.mercury.cred.LegacyURL
reqUrl := fmt.Sprintf("%s%s%s", mercuryURL, mercuryPathV02, q.Encode())
lggr.Debugf("request URL for upkeep %s feed %s: %s", sl.upkeepId.String(), sl.Feeds[index], reqUrl)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqUrl, nil)
if err != nil {
ch <- MercuryData{Index: index, Error: err, Retryable: false, State: encoding.InvalidMercuryRequest}
return
}
ts := time.Now().UTC().UnixMilli()
signature := r.generateHMAC(http.MethodGet, mercuryPathV02+q.Encode(), []byte{}, r.mercury.cred.Username, r.mercury.cred.Password, ts)
req.Header.Set(headerContentType, applicationJson)
req.Header.Set(headerAuthorization, r.mercury.cred.Username)
req.Header.Set(headerTimestamp, strconv.FormatInt(ts, 10))
req.Header.Set(headerSignature, signature)
// in the case of multiple retries here, use the last attempt's data
state := encoding.NoPipelineError
retryable := false
sent := false
retryErr := retry.Do(
func() error {
retryable = false
resp, err1 := r.hc.Do(req)
if err1 != nil {
lggr.Warnf("at block %s upkeep %s GET request fails for feed %s: %v", sl.Time.String(), sl.upkeepId.String(), sl.Feeds[index], err1)
retryable = true
state = encoding.MercuryFlakyFailure
return err1
}
defer func(Body io.ReadCloser) {
err = Body.Close()
if err != nil {
lggr.Warnf("failed to close mercury response Body: %s", err)
}
}(resp.Body)
body, err1 := io.ReadAll(resp.Body)
if err1 != nil {
retryable = false
state = encoding.InvalidMercuryResponse
return err1
}
if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusInternalServerError || resp.StatusCode == http.StatusBadGateway || resp.StatusCode == http.StatusServiceUnavailable || resp.StatusCode == http.StatusGatewayTimeout {
lggr.Warnf("at block %s upkeep %s received status code %d for feed %s", sl.Time.String(), sl.upkeepId.String(), resp.StatusCode, sl.Feeds[index])
retryable = true
state = encoding.MercuryFlakyFailure
return errors.New(strconv.FormatInt(int64(resp.StatusCode), 10))
} else if resp.StatusCode != http.StatusOK {
retryable = false
state = encoding.InvalidMercuryRequest
return fmt.Errorf("at block %s upkeep %s received status code %d for feed %s", sl.Time.String(), sl.upkeepId.String(), resp.StatusCode, sl.Feeds[index])
}
lggr.Debugf("at block %s upkeep %s received status code %d from mercury v0.2 with BODY=%s", sl.Time.String(), sl.upkeepId.String(), resp.StatusCode, hexutil.Encode(body))
var m MercuryV02Response
err1 = json.Unmarshal(body, &m)
if err1 != nil {
lggr.Warnf("at block %s upkeep %s failed to unmarshal body to MercuryV02Response for feed %s: %v", sl.Time.String(), sl.upkeepId.String(), sl.Feeds[index], err1)
retryable = false
state = encoding.MercuryUnmarshalError
return err1
}
blobBytes, err1 := hexutil.Decode(m.ChainlinkBlob)
if err1 != nil {
lggr.Warnf("at block %s upkeep %s failed to decode chainlinkBlob %s for feed %s: %v", sl.Time.String(), sl.upkeepId.String(), m.ChainlinkBlob, sl.Feeds[index], err1)
retryable = false
state = encoding.InvalidMercuryResponse
return err1
}
ch <- MercuryData{
Index: index,
Bytes: [][]byte{blobBytes},
Retryable: false,
State: encoding.NoPipelineError,
}
sent = true
return nil
},
// only retry when the error is 404 Not Found, 500 Internal Server Error, 502 Bad Gateway, 503 Service Unavailable, 504 Gateway Timeout
retry.RetryIf(func(err error) bool {
return err.Error() == fmt.Sprintf("%d", http.StatusNotFound) || err.Error() == fmt.Sprintf("%d", http.StatusInternalServerError) || err.Error() == fmt.Sprintf("%d", http.StatusBadGateway) || err.Error() == fmt.Sprintf("%d", http.StatusServiceUnavailable) || err.Error() == fmt.Sprintf("%d", http.StatusGatewayTimeout)
}),
retry.Context(ctx),
retry.Delay(retryDelay),
retry.Attempts(totalAttempt))
if !sent {
md := MercuryData{
Index: index,
Bytes: [][]byte{},
Retryable: retryable,
Error: fmt.Errorf("failed to request feed for %s: %w", sl.Feeds[index], retryErr),
State: state,
}
ch <- md
}
}
// multiFeedsRequest sends a Mercury v0.3 request for a multi-feed report
func (r *EvmRegistry) multiFeedsRequest(ctx context.Context, ch chan<- MercuryData, sl *StreamsLookup, lggr logger.Logger) {
// this won't work bc q.Encode() will encode commas as '%2C' but the server is strictly expecting a comma separated list
//q := url.Values{
// feedIDs: {strings.Join(sl.Feeds, ",")},
// timestamp: {sl.Time.String()},
//}
params := fmt.Sprintf("%s=%s&%s=%s", feedIDs, strings.Join(sl.Feeds, ","), sl.TimeParamKey, sl.Time.String())
batchPathV03 := mercuryBatchPathV03
if sl.TimeParamKey == blockNumber {
batchPathV03 = mercuryBatchPathV03BlockNumber
}
reqUrl := fmt.Sprintf("%s%s%s", r.mercury.cred.URL, batchPathV03, params)
lggr.Debugf("request URL for upkeep %s userId %s: %s", sl.upkeepId.String(), r.mercury.cred.Username, reqUrl)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqUrl, nil)
if err != nil {
ch <- MercuryData{Index: 0, Error: err, Retryable: false, State: encoding.InvalidMercuryRequest}
return
}
ts := time.Now().UTC().UnixMilli()
signature := r.generateHMAC(http.MethodGet, batchPathV03+params, []byte{}, r.mercury.cred.Username, r.mercury.cred.Password, ts)
req.Header.Set(headerContentType, applicationJson)
// username here is often referred to as user id
req.Header.Set(headerAuthorization, r.mercury.cred.Username)
req.Header.Set(headerTimestamp, strconv.FormatInt(ts, 10))
req.Header.Set(headerSignature, signature)
// mercury will inspect authorization headers above to make sure this user (in automation's context, this node) is eligible to access mercury
// and if it has an automation role. it will then look at this upkeep id to check if it has access to all the requested feeds.
req.Header.Set(headerUpkeepId, sl.upkeepId.String())
// in the case of multiple retries here, use the last attempt's data
state := encoding.NoPipelineError
retryable := false
sent := false
retryErr := retry.Do(
func() error {
retryable = false
resp, err1 := r.hc.Do(req)
if err1 != nil {
lggr.Warnf("at timestamp %s upkeep %s GET request fails from mercury v0.3: %v", sl.Time.String(), sl.upkeepId.String(), err1)
retryable = true
state = encoding.MercuryFlakyFailure
return err1
}
defer func(Body io.ReadCloser) {
err = Body.Close()
if err != nil {
lggr.Warnf("failed to close mercury response Body: %s", err)
}
}(resp.Body)
body, err1 := io.ReadAll(resp.Body)
if err1 != nil {
retryable = false
state = encoding.InvalidMercuryResponse
return err1
}
lggr.Infof("at timestamp %s upkeep %s received status code %d from mercury v0.3", sl.Time.String(), sl.upkeepId.String(), resp.StatusCode)
if resp.StatusCode == http.StatusUnauthorized {
retryable = false
state = encoding.UpkeepNotAuthorized
return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3, most likely this is caused by unauthorized upkeep", sl.Time.String(), sl.upkeepId.String(), resp.StatusCode)
} else if resp.StatusCode == http.StatusBadRequest {
retryable = false
state = encoding.InvalidMercuryRequest
return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3 with message: %s", sl.Time.String(), sl.upkeepId.String(), resp.StatusCode, string(body))
} else if resp.StatusCode == http.StatusInternalServerError || resp.StatusCode == http.StatusBadGateway || resp.StatusCode == http.StatusServiceUnavailable || resp.StatusCode == http.StatusGatewayTimeout {
retryable = true
state = encoding.MercuryFlakyFailure
return fmt.Errorf("%d", resp.StatusCode)
} else if resp.StatusCode == http.StatusPartialContent {
// TODO (AUTO-5044): handle response code 206 entirely with errors field parsing
lggr.Warnf("at timestamp %s upkeep %s requested [%s] feeds but mercury v0.3 server returned 206 status, treating it as 404 and retrying", sl.Time.String(), sl.upkeepId.String(), sl.Feeds)
retryable = true
state = encoding.MercuryFlakyFailure
return fmt.Errorf("%d", http.StatusPartialContent)
} else if resp.StatusCode != http.StatusOK {
retryable = false
state = encoding.InvalidMercuryRequest
return fmt.Errorf("at timestamp %s upkeep %s received status code %d from mercury v0.3", sl.Time.String(), sl.upkeepId.String(), resp.StatusCode)
}
lggr.Debugf("at block %s upkeep %s received status code %d from mercury v0.3 with BODY=%s", sl.Time.String(), sl.upkeepId.String(), resp.StatusCode, hexutil.Encode(body))
var response MercuryV03Response
err1 = json.Unmarshal(body, &response)
if err1 != nil {
lggr.Warnf("at timestamp %s upkeep %s failed to unmarshal body to MercuryV03Response from mercury v0.3: %v", sl.Time.String(), sl.upkeepId.String(), err1)
retryable = false
state = encoding.MercuryUnmarshalError
return err1
}
// in v0.3, if some feeds are not available, the server will only return available feeds, but we need to make sure ALL feeds are retrieved before calling user contract
// hence, retry in this case. retry will help when we send a very new timestamp and reports are not yet generated
if len(response.Reports) != len(sl.Feeds) {
var receivedFeeds []string
for _, f := range response.Reports {
receivedFeeds = append(receivedFeeds, f.FeedID)
}
lggr.Warnf("at timestamp %s upkeep %s mercury v0.3 server returned 206 status with [%s] reports while we requested [%s] feeds, retrying", sl.Time.String(), sl.upkeepId.String(), receivedFeeds, sl.Feeds)
retryable = true
state = encoding.MercuryFlakyFailure
return fmt.Errorf("%d", http.StatusNotFound)
}
var reportBytes [][]byte
for _, rsp := range response.Reports {
b, err := hexutil.Decode(rsp.FullReport)
if err != nil {
lggr.Warnf("at timestamp %s upkeep %s failed to decode reportBlob %s: %v", sl.Time.String(), sl.upkeepId.String(), rsp.FullReport, err)
retryable = false
state = encoding.InvalidMercuryResponse
return err
}
reportBytes = append(reportBytes, b)
}
ch <- MercuryData{
Index: 0,
Bytes: reportBytes,
Retryable: false,
State: encoding.NoPipelineError,
}
sent = true
return nil
},
// only retry when the error is 206 Partial Content, 404 Not Found, 500 Internal Server Error, 502 Bad Gateway, 503 Service Unavailable, 504 Gateway Timeout
retry.RetryIf(func(err error) bool {
return err.Error() == fmt.Sprintf("%d", http.StatusPartialContent) || err.Error() == fmt.Sprintf("%d", http.StatusNotFound) || err.Error() == fmt.Sprintf("%d", http.StatusInternalServerError) || err.Error() == fmt.Sprintf("%d", http.StatusBadGateway) || err.Error() == fmt.Sprintf("%d", http.StatusServiceUnavailable) || err.Error() == fmt.Sprintf("%d", http.StatusGatewayTimeout)
}),
retry.Context(ctx),
retry.Delay(retryDelay),
retry.Attempts(totalAttempt))
if !sent {
md := MercuryData{
Index: 0,
Bytes: [][]byte{},
Retryable: retryable,
Error: retryErr,
State: state,
}
ch <- md
}
}
// generateHMAC calculates a user HMAC for Mercury server authentication.
func (r *EvmRegistry) generateHMAC(method string, path string, body []byte, clientId string, secret string, ts int64) string {
bodyHash := sha256.New()
bodyHash.Write(body)
hashString := fmt.Sprintf("%s %s %s %s %d",
method,
path,
hex.EncodeToString(bodyHash.Sum(nil)),
clientId,
ts)
signedMessage := hmac.New(sha256.New, []byte(secret))
signedMessage.Write([]byte(hashString))
userHmac := hex.EncodeToString(signedMessage.Sum(nil))
return userHmac
}
// calculateRetryConfig returns plugin retry interval based on how many times plugin has retried this work
func (r *EvmRegistry) calculateRetryConfig(prk string) time.Duration {
var ri time.Duration
var retries int
totalAttempts, ok := r.mercury.pluginRetryCache.Get(prk)
if ok {
retries = totalAttempts.(int)
if retries < totalFastPluginRetries {
ri = 1 * time.Second
} else if retries < totalMediumPluginRetries {
ri = 5 * time.Second
}
// if the core node has retried totalMediumPluginRetries times, do not set retry interval and plugin will use
// the default interval
} else {
ri = 1 * time.Second
}
r.mercury.pluginRetryCache.Set(prk, retries+1, cache.DefaultExpiration)
return ri
}
// generatePluginRetryKey returns a plugin retry cache key
func generatePluginRetryKey(workID string, block uint64) string {
return workID + "|" + fmt.Sprintf("%d", block)
}