Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offchain usdc #112

Merged
merged 26 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f478a01
basic attestation service
RensR Sep 5, 2023
7a883b7
add hard coded LP subscription logic
RensR Sep 5, 2023
5468713
check usdc messges attestation in batch building (mock)
RensR Sep 5, 2023
1a7b5a9
add logpoller IndexedLogsByTxHash
RensR Sep 6, 2023
1577e56
refactor USDC service to be self contained
RensR Sep 6, 2023
2ac5879
make offchain token data providers generic
RensR Sep 7, 2023
8c2250f
add all usdc token and messageTransmitter addresses
RensR Sep 7, 2023
0f2e9eb
minor code improvements
RensR Sep 13, 2023
bb5b50e
Merge branch 'ccip-develop' into offchain-usdc
RensR Sep 13, 2023
f738579
Merge branch 'ccip-develop' into offchain-usdc
RensR Sep 13, 2023
a523911
Merge branch 'ccip-develop' into offchain-usdc
RensR Sep 14, 2023
256c29f
fix evm logs reference
RensR Sep 14, 2023
68a70d5
review comments
RensR Sep 14, 2023
addbdbb
renames and passing in uri
RensR Sep 19, 2023
b7f2f81
Merge branch 'ccip-develop' into offchain-usdc
RensR Sep 19, 2023
0ef66b3
re-use logindex and tx hash from exec plugin
RensR Sep 19, 2023
239b7ca
cached reader + ReadTokenData rename
RensR Sep 19, 2023
8029d4c
fix test, rm unused code
RensR Sep 20, 2023
6e31124
pass in internal.EVM2EVMOnRampCCIPSendRequestedWithMeta
RensR Sep 20, 2023
0e80c67
extract chains logic and fix linting
RensR Sep 20, 2023
7be78f6
add cached reader test & reader mock
RensR Sep 20, 2023
142a6d1
more tests, real context and extract into method
RensR Sep 20, 2023
50a59b2
Merge branch 'ccip-develop' into offchain-usdc
RensR Sep 20, 2023
acfeef5
read usdc config from jobspec
RensR Sep 20, 2023
d00da8c
improve logging, reduce exported usdc types
RensR Sep 20, 2023
e4f6b07
Merge branch 'ccip-develop' into offchain-usdc
RensR Sep 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/chains/evm/logpoller/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,7 @@ func (d disabled) LatestBlockByEventSigsAddrsWithConfs(fromBlock int64, eventSig
func (d disabled) LogsUntilBlockHashDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, untilBlockHash common.Hash, qopts ...pg.QOpt) ([]Log, error) {
return nil, ErrDisabled
}

func (d disabled) IndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) {
return nil, ErrDisabled
}
5 changes: 5 additions & 0 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type LogPoller interface {
// Content based querying
IndexedLogs(eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error)
IndexedLogsByBlockRange(start, end int64, eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, qopts ...pg.QOpt) ([]Log, error)
IndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error)
IndexedLogsCreatedAfter(eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error)
IndexedLogsTopicGreaterThan(eventSig common.Hash, address common.Address, topicIndex int, topicValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error)
IndexedLogsTopicRange(eventSig common.Hash, address common.Address, topicIndex int, topicValueMin common.Hash, topicValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error)
Expand Down Expand Up @@ -1015,6 +1016,10 @@ func (lp *logPoller) LatestBlockByEventSigsAddrsWithConfs(fromBlock int64, event
return lp.orm.SelectLatestBlockNumberEventSigsAddrsWithConfs(fromBlock, eventSigs, addresses, confs, qopts...)
}

func (lp *logPoller) IndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure to backport these changes to the main repo

return lp.orm.SelectIndexedLogsByTxHash(eventSig, txHash, qopts...)
}

// GetBlocksRange tries to get the specified block numbers from the log pollers
// blocks table. It falls back to the RPC for any unfulfilled requested blocks.
func (lp *logPoller) GetBlocksRange(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]LogPollerBlock, error) {
Expand Down
33 changes: 33 additions & 0 deletions core/chains/evm/logpoller/mocks/log_poller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ func (o *ObservedLogPoller) LatestBlockByEventSigsAddrsWithConfs(fromBlock int64
})
}

func (o *ObservedLogPoller) IndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQueryAndResults(o, "IndexedLogsByTxHash", func() ([]Log, error) {
return o.LogPoller.IndexedLogsByTxHash(eventSig, txHash, qopts...)
})
}

func (o *ObservedLogPoller) IndexedLogs(eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQueryAndResults(o, "IndexedLogs", func() ([]Log, error) {
return o.LogPoller.IndexedLogs(eventSig, address, topicIndex, topicValues, confs, qopts...)
Expand Down
16 changes: 16 additions & 0 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,22 @@ func (o *ORM) SelectIndexedLogsByBlockRangeFilter(start, end int64, address comm
return logs, nil
}

func (o *ORM) SelectIndexedLogsByTxHash(eventSig common.Hash, txHash common.Hash, qopts ...pg.QOpt) ([]Log, error) {
q := o.q.WithOpts(qopts...)
var logs []Log
err := q.Select(&logs, `
SELECT * FROM evm.logs
WHERE evm.logs.evm_chain_id = $1
AND tx_hash = $2
AND event_sig = $3
ORDER BY (evm.logs.block_number, evm.logs.log_index)`,
utils.NewBig(o.chainID), txHash.Bytes(), eventSig.Bytes())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: Make sure query is properly hitting indexes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, please make sure it doesn't hurt db performance, I described how to create a simple test on a local machine and test it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we miss the index for tx_hash. Postgres will be able to performantly pick up data using index only for chain_id and event_sig. Then will have to perform filtering on the remaining part using tx_hash

Example on my local machine

 Sort  (cost=575.91..575.92 rows=1 width=297) (actual time=213.647..259.292 rows=100000 loops=1)
   Sort Key: (ROW(block_number, log_index))
   Sort Method: external merge  Disk: 31240kB
   ->  Index Scan using idx_evm_logs_ordered_by_block_and_created_at on logs  (cost=0.55..575.90 rows=1 width=297) (actual time=0.070..89.148 rows=100000 loops=1)
         Index Cond: ((evm_chain_id = '1'::numeric) AND (event_sig = '\xa32c5f1d88735034143171f18fbb4d447fbbe0fbf4c98733d54092a081ba5d2a'::bytea))
         Filter: (tx_hash = '\xbb0f07700587fee22c5863ed75a5e05b68fcac4763449a582987fa4e8e9a2d0f'::bytea)

if err != nil {
return nil, err
}
return logs, nil
}

func validateTopicIndex(index int) error {
// Only topicIndex 1 through 3 is valid. 0 is the event sig and only 4 total topics are allowed
if !(index == 1 || index == 2 || index == 3) {
Expand Down
4 changes: 4 additions & 0 deletions core/services/ocr2/plugins/ccip/abihelpers/abi_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ var EventSignatures struct {
FeeTokenAdded common.Hash
FeeTokenRemoved common.Hash

USDCMessageSent common.Hash

// offset || sourceChainID || seqNum || ...
SendRequestedSequenceNumberWord int
// offset || priceUpdatesOffset || minSeqNum || maxSeqNum || merkleRoot
Expand Down Expand Up @@ -128,6 +130,8 @@ func init() {
panic("missing event 'manuallyExecute'")
}
ExecutionReportArgs = manuallyExecuteMethod.Inputs[:1]

EventSignatures.USDCMessageSent = utils.Keccak256Fixed([]byte("MessageSent(bytes)"))
}

func MessagesFromExecutionReport(report types.Report) ([]evm_2_evm_offramp.InternalEVM2EVMMessage, error) {
Expand Down
32 changes: 32 additions & 0 deletions core/services/ocr2/plugins/ccip/config/config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
package config

import (
"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"

"github.com/smartcontractkit/chainlink/v2/core/utils"
)

// CommitPluginJobSpecConfig contains the plugin specific variables for the ccip.CCIPCommit plugin.
// We use ID here to keep it as general as possible, e.g. abstracting for chains which don't have an address concept.
type CommitPluginJobSpecConfig struct {
Expand All @@ -14,4 +21,29 @@ type CommitPluginJobSpecConfig struct {
// ExecutionPluginJobSpecConfig contains the plugin specific variables for the ccip.CCIPExecution plugin.
type ExecutionPluginJobSpecConfig struct {
SourceStartBlock, DestStartBlock int64 // Only for first time job add.
USDCConfig USDCConfig
}

type USDCConfig struct {
SourceTokenAddress common.Address
SourceMessageTransmitterAddress common.Address
AttestationAPI string
}

func (uc *USDCConfig) ValidateUSDCConfig() error {
if uc.AttestationAPI == "" && uc.SourceTokenAddress == utils.ZeroAddress && uc.SourceMessageTransmitterAddress == utils.ZeroAddress {
return nil
}

if uc.AttestationAPI == "" {
return errors.New("AttestationAPI is required")
}
if uc.SourceTokenAddress == utils.ZeroAddress {
return errors.New("SourceTokenAddress is required")
}
if uc.SourceMessageTransmitterAddress == utils.ZeroAddress {
return errors.New("SourceMessageTransmitterAddress is required")
}

return nil
}
66 changes: 59 additions & 7 deletions core/services/ocr2/plugins/ccip/execution_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"net/url"
"strconv"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
Expand All @@ -15,6 +16,7 @@ import (
libocr2 "github.com/smartcontractkit/libocr/offchainreporting2plus"

relaylogger "github.com/smartcontractkit/chainlink-relay/pkg/logger"

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/contractutil"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/hashlib"
Expand All @@ -34,6 +36,8 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/abihelpers"
ccipconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/config"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/observability"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata/usdc"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/promwrapper"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)
Expand Down Expand Up @@ -110,12 +114,19 @@ func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyCha
"sourceChain", ChainName(int64(chainId)),
"destChain", ChainName(destChainID))

sourceChainEventClient := ccipdata.NewLogPollerReader(sourceChain.LogPoller(), execLggr, sourceChain.Client())

tokenDataProviders, err := getTokenDataProviders(pluginConfig, offRampConfig.OnRamp, sourceChainEventClient)
if err != nil {
return nil, errors.Wrap(err, "could not get token data providers")
}

wrappedPluginFactory := NewExecutionReportingPluginFactory(
ExecutionPluginConfig{
lggr: execLggr,
sourceLP: sourceChain.LogPoller(),
destLP: destChain.LogPoller(),
sourceReader: ccipdata.NewLogPollerReader(sourceChain.LogPoller(), execLggr, sourceChain.Client()),
sourceReader: sourceChainEventClient,
destReader: ccipdata.NewLogPollerReader(destChain.LogPoller(), execLggr, destChain.Client()),
onRamp: onRamp,
offRamp: offRamp,
Expand All @@ -126,6 +137,7 @@ func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyCha
sourceClient: sourceChain.Client(),
destGasEstimator: destChain.GasEstimator(),
leafHasher: hashlib.NewLeafHasher(offRampConfig.SourceChainSelector, offRampConfig.ChainSelector, onRamp.Address(), hashlib.NewKeccakCtx()),
tokenDataProviders: tokenDataProviders,
})

err = wrappedPluginFactory.UpdateLogPollerFilters(utils.ZeroAddress, qopts...)
Expand Down Expand Up @@ -161,8 +173,41 @@ func NewExecutionServices(lggr logger.Logger, jb job.Job, chainSet evm.LegacyCha
return []job.ServiceCtx{job.NewServiceAdapter(oracle)}, nil
}

func getExecutionPluginSourceLpChainFilters(onRamp, priceRegistry common.Address) []logpoller.Filter {
return []logpoller.Filter{
func getTokenDataProviders(pluginConfig ccipconfig.ExecutionPluginJobSpecConfig, onRampAddress common.Address, sourceChainEventClient *ccipdata.LogPollerReader) (map[common.Address]tokendata.Reader, error) {
tokenDataProviders := make(map[common.Address]tokendata.Reader)

if pluginConfig.USDCConfig.AttestationAPI != "" {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit could invert this to avoid nesting i.e. if == "" early return. Maybe also want an info log indicating we're proceeding without USDC config to speed up debugging?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was imagining this function to do setup for all future token data providers, therefore having an if for each. With just usdc in mind it makes sense flipping

err := pluginConfig.USDCConfig.ValidateUSDCConfig()
if err != nil {
return nil, err
}

attestationURI, err2 := url.ParseRequestURI(pluginConfig.USDCConfig.AttestationAPI)
if err2 != nil {
return nil, errors.Wrap(err2, "failed to parse USDC attestation API")
}

tokenDataProviders[pluginConfig.USDCConfig.SourceTokenAddress] = tokendata.NewCachedReader(
usdc.NewUSDCTokenDataReader(
sourceChainEventClient,
pluginConfig.USDCConfig.SourceTokenAddress,
pluginConfig.USDCConfig.SourceMessageTransmitterAddress,
onRampAddress,
attestationURI,
),
)
}

return tokenDataProviders, nil
}

func getExecutionPluginSourceLpChainFilters(onRamp, priceRegistry common.Address, tokenDataProviders map[common.Address]tokendata.Reader) []logpoller.Filter {
var filters []logpoller.Filter
for _, provider := range tokenDataProviders {
filters = append(filters, provider.GetSourceLogPollerFilters()...)
}

return append(filters, []logpoller.Filter{
{
Name: logpoller.FilterName(EXEC_CCIP_SENDS, onRamp.String()),
EventSigs: []common.Hash{abihelpers.EventSignatures.SendRequested},
Expand All @@ -178,7 +223,7 @@ func getExecutionPluginSourceLpChainFilters(onRamp, priceRegistry common.Address
EventSigs: []common.Hash{abihelpers.EventSignatures.FeeTokenRemoved},
Addresses: []common.Address{priceRegistry},
},
}
}...)
}

func getExecutionPluginDestLpChainFilters(commitStore, offRamp, priceRegistry common.Address) []logpoller.Filter {
Expand Down Expand Up @@ -264,7 +309,7 @@ func UnregisterExecPluginLpFilters(ctx context.Context, spec *job.OCR2OracleSpec
return errors.Wrap(err, "failed loading onRamp")
}

return unregisterExecutionPluginLpFilters(ctx, sourceChain.LogPoller(), destChain.LogPoller(), offRamp, offRampConfig, sourceOnRamp, sourceChain.Client(), qopts...)
return unregisterExecutionPluginLpFilters(ctx, sourceChain.LogPoller(), destChain.LogPoller(), offRamp, offRampConfig, sourceOnRamp, sourceChain.Client(), pluginConfig, qopts...)
}

func unregisterExecutionPluginLpFilters(
Expand All @@ -275,6 +320,7 @@ func unregisterExecutionPluginLpFilters(
destOffRampConfig evm_2_evm_offramp.EVM2EVMOffRampStaticConfig,
sourceOnRamp evm_2_evm_onramp.EVM2EVMOnRampInterface,
sourceChainClient client.Client,
pluginConfig ccipconfig.ExecutionPluginJobSpecConfig,
qopts ...pg.QOpt) error {
destOffRampDynCfg, err := destOffRamp.GetDynamicConfig(&bind.CallOpts{Context: ctx})
if err != nil {
Expand All @@ -286,9 +332,15 @@ func unregisterExecutionPluginLpFilters(
return err
}

if err := logpollerutil.UnregisterLpFilters(
// SourceChainEventClient can be nil because it is not used in unregisterExecutionPluginLpFilters
tokenDataProviders, err := getTokenDataProviders(pluginConfig, destOffRampConfig.OnRamp, nil)
if err != nil {
return err
}

if err = logpollerutil.UnregisterLpFilters(
sourceLP,
getExecutionPluginSourceLpChainFilters(destOffRampConfig.OnRamp, onRampDynCfg.PriceRegistry),
getExecutionPluginSourceLpChainFilters(destOffRampConfig.OnRamp, onRampDynCfg.PriceRegistry, tokenDataProviders),
qopts...,
); err != nil {
return err
Expand Down
18 changes: 16 additions & 2 deletions core/services/ocr2/plugins/ccip/execution_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/evm_2_evm_offramp"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/ccip/generated/evm_2_evm_onramp"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/config"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/testhelpers"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/tokendata/usdc"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

func TestGetExecutionPluginFilterNamesFromSpec(t *testing.T) {
Expand Down Expand Up @@ -74,11 +77,20 @@ func TestGetExecutionPluginFilterNames(t *testing.T) {
mockOnRamp, onRampAddr := testhelpers.NewFakeOnRamp(t)
mockOnRamp.SetDynamicCfg(evm_2_evm_onramp.EVM2EVMOnRampDynamicConfig{PriceRegistry: srcPriceRegAddr})

pluginConfig := config.ExecutionPluginJobSpecConfig{
USDCConfig: config.USDCConfig{
SourceTokenAddress: utils.RandomAddress(),
SourceMessageTransmitterAddress: utils.RandomAddress(),
AttestationAPI: "http://localhost:8080",
},
}

srcLP := mocklp.NewLogPoller(t)
srcFilters := []string{
"Exec ccip sends - " + onRampAddr.String(),
"Fee token added - 0xdAFea492D9c6733aE3d56B7ed1ADb60692c98bC9",
"Fee token removed - 0xdAFea492D9c6733aE3d56B7ed1ADb60692c98bC9",
usdc.MESSAGE_SENT_FILTER_NAME + " - " + pluginConfig.USDCConfig.SourceMessageTransmitterAddress.Hex(),
}
for _, f := range srcFilters {
srcLP.On("UnregisterFilter", f, mock.Anything).Return(nil)
Expand All @@ -103,11 +115,13 @@ func TestGetExecutionPluginFilterNames(t *testing.T) {
dstLP,
mockOffRamp,
evm_2_evm_offramp.EVM2EVMOffRampStaticConfig{
CommitStore: commitStoreAddr,
OnRamp: onRampAddr,
CommitStore: commitStoreAddr,
OnRamp: onRampAddr,
SourceChainSelector: 5009297550715157269,
},
mockOnRamp,
nil,
pluginConfig,
)
assert.NoError(t, err)

Expand Down