-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
data_source.go
291 lines (257 loc) · 9.81 KB
/
data_source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
package mercury_v1
import (
"context"
"errors"
"fmt"
"math/big"
"sync"
pkgerrors "github.com/pkg/errors"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
relaymercury "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury"
relaymercuryv1 "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/mercury/v1"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/types"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v1/reportcodec"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)
type Runner interface {
ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error)
}
// Fetcher fetcher data from Mercury server
type Fetcher interface {
// FetchInitialMaxFinalizedBlockNumber should fetch the initial max finalized block number
FetchInitialMaxFinalizedBlockNumber(context.Context) (*int64, error)
}
type datasource struct {
pipelineRunner Runner
jb job.Job
spec pipeline.Spec
lggr logger.Logger
runResults chan<- *pipeline.Run
orm types.DataSourceORM
codec reportcodec.ReportCodec
feedID [32]byte
mu sync.RWMutex
chEnhancedTelem chan<- ocrcommon.EnhancedTelemetryMercuryData
chainHeadTracker types.ChainHeadTracker
fetcher Fetcher
initialBlockNumber *int64
}
var _ relaymercuryv1.DataSource = &datasource{}
func NewDataSource(orm types.DataSourceORM, pr pipeline.Runner, jb job.Job, spec pipeline.Spec, lggr logger.Logger, rr chan *pipeline.Run, enhancedTelemChan chan ocrcommon.EnhancedTelemetryMercuryData, chainHeadTracker types.ChainHeadTracker, fetcher Fetcher, initialBlockNumber *int64, feedID [32]byte) *datasource {
return &datasource{pr, jb, spec, lggr, rr, orm, reportcodec.ReportCodec{}, feedID, sync.RWMutex{}, enhancedTelemChan, chainHeadTracker, fetcher, initialBlockNumber}
}
func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestamp, fetchMaxFinalizedBlockNum bool) (obs relaymercuryv1.Observation, err error) {
// setCurrentBlock must come first, along with observationTimestamp, to
// avoid front-running
ds.setCurrentBlock(ctx, &obs)
var wg sync.WaitGroup
if fetchMaxFinalizedBlockNum {
wg.Add(1)
go func() {
defer wg.Done()
latest, dbErr := ds.orm.LatestReport(ctx, ds.feedID)
if dbErr != nil {
obs.MaxFinalizedBlockNumber.Err = dbErr
return
}
if latest != nil {
obs.MaxFinalizedBlockNumber.Val, obs.MaxFinalizedBlockNumber.Err = ds.codec.CurrentBlockNumFromReport(latest)
return
}
val, fetchErr := ds.fetcher.FetchInitialMaxFinalizedBlockNumber(ctx)
if fetchErr != nil {
obs.MaxFinalizedBlockNumber.Err = fetchErr
return
}
if val != nil {
obs.MaxFinalizedBlockNumber.Val = *val
return
}
if ds.initialBlockNumber == nil {
if obs.CurrentBlockNum.Err != nil {
obs.MaxFinalizedBlockNumber.Err = fmt.Errorf("FetchInitialMaxFinalizedBlockNumber returned empty LatestReport; this is a new feed. No initialBlockNumber was set, tried to use current block number to determine maxFinalizedBlockNumber but got error: %w", obs.CurrentBlockNum.Err)
} else {
// Subract 1 here because we will later add 1 to the
// maxFinalizedBlockNumber to get the first validFromBlockNum, which
// ought to be the same as current block num.
obs.MaxFinalizedBlockNumber.Val = obs.CurrentBlockNum.Val - 1
ds.lggr.Infof("FetchInitialMaxFinalizedBlockNumber returned empty LatestReport; this is a new feed so maxFinalizedBlockNumber=%d (initialBlockNumber unset, using currentBlockNum=%d-1)", obs.MaxFinalizedBlockNumber.Val, obs.CurrentBlockNum.Val)
}
} else {
// NOTE: It's important to subtract 1 if the server is missing any past
// report (brand new feed) since we will add 1 to the
// maxFinalizedBlockNumber to get the first validFromBlockNum, which
// ought to be zero.
//
// If "initialBlockNumber" is set to zero, this will give a starting block of zero.
obs.MaxFinalizedBlockNumber.Val = *ds.initialBlockNumber - 1
ds.lggr.Infof("FetchInitialMaxFinalizedBlockNumber returned empty LatestReport; this is a new feed so maxFinalizedBlockNumber=%d (initialBlockNumber=%d)", obs.MaxFinalizedBlockNumber.Val, *ds.initialBlockNumber)
}
}()
} else {
obs.MaxFinalizedBlockNumber.Err = errors.New("fetchMaxFinalizedBlockNum=false")
}
var trrs pipeline.TaskRunResults
wg.Add(1)
go func() {
defer wg.Done()
var run *pipeline.Run
run, trrs, err = ds.executeRun(ctx)
if err != nil {
err = fmt.Errorf("Observe failed while executing run: %w", err)
return
}
select {
case ds.runResults <- run:
default:
ds.lggr.Warnf("unable to enqueue run save for job ID %d, buffer full", ds.spec.JobID)
}
// NOTE: trrs comes back as _all_ tasks, but we only want the terminal ones
// They are guaranteed to be sorted by index asc so should be in the correct order
var finaltrrs []pipeline.TaskRunResult
for _, trr := range trrs {
if trr.IsTerminal() {
finaltrrs = append(finaltrrs, trr)
}
}
var parsed parseOutput
parsed, err = ds.parse(finaltrrs)
if err != nil {
err = fmt.Errorf("Observe failed while parsing run results: %w", err)
return
}
obs.BenchmarkPrice = parsed.benchmarkPrice
obs.Bid = parsed.bid
obs.Ask = parsed.ask
}()
wg.Wait()
if ocrcommon.ShouldCollectEnhancedTelemetryMercury(&ds.jb) {
ocrcommon.EnqueueEnhancedTelem(ds.chEnhancedTelem, ocrcommon.EnhancedTelemetryMercuryData{
TaskRunResults: trrs,
Observation: obs,
RepTimestamp: repts,
})
}
return obs, err
}
func toBigInt(val interface{}) (*big.Int, error) {
dec, err := utils.ToDecimal(val)
if err != nil {
return nil, err
}
return dec.BigInt(), nil
}
type parseOutput struct {
benchmarkPrice relaymercury.ObsResult[*big.Int]
bid relaymercury.ObsResult[*big.Int]
ask relaymercury.ObsResult[*big.Int]
}
// parse expects the output of observe to be three values, in the following order:
// 1. benchmark price
// 2. bid
// 3. ask
//
// returns error on parse errors: if something is the wrong type
func (ds *datasource) parse(trrs pipeline.TaskRunResults) (o parseOutput, merr error) {
var finaltrrs []pipeline.TaskRunResult
for _, trr := range trrs {
// only return terminal trrs from executeRun
if trr.IsTerminal() {
finaltrrs = append(finaltrrs, trr)
}
}
// pipeline.TaskRunResults comes ordered asc by index, this is guaranteed
// by the pipeline executor
if len(finaltrrs) != 3 {
return o, fmt.Errorf("invalid number of results, expected: 3, got: %d", len(finaltrrs))
}
merr = errors.Join(
setBenchmarkPrice(&o, finaltrrs[0].Result),
setBid(&o, finaltrrs[1].Result),
setAsk(&o, finaltrrs[2].Result),
)
return o, merr
}
func setBenchmarkPrice(o *parseOutput, res pipeline.Result) error {
if res.Error != nil {
o.benchmarkPrice.Err = res.Error
} else if val, err := toBigInt(res.Value); err != nil {
return fmt.Errorf("failed to parse BenchmarkPrice: %w", err)
} else {
o.benchmarkPrice.Val = val
}
return nil
}
func setBid(o *parseOutput, res pipeline.Result) error {
if res.Error != nil {
o.bid.Err = res.Error
} else if val, err := toBigInt(res.Value); err != nil {
return fmt.Errorf("failed to parse Bid: %w", err)
} else {
o.bid.Val = val
}
return nil
}
func setAsk(o *parseOutput, res pipeline.Result) error {
if res.Error != nil {
o.ask.Err = res.Error
} else if val, err := toBigInt(res.Value); err != nil {
return fmt.Errorf("failed to parse Ask: %w", err)
} else {
o.ask.Val = val
}
return nil
}
// The context passed in here has a timeout of (ObservationTimeout + ObservationGracePeriod).
// Upon context cancellation, its expected that we return any usable values within ObservationGracePeriod.
func (ds *datasource) executeRun(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) {
vars := pipeline.NewVarsFrom(map[string]interface{}{
"jb": map[string]interface{}{
"databaseID": ds.jb.ID,
"externalJobID": ds.jb.ExternalJobID,
"name": ds.jb.Name.ValueOrZero(),
},
})
run, trrs, err := ds.pipelineRunner.ExecuteRun(ctx, ds.spec, vars, ds.lggr)
if err != nil {
return nil, nil, pkgerrors.Wrapf(err, "error executing run for spec ID %v", ds.spec.ID)
}
return run, trrs, err
}
func (ds *datasource) setCurrentBlock(ctx context.Context, obs *relaymercuryv1.Observation) {
latestHead, err := ds.getCurrentBlock(ctx)
if err != nil {
obs.CurrentBlockNum.Err = err
obs.CurrentBlockHash.Err = err
obs.CurrentBlockTimestamp.Err = err
return
}
obs.CurrentBlockNum.Val = latestHead.Number
obs.CurrentBlockHash.Val = latestHead.Hash.Bytes()
if latestHead.Timestamp.IsZero() {
obs.CurrentBlockTimestamp.Val = 0
} else {
obs.CurrentBlockTimestamp.Val = uint64(latestHead.Timestamp.Unix())
}
}
func (ds *datasource) getCurrentBlock(ctx context.Context) (*evmtypes.Head, error) {
// Use the headtracker's view of the latest block, this is very fast since
// it doesn't make any external network requests, and it is the
// headtracker's job to ensure it has an up-to-date view of the chain based
// on responses from all available RPC nodes
latestHead := ds.chainHeadTracker.HeadTracker().LatestChain()
if latestHead == nil {
logger.Sugared(ds.lggr).AssumptionViolation("HeadTracker unexpectedly returned nil head, falling back to RPC call")
var err error
latestHead, err = ds.chainHeadTracker.Client().HeadByNumber(ctx, nil)
if err != nil {
return nil, err
}
}
return latestHead, nil
}