-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
log_events.go
486 lines (423 loc) · 15.4 KB
/
log_events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
package models
import (
"bytes"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"math/big"
"github.com/smartcontractkit/chainlink/core/assets"
"github.com/smartcontractkit/chainlink/core/logger"
"github.com/smartcontractkit/chainlink/core/utils"
ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
)
// Descriptive indices of a RunLog's Topic array
const (
RequestLogTopicSignature = iota
RequestLogTopicJobID
RequestLogTopicRequester
RequestLogTopicPayment
)
const (
evmWordSize = common.HashLength
requesterSize = evmWordSize
idSize = evmWordSize
paymentSize = evmWordSize
versionSize = evmWordSize
callbackAddrSize = evmWordSize
callbackFuncSize = evmWordSize
expirationSize = evmWordSize
dataLocationSize = evmWordSize
dataLengthSize = evmWordSize
)
var (
// RunLogTopic20190207withoutIndexes was the new RunRequest filter topic as of 2019-01-28,
// after renaming Solidity variables, moving data version, and removing the cast of requestId to uint256
RunLogTopic20190207withoutIndexes = utils.MustHash("OracleRequest(bytes32,address,bytes32,uint256,address,bytes4,uint256,uint256,bytes)")
// RandomnessRequestLogTopic is the signature for the event log
// VRFCoordinator.RandomnessRequest.
RandomnessRequestLogTopic = VRFRandomnessRequestLogTopic()
// OracleFulfillmentFunctionID20190128withoutCast is the function selector for fulfilling Ethereum requests,
// as updated on 2019-01-28, removing the cast to uint256 for the requestId.
OracleFulfillmentFunctionID20190128withoutCast = utils.MustHash("fulfillOracleRequest(bytes32,uint256,address,bytes4,uint256,bytes32)").Hex()[:10]
OracleFulfillmentFunctionID2020 = utils.MustHash("fulfillOracleRequest2(bytes32,uint256,address,bytes4,uint256,bytes)").Hex()[:10]
)
type logRequestParser interface {
parseJSON(Log) (JSON, error)
parseRequestID(Log) (common.Hash, error)
}
// topicFactoryMap maps the log topic to a factory method that returns an
// implementation of the interface LogRequest. The concrete implementations
// are polymorphic and can have difference behaviors for methods like JSON().
var topicFactoryMap = map[common.Hash]logRequestParser{
RunLogTopic20190207withoutIndexes: parseRunLog20190207withoutIndexes{},
RandomnessRequestLogTopic: parseRandomnessRequest{},
}
// LogBasedChainlinkJobInitiators are initiators which kick off a user-specified
// chainlink job when an appropriate ethereum log is received.
// (InitiatorFluxMonitor kicks off work, but not a user-specified job.)
var LogBasedChainlinkJobInitiators = []string{InitiatorRunLog, InitiatorEthLog, InitiatorRandomnessLog}
// topicsForInitiatorsWhichRequireJobSpecTopic are the log topics which kick off
// a user job with the given type of initiator. If chainlink has any jobs with
// these initiators, it subscribes on startup to logs which match both these
// topics and some representation of the job spec ID.
var TopicsForInitiatorsWhichRequireJobSpecIDTopic = map[string][]common.Hash{
InitiatorRunLog: {RunLogTopic20190207withoutIndexes},
InitiatorRandomnessLog: {RandomnessRequestLogTopic},
}
// initiationRequiresJobSpecId is true if jobs initiated by the given
// initiatiatorType require that their initiating logs match their JobSpecIDs.
func initiationRequiresJobSpecID(initiatorType string) bool {
_, ok := TopicsForInitiatorsWhichRequireJobSpecIDTopic[initiatorType]
return ok
}
// JobSpecIDTopics lists the ways jsID could be represented as a log topic. This
// allows log subscriptions to respond to all possible representations.
func JobSpecIDTopics(jsID JobID) []common.Hash {
return []common.Hash{
// The job to be initiated can be encoded in a log topic in two ways:
IDToTopic(jsID), // 16 full-range bytes, left padded to 32 bytes,
IDToHexTopic(jsID), // 32 ASCII hex chars representing the 16 bytes
}
}
// FilterQueryFactory returns the ethereum FilterQuery for this initiator.
func FilterQueryFactory(i Initiator, from *big.Int, addresses ...common.Address) (q ethereum.FilterQuery, err error) {
q.FromBlock = from
filterAddresses := append([]common.Address{i.Address}, addresses...)
q.Addresses = utils.WithoutZeroAddresses(filterAddresses)
switch {
case i.Type == InitiatorEthLog:
if from == nil {
q.FromBlock = i.InitiatorParams.FromBlock.ToInt()
} else if i.InitiatorParams.FromBlock != nil {
q.FromBlock = utils.MaxBigs(from, i.InitiatorParams.FromBlock.ToInt())
}
q.ToBlock = i.InitiatorParams.ToBlock.ToInt()
if q.FromBlock != nil && q.ToBlock != nil && q.FromBlock.Cmp(q.ToBlock) >= 0 {
return ethereum.FilterQuery{}, fmt.Errorf(
"cannot generate a FilterQuery with fromBlock >= toBlock")
}
// Copying the topics across (instead of coercing i.Topics to a
// [][]common.Hash) clarifies their type for reflect.DeepEqual
q.Topics = make([][]common.Hash, len(i.Topics))
copy(q.Topics, i.Topics)
case initiationRequiresJobSpecID(i.Type):
q.Topics = [][]common.Hash{
TopicsForInitiatorsWhichRequireJobSpecIDTopic[i.Type],
JobSpecIDTopics(i.JobSpecID),
}
default:
return ethereum.FilterQuery{},
fmt.Errorf("cannot generate a FilterQuery for initiator of type %T", i)
}
return q, nil
}
// LogRequest is the interface to allow polymorphic functionality of different
// types of LogEvents.
// i.e. EthLogEvent, RunLogEvent, OracleLogEvent
type LogRequest interface {
GetLog() Log
GetJobSpecID() JobID
GetInitiator() Initiator
Validate() bool
JSON() (JSON, error)
ToDebug()
ForLogger(kvs ...interface{}) []interface{}
ValidateRequester() error
BlockNumber() *big.Int
RunRequest() (RunRequest, error)
}
// InitiatorLogEvent encapsulates all information as a result of a received log from an
// InitiatorSubscription, and acts as a base struct for other log-initiated events
type InitiatorLogEvent struct {
Log Log
Initiator Initiator
}
var _ LogRequest = InitiatorLogEvent{} // InitiatorLogEvent implements LogRequest
// LogRequest is a factory method that coerces this log event to the correct
// type based on Initiator.Type, exposed by the LogRequest interface.
func (le InitiatorLogEvent) LogRequest() LogRequest {
switch le.Initiator.Type {
case InitiatorEthLog:
return EthLogEvent{InitiatorLogEvent: le}
case InitiatorRunLog:
return RunLogEvent{le}
case InitiatorRandomnessLog:
return RandomnessLogEvent{le}
}
logger.Warnw("LogRequest: Unable to discern initiator type for log request", le.ForLogger()...)
return EthLogEvent{InitiatorLogEvent: le}
}
// GetLog returns the log.
func (le InitiatorLogEvent) GetLog() Log {
return le.Log
}
// GetJobSpecID returns the associated JobSpecID
func (le InitiatorLogEvent) GetJobSpecID() JobID {
return le.Initiator.JobSpecID
}
// GetInitiator returns the initiator.
func (le InitiatorLogEvent) GetInitiator() Initiator {
return le.Initiator
}
// ForLogger formats the InitiatorSubscriptionLogEvent for easy common
// formatting in logs (trace statements, not ethereum events).
func (le InitiatorLogEvent) ForLogger(kvs ...interface{}) []interface{} {
output := []interface{}{
"job", le.Initiator.JobSpecID.String(),
"log", le.Log.BlockNumber,
"initiator", le.Initiator,
}
for index, topic := range le.Log.Topics {
output = append(output, fmt.Sprintf("topic%d", index), topic.Hex())
}
return append(kvs, output...)
}
// ToDebug prints this event via logger.Debug.
func (le InitiatorLogEvent) ToDebug() {
friendlyAddress := utils.LogListeningAddress(le.Initiator.Address)
logger.Debugw(
fmt.Sprintf("Received log from block #%v for address %v", le.Log.BlockNumber, friendlyAddress),
le.ForLogger()...,
)
}
// BlockNumber returns the block number for the given InitiatorSubscriptionLogEvent.
func (le InitiatorLogEvent) BlockNumber() *big.Int {
return new(big.Int).SetUint64(le.Log.BlockNumber)
}
// RunRequest returns a run request instance with the transaction hash,
// present on all log initiated runs.
func (le InitiatorLogEvent) RunRequest() (RunRequest, error) {
requestParams, err := le.JSON()
if err != nil {
return RunRequest{}, err
}
return RunRequest{BlockHash: &le.Log.BlockHash, TxHash: &le.Log.TxHash,
RequestParams: requestParams}, nil
}
// Validate returns true, no validation on this log event type.
func (le InitiatorLogEvent) Validate() bool {
return true
}
// ValidateRequester returns true since all requests are valid for base
// initiator log events.
func (le InitiatorLogEvent) ValidateRequester() error {
return nil
}
// JSON returns the eth log as JSON.
func (le InitiatorLogEvent) JSON() (JSON, error) {
el := le.Log
var out JSON
b, err := json.Marshal(el)
if err != nil {
return out, err
}
return out, json.Unmarshal(b, &out)
}
// EthLogEvent provides functionality specific to a log event emitted
// for an eth log initiator.
type EthLogEvent struct {
InitiatorLogEvent
}
// RunLogEvent provides functionality specific to a log event emitted
// for a run log initiator.
type RunLogEvent struct {
InitiatorLogEvent
}
// Validate returns whether or not the contained log has a properly encoded
// job id.
func (le RunLogEvent) Validate() bool {
jobSpecID := le.Initiator.JobSpecID
topic := le.Log.Topics[RequestLogTopicJobID]
if IDToTopic(jobSpecID) != topic && IDToHexTopic(jobSpecID) != topic {
logger.Errorw("Run Log didn't have matching job ID", le.ForLogger("id", jobSpecID.String())...)
return false
}
return true
}
// ContractPayment returns the amount attached to a contract to pay the Oracle upon fulfillment.
func contractPayment(log Log) (*assets.Link, error) {
var encodedAmount common.Hash
paymentStart := requesterSize + idSize
paymentData, err := UntrustedBytes(log.Data).SafeByteSlice(paymentStart, paymentStart+paymentSize)
if err != nil {
return nil, err
}
encodedAmount = common.BytesToHash(paymentData)
payment, ok := new(assets.Link).SetString(encodedAmount.Hex(), 0)
if !ok {
return payment, fmt.Errorf("unable to decoded amount from RunLog: %s", encodedAmount.Hex())
}
return payment, nil
}
// ValidateRequester returns true if the requester matches the one associated
// with the initiator.
func (le RunLogEvent) ValidateRequester() error {
if len(le.Initiator.Requesters) == 0 {
return nil
}
requester, err := le.Requester()
if err != nil {
return err
}
for _, r := range le.Initiator.Requesters {
if requester == r {
return nil
}
}
return fmt.Errorf("run Log didn't have have a valid requester: %v", requester.Hex())
}
// Requester pulls the requesting address out of the LogEvent's topics.
func (le RunLogEvent) Requester() (common.Address, error) {
requesterData, err := UntrustedBytes(le.Log.Data).SafeByteSlice(0, requesterSize)
if err != nil {
return common.Address{}, err
}
return common.BytesToAddress(requesterData), nil
}
// RunRequest returns an RunRequest instance with all parameters
// from a run log topic, like RequestID.
func (le RunLogEvent) RunRequest() (RunRequest, error) {
requestParams, err := le.JSON()
if err != nil {
logger.Errorw(err.Error(), le.ForLogger()...)
return RunRequest{}, err
}
parser, err := parserFromLog(le.Log)
if err != nil {
return RunRequest{}, err
}
payment, err := contractPayment(le.Log)
if err != nil {
return RunRequest{}, err
}
requestID, err := parser.parseRequestID(le.Log)
if err != nil {
return RunRequest{}, err
}
requester, err := le.Requester()
if err != nil {
return RunRequest{}, err
}
return RunRequest{
RequestID: &requestID,
TxHash: &le.Log.TxHash,
BlockHash: &le.Log.BlockHash,
Requester: &requester,
Payment: payment,
RequestParams: requestParams,
}, nil
}
// JSON decodes the RunLogEvent's data converts it to a JSON object.
func (le RunLogEvent) JSON() (JSON, error) {
return ParseRunLog(le.Log)
}
func parserFromLog(log Log) (logRequestParser, error) {
if len(log.Topics) == 0 {
return nil, errors.New("log has no topics")
}
topic := log.Topics[0]
parser, ok := topicFactoryMap[topic]
if !ok {
return nil, fmt.Errorf("no parser for the RunLogEvent topic %s", topic.String())
}
return parser, nil
}
// ParseRunLog decodes the CBOR in the ABI of the log event.
func ParseRunLog(log Log) (JSON, error) {
parser, err := parserFromLog(log)
if err != nil {
return JSON{}, err
}
return parser.parseJSON(log)
}
// parseRunLog20190207withoutIndexes parses the OracleRequest log format after
// the sender and payment amount indexes were removed.
// Additionally, the version field for the data payload was moved next to the
// data that it corresponds to. The fulfillment is made up of the request ID,
// payment amount, callback, expiration, and data.
type parseRunLog20190207withoutIndexes struct{}
func (parseRunLog20190207withoutIndexes) parseJSON(log Log) (JSON, error) {
data := log.Data
idStart := requesterSize
expirationEnd := idStart + idSize + paymentSize + callbackAddrSize + callbackFuncSize + expirationSize
dataLengthStart := expirationEnd + versionSize + dataLocationSize
cborStart := dataLengthStart + dataLengthSize
if len(log.Data) < dataLengthStart+32 {
return JSON{}, errors.New("malformed data")
}
dataLengthBytes, err := UntrustedBytes(data).SafeByteSlice(dataLengthStart, dataLengthStart+32)
if err != nil {
return JSON{}, err
}
dataLength := utils.EVMBytesToUint64(dataLengthBytes)
if len(log.Data) < cborStart+int(dataLength) {
return JSON{}, errors.New("cbor too short")
}
cborData, err := UntrustedBytes(data).SafeByteSlice(cborStart, cborStart+int(dataLength))
if err != nil {
return JSON{}, err
}
js, err := ParseCBOR(cborData)
if err != nil {
return js, fmt.Errorf("Error parsing CBOR: %v", err)
}
dataPrefixBytes, err := UntrustedBytes(data).SafeByteSlice(idStart, expirationEnd)
if err != nil {
return JSON{}, err
}
// The operator contract restricts us to 256 versions, so
// 8 bytes worth of data versions is plenty.
dataVersionBytes, err := UntrustedBytes(data).SafeByteSlice(expirationEnd+versionSize-8, expirationEnd+versionSize)
if err != nil {
return JSON{}, err
}
b := bytes.NewBuffer(dataVersionBytes)
var dataVersion uint64
err = binary.Read(b, binary.BigEndian, &dataVersion)
if err != nil {
return JSON{}, err
}
var fnSelector string
switch dataVersion {
case 1:
fnSelector = OracleFulfillmentFunctionID20190128withoutCast
case 2:
fnSelector = OracleFulfillmentFunctionID2020
default:
return JSON{}, errors.Errorf("unsupported data version %d", dataVersion)
}
return js.MultiAdd(KV{
"address": log.Address.String(),
"dataPrefix": bytesToHex(dataPrefixBytes),
"functionSelector": fnSelector,
})
}
func (parseRunLog20190207withoutIndexes) parseRequestID(log Log) (common.Hash, error) {
start := requesterSize
requestIDBytes, err := UntrustedBytes(log.Data).SafeByteSlice(start, start+idSize)
if err != nil {
return common.Hash{}, err
}
return common.BytesToHash(requestIDBytes), nil
}
func bytesToHex(data []byte) string {
return utils.AddHexPrefix(hex.EncodeToString(data))
}
// IDToTopic encodes the bytes representation of the JobID padded to fit into a
// bytes32
func IDToTopic(id JobID) common.Hash {
return common.BytesToHash(common.RightPadBytes(id.UUID().Bytes(), utils.EVMWordByteLen))
}
// IDToHexTopic encodes the string representation of the JobID
func IDToHexTopic(id JobID) common.Hash {
return common.BytesToHash([]byte(id.String()))
}
type LogCursor struct {
Name string `gorm:"primary_key"`
Initialized bool `gorm:"not null;default true"`
BlockIndex int64 `gorm:"not null;default 0"`
LogIndex int64 `gorm:"not null;default 0"`
}