-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
mercury.go
180 lines (152 loc) · 6.04 KB
/
mercury.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package mercury
import (
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"fmt"
"math/big"
"net/http"
"time"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/patrickmn/go-cache"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/core"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/encoding"
)
const (
FeedIDs = "feedIDs" // valid for v0.3
FeedIdHex = "feedIdHex" // valid for v0.2
BlockNumber = "blockNumber" // valid for v0.2
Timestamp = "timestamp" // valid for v0.3
totalFastPluginRetries = 5
totalMediumPluginRetries = 10
)
var GenerateHMACFn = func(method string, path string, body []byte, clientId string, secret string, ts int64) string {
bodyHash := sha256.New()
bodyHash.Write(body)
hashString := fmt.Sprintf("%s %s %s %s %d",
method,
path,
hex.EncodeToString(bodyHash.Sum(nil)),
clientId,
ts)
signedMessage := hmac.New(sha256.New, []byte(secret))
signedMessage.Write([]byte(hashString))
userHmac := hex.EncodeToString(signedMessage.Sum(nil))
return userHmac
}
// CalculateRetryConfig returns plugin retry interval based on how many times plugin has retried this work
var CalculateRetryConfigFn = func(prk string, mercuryConfig MercuryConfigProvider) time.Duration {
var retryInterval time.Duration
var retries int
totalAttempts, ok := mercuryConfig.GetPluginRetry(prk)
if ok {
retries = totalAttempts.(int)
if retries < totalFastPluginRetries {
retryInterval = 1 * time.Second
} else if retries < totalMediumPluginRetries {
retryInterval = 5 * time.Second
}
// if the core node has retried totalMediumPluginRetries times, do not set retry interval and plugin will use
// the default interval
} else {
retryInterval = 1 * time.Second
}
mercuryConfig.SetPluginRetry(prk, retries+1, cache.DefaultExpiration)
return retryInterval
}
type MercuryData struct {
Index int
Error error
Retryable bool
Bytes [][]byte
State encoding.PipelineExecutionState
}
type MercuryConfigProvider interface {
Credentials() *types.MercuryCredentials
IsUpkeepAllowed(string) (interface{}, bool)
SetUpkeepAllowed(string, interface{}, time.Duration)
GetPluginRetry(string) (interface{}, bool)
SetPluginRetry(string, interface{}, time.Duration)
}
type HttpClient interface {
Do(req *http.Request) (*http.Response, error)
}
type MercuryClient interface {
DoRequest(ctx context.Context, streamsLookup *StreamsLookup, pluginRetryKey string) (encoding.PipelineExecutionState, encoding.UpkeepFailureReason, [][]byte, bool, time.Duration, error)
}
type StreamsLookupError struct {
FeedParamKey string
Feeds []string
TimeParamKey string
Time *big.Int
ExtraData []byte
}
type StreamsLookup struct {
*StreamsLookupError
UpkeepId *big.Int
Block uint64
}
func (l *StreamsLookup) IsMercuryV02() bool {
return l.FeedParamKey == FeedIdHex && l.TimeParamKey == BlockNumber
}
func (l *StreamsLookup) IsMercuryV03() bool {
return l.FeedParamKey == FeedIDs
}
// IsMercuryV03UsingBlockNumber is used to distinguish the batch path. It is used for Mercury V03 only
func (l *StreamsLookup) IsMercuryV03UsingBlockNumber() bool {
return l.TimeParamKey == BlockNumber
}
type Packer interface {
UnpackCheckCallbackResult(callbackResp []byte) (encoding.PipelineExecutionState, bool, []byte, encoding.UpkeepFailureReason, *big.Int, error)
PackGetUpkeepPrivilegeConfig(upkeepId *big.Int) ([]byte, error)
UnpackGetUpkeepPrivilegeConfig(resp []byte) ([]byte, error)
DecodeStreamsLookupRequest(data []byte) (*StreamsLookupError, error)
}
type abiPacker struct {
registryABI abi.ABI
streamsABI abi.ABI
}
func NewAbiPacker() *abiPacker {
return &abiPacker{registryABI: core.RegistryABI, streamsABI: core.StreamsCompatibleABI}
}
// DecodeStreamsLookupRequest decodes the revert error StreamsLookup(string feedParamKey, string[] feeds, string feedParamKey, uint256 time, byte[] extraData)
func (p *abiPacker) DecodeStreamsLookupRequest(data []byte) (*StreamsLookupError, error) {
e := p.streamsABI.Errors["StreamsLookup"]
unpack, err := e.Unpack(data)
if err != nil {
return nil, fmt.Errorf("unpack error: %w", err)
}
errorParameters := unpack.([]interface{})
return &StreamsLookupError{
FeedParamKey: *abi.ConvertType(errorParameters[0], new(string)).(*string),
Feeds: *abi.ConvertType(errorParameters[1], new([]string)).(*[]string),
TimeParamKey: *abi.ConvertType(errorParameters[2], new(string)).(*string),
Time: *abi.ConvertType(errorParameters[3], new(*big.Int)).(**big.Int),
ExtraData: *abi.ConvertType(errorParameters[4], new([]byte)).(*[]byte),
}, nil
}
func (p *abiPacker) UnpackCheckCallbackResult(callbackResp []byte) (encoding.PipelineExecutionState, bool, []byte, encoding.UpkeepFailureReason, *big.Int, error) {
out, err := p.registryABI.Methods["checkCallback"].Outputs.UnpackValues(callbackResp)
if err != nil {
return encoding.PackUnpackDecodeFailed, false, nil, 0, nil, fmt.Errorf("%w: unpack checkUpkeep return: %s", err, hexutil.Encode(callbackResp))
}
upkeepNeeded := *abi.ConvertType(out[0], new(bool)).(*bool)
rawPerformData := *abi.ConvertType(out[1], new([]byte)).(*[]byte)
failureReason := encoding.UpkeepFailureReason(*abi.ConvertType(out[2], new(uint8)).(*uint8))
gasUsed := *abi.ConvertType(out[3], new(*big.Int)).(**big.Int)
return encoding.NoPipelineError, upkeepNeeded, rawPerformData, failureReason, gasUsed, nil
}
func (p *abiPacker) UnpackGetUpkeepPrivilegeConfig(resp []byte) ([]byte, error) {
out, err := p.registryABI.Methods["getUpkeepPrivilegeConfig"].Outputs.UnpackValues(resp)
if err != nil {
return nil, fmt.Errorf("%w: unpack getUpkeepPrivilegeConfig return", err)
}
bts := *abi.ConvertType(out[0], new([]byte)).(*[]byte)
return bts, nil
}
func (p *abiPacker) PackGetUpkeepPrivilegeConfig(upkeepId *big.Int) ([]byte, error) {
return p.registryABI.Pack("getUpkeepPrivilegeConfig", upkeepId)
}