Skip to content

Commit

Permalink
switch more EVM components to use sqlutil.DataStore (#12856)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Apr 17, 2024
1 parent 99443c5 commit 0ec9276
Show file tree
Hide file tree
Showing 96 changed files with 1,363 additions and 1,384 deletions.
5 changes: 5 additions & 0 deletions .changeset/soft-hotels-decide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

switch more EVM components to use sqlutil.DataStore #internal
2 changes: 1 addition & 1 deletion common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) save
// is relatively benign and probably nobody will ever run into it in
// practice, but something to be aware of.
if etx.PipelineTaskRunID.Valid && eb.resumeCallback != nil && etx.SignalCallback {
err := eb.resumeCallback(etx.PipelineTaskRunID.UUID, nil, fmt.Errorf("fatal error while sending transaction: %s", etx.Error.String))
err := eb.resumeCallback(ctx, etx.PipelineTaskRunID.UUID, nil, fmt.Errorf("fatal error while sending transaction: %s", etx.Error.String))
if errors.Is(err, sql.ErrNoRows) {
lgr.Debugw("callback missing or already resumed", "etxID", etx.ID)
} else if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1120,7 +1120,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Res
}

ec.lggr.Debugw("Callback: resuming tx with receipt", "output", output, "taskErr", taskErr, "pipelineTaskRunID", data.ID)
if err := ec.resumeCallback(data.ID, output, taskErr); err != nil {
if err := ec.resumeCallback(ctx, data.ID, output, taskErr); err != nil {
return fmt.Errorf("failed to resume suspended pipeline run: %w", err)
}
// Mark tx as having completed callback
Expand Down
2 changes: 1 addition & 1 deletion common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
// https://www.notion.so/chainlink/Txm-Architecture-Overview-9dc62450cd7a443ba9e7dceffa1a8d6b

// ResumeCallback is assumed to be idempotent
type ResumeCallback func(id uuid.UUID, result interface{}, err error) error
type ResumeCallback func(ctx context.Context, id uuid.UUID, result interface{}, err error) error

// TxManager is the main component of the transaction manager.
// It is also the interface to external callers.
Expand Down
5 changes: 2 additions & 3 deletions core/bridges/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/store/models"
)
Expand Down Expand Up @@ -144,8 +143,8 @@ func TestORM_TestCachedResponse(t *testing.T) {
db := pgtest.NewSqlxDB(t)
orm := bridges.NewORM(db)

trORM := pipeline.NewORM(db, logger.TestLogger(t), cfg.Database(), cfg.JobPipeline().MaxSuccessfulRuns())
specID, err := trORM.CreateSpec(pipeline.Pipeline{}, *models.NewInterval(5 * time.Minute), pg.WithParentCtx(testutils.Context(t)))
trORM := pipeline.NewORM(db, logger.TestLogger(t), cfg.JobPipeline().MaxSuccessfulRuns())
specID, err := trORM.CreateSpec(ctx, nil, pipeline.Pipeline{}, *models.NewInterval(5 * time.Minute))
require.NoError(t, err)

_, err = orm.GetCachedResponse(ctx, "dot", specID, 1*time.Second)
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/forwarders/forwarder_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ type FwdMgr struct {
wg sync.WaitGroup
}

func NewFwdMgr(db sqlutil.DataSource, client evmclient.Client, logpoller evmlogpoller.LogPoller, l logger.Logger, cfg Config) *FwdMgr {
func NewFwdMgr(ds sqlutil.DataSource, client evmclient.Client, logpoller evmlogpoller.LogPoller, l logger.Logger, cfg Config) *FwdMgr {
lggr := logger.Sugared(logger.Named(l, "EVMForwarderManager"))
fwdMgr := FwdMgr{
logger: lggr,
cfg: cfg,
evmClient: client,
ORM: NewORM(db),
ORM: NewORM(ds),
logpoller: logpoller,
sendersCache: make(map[common.Address][]common.Address),
}
Expand Down
46 changes: 23 additions & 23 deletions core/chains/evm/forwarders/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,50 +23,50 @@ type ORM interface {
FindForwardersInListByChain(ctx context.Context, evmChainId big.Big, addrs []common.Address) ([]Forwarder, error)
}

type DbORM struct {
db sqlutil.DataSource
type DSORM struct {
ds sqlutil.DataSource
}

var _ ORM = &DbORM{}
var _ ORM = &DSORM{}

func NewORM(db sqlutil.DataSource) *DbORM {
return &DbORM{db: db}
func NewORM(ds sqlutil.DataSource) *DSORM {
return &DSORM{ds: ds}
}

func (o *DbORM) Transaction(ctx context.Context, fn func(*DbORM) error) (err error) {
return sqlutil.Transact(ctx, o.new, o.db, nil, fn)
func (o *DSORM) Transact(ctx context.Context, fn func(*DSORM) error) (err error) {
return sqlutil.Transact(ctx, o.new, o.ds, nil, fn)
}

// new returns a NewORM like o, but backed by q.
func (o *DbORM) new(q sqlutil.DataSource) *DbORM { return NewORM(q) }
func (o *DSORM) new(q sqlutil.DataSource) *DSORM { return NewORM(q) }

// CreateForwarder creates the Forwarder address associated with the current EVM chain id.
func (o *DbORM) CreateForwarder(ctx context.Context, addr common.Address, evmChainId big.Big) (fwd Forwarder, err error) {
func (o *DSORM) CreateForwarder(ctx context.Context, addr common.Address, evmChainId big.Big) (fwd Forwarder, err error) {
sql := `INSERT INTO evm.forwarders (address, evm_chain_id, created_at, updated_at) VALUES ($1, $2, now(), now()) RETURNING *`
err = o.db.GetContext(ctx, &fwd, sql, addr, evmChainId)
err = o.ds.GetContext(ctx, &fwd, sql, addr, evmChainId)
return fwd, err
}

// DeleteForwarder removes a forwarder address.
// If cleanup is non-nil, it can be used to perform any chain- or contract-specific cleanup that need to happen atomically
// on forwarder deletion. If cleanup returns an error, forwarder deletion will be aborted.
func (o *DbORM) DeleteForwarder(ctx context.Context, id int64, cleanup func(tx sqlutil.DataSource, evmChainID int64, addr common.Address) error) (err error) {
return o.Transaction(ctx, func(orm *DbORM) error {
func (o *DSORM) DeleteForwarder(ctx context.Context, id int64, cleanup func(tx sqlutil.DataSource, evmChainID int64, addr common.Address) error) (err error) {
return o.Transact(ctx, func(orm *DSORM) error {
var dest struct {
EvmChainId int64
Address common.Address
}
err := orm.db.GetContext(ctx, &dest, `SELECT evm_chain_id, address FROM evm.forwarders WHERE id = $1`, id)
err := orm.ds.GetContext(ctx, &dest, `SELECT evm_chain_id, address FROM evm.forwarders WHERE id = $1`, id)
if err != nil {
return err
}
if cleanup != nil {
if err = cleanup(orm.db, dest.EvmChainId, dest.Address); err != nil {
if err = cleanup(orm.ds, dest.EvmChainId, dest.Address); err != nil {
return err
}
}

result, err := orm.db.ExecContext(ctx, `DELETE FROM evm.forwarders WHERE id = $1`, id)
result, err := orm.ds.ExecContext(ctx, `DELETE FROM evm.forwarders WHERE id = $1`, id)
// If the forwarder wasn't found, we still want to delete the filter.
// In that case, the transaction must return nil, even though DeleteForwarder
// will return sql.ErrNoRows
Expand All @@ -82,27 +82,27 @@ func (o *DbORM) DeleteForwarder(ctx context.Context, id int64, cleanup func(tx s
}

// FindForwarders returns all forwarder addresses from offset up until limit.
func (o *DbORM) FindForwarders(ctx context.Context, offset, limit int) (fwds []Forwarder, count int, err error) {
func (o *DSORM) FindForwarders(ctx context.Context, offset, limit int) (fwds []Forwarder, count int, err error) {
sql := `SELECT count(*) FROM evm.forwarders`
if err = o.db.GetContext(ctx, &count, sql); err != nil {
if err = o.ds.GetContext(ctx, &count, sql); err != nil {
return
}

sql = `SELECT * FROM evm.forwarders ORDER BY created_at DESC, id DESC LIMIT $1 OFFSET $2`
if err = o.db.SelectContext(ctx, &fwds, sql, limit, offset); err != nil {
if err = o.ds.SelectContext(ctx, &fwds, sql, limit, offset); err != nil {
return
}
return
}

// FindForwardersByChain returns all forwarder addresses for a chain.
func (o *DbORM) FindForwardersByChain(ctx context.Context, evmChainId big.Big) (fwds []Forwarder, err error) {
func (o *DSORM) FindForwardersByChain(ctx context.Context, evmChainId big.Big) (fwds []Forwarder, err error) {
sql := `SELECT * FROM evm.forwarders where evm_chain_id = $1 ORDER BY created_at DESC, id DESC`
err = o.db.SelectContext(ctx, &fwds, sql, evmChainId)
err = o.ds.SelectContext(ctx, &fwds, sql, evmChainId)
return
}

func (o *DbORM) FindForwardersInListByChain(ctx context.Context, evmChainId big.Big, addrs []common.Address) ([]Forwarder, error) {
func (o *DSORM) FindForwardersInListByChain(ctx context.Context, evmChainId big.Big, addrs []common.Address) ([]Forwarder, error) {
var fwdrs []Forwarder

arg := map[string]interface{}{
Expand All @@ -127,8 +127,8 @@ func (o *DbORM) FindForwardersInListByChain(ctx context.Context, evmChainId big.
return nil, pkgerrors.Wrap(err, "Failed to run sqlx.IN on query")
}

query = o.db.Rebind(query)
err = o.db.SelectContext(ctx, &fwdrs, query, args...)
query = o.ds.Rebind(query)
err = o.ds.SelectContext(ctx, &fwdrs, query, args...)

if err != nil {
return nil, pkgerrors.Wrap(err, "Failed to execute query")
Expand Down
16 changes: 8 additions & 8 deletions core/chains/evm/headtracker/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ var _ ORM = &DbORM{}

type DbORM struct {
chainID ubig.Big
db sqlutil.DataSource
ds sqlutil.DataSource
}

// NewORM creates an ORM scoped to chainID.
func NewORM(chainID big.Int, db sqlutil.DataSource) *DbORM {
func NewORM(chainID big.Int, ds sqlutil.DataSource) *DbORM {
return &DbORM{
chainID: ubig.Big(chainID),
db: db,
ds: ds,
}
}

Expand All @@ -48,19 +48,19 @@ func (orm *DbORM) IdempotentInsertHead(ctx context.Context, head *evmtypes.Head)
INSERT INTO evm.heads (hash, number, parent_hash, created_at, timestamp, l1_block_number, evm_chain_id, base_fee_per_gas) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (evm_chain_id, hash) DO NOTHING`
_, err := orm.db.ExecContext(ctx, query, head.Hash, head.Number, head.ParentHash, head.CreatedAt, head.Timestamp, head.L1BlockNumber, orm.chainID, head.BaseFeePerGas)
_, err := orm.ds.ExecContext(ctx, query, head.Hash, head.Number, head.ParentHash, head.CreatedAt, head.Timestamp, head.L1BlockNumber, orm.chainID, head.BaseFeePerGas)
return pkgerrors.Wrap(err, "IdempotentInsertHead failed to insert head")
}

func (orm *DbORM) TrimOldHeads(ctx context.Context, minBlockNumber int64) (err error) {
query := `DELETE FROM evm.heads WHERE evm_chain_id = $1 AND number < $2`
_, err = orm.db.ExecContext(ctx, query, orm.chainID, minBlockNumber)
_, err = orm.ds.ExecContext(ctx, query, orm.chainID, minBlockNumber)
return err
}

func (orm *DbORM) LatestHead(ctx context.Context) (head *evmtypes.Head, err error) {
head = new(evmtypes.Head)
err = orm.db.GetContext(ctx, head, `SELECT * FROM evm.heads WHERE evm_chain_id = $1 ORDER BY number DESC, created_at DESC, id DESC LIMIT 1`, orm.chainID)
err = orm.ds.GetContext(ctx, head, `SELECT * FROM evm.heads WHERE evm_chain_id = $1 ORDER BY number DESC, created_at DESC, id DESC LIMIT 1`, orm.chainID)
if pkgerrors.Is(err, sql.ErrNoRows) {
return nil, nil
}
Expand All @@ -69,14 +69,14 @@ func (orm *DbORM) LatestHead(ctx context.Context) (head *evmtypes.Head, err erro
}

func (orm *DbORM) LatestHeads(ctx context.Context, minBlockNumer int64) (heads []*evmtypes.Head, err error) {
err = orm.db.SelectContext(ctx, &heads, `SELECT * FROM evm.heads WHERE evm_chain_id = $1 AND number >= $2 ORDER BY number DESC, created_at DESC, id DESC`, orm.chainID, minBlockNumer)
err = orm.ds.SelectContext(ctx, &heads, `SELECT * FROM evm.heads WHERE evm_chain_id = $1 AND number >= $2 ORDER BY number DESC, created_at DESC, id DESC`, orm.chainID, minBlockNumer)
err = pkgerrors.Wrap(err, "LatestHeads failed")
return
}

func (orm *DbORM) HeadByHash(ctx context.Context, hash common.Hash) (head *evmtypes.Head, err error) {
head = new(evmtypes.Head)
err = orm.db.GetContext(ctx, head, `SELECT * FROM evm.heads WHERE evm_chain_id = $1 AND hash = $2`, orm.chainID, hash)
err = orm.ds.GetContext(ctx, head, `SELECT * FROM evm.heads WHERE evm_chain_id = $1 AND hash = $2`, orm.chainID, hash)
if pkgerrors.Is(err, sql.ErrNoRows) {
return nil, nil
}
Expand Down
56 changes: 20 additions & 36 deletions core/chains/evm/log/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ import (
"sync/atomic"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
pkgerrors "github.com/pkg/errors"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-common/pkg/utils"
"github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox"

Expand Down Expand Up @@ -60,12 +59,10 @@ type (
Register(listener Listener, opts ListenerOpts) (unsubscribe func())

WasAlreadyConsumed(ctx context.Context, lb Broadcast) (bool, error)
MarkConsumed(ctx context.Context, lb Broadcast) error

// MarkManyConsumed marks all the provided log broadcasts as consumed.
MarkManyConsumed(ctx context.Context, lbs []Broadcast) error
// ds is optional
MarkConsumed(ctx context.Context, ds sqlutil.DataSource, lb Broadcast) error

// NOTE: WasAlreadyConsumed, MarkConsumed and MarkManyConsumed MUST be used within a single goroutine in order for WasAlreadyConsumed to be accurate
// NOTE: WasAlreadyConsumed, and MarkConsumed MUST be used within a single goroutine in order for WasAlreadyConsumed to be accurate
}

BroadcasterInTest interface {
Expand Down Expand Up @@ -422,12 +419,15 @@ func (b *broadcaster) eventLoop(chRawLogs <-chan types.Log, chErr <-chan error)
debounceResubscribe := time.NewTicker(1 * time.Second)
defer debounceResubscribe.Stop()

ctx, cancel := b.chStop.NewCtx()
defer cancel()

b.logger.Debug("Starting the event loop")
for {
// Replay requests take priority.
select {
case req := <-b.replayChannel:
b.onReplayRequest(req)
b.onReplayRequest(ctx, req)
return true, nil
default:
}
Expand Down Expand Up @@ -456,7 +456,7 @@ func (b *broadcaster) eventLoop(chRawLogs <-chan types.Log, chErr <-chan error)
needsResubscribe = b.onChangeSubscriberStatus() || needsResubscribe

case req := <-b.replayChannel:
b.onReplayRequest(req)
b.onReplayRequest(ctx, req)
return true, nil

case <-debounceResubscribe.C:
Expand All @@ -480,7 +480,7 @@ func (b *broadcaster) eventLoop(chRawLogs <-chan types.Log, chErr <-chan error)
}

// onReplayRequest clears the pool and sets the block backfill number.
func (b *broadcaster) onReplayRequest(replayReq replayRequest) {
func (b *broadcaster) onReplayRequest(ctx context.Context, replayReq replayRequest) {
// notify subscribers that we are about to replay.
for subscriber := range b.registrations.registeredSubs {
if subscriber.opts.ReplayStartedCallback != nil {
Expand All @@ -495,11 +495,11 @@ func (b *broadcaster) onReplayRequest(replayReq replayRequest) {
b.backfillBlockNumber.Int64 = replayReq.fromBlock
b.backfillBlockNumber.Valid = true
if replayReq.forceBroadcast {
ctx, cancel := b.chStop.CtxCancel(context.WithTimeout(context.Background(), time.Minute))
ctx = sqlutil.WithoutDefaultTimeout(ctx)
defer cancel()
// Use a longer timeout in the event that a very large amount of logs need to be marked
// as consumed.
// as unconsumed.
var cancel func()
ctx, cancel = context.WithTimeout(sqlutil.WithoutDefaultTimeout(ctx), time.Minute)
defer cancel()
err := b.orm.MarkBroadcastsUnconsumed(ctx, replayReq.fromBlock)
if err != nil {
b.logger.Errorw("Error marking broadcasts as unconsumed",
Expand Down Expand Up @@ -694,25 +694,12 @@ func (b *broadcaster) WasAlreadyConsumed(ctx context.Context, lb Broadcast) (boo
}

// MarkConsumed marks the log as having been successfully consumed by the subscriber
func (b *broadcaster) MarkConsumed(ctx context.Context, lb Broadcast) error {
return b.orm.MarkBroadcastConsumed(ctx, lb.RawLog().BlockHash, lb.RawLog().BlockNumber, lb.RawLog().Index, lb.JobID())
}

// MarkManyConsumed marks the logs as having been successfully consumed by the subscriber
func (b *broadcaster) MarkManyConsumed(ctx context.Context, lbs []Broadcast) (err error) {
var (
blockHashes = make([]common.Hash, len(lbs))
blockNumbers = make([]uint64, len(lbs))
logIndexes = make([]uint, len(lbs))
jobIDs = make([]int32, len(lbs))
)
for i := range lbs {
blockHashes[i] = lbs[i].RawLog().BlockHash
blockNumbers[i] = lbs[i].RawLog().BlockNumber
logIndexes[i] = lbs[i].RawLog().Index
jobIDs[i] = lbs[i].JobID()
func (b *broadcaster) MarkConsumed(ctx context.Context, ds sqlutil.DataSource, lb Broadcast) error {
orm := b.orm
if ds != nil {
orm = orm.WithDataSource(ds)
}
return b.orm.MarkBroadcastsConsumed(ctx, blockHashes, blockNumbers, logIndexes, jobIDs)
return orm.MarkBroadcastConsumed(ctx, lb.RawLog().BlockHash, lb.RawLog().BlockNumber, lb.RawLog().Index, lb.JobID())
}

// test only
Expand Down Expand Up @@ -779,10 +766,7 @@ func (n *NullBroadcaster) TrackedAddressesCount() uint32 {
func (n *NullBroadcaster) WasAlreadyConsumed(ctx context.Context, lb Broadcast) (bool, error) {
return false, pkgerrors.New(n.ErrMsg)
}
func (n *NullBroadcaster) MarkConsumed(ctx context.Context, lb Broadcast) error {
return pkgerrors.New(n.ErrMsg)
}
func (n *NullBroadcaster) MarkManyConsumed(ctx context.Context, lbs []Broadcast) error {
func (n *NullBroadcaster) MarkConsumed(ctx context.Context, ds sqlutil.DataSource, lb Broadcast) error {
return pkgerrors.New(n.ErrMsg)
}

Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/log/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (listener *simpleLogListener) SkipMarkingConsumed(skip bool) {
listener.skipMarkingConsumed.Store(skip)
}

func (listener *simpleLogListener) HandleLog(lb log.Broadcast) {
func (listener *simpleLogListener) HandleLog(ctx context.Context, lb log.Broadcast) {
listener.received.Lock()
defer listener.received.Unlock()
listener.lggr.Tracef("Listener %v HandleLog for block %v %v received at %v %v", listener.name, lb.RawLog().BlockNumber, lb.RawLog().BlockHash, lb.LatestBlockNumber(), lb.LatestBlockHash())
Expand Down

0 comments on commit 0ec9276

Please sign in to comment.