-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
data_source.go
79 lines (65 loc) · 1.96 KB
/
data_source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package medianpoc
import (
"context"
"errors"
"fmt"
"math/big"
"sync"
"time"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink/v2/core/bridges"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)
type DataSource struct {
pipelineRunner types.PipelineRunnerService
spec string
lggr logger.Logger
current bridges.BridgeMetaData
mu sync.RWMutex
}
func (d *DataSource) Observe(ctx context.Context, reportTimestamp ocrtypes.ReportTimestamp) (*big.Int, error) {
md, err := bridges.MarshalBridgeMetaData(d.currentAnswer())
if err != nil {
d.lggr.Warnw("unable to attach metadata for run", "err", err)
}
// NOTE: job metadata is automatically attached by the pipeline runner service
vars := types.Vars{
Vars: map[string]interface{}{
"jobRun": md,
},
}
results, err := d.pipelineRunner.ExecuteRun(ctx, d.spec, vars, types.Options{})
if err != nil {
return nil, err
}
finalResults := results.FinalResults()
if len(finalResults) == 0 {
return nil, errors.New("pipeline execution failed: not enough results")
}
finalResult := finalResults[0]
if finalResult.Error != nil {
return nil, fmt.Errorf("pipeline execution failed: %w", finalResult.Error)
}
asDecimal, err := utils.ToDecimal(finalResult.Value)
if err != nil {
return nil, errors.New("cannot convert observation to decimal")
}
resultAsBigInt := asDecimal.BigInt()
d.updateAnswer(resultAsBigInt)
return resultAsBigInt, nil
}
func (d *DataSource) currentAnswer() (*big.Int, *big.Int) {
d.mu.RLock()
defer d.mu.RUnlock()
return d.current.LatestAnswer, d.current.UpdatedAt
}
func (d *DataSource) updateAnswer(latestAnswer *big.Int) {
d.mu.Lock()
defer d.mu.Unlock()
d.current = bridges.BridgeMetaData{
LatestAnswer: latestAnswer,
UpdatedAt: big.NewInt(time.Now().Unix()),
}
}