diff --git a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go index 9de8a133a4..2c7aea1fe8 100644 --- a/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go +++ b/core/services/ocr2/plugins/ccip/commit_reporting_plugin.go @@ -159,15 +159,9 @@ func (rf *CommitReportingPluginFactory) NewReportingPlugin(config types.Reportin lggr: rf.config.lggr.Named("CommitReportingPlugin"), inflightReports: newInflightCommitReportsContainer(rf.config.commitStore.OffchainConfig().InflightCacheExpiry), destPriceRegistryReader: rf.destPriceRegReader, - tokenDecimalsCache: cache.NewTokenToDecimals( - rf.config.lggr, - rf.config.destLP, - rf.config.offRamp, - rf.destPriceRegReader, - int64(rf.config.commitStore.OffchainConfig().DestFinalityDepth), - ), - gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(), - offchainConfig: pluginOffChainConfig, + tokenDecimalsCache: cache.NewTokenToDecimals(rf.config.lggr, rf.config.destLP, rf.config.offRamp, rf.destPriceRegReader), + gasPriceEstimator: rf.config.commitStore.GasPriceEstimator(), + offchainConfig: pluginOffChainConfig, }, types.ReportingPluginInfo{ Name: "CCIPCommit", diff --git a/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go b/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go index 8a383f9f88..bf6107b14c 100644 --- a/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go +++ b/core/services/ocr2/plugins/ccip/execution_reporting_plugin.go @@ -138,10 +138,10 @@ func (rf *ExecutionReportingPluginFactory) NewReportingPlugin(config types.Repor } offchainConfig := rf.config.offRampReader.OffchainConfig() - cachedSourceFeeTokens := cache.NewCachedFeeTokens(rf.config.sourceLP, rf.config.sourcePriceRegistry, int64(offchainConfig.SourceFinalityDepth)) - cachedDestTokens := cache.NewCachedSupportedTokens(rf.config.destLP, rf.config.offRampReader, rf.destPriceRegReader, int64(offchainConfig.DestOptimisticConfirmations)) + cachedSourceFeeTokens := cache.NewCachedFeeTokens(rf.config.sourceLP, rf.config.sourcePriceRegistry) + cachedDestTokens := cache.NewCachedSupportedTokens(rf.config.destLP, rf.config.offRampReader, rf.destPriceRegReader) - cachedTokenPools := cache.NewTokenPools(rf.config.lggr, rf.config.destLP, rf.config.offRampReader, int64(offchainConfig.DestOptimisticConfirmations), 5) + cachedTokenPools := cache.NewTokenPools(rf.config.lggr, rf.config.destLP, rf.config.offRampReader, 5) return &ExecutionReportingPlugin{ config: rf.config, diff --git a/core/services/ocr2/plugins/ccip/internal/cache/cache.go b/core/services/ocr2/plugins/ccip/internal/cache/cache.go index 1363aa35b6..3705b19015 100644 --- a/core/services/ocr2/plugins/ccip/internal/cache/cache.go +++ b/core/services/ocr2/plugins/ccip/internal/cache/cache.go @@ -22,16 +22,17 @@ type AutoSync[T any] interface { // If we discover that change occurred since last update, we perform RPC to the chain using ContractOrigin.CallOrigin function. // Purpose of this struct is handle common logic in a single place, you only need to override methods from ContractOrigin // and Get function (behaving as orchestrator) will take care of the rest. +// IMPORTANT: Cache refresh relies on the events that are finalized. This introduces some delay between the event onchain occurrence +// and cache refreshing. This is intentional, because we want to prevent handling reorgs within the cache. // // That being said, adding caching layer to the new contract is as simple as: // * implementing ContractOrigin interface // * registering proper events in log poller type CachedChain[T any] struct { // Static configuration - observedEvents []common.Hash - logPoller logpoller.LogPoller - address []common.Address - optimisticConfirmations int64 + observedEvents []common.Hash + logPoller logpoller.LogPoller + address []common.Address // Cache lock *sync.RWMutex @@ -53,23 +54,33 @@ type ContractOrigin[T any] interface { func (c *CachedChain[T]) Get(ctx context.Context) (T, error) { var empty T - lastChangeBlock := c.readLastChangeBlock() - + cachedLastChangeBlock := c.readLastChangeBlock() // Handles first call, because cache is not eagerly populated - if lastChangeBlock == 0 { + if cachedLastChangeBlock == 0 { return c.initializeCache(ctx) } - currentBlockNumber, err := c.logPoller.LatestBlockByEventSigsAddrsWithConfs(lastChangeBlock, c.observedEvents, c.address, logpoller.Confirmations(c.optimisticConfirmations), pg.WithParentCtx(ctx)) - + // Ordering matters here, we need to do operations in the following order: + // * get LatestBlock + // * get LatestBlockByEventSigsAddrsWithConfs + // * fetch data from Origin + // It's because LogPoller keep progressing in the background, and we want to prevent missing data. + // If we do it in the opposite order, we might store in cache block that after logs that + // were not scanned by LatestBlockByEventSigsAddrsWithConfs. And therefore ignore them and not update the cache. + // (this will ignore logs produced between LatestBlockByEventSigsAddrsWithConfs and LatestBlock calls). + // Calling LatestBlock first gives us guarantee that we never miss anything. + latestBlock, err := c.logPoller.LatestBlock(pg.WithParentCtx(ctx)) if err != nil { - return empty, err + // Intentionally ignore the error here + latestBlock = logpoller.LogPollerBlock{} } - // In case of new updates, fetch fresh data from the origin - if currentBlockNumber > lastChangeBlock { - return c.fetchFromOrigin(ctx, currentBlockNumber) + if err1 := c.maybeRefreshCache(ctx, cachedLastChangeBlock); err1 != nil { + return empty, err1 } + + // This is performance improvement that will prevent for large db scans, by updating the lower bound of the search query + c.maybeCacheLatestFinalizedBlock(cachedLastChangeBlock, latestBlock.FinalizedBlockNumber) return c.copyCachedValue(), nil } @@ -91,22 +102,36 @@ func (c *CachedChain[T]) initializeCache(ctx context.Context) (T, error) { return empty, err } - c.updateCache(value, latestBlock.BlockNumber-c.optimisticConfirmations) + c.updateCache(value, latestBlock.FinalizedBlockNumber) return c.copyCachedValue(), nil } -// fetchFromOrigin fetches data from origin. This action is performed when logpoller.LogPoller says there were events -// emitted since last update. -func (c *CachedChain[T]) fetchFromOrigin(ctx context.Context, currentBlockNumber int64) (T, error) { - var empty T - value, err := c.origin.CallOrigin(ctx) +// maybeRefreshCache checks whether cache is fresh or needs to be updated. +// We fetch the last changed block from the log poller and compare that with the last change block stored within cache. +// If the last changed block is greater than the one stored within cache, we need to update the cache by fetching data from the origin. +func (c *CachedChain[T]) maybeRefreshCache(ctx context.Context, cachedLastChangeBlock int64) error { + chainLastChangeBlock, err := c.logPoller.LatestBlockByEventSigsAddrsWithConfs( + cachedLastChangeBlock, + c.observedEvents, + c.address, + logpoller.Finalized, + pg.WithParentCtx(ctx), + ) if err != nil { - return empty, err + return err } - c.updateCache(value, currentBlockNumber) - return c.copyCachedValue(), nil + // In case of new updates, fetch fresh data from the origin + if chainLastChangeBlock > cachedLastChangeBlock { + // Return error when cache cannot be fetched, don't return stale values + value, err1 := c.origin.CallOrigin(ctx) + if err1 != nil { + return err1 + } + c.updateCache(value, chainLastChangeBlock) + } + return nil } // updateCache performs updating two critical variables for cache to work properly: @@ -136,3 +161,18 @@ func (c *CachedChain[T]) copyCachedValue() T { defer c.lock.RUnlock() return c.origin.Copy(c.value) } + +func (c *CachedChain[T]) maybeCacheLatestFinalizedBlock(cachedLastBlock int64, latestFinalizedBlock int64) { + // Check if applicable to prevent unnecessary locking + if cachedLastBlock >= latestFinalizedBlock { + return + } + + c.lock.Lock() + defer c.lock.Unlock() + // Double-lock checking. No need to update if other goroutine was faster + if latestFinalizedBlock <= c.lastChangeBlock { + return + } + c.lastChangeBlock = latestFinalizedBlock +} diff --git a/core/services/ocr2/plugins/ccip/internal/cache/cache_test.go b/core/services/ocr2/plugins/ccip/internal/cache/cache_test.go index 7d52944adb..de9b21e9dc 100644 --- a/core/services/ocr2/plugins/ccip/internal/cache/cache_test.go +++ b/core/services/ocr2/plugins/ccip/internal/cache/cache_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -22,7 +23,7 @@ const ( func TestGet_InitDataForTheFirstTime(t *testing.T) { lp := lpMocks.NewLogPoller(t) - lp.On("LatestBlock", mock.Anything).Maybe().Return(logpoller.LogPollerBlock{BlockNumber: 100}, nil) + lp.On("LatestBlock", mock.Anything).Maybe().Return(logpoller.LogPollerBlock{BlockNumber: 100, FinalizedBlockNumber: 80}, nil) contract := newCachedContract(lp, "", []string{"value1"}, 0) @@ -32,15 +33,96 @@ func TestGet_InitDataForTheFirstTime(t *testing.T) { } func TestGet_ReturnDataFromCacheIfNoNewEvents(t *testing.T) { - latestBlock := int64(100) - lp := lpMocks.NewLogPoller(t) - mockLogPollerQuery(lp, latestBlock) + tests := []struct { + name string + lastFinalizedBlock int64 + lastChangeBlock int64 + lastBlockError error + expectedLastChangeBlock int64 + }{ + { + name: "last finalized block is 0", + lastFinalizedBlock: 10, + lastChangeBlock: 100, + expectedLastChangeBlock: 100, + }, + { + name: "last finalized block is lower than last change block", + lastFinalizedBlock: 10, + lastChangeBlock: 100, + expectedLastChangeBlock: 100, + }, + { + name: "last finalized block is higher than last change block", + lastFinalizedBlock: 200, + lastChangeBlock: 100, + expectedLastChangeBlock: 200, + }, + { + name: "should ignore error when fetching latest block", + lastFinalizedBlock: 5000, + lastChangeBlock: 100, + lastBlockError: assert.AnError, + expectedLastChangeBlock: 100, + }, + } - contract := newCachedContract(lp, cachedValue, []string{"value1"}, latestBlock) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + lp := lpMocks.NewLogPoller(t) + lp.On("LatestBlockByEventSigsAddrsWithConfs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tt.lastChangeBlock, nil) + lp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{FinalizedBlockNumber: tt.lastFinalizedBlock}, tt.lastBlockError) - value, err := contract.Get(testutils.Context(t)) - require.NoError(t, err) - require.Equal(t, cachedValue, value) + contract := newCachedContract(lp, cachedValue, []string{"value1"}, tt.lastChangeBlock) + + value, err := contract.Get(testutils.Context(t)) + require.NoError(t, err) + assert.Equal(t, cachedValue, value) + assert.Equal(t, tt.expectedLastChangeBlock, contract.lastChangeBlock) + }) + } +} + +func TestGet_DifferentOriginBehaviour(t *testing.T) { + lp := lpMocks.NewLogPoller(t) + mockLogPollerQuery(lp, 100) + + tests := []struct { + name string + originResponse func() (string, error) + wantErr bool + expectedValue string + }{ + { + name: "origin returns error", + originResponse: func() (string, error) { return "", assert.AnError }, + wantErr: true, + }, + { + name: "origin returns value", + originResponse: func() (string, error) { return "success", nil }, + expectedValue: "success", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + originValue, originErr := tt.originResponse() + cache := &CachedChain[string]{ + logPoller: lp, + lock: &sync.RWMutex{}, + lastChangeBlock: 1, + origin: &FakeContractOrigin{values: []string{originValue}, err: originErr}, + } + + value, err := cache.Get(testutils.Context(t)) + if tt.wantErr { + assert.NotNil(t, err) + } else { + assert.Equal(t, tt.expectedValue, value) + } + }) + } } func TestGet_CallOriginForNewEvents(t *testing.T) { @@ -124,11 +206,7 @@ func TestGet_ConcurrentAccess(t *testing.T) { func newCachedContract(lp logpoller.LogPoller, cacheValue string, originValue []string, lastChangeBlock int64) *CachedChain[string] { return &CachedChain[string]{ - observedEvents: []common.Hash{{}}, - logPoller: lp, - address: []common.Address{{}}, - optimisticConfirmations: 0, - + logPoller: lp, lock: &sync.RWMutex{}, value: cacheValue, lastChangeBlock: lastChangeBlock, @@ -137,7 +215,8 @@ func newCachedContract(lp logpoller.LogPoller, cacheValue string, originValue [] } func mockLogPollerQuery(lp *lpMocks.LogPoller, latestBlock int64) *mock.Call { - return lp.On("LatestBlockByEventSigsAddrsWithConfs", mock.Anything, []common.Hash{{}}, []common.Address{{}}, logpoller.Confirmations(0), mock.Anything). + lp.On("LatestBlock", mock.Anything).Maybe().Return(logpoller.LogPollerBlock{}, nil) + return lp.On("LatestBlockByEventSigsAddrsWithConfs", mock.Anything, mock.Anything, mock.Anything, logpoller.Finalized, mock.Anything). Maybe().Return(latestBlock, nil) } @@ -154,8 +233,15 @@ func (lp *ProgressingLogPoller) LatestBlockByEventSigsAddrsWithConfs(int64, []co return lp.latestBlock, nil } +func (lp *ProgressingLogPoller) LatestBlock(...pg.QOpt) (logpoller.LogPollerBlock, error) { + lp.lock.Lock() + defer lp.lock.Unlock() + return logpoller.LogPollerBlock{BlockNumber: lp.latestBlock}, nil +} + type FakeContractOrigin struct { values []string + err error counter int lock sync.Mutex } @@ -166,6 +252,9 @@ func (f *FakeContractOrigin) CallOrigin(context.Context) (string, error) { f.counter++ f.lock.Unlock() }() + if f.err != nil { + return "", f.err + } return f.values[f.counter], nil } diff --git a/core/services/ocr2/plugins/ccip/internal/cache/tokenpool.go b/core/services/ocr2/plugins/ccip/internal/cache/tokenpool.go index a9bf1dabea..7deec4ec87 100644 --- a/core/services/ocr2/plugins/ccip/internal/cache/tokenpool.go +++ b/core/services/ocr2/plugins/ccip/internal/cache/tokenpool.go @@ -17,18 +17,16 @@ func NewTokenPools( lggr logger.Logger, lp logpoller.LogPoller, offRamp ccipdata.OffRampReader, - optimisticConfirmations int64, numWorkers int, ) *CachedChain[map[common.Address]common.Address] { return &CachedChain[map[common.Address]common.Address]{ - observedEvents: offRamp.TokenEvents(), - logPoller: lp, - address: []common.Address{offRamp.Address()}, - optimisticConfirmations: optimisticConfirmations, - lock: &sync.RWMutex{}, - value: make(map[common.Address]common.Address), - lastChangeBlock: 0, - origin: newTokenPoolsOrigin(lggr, offRamp, numWorkers), + observedEvents: offRamp.TokenEvents(), + logPoller: lp, + address: []common.Address{offRamp.Address()}, + lock: &sync.RWMutex{}, + value: make(map[common.Address]common.Address), + lastChangeBlock: 0, + origin: newTokenPoolsOrigin(lggr, offRamp, numWorkers), } } diff --git a/core/services/ocr2/plugins/ccip/internal/cache/tokenpool_test.go b/core/services/ocr2/plugins/ccip/internal/cache/tokenpool_test.go index a89755e61c..4dbb529503 100644 --- a/core/services/ocr2/plugins/ccip/internal/cache/tokenpool_test.go +++ b/core/services/ocr2/plugins/ccip/internal/cache/tokenpool_test.go @@ -75,7 +75,7 @@ func TestNewTokenPools(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { mockLp := mocks.NewLogPoller(t) - mockLp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: 100}, nil) + mockLp.On("LatestBlock", mock.Anything).Return(logpoller.LogPollerBlock{BlockNumber: 100, FinalizedBlockNumber: 80}, nil) offRamp := ccipdatamocks.NewOffRampReader(t) offRamp.On("TokenEvents").Return([]common.Hash{}) @@ -94,7 +94,7 @@ func TestNewTokenPools(t *testing.T) { } offRamp.On("GetDestinationTokens", mock.Anything).Return(destTokens, nil) - c := NewTokenPools(logger.TestLogger(t), mockLp, offRamp, 0, 5) + c := NewTokenPools(logger.TestLogger(t), mockLp, offRamp, 5) res, err := c.Get(ctx) if tc.expErr { diff --git a/core/services/ocr2/plugins/ccip/internal/cache/tokens.go b/core/services/ocr2/plugins/ccip/internal/cache/tokens.go index 15025a67f0..359efd9ad4 100644 --- a/core/services/ocr2/plugins/ccip/internal/cache/tokens.go +++ b/core/services/ocr2/plugins/ccip/internal/cache/tokens.go @@ -17,17 +17,15 @@ import ( func NewCachedFeeTokens( lp logpoller.LogPoller, priceRegistry ccipdata.PriceRegistryReader, - optimisticConfirmations int64, ) *CachedChain[[]common.Address] { return &CachedChain[[]common.Address]{ - observedEvents: priceRegistry.FeeTokenEvents(), - logPoller: lp, - address: []common.Address{priceRegistry.Address()}, - optimisticConfirmations: optimisticConfirmations, - lock: &sync.RWMutex{}, - value: []common.Address{}, - lastChangeBlock: 0, - origin: &feeTokensOrigin{priceRegistry: priceRegistry}, + observedEvents: priceRegistry.FeeTokenEvents(), + logPoller: lp, + address: []common.Address{priceRegistry.Address()}, + lock: &sync.RWMutex{}, + value: []common.Address{}, + lastChangeBlock: 0, + origin: &feeTokensOrigin{priceRegistry: priceRegistry}, } } @@ -42,16 +40,14 @@ func NewCachedSupportedTokens( lp logpoller.LogPoller, offRamp ccipdata.OffRampReader, priceRegistry ccipdata.PriceRegistryReader, - optimisticConfirmations int64, ) *CachedChain[CachedTokens] { return &CachedChain[CachedTokens]{ - observedEvents: append(priceRegistry.FeeTokenEvents(), offRamp.TokenEvents()...), - logPoller: lp, - address: []common.Address{priceRegistry.Address(), offRamp.Address()}, - optimisticConfirmations: optimisticConfirmations, - lock: &sync.RWMutex{}, - value: CachedTokens{}, - lastChangeBlock: 0, + observedEvents: append(priceRegistry.FeeTokenEvents(), offRamp.TokenEvents()...), + logPoller: lp, + address: []common.Address{priceRegistry.Address(), offRamp.Address()}, + lock: &sync.RWMutex{}, + value: CachedTokens{}, + lastChangeBlock: 0, origin: &feeAndSupportedTokensOrigin{ feeTokensOrigin: feeTokensOrigin{priceRegistry: priceRegistry}, supportedTokensOrigin: supportedTokensOrigin{offRamp: offRamp}}, @@ -63,16 +59,14 @@ func NewTokenToDecimals( lp logpoller.LogPoller, offRamp ccipdata.OffRampReader, priceRegistryReader ccipdata.PriceRegistryReader, - optimisticConfirmations int64, ) *CachedChain[map[common.Address]uint8] { return &CachedChain[map[common.Address]uint8]{ - observedEvents: append(priceRegistryReader.FeeTokenEvents(), offRamp.TokenEvents()...), - logPoller: lp, - address: []common.Address{priceRegistryReader.Address(), offRamp.Address()}, - optimisticConfirmations: optimisticConfirmations, - lock: &sync.RWMutex{}, - value: make(map[common.Address]uint8), - lastChangeBlock: 0, + observedEvents: append(priceRegistryReader.FeeTokenEvents(), offRamp.TokenEvents()...), + logPoller: lp, + address: []common.Address{priceRegistryReader.Address(), offRamp.Address()}, + lock: &sync.RWMutex{}, + value: make(map[common.Address]uint8), + lastChangeBlock: 0, origin: &tokenToDecimals{ lggr: lggr, priceRegistryReader: priceRegistryReader,