Skip to content

Commit

Permalink
core/services/keeper: switch to sqlutil.DataSource (#12820)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Apr 16, 2024
1 parent 0af4aca commit e523aa0
Show file tree
Hide file tree
Showing 18 changed files with 214 additions and 190 deletions.
5 changes: 5 additions & 0 deletions .changeset/many-pillows-reflect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

core/services/keeper: switch to sqlutil.DataSource #internal
22 changes: 12 additions & 10 deletions core/internal/cltest/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/keeper"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/store/models"
"github.com/smartcontractkit/chainlink/v2/core/utils"
Expand Down Expand Up @@ -379,15 +378,16 @@ func MakeDirectRequestJobSpec(t *testing.T) *job.Job {
return spec
}

func MustInsertKeeperJob(t *testing.T, db *sqlx.DB, korm keeper.ORM, from evmtypes.EIP55Address, contract evmtypes.EIP55Address) job.Job {
func MustInsertKeeperJob(t *testing.T, db *sqlx.DB, korm *keeper.ORM, from evmtypes.EIP55Address, contract evmtypes.EIP55Address) job.Job {
t.Helper()
ctx := testutils.Context(t)

var keeperSpec job.KeeperSpec
err := korm.Q().Get(&keeperSpec, `INSERT INTO keeper_specs (contract_address, from_address, created_at, updated_at,evm_chain_id) VALUES ($1, $2, NOW(), NOW(), $3) RETURNING *`, contract, from, testutils.SimulatedChainID.Int64())
err := korm.DataSource().GetContext(ctx, &keeperSpec, `INSERT INTO keeper_specs (contract_address, from_address, created_at, updated_at,evm_chain_id) VALUES ($1, $2, NOW(), NOW(), $3) RETURNING *`, contract, from, testutils.SimulatedChainID.Int64())
require.NoError(t, err)

var pipelineSpec pipeline.Spec
err = korm.Q().Get(&pipelineSpec, `INSERT INTO pipeline_specs (dot_dag_source,created_at) VALUES ('',NOW()) RETURNING *`)
err = korm.DataSource().GetContext(ctx, &pipelineSpec, `INSERT INTO pipeline_specs (dot_dag_source,created_at) VALUES ('',NOW()) RETURNING *`)
require.NoError(t, err)

jb := job.Job{
Expand All @@ -411,10 +411,11 @@ func MustInsertKeeperJob(t *testing.T, db *sqlx.DB, korm keeper.ORM, from evmtyp
return jb
}

func MustInsertKeeperRegistry(t *testing.T, db *sqlx.DB, korm keeper.ORM, ethKeyStore keystore.Eth, keeperIndex, numKeepers, blockCountPerTurn int32) (keeper.Registry, job.Job) {
func MustInsertKeeperRegistry(t *testing.T, db *sqlx.DB, korm *keeper.ORM, ethKeyStore keystore.Eth, keeperIndex, numKeepers, blockCountPerTurn int32) (keeper.Registry, job.Job) {
t.Helper()
ctx := testutils.Context(t)
key, _ := MustInsertRandomKey(t, ethKeyStore, *ubig.New(testutils.SimulatedChainID))
from := key.EIP55Address
t.Helper()
contractAddress := NewEIP55Address()
jb := MustInsertKeeperJob(t, db, korm, from, contractAddress)
registry := keeper.Registry{
Expand All @@ -429,13 +430,14 @@ func MustInsertKeeperRegistry(t *testing.T, db *sqlx.DB, korm keeper.ORM, ethKey
from: keeperIndex,
},
}
err := korm.UpsertRegistry(&registry)
err := korm.UpsertRegistry(ctx, &registry)
require.NoError(t, err)
return registry, jb
}

func MustInsertUpkeepForRegistry(t *testing.T, db *sqlx.DB, cfg pg.QConfig, registry keeper.Registry) keeper.UpkeepRegistration {
korm := keeper.NewORM(db, logger.TestLogger(t), cfg)
func MustInsertUpkeepForRegistry(t *testing.T, db *sqlx.DB, registry keeper.Registry) keeper.UpkeepRegistration {
ctx := testutils.Context(t)
korm := keeper.NewORM(db, logger.TestLogger(t))
upkeepID := ubig.NewI(int64(mathrand.Uint32()))
upkeep := keeper.UpkeepRegistration{
UpkeepID: upkeepID,
Expand All @@ -447,7 +449,7 @@ func MustInsertUpkeepForRegistry(t *testing.T, db *sqlx.DB, cfg pg.QConfig, regi
positioningConstant, err := keeper.CalcPositioningConstant(upkeepID, registry.ContractAddress)
require.NoError(t, err)
upkeep.PositioningConstant = positioningConstant
err = korm.UpsertUpkeep(&upkeep)
err = korm.UpsertUpkeep(ctx, &upkeep)
require.NoError(t, err)
return upkeep
}
Expand Down
6 changes: 2 additions & 4 deletions core/services/job/job_orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,7 @@ func TestORM_DeleteJob_DeletesAssociatedRecords(t *testing.T) {
pipelineORM := pipeline.NewORM(db, lggr, config.Database(), config.JobPipeline().MaxSuccessfulRuns())
bridgesORM := bridges.NewORM(db)
jobORM := NewTestORM(t, db, pipelineORM, bridgesORM, keyStore, config.Database())
scopedConfig := evmtest.NewChainScopedConfig(t, config)
korm := keeper.NewORM(db, logger.TestLogger(t), scopedConfig.Database())
korm := keeper.NewORM(db, logger.TestLogger(t))

t.Run("it deletes records for offchainreporting jobs", func(t *testing.T) {
_, bridge := cltest.MustCreateBridge(t, db, cltest.BridgeOpts{})
Expand Down Expand Up @@ -381,8 +380,7 @@ func TestORM_DeleteJob_DeletesAssociatedRecords(t *testing.T) {

t.Run("it deletes records for keeper jobs", func(t *testing.T) {
registry, keeperJob := cltest.MustInsertKeeperRegistry(t, db, korm, keyStore.Eth(), 0, 1, 20)
scoped := evmtest.NewChainScopedConfig(t, config)
cltest.MustInsertUpkeepForRegistry(t, db, scoped.Database(), registry)
cltest.MustInsertUpkeepForRegistry(t, db, registry)

cltest.AssertCount(t, db, "keeper_specs", 1)
cltest.AssertCount(t, db, "keeper_registries", 1)
Expand Down
2 changes: 1 addition & 1 deletion core/services/keeper/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) (services
return nil, err
}
registryAddress := spec.KeeperSpec.ContractAddress
orm := NewORM(d.db, d.logger, chain.Config().Database())
orm := NewORM(d.db, d.logger)
svcLogger := d.logger.With(
"jobID", spec.ID,
"registryAddress", registryAddress.Hex(),
Expand Down
5 changes: 3 additions & 2 deletions core/services/keeper/helpers_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package keeper

import (
"context"
"math/big"

"github.com/ethereum/go-ethereum/core/types"
"github.com/pkg/errors"
)

func (rs *RegistrySynchronizer) ExportedFullSync() {
rs.fullSync()
func (rs *RegistrySynchronizer) ExportedFullSync(ctx context.Context) {
rs.fullSync(ctx)
}

func (rw *RegistryWrapper) GetUpkeepIdFromRawRegistrationLog(rawLog types.Log) (*big.Int, error) {
Expand Down
19 changes: 8 additions & 11 deletions core/services/keeper/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest/heavyweight"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/chainlink"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
Expand Down Expand Up @@ -247,8 +246,7 @@ func TestKeeperEthIntegration(t *testing.T) {
c.EVM[0].MinIncomingConfirmations = ptr[uint32](1) // disable reorg protection for this test
c.EVM[0].HeadTracker.MaxBufferSize = ptr[uint32](100) // helps prevent missed heads
})
scopedConfig := evmtest.NewChainScopedConfig(t, config)
korm := keeper.NewORM(db, logger.TestLogger(t), scopedConfig.Database())
korm := keeper.NewORM(db, logger.TestLogger(t))

app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, backend.Backend(), nodeKey)
require.NoError(t, app.Start(testutils.Context(t)))
Expand Down Expand Up @@ -328,6 +326,7 @@ func TestKeeperEthIntegration(t *testing.T) {
func TestKeeperForwarderEthIntegration(t *testing.T) {
t.Parallel()
t.Run("keeper_forwarder_flow", func(t *testing.T) {
ctx := testutils.Context(t)
g := gomega.NewWithT(t)

// setup node key
Expand Down Expand Up @@ -407,15 +406,14 @@ func TestKeeperForwarderEthIntegration(t *testing.T) {
c.EVM[0].Transactions.ForwardersEnabled = ptr(true) // Enable Operator Forwarder flow
c.EVM[0].ChainID = (*ubig.Big)(testutils.SimulatedChainID)
})
scopedConfig := evmtest.NewChainScopedConfig(t, config)
korm := keeper.NewORM(db, logger.TestLogger(t), scopedConfig.Database())
korm := keeper.NewORM(db, logger.TestLogger(t))

app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, backend.Backend(), nodeKey)
require.NoError(t, app.Start(testutils.Context(t)))
require.NoError(t, app.Start(ctx))

forwarderORM := forwarders.NewORM(db)
chainID := ubig.Big(*backend.ConfiguredChainID())
_, err = forwarderORM.CreateForwarder(testutils.Context(t), fwdrAddress, chainID)
_, err = forwarderORM.CreateForwarder(ctx, fwdrAddress, chainID)
require.NoError(t, err)

addr, err := app.GetRelayers().LegacyEVMChains().Slice()[0].TxManager().GetForwarderForEOA(nodeAddress)
Expand Down Expand Up @@ -452,7 +450,7 @@ func TestKeeperForwarderEthIntegration(t *testing.T) {
evmtypes.EIP55AddressFromAddress(nelly.From): 1,
},
}
err = korm.UpsertRegistry(&registry)
err = korm.UpsertRegistry(ctx, &registry)
require.NoError(t, err)

callOpts := bind.CallOpts{From: nodeAddress}
Expand All @@ -464,7 +462,7 @@ func TestKeeperForwarderEthIntegration(t *testing.T) {
}
require.Equal(t, lastKeeper(), common.Address{})

err = app.JobSpawner().StartService(testutils.Context(t), jb)
err = app.JobSpawner().StartService(ctx, jb)
require.NoError(t, err)

// keeper job is triggered and payload is received
Expand Down Expand Up @@ -551,8 +549,7 @@ func TestMaxPerformDataSize(t *testing.T) {
c.EVM[0].MinIncomingConfirmations = ptr[uint32](1) // disable reorg protection for this test
c.EVM[0].HeadTracker.MaxBufferSize = ptr[uint32](100) // helps prevent missed heads
})
scopedConfig := evmtest.NewChainScopedConfig(t, config)
korm := keeper.NewORM(db, logger.TestLogger(t), scopedConfig.Database())
korm := keeper.NewORM(db, logger.TestLogger(t))

app := cltest.NewApplicationWithConfigV2AndKeyOnSimulatedBlockchain(t, config, backend.Backend(), nodeKey)
require.NoError(t, app.Start(testutils.Context(t)))
Expand Down
70 changes: 39 additions & 31 deletions core/services/keeper/orm.go
Original file line number Diff line number Diff line change
@@ -1,60 +1,60 @@
package keeper

import (
"context"
"math/rand"

"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/pkg/errors"

"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

// ORM implements ORM layer using PostgreSQL
type ORM struct {
q pg.Q
ds sqlutil.DataSource
logger logger.Logger
}

// NewORM is the constructor of postgresORM
func NewORM(db *sqlx.DB, lggr logger.Logger, config pg.QConfig) ORM {
func NewORM(ds sqlutil.DataSource, lggr logger.Logger) *ORM {
lggr = lggr.Named("KeeperORM")
return ORM{
q: pg.NewQ(db, lggr, config),
return &ORM{
ds: ds,
logger: lggr,
}
}

func (korm ORM) Q() pg.Q {
return korm.q
func (o *ORM) DataSource() sqlutil.DataSource {
return o.ds
}

// Registries returns all registries
func (korm ORM) Registries() ([]Registry, error) {
func (o *ORM) Registries(ctx context.Context) ([]Registry, error) {
var registries []Registry
err := korm.q.Select(&registries, `SELECT * FROM keeper_registries ORDER BY id ASC`)
err := o.ds.SelectContext(ctx, &registries, `SELECT * FROM keeper_registries ORDER BY id ASC`)
return registries, errors.Wrap(err, "failed to get registries")
}

// RegistryByContractAddress returns a single registry based on provided address
func (korm ORM) RegistryByContractAddress(registryAddress types.EIP55Address) (Registry, error) {
func (o *ORM) RegistryByContractAddress(ctx context.Context, registryAddress types.EIP55Address) (Registry, error) {
var registry Registry
err := korm.q.Get(&registry, `SELECT * FROM keeper_registries WHERE keeper_registries.contract_address = $1`, registryAddress)
err := o.ds.GetContext(ctx, &registry, `SELECT * FROM keeper_registries WHERE keeper_registries.contract_address = $1`, registryAddress)
return registry, errors.Wrap(err, "failed to get registry")
}

// RegistryForJob returns a specific registry for a job with the given ID
func (korm ORM) RegistryForJob(jobID int32) (Registry, error) {
func (o *ORM) RegistryForJob(ctx context.Context, jobID int32) (Registry, error) {
var registry Registry
err := korm.q.Get(&registry, `SELECT * FROM keeper_registries WHERE job_id = $1 LIMIT 1`, jobID)
err := o.ds.GetContext(ctx, &registry, `SELECT * FROM keeper_registries WHERE job_id = $1 LIMIT 1`, jobID)
return registry, errors.Wrapf(err, "failed to get registry with job_id %d", jobID)
}

// UpsertRegistry upserts registry by the given input
func (korm ORM) UpsertRegistry(registry *Registry) error {
func (o *ORM) UpsertRegistry(ctx context.Context, registry *Registry) error {
stmt := `
INSERT INTO keeper_registries (job_id, keeper_index, contract_address, from_address, check_gas, block_count_per_turn, num_keepers, keeper_index_map) VALUES (
:job_id, :keeper_index, :contract_address, :from_address, :check_gas, :block_count_per_turn, :num_keepers, :keeper_index_map
Expand All @@ -66,12 +66,16 @@ INSERT INTO keeper_registries (job_id, keeper_index, contract_address, from_addr
keeper_index_map = :keeper_index_map
RETURNING *
`
err := korm.q.GetNamed(stmt, registry, registry)
query, args, err := o.ds.BindNamed(stmt, registry)
if err != nil {
return errors.Wrap(err, "failed to upsert registry")
}
err = o.ds.GetContext(ctx, registry, query, args...)
return errors.Wrap(err, "failed to upsert registry")
}

// UpsertUpkeep upserts upkeep by the given input
func (korm ORM) UpsertUpkeep(registration *UpkeepRegistration) error {
func (o *ORM) UpsertUpkeep(ctx context.Context, registration *UpkeepRegistration) error {
stmt := `
INSERT INTO upkeep_registrations (registry_id, execute_gas, check_data, upkeep_id, positioning_constant, last_run_block_height) VALUES (
:registry_id, :execute_gas, :check_data, :upkeep_id, :positioning_constant, :last_run_block_height
Expand All @@ -81,13 +85,17 @@ INSERT INTO upkeep_registrations (registry_id, execute_gas, check_data, upkeep_i
positioning_constant = :positioning_constant
RETURNING *
`
err := korm.q.GetNamed(stmt, registration, registration)
query, args, err := o.ds.BindNamed(stmt, registration)
if err != nil {
return errors.Wrap(err, "failed to upsert upkeep")
}
err = o.ds.GetContext(ctx, registration, query, args...)
return errors.Wrap(err, "failed to upsert upkeep")
}

// UpdateUpkeepLastKeeperIndex updates the last keeper index for an upkeep
func (korm ORM) UpdateUpkeepLastKeeperIndex(jobID int32, upkeepID *big.Big, fromAddress types.EIP55Address) error {
_, err := korm.q.Exec(`
func (o *ORM) UpdateUpkeepLastKeeperIndex(ctx context.Context, jobID int32, upkeepID *big.Big, fromAddress types.EIP55Address) error {
_, err := o.ds.ExecContext(ctx, `
UPDATE upkeep_registrations
SET
last_keeper_index = CAST((SELECT keeper_index_map -> $3 FROM keeper_registries WHERE job_id = $1) AS int)
Expand All @@ -98,12 +106,12 @@ func (korm ORM) UpdateUpkeepLastKeeperIndex(jobID int32, upkeepID *big.Big, from
}

// BatchDeleteUpkeepsForJob deletes all upkeeps by the given IDs for the job with the given ID
func (korm ORM) BatchDeleteUpkeepsForJob(jobID int32, upkeepIDs []big.Big) (int64, error) {
func (o *ORM) BatchDeleteUpkeepsForJob(ctx context.Context, jobID int32, upkeepIDs []big.Big) (int64, error) {
strIds := []string{}
for _, upkeepID := range upkeepIDs {
strIds = append(strIds, upkeepID.String())
}
res, err := korm.q.Exec(`
res, err := o.ds.ExecContext(ctx, `
DELETE FROM upkeep_registrations WHERE registry_id IN (
SELECT id FROM keeper_registries WHERE job_id = $1
) AND upkeep_id = ANY($2)
Expand All @@ -125,7 +133,7 @@ DELETE FROM upkeep_registrations WHERE registry_id IN (
// -- OR is it my buddy's turn AND they were the last keeper to do the perform for this upkeep
// DEV: note we cast upkeep_id and binaryHash as 32 bits, even though both are 256 bit numbers when performing XOR. This is enough information
// to distribute the upkeeps over the keepers so long as num keepers < 4294967296
func (korm ORM) EligibleUpkeepsForRegistry(registryAddress types.EIP55Address, blockNumber int64, gracePeriod int64, binaryHash string) (upkeeps []UpkeepRegistration, err error) {
func (o *ORM) EligibleUpkeepsForRegistry(ctx context.Context, registryAddress types.EIP55Address, blockNumber int64, gracePeriod int64, binaryHash string) (upkeeps []UpkeepRegistration, err error) {
stmt := `
SELECT upkeep_registrations.*
FROM upkeep_registrations
Expand Down Expand Up @@ -165,10 +173,10 @@ WHERE keeper_registries.contract_address = $1
)
)
`
if err = korm.q.Select(&upkeeps, stmt, registryAddress, gracePeriod, blockNumber, binaryHash); err != nil {
if err = o.ds.SelectContext(ctx, &upkeeps, stmt, registryAddress, gracePeriod, blockNumber, binaryHash); err != nil {
return upkeeps, errors.Wrap(err, "EligibleUpkeepsForRegistry failed to get upkeep_registrations")
}
if err = loadUpkeepsRegistry(korm.q, upkeeps); err != nil {
if err = o.loadUpkeepsRegistry(ctx, upkeeps); err != nil {
return upkeeps, errors.Wrap(err, "EligibleUpkeepsForRegistry failed to load Registry on upkeeps")
}

Expand All @@ -179,7 +187,7 @@ WHERE keeper_registries.contract_address = $1
return upkeeps, err
}

func loadUpkeepsRegistry(q pg.Queryer, upkeeps []UpkeepRegistration) error {
func (o *ORM) loadUpkeepsRegistry(ctx context.Context, upkeeps []UpkeepRegistration) error {
registryIDM := make(map[int64]*Registry)
var registryIDs []int64
for _, upkeep := range upkeeps {
Expand All @@ -189,7 +197,7 @@ func loadUpkeepsRegistry(q pg.Queryer, upkeeps []UpkeepRegistration) error {
}
}
var registries []*Registry
err := q.Select(&registries, `SELECT * FROM keeper_registries WHERE id = ANY($1)`, pq.Array(registryIDs))
err := o.ds.SelectContext(ctx, &registries, `SELECT * FROM keeper_registries WHERE id = ANY($1)`, pq.Array(registryIDs))
if err != nil {
return errors.Wrap(err, "loadUpkeepsRegistry failed")
}
Expand All @@ -202,8 +210,8 @@ func loadUpkeepsRegistry(q pg.Queryer, upkeeps []UpkeepRegistration) error {
return nil
}

func (korm ORM) AllUpkeepIDsForRegistry(regID int64) (upkeeps []big.Big, err error) {
err = korm.q.Select(&upkeeps, `
func (o *ORM) AllUpkeepIDsForRegistry(ctx context.Context, regID int64) (upkeeps []big.Big, err error) {
err = o.ds.SelectContext(ctx, &upkeeps, `
SELECT upkeep_id
FROM upkeep_registrations
WHERE registry_id = $1
Expand All @@ -212,8 +220,8 @@ WHERE registry_id = $1
}

// SetLastRunInfoForUpkeepOnJob sets the last run block height and the associated keeper index only if the new block height is greater than the previous.
func (korm ORM) SetLastRunInfoForUpkeepOnJob(jobID int32, upkeepID *big.Big, height int64, fromAddress types.EIP55Address, qopts ...pg.QOpt) (int64, error) {
res, err := korm.q.WithOpts(qopts...).Exec(`
func (o *ORM) SetLastRunInfoForUpkeepOnJob(ctx context.Context, jobID int32, upkeepID *big.Big, height int64, fromAddress types.EIP55Address) (int64, error) {
res, err := o.ds.ExecContext(ctx, `
UPDATE upkeep_registrations
SET last_run_block_height = $1,
last_keeper_index = CAST((SELECT keeper_index_map -> $4 FROM keeper_registries WHERE job_id = $3) AS int)
Expand Down

0 comments on commit e523aa0

Please sign in to comment.