Skip to content
This repository was archived by the owner on Mar 14, 2025. It is now read-only.
10 changes: 3 additions & 7 deletions core/services/ocr2/plugins/ccip/ccipcommit/ocr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-common/pkg/utils/mathutil"

"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/cciptypes"

"github.com/smartcontractkit/chainlink/v2/core/logger"
Expand Down Expand Up @@ -98,13 +99,8 @@ func (r *CommitReportingPlugin) Query(context.Context, types.ReportTimestamp) (t
// the observation will be considered invalid and rejected.
func (r *CommitReportingPlugin) Observation(ctx context.Context, epochAndRound types.ReportTimestamp, _ types.Query) (types.Observation, error) {
lggr := r.lggr.Named("CommitObservation")
// If the commit store is down the protocol should halt.
down, err := r.commitStoreReader.IsDown(ctx)
if err != nil {
return nil, errors.Wrap(err, "isDown check errored")
}
if down {
return nil, ccip.ErrCommitStoreIsDown
if err := ccipcommon.VerifyNotDown(ctx, r.lggr, r.commitStoreReader, r.onRampReader); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified. One check, instead of two (bool + error)

return nil, err
}
r.inflightReports.expire(lggr)

Expand Down
35 changes: 22 additions & 13 deletions core/services/ocr2/plugins/ccip/ccipcommit/ocr2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,15 @@ func TestCommitReportingPlugin_Observation(t *testing.T) {
someTokenAddr := ccipcalc.HexToAddress("2000")

testCases := []struct {
name string
epochAndRound types.ReportTimestamp
commitStoreIsPaused bool
commitStoreSeqNum uint64
tokenPrices map[cciptypes.Address]*big.Int
sendReqs []cciptypes.EVM2EVMMessageWithTxMeta
tokenDecimals map[cciptypes.Address]uint8
fee *big.Int
name string
epochAndRound types.ReportTimestamp
commitStorePaused bool
sourceChainCursed bool
commitStoreSeqNum uint64
tokenPrices map[cciptypes.Address]*big.Int
sendReqs []cciptypes.EVM2EVMMessageWithTxMeta
tokenDecimals map[cciptypes.Address]uint8
fee *big.Int

expErr bool
expObs ccip.CommitObservation
Expand Down Expand Up @@ -90,22 +91,30 @@ func TestCommitReportingPlugin_Observation(t *testing.T) {
},
},
{
name: "commit store is down",
commitStoreIsPaused: true,
expErr: true,
name: "commit store is down",
commitStorePaused: true,
sourceChainCursed: false,
expErr: true,
},
{
name: "source chain is cursed",
commitStorePaused: false,
sourceChainCursed: true,
expErr: true,
},
}

ctx := testutils.Context(t)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
commitStoreReader := ccipdatamocks.NewCommitStoreReader(t)
commitStoreReader.On("IsDown", ctx).Return(tc.commitStoreIsPaused, nil)
if !tc.commitStoreIsPaused {
commitStoreReader.On("IsDown", ctx).Return(tc.commitStorePaused, nil)
if !tc.commitStorePaused && !tc.sourceChainCursed {
commitStoreReader.On("GetExpectedNextSequenceNumber", ctx).Return(tc.commitStoreSeqNum, nil)
}

onRampReader := ccipdatamocks.NewOnRampReader(t)
onRampReader.On("IsSourceCursed", ctx).Return(tc.sourceChainCursed, nil)
if len(tc.sendReqs) > 0 {
onRampReader.On("GetSendRequestsBetweenSeqNums", ctx, tc.commitStoreSeqNum, tc.commitStoreSeqNum+OnRampMessagesScanLimit, true).
Return(tc.sendReqs, nil)
Expand Down
8 changes: 2 additions & 6 deletions core/services/ocr2/plugins/ccip/ccipexec/ocr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,8 @@ func (r *ExecutionReportingPlugin) Query(context.Context, types.ReportTimestamp)

func (r *ExecutionReportingPlugin) Observation(ctx context.Context, timestamp types.ReportTimestamp, query types.Query) (types.Observation, error) {
lggr := r.lggr.Named("ExecutionObservation")
down, err := r.commitStoreReader.IsDown(ctx)
if err != nil {
return nil, errors.Wrap(err, "isDown check errored")
}
if down {
return nil, ccip.ErrCommitStoreIsDown
if err := ccipcommon.VerifyNotDown(ctx, r.lggr, r.commitStoreReader, r.onRampReader); err != nil {
return nil, err
}
// Expire any inflight reports.
r.inflightReports.expire(lggr)
Expand Down
10 changes: 10 additions & 0 deletions core/services/ocr2/plugins/ccip/ccipexec/ocr2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestExecutionReportingPlugin_Observation(t *testing.T) {
testCases := []struct {
name string
commitStorePaused bool
sourceChainCursed bool
inflightReports []InflightInternalExecutionReport
unexpiredReports []cciptypes.CommitStoreReportWithTxMeta
sendRequests []cciptypes.EVM2EVMMessageWithTxMeta
Expand All @@ -61,11 +62,19 @@ func TestExecutionReportingPlugin_Observation(t *testing.T) {
{
name: "commit store is down",
commitStorePaused: true,
sourceChainCursed: false,
expErr: true,
},
{
name: "source chain is cursed",
commitStorePaused: false,
sourceChainCursed: true,
expErr: true,
},
{
name: "happy flow",
commitStorePaused: false,
sourceChainCursed: false,
inflightReports: []InflightInternalExecutionReport{},
unexpiredReports: []cciptypes.CommitStoreReportWithTxMeta{
{
Expand Down Expand Up @@ -149,6 +158,7 @@ func TestExecutionReportingPlugin_Observation(t *testing.T) {
p.offRampReader = mockOffRampReader

mockOnRampReader := ccipdatamocks.NewOnRampReader(t)
mockOnRampReader.On("IsSourceCursed", ctx).Return(tc.sourceChainCursed, nil).Maybe()
mockOnRampReader.On("GetSendRequestsBetweenSeqNums", ctx, mock.Anything, mock.Anything, false).
Return(tc.sendRequests, nil).Maybe()
sourcePriceRegistryAddress := cciptypes.Address(utils.RandomAddress().String())
Expand Down
4 changes: 3 additions & 1 deletion core/services/ocr2/plugins/ccip/cciptypes/onramp.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ type OnRampReader interface {
// If some requests do not exist in the provided sequence numbers range they will not be part of the response.
// It's the responsibility of the caller to validate whether all the requests exist or not.
GetSendRequestsBetweenSeqNums(ctx context.Context, seqNumMin, seqNumMax uint64, finalized bool) ([]EVM2EVMMessageWithTxMeta, error)

// IsSourceCursed returns true if the source chain is cursed. OnRamp communicates with the underlying RMN
// to verify if source chain was cursed or not.
IsSourceCursed(ctx context.Context) (bool, error)
// RouterAddress returns the router address that is configured on the onRamp
RouterAddress() (Address, error)

Expand Down
38 changes: 38 additions & 0 deletions core/services/ocr2/plugins/ccip/internal/cache/once.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package cache

import (
"context"
"sync"
)

type OnceCtxFunction[T any] func(ctx context.Context) (T, error)

// CallOnceOnNoError returns a new function that wraps the given function f with caching capabilities.
// If f returns an error, the result is not cached, allowing f to be retried on subsequent calls.
// Use case for that is to avoid caching an error forever in case of transient errors (e.g. flaky RPC)
func CallOnceOnNoError[T any](f OnceCtxFunction[T]) OnceCtxFunction[T] {
var (
mu sync.Mutex
value T
err error
called bool
)

return func(ctx context.Context) (T, error) {
mu.Lock()
defer mu.Unlock()

// If the function has been called successfully before, return the cached result.
if called && err == nil {
return value, nil
}

// Call the function and cache the result only if there is no error.
value, err = f(ctx)
if err == nil {
called = true
}

return value, err
}
}
83 changes: 83 additions & 0 deletions core/services/ocr2/plugins/ccip/internal/cache/once_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package cache

import (
"context"
"errors"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
)

// TestCallOnceOnNoErrorCachingSuccess tests caching behavior when the function succeeds.
func TestCallOnceOnNoErrorCachingSuccess(t *testing.T) {
callCount := 0
testFunc := func(ctx context.Context) (string, error) {
callCount++
return "test result", nil
}

cachedFunc := CallOnceOnNoError(testFunc)

// Call the function twice.
_, err := cachedFunc(tests.Context(t))
assert.NoError(t, err, "Expected no error on the first call")

_, err = cachedFunc(tests.Context(t))
assert.NoError(t, err, "Expected no error on the second call")

assert.Equal(t, 1, callCount, "Function should be called exactly once")
}

// TestCallOnceOnNoErrorCachingError tests that the function is retried after an error.
func TestCallOnceOnNoErrorCachingError(t *testing.T) {
callCount := 0
testFunc := func(ctx context.Context) (string, error) {
callCount++
if callCount == 1 {
return "", errors.New("test error")
}
return "test result", nil
}

cachedFunc := CallOnceOnNoError(testFunc)

// First call should fail.
_, err := cachedFunc(tests.Context(t))
require.Error(t, err, "Expected an error on the first call")

// Second call should succeed.
r, err := cachedFunc(tests.Context(t))
assert.NoError(t, err, "Expected no error on the second call")
assert.Equal(t, "test result", r)
assert.Equal(t, 2, callCount, "Function should be called exactly twice")
}

// TestCallOnceOnNoErrorCachingConcurrency tests that the function works correctly under concurrent access.
func TestCallOnceOnNoErrorCachingConcurrency(t *testing.T) {
var wg sync.WaitGroup
callCount := 0
testFunc := func(ctx context.Context) (string, error) {
callCount++
return "test result", nil
}

cachedFunc := CallOnceOnNoError(testFunc)

// Simulate concurrent calls.
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err := cachedFunc(tests.Context(t))
assert.NoError(t, err, "Expected no error in concurrent execution")
}()
}

wg.Wait()

assert.Equal(t, 1, callCount, "Function should be called exactly once despite concurrent calls")
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ import (
"encoding/hex"
"fmt"

"github.com/pkg/errors"
"golang.org/x/sync/errgroup"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/cciptypes"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata"
)
Expand Down Expand Up @@ -73,3 +76,41 @@ func FlattenUniqueSlice[T comparable](slices ...[]T) []T {
}
return flattened
}

// VerifyNotDown returns error if the commitStore is down (paused or destination cursed) or if the source chain is cursed
// Both RPCs are called in parallel to save some time. These calls cannot be batched because they target different chains.
func VerifyNotDown(ctx context.Context, lggr logger.Logger, commitStore ccipdata.CommitStoreReader, onRamp ccipdata.OnRampReader) error {
var (
eg = new(errgroup.Group)
isDown bool
isCursed bool
)

eg.Go(func() error {
var err error
isDown, err = commitStore.IsDown(ctx)
if err != nil {
return errors.Wrap(err, "commitStore isDown check errored")
}
return nil
})

eg.Go(func() error {
var err error
isCursed, err = onRamp.IsSourceCursed(ctx)
if err != nil {
return errors.Wrap(err, "onRamp isSourceCursed errored")
}
return nil
})

if err := eg.Wait(); err != nil {
return err
}

if isDown || isCursed {
lggr.Errorf("Source chain is cursed or CommitStore is down", "isDown", isDown, "isCursed", isCursed)
return ccip.ErrChainPausedOrCursed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we care to be more specific here in this error or is the above log enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, good question, maybe having distinction between source and dest curse could be useful. Do you think that separate errors per source/dest would be enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other hand, to be very clear it would require three different errors

  • CommitStoreDown
  • SourceCursed
  • Both
    Maybe let's keep it simple and rely on an error log for now

}
return nil
}
Loading