Skip to content
This repository was archived by the owner on Mar 14, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions core/services/ocr2/plugins/ccip/commit_reporting_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,9 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin
lggr: rf.config.lggr.Named("CommitReportingPlugin"),
inflightReports: newInflightCommitReportsContainer(rf.config.commitStore.OffchainConfig().InflightCacheExpiry),
destPriceRegistryReader: rf.destPriceRegReader,
tokenDecimalsCache: cache.NewTokenToDecimals(
rf.config.lggr,
rf.config.destLP,
rf.config.offRamp,
rf.destPriceRegReader,
int64(rf.config.commitStore.OffchainConfig().DestFinalityDepth),
),
gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(),
offchainConfig: pluginOffChainConfig,
tokenDecimalsCache: cache.NewTokenToDecimals(rf.config.lggr, rf.config.destLP, rf.config.offRamp, rf.destPriceRegReader),
gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(),
offchainConfig: pluginOffChainConfig,
},
types.ReportingPluginInfo{
Name: "CCIPCommit",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,10 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPlugin(config types.Repor
}

offchainConfig := rf.config.offRampReader.OffchainConfig()
cachedSourceFeeTokens := cache.NewCachedFeeTokens(rf.config.sourceLP, rf.config.sourcePriceRegistry, int64(offchainConfig.SourceFinalityDepth))
cachedDestTokens := cache.NewCachedSupportedTokens(rf.config.destLP, rf.config.offRampReader, rf.destPriceRegReader, int64(offchainConfig.DestOptimisticConfirmations))
cachedSourceFeeTokens := cache.NewCachedFeeTokens(rf.config.sourceLP, rf.config.sourcePriceRegistry)
cachedDestTokens := cache.NewCachedSupportedTokens(rf.config.destLP, rf.config.offRampReader, rf.destPriceRegReader)

cachedTokenPools := cache.NewTokenPools(rf.config.lggr, rf.config.destLP, rf.config.offRampReader, int64(offchainConfig.DestOptimisticConfirmations), 5)
cachedTokenPools := cache.NewTokenPools(rf.config.lggr, rf.config.destLP, rf.config.offRampReader, 5)

return &ExecutionReportingPlugin{
config: rf.config,
Expand Down
84 changes: 62 additions & 22 deletions core/services/ocr2/plugins/ccip/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@ type AutoSync[T any] interface {
// If we discover that change occurred since last update, we perform RPC to the chain using ContractOrigin.CallOrigin function.
// Purpose of this struct is handle common logic in a single place, you only need to override methods from ContractOrigin
// and Get function (behaving as orchestrator) will take care of the rest.
// IMPORTANT: Cache refresh relies on the events that are finalized. This introduces some delay between the event onchain occurrence
// and cache refreshing. This is intentional, because we want to prevent handling reorgs within the cache.
//
// That being said, adding caching layer to the new contract is as simple as:
// * implementing ContractOrigin interface
// * registering proper events in log poller
type CachedChain[T any] struct {
// Static configuration
observedEvents []common.Hash
logPoller logpoller.LogPoller
address []common.Address
optimisticConfirmations int64
observedEvents []common.Hash
logPoller logpoller.LogPoller
address []common.Address

// Cache
lock *sync.RWMutex
Expand All @@ -53,23 +54,33 @@ type ContractOrigin[T any] interface {
func (c *CachedChain[T]) Get(ctx context.Context) (T, error) {
var empty T

lastChangeBlock := c.readLastChangeBlock()

cachedLastChangeBlock := c.readLastChangeBlock()
// Handles first call, because cache is not eagerly populated
if lastChangeBlock == 0 {
if cachedLastChangeBlock == 0 {
return c.initializeCache(ctx)
}

currentBlockNumber, err := c.logPoller.LatestBlockByEventSigsAddrsWithConfs(lastChangeBlock, c.observedEvents, c.address, logpoller.Confirmations(c.optimisticConfirmations), pg.WithParentCtx(ctx))

// Ordering matters here, we need to do operations in the following order:
// * get LatestBlock
// * get LatestBlockByEventSigsAddrsWithConfs
// * fetch data from Origin
// It's because LogPoller keep progressing in the background, and we want to prevent missing data.
// If we do it in the opposite order, we might store in cache block that after logs that
// were not scanned by LatestBlockByEventSigsAddrsWithConfs. And therefore ignore them and not update the cache.
// (this will ignore logs produced between LatestBlockByEventSigsAddrsWithConfs and LatestBlock calls).
// Calling LatestBlock first gives us guarantee that we never miss anything.
latestBlock, err := c.logPoller.LatestBlock(pg.WithParentCtx(ctx))
if err != nil {
return empty, err
// Intentionally ignore the error here
latestBlock = logpoller.LogPollerBlock{}
}

// In case of new updates, fetch fresh data from the origin
if currentBlockNumber > lastChangeBlock {
return c.fetchFromOrigin(ctx, currentBlockNumber)
if err1 := c.maybeRefreshCache(ctx, cachedLastChangeBlock); err1 != nil {
return empty, err1
}

// This is performance improvement that will prevent for large db scans, by updating the lower bound of the search query
c.maybeCacheLatestFinalizedBlock(cachedLastChangeBlock, latestBlock.FinalizedBlockNumber)
return c.copyCachedValue(), nil
}

Expand All @@ -91,22 +102,36 @@ func (c *CachedChain[T]) initializeCache(ctx context.Context) (T, error) {
return empty, err
}

c.updateCache(value, latestBlock.BlockNumber-c.optimisticConfirmations)
c.updateCache(value, latestBlock.FinalizedBlockNumber)
return c.copyCachedValue(), nil

}

// fetchFromOrigin fetches data from origin. This action is performed when logpoller.LogPoller says there were events
// emitted since last update.
func (c *CachedChain[T]) fetchFromOrigin(ctx context.Context, currentBlockNumber int64) (T, error) {
var empty T
value, err := c.origin.CallOrigin(ctx)
// maybeRefreshCache checks whether cache is fresh or needs to be updated.
// We fetch the last changed block from the log poller and compare that with the last change block stored within cache.
// If the last changed block is greater than the one stored within cache, we need to update the cache by fetching data from the origin.
func (c *CachedChain[T]) maybeRefreshCache(ctx context.Context, cachedLastChangeBlock int64) error {
chainLastChangeBlock, err := c.logPoller.LatestBlockByEventSigsAddrsWithConfs(
cachedLastChangeBlock,
c.observedEvents,
c.address,
logpoller.Finalized,
pg.WithParentCtx(ctx),
)
if err != nil {
return empty, err
return err
}
c.updateCache(value, currentBlockNumber)

return c.copyCachedValue(), nil
// In case of new updates, fetch fresh data from the origin
if chainLastChangeBlock > cachedLastChangeBlock {
// Return error when cache cannot be fetched, don't return stale values
value, err1 := c.origin.CallOrigin(ctx)
if err1 != nil {
return err1
}
c.updateCache(value, chainLastChangeBlock)
}
return nil
}

// updateCache performs updating two critical variables for cache to work properly:
Expand Down Expand Up @@ -136,3 +161,18 @@ func (c *CachedChain[T]) copyCachedValue() T {
defer c.lock.RUnlock()
return c.origin.Copy(c.value)
}

func (c *CachedChain[T]) maybeCacheLatestFinalizedBlock(cachedLastBlock int64, latestFinalizedBlock int64) {
// Check if applicable to prevent unnecessary locking
if cachedLastBlock >= latestFinalizedBlock {
return
}

c.lock.Lock()
defer c.lock.Unlock()
// Double-lock checking. No need to update if other goroutine was faster
if latestFinalizedBlock <= c.lastChangeBlock {
return
}
c.lastChangeBlock = latestFinalizedBlock
}
117 changes: 103 additions & 14 deletions core/services/ocr2/plugins/ccip/internal/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

Expand All @@ -22,7 +23,7 @@ const (

func TestGet_InitDataForTheFirstTime(t *testing.T) {
lp := lpMocks.NewLogPoller(t)
lp.On("LatestBlock", mock.Anything).Maybe().Return(logpoller.LogPollerBlock{BlockNumber: 100}, nil)
lp.On("LatestBlock", mock.Anything).Maybe().Return(logpoller.LogPollerBlock{BlockNumber: 100, FinalizedBlockNumber: 80}, nil)

contract := newCachedContract(lp, "", []string{"value1"}, 0)

Expand All @@ -32,15 +33,96 @@ func TestGet_InitDataForTheFirstTime(t *testing.T) {
}

func TestGet_ReturnDataFromCacheIfNoNewEvents(t *testing.T) {
latestBlock := int64(100)
lp := lpMocks.NewLogPoller(t)
mockLogPollerQuery(lp, latestBlock)
tests := []struct {
name string
lastFinalizedBlock int64
lastChangeBlock int64
lastBlockError error
expectedLastChangeBlock int64
}{
{
name: "last finalized block is 0",
lastFinalizedBlock: 10,
lastChangeBlock: 100,
expectedLastChangeBlock: 100,
},
{
name: "last finalized block is lower than last change block",
lastFinalizedBlock: 10,
lastChangeBlock: 100,
expectedLastChangeBlock: 100,
},
{
name: "last finalized block is higher than last change block",
lastFinalizedBlock: 200,
lastChangeBlock: 100,
expectedLastChangeBlock: 200,
},
{
name: "should ignore error when fetching latest block",
lastFinalizedBlock: 5000,
lastChangeBlock: 100,
lastBlockError: assert.AnError,
expectedLastChangeBlock: 100,
},
}

contract := newCachedContract(lp, cachedValue, []string{"value1"}, latestBlock)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
lp := lpMocks.NewLogPoller(t)
lp.On("LatestBlockByEventSigsAddrsWithConfs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tt.lastChangeBlock, nil)
lp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{FinalizedBlockNumber: tt.lastFinalizedBlock}, tt.lastBlockError)

value, err := contract.Get(testutils.Context(t))
require.NoError(t, err)
require.Equal(t, cachedValue, value)
contract := newCachedContract(lp, cachedValue, []string{"value1"}, tt.lastChangeBlock)

value, err := contract.Get(testutils.Context(t))
require.NoError(t, err)
assert.Equal(t, cachedValue, value)
assert.Equal(t, tt.expectedLastChangeBlock, contract.lastChangeBlock)
})
}
}

func TestGet_DifferentOriginBehaviour(t *testing.T) {
lp := lpMocks.NewLogPoller(t)
mockLogPollerQuery(lp, 100)

tests := []struct {
name string
originResponse func() (string, error)
wantErr bool
expectedValue string
}{
{
name: "origin returns error",
originResponse: func() (string, error) { return "", assert.AnError },
wantErr: true,
},
{
name: "origin returns value",
originResponse: func() (string, error) { return "success", nil },
expectedValue: "success",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
originValue, originErr := tt.originResponse()
cache := &CachedChain[string]{
logPoller: lp,
lock: &sync.RWMutex{},
lastChangeBlock: 1,
origin: &FakeContractOrigin{values: []string{originValue}, err: originErr},
}

value, err := cache.Get(testutils.Context(t))
if tt.wantErr {
assert.NotNil(t, err)
} else {
assert.Equal(t, tt.expectedValue, value)
}
})
}
}

func TestGet_CallOriginForNewEvents(t *testing.T) {
Expand Down Expand Up @@ -124,11 +206,7 @@ func TestGet_ConcurrentAccess(t *testing.T) {

func newCachedContract(lp logpoller.LogPoller, cacheValue string, originValue []string, lastChangeBlock int64) *CachedChain[string] {
return &CachedChain[string]{
observedEvents: []common.Hash{{}},
logPoller: lp,
address: []common.Address{{}},
optimisticConfirmations: 0,

logPoller: lp,
lock: &sync.RWMutex{},
value: cacheValue,
lastChangeBlock: lastChangeBlock,
Expand All @@ -137,7 +215,8 @@ func newCachedContract(lp logpoller.LogPoller, cacheValue string, originValue []
}

func mockLogPollerQuery(lp *lpMocks.LogPoller, latestBlock int64) *mock.Call {
return lp.On("LatestBlockByEventSigsAddrsWithConfs", mock.Anything, []common.Hash{{}}, []common.Address{{}}, logpoller.Confirmations(0), mock.Anything).
lp.On("LatestBlock", mock.Anything).Maybe().Return(logpoller.LogPollerBlock{}, nil)
return lp.On("LatestBlockByEventSigsAddrsWithConfs", mock.Anything, mock.Anything, mock.Anything, logpoller.Finalized, mock.Anything).
Maybe().Return(latestBlock, nil)
}

Expand All @@ -154,8 +233,15 @@ func (lp *ProgressingLogPoller) LatestBlockByEventSigsAddrsWithConfs(int64, []co
return lp.latestBlock, nil
}

func (lp *ProgressingLogPoller) LatestBlock(...pg.QOpt) (logpoller.LogPollerBlock, error) {
lp.lock.Lock()
defer lp.lock.Unlock()
return logpoller.LogPollerBlock{BlockNumber: lp.latestBlock}, nil
}

type FakeContractOrigin struct {
values []string
err error
counter int
lock sync.Mutex
}
Expand All @@ -166,6 +252,9 @@ func (f *FakeContractOrigin) CallOrigin(context.Context) (string, error) {
f.counter++
f.lock.Unlock()
}()
if f.err != nil {
return "", f.err
}
return f.values[f.counter], nil
}

Expand Down
16 changes: 7 additions & 9 deletions core/services/ocr2/plugins/ccip/internal/cache/tokenpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,16 @@ func NewTokenPools(
lggr logger.Logger,
lp logpoller.LogPoller,
offRamp ccipdata.OffRampReader,
optimisticConfirmations int64,
numWorkers int,
) *CachedChain[map[common.Address]common.Address] {
return &CachedChain[map[common.Address]common.Address]{
observedEvents: offRamp.TokenEvents(),
logPoller: lp,
address: []common.Address{offRamp.Address()},
optimisticConfirmations: optimisticConfirmations,
lock: &sync.RWMutex{},
value: make(map[common.Address]common.Address),
lastChangeBlock: 0,
origin: newTokenPoolsOrigin(lggr, offRamp, numWorkers),
observedEvents: offRamp.TokenEvents(),
logPoller: lp,
address: []common.Address{offRamp.Address()},
lock: &sync.RWMutex{},
value: make(map[common.Address]common.Address),
lastChangeBlock: 0,
origin: newTokenPoolsOrigin(lggr, offRamp, numWorkers),
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestNewTokenPools(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mockLp := mocks.NewLogPoller(t)
mockLp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: 100}, nil)
mockLp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: 100, FinalizedBlockNumber: 80}, nil)

offRamp := ccipdatamocks.NewOffRampReader(t)
offRamp.On("TokenEvents").Return([]common.Hash{})
Expand All @@ -94,7 +94,7 @@ func TestNewTokenPools(t *testing.T) {
}
offRamp.On("GetDestinationTokens", mock.Anything).Return(destTokens, nil)

c := NewTokenPools(logger.TestLogger(t), mockLp, offRamp, 0, 5)
c := NewTokenPools(logger.TestLogger(t), mockLp, offRamp, 5)

res, err := c.Get(ctx)
if tc.expErr {
Expand Down
Loading