Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions core/capabilities/remote/executable/request/client_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
caperrors "github.com/smartcontractkit/chainlink-common/pkg/capabilities/errors"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-protos/workflows/go/events"
Expand All @@ -27,6 +28,32 @@ import (
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)

// errRemoteCapabilityExecuteError preserves the legacy "TRANSPORT : ErrorMsg" string from the
// remote executable client while wrapping a deserialized caperrors.Error so callers can
// errors.As into caperrors.Error after RPC (see capability_executor metrics).
type errRemoteCapabilityExecuteError struct {
s string
wrap caperrors.Error
}

func (e *errRemoteCapabilityExecuteError) Error() string { return e.s }

func (e *errRemoteCapabilityExecuteError) Unwrap() error { return e.wrap }

func newRemoteCapabilityExecuteError(transportErr types.Error, errMsg string) error {
return &errRemoteCapabilityExecuteError{
s: fmt.Sprintf("%s : %s", transportErr, errMsg),
wrap: caperrors.DeserializeErrorFromString(errMsg),
}
}

func newRemoteCapabilityExecuteErrorWithMessage(display string, errMsg string) error {
return &errRemoteCapabilityExecuteError{
s: display,
wrap: caperrors.DeserializeErrorFromString(errMsg),
}
}

type clientResponse struct {
Result []byte
Err error
Expand Down Expand Up @@ -351,9 +378,12 @@ func (c *ClientRequest) OnMessage(_ context.Context, msg *types.MessageBody) err
}

if c.errorCount[msg.ErrorMsg] == c.requiredIdenticalResponses {
c.sendResponse(clientResponse{Err: fmt.Errorf("%s : %s", msg.Error, msg.ErrorMsg)})
c.sendResponse(clientResponse{Err: newRemoteCapabilityExecuteError(msg.Error, msg.ErrorMsg)})
} else if c.totalErrorCount == c.remoteNodeCount-c.requiredIdenticalResponses+1 {
c.sendResponse(clientResponse{Err: fmt.Errorf("received %d errors, last error %s : %s", c.totalErrorCount, msg.Error, msg.ErrorMsg)})
c.sendResponse(clientResponse{Err: newRemoteCapabilityExecuteErrorWithMessage(
fmt.Sprintf("received %d errors, last error %s : %s", c.totalErrorCount, msg.Error, msg.ErrorMsg),
msg.ErrorMsg,
)})
}
}
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/beholder/beholdertest"
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
caperrors "github.com/smartcontractkit/chainlink-common/pkg/capabilities/errors"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-protos/cre/go/values"
"github.com/smartcontractkit/chainlink-protos/workflows/go/events"
Expand Down Expand Up @@ -229,6 +230,58 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {
response := <-req.ResponseChan()

assert.Equal(t, fmt.Sprintf("%s : %s", types.Error_INTERNAL_ERROR, assert.AnError.Error()), response.Err.Error())

var capErr caperrors.Error
require.ErrorAs(t, response.Err, &capErr)
assert.Equal(t, caperrors.OriginSystem, capErr.Origin(), "non-serialized ErrorMsg falls back to private system capability error")
assert.Equal(t, caperrors.VisibilityPrivate, capErr.Visibility())
assert.Equal(t, caperrors.Unknown, capErr.Code())
})

t.Run("Error response with serialized caperrors unwraps correctly as usererror", func(t *testing.T) {
ctx := t.Context()
capabilityPeers, capDonInfo, capInfo := capabilityDon(t, 4, 1)

dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)}
req, err := request.NewClientExecuteRequest(ctx, logger.Test(t), capabilityRequest, capInfo,
workflowDonInfo, dispatcher, 10*time.Minute, nil, "")
require.NoError(t, err)
defer req.Cancel(errors.New("test end"))

<-dispatcher.msgs
<-dispatcher.msgs
assert.Empty(t, dispatcher.msgs)

serialized := caperrors.NewPublicUserError(errors.New("rpc error: EVM error invalid argument"), caperrors.FailedPrecondition).SerializeToRemoteString()
msgWithError := &types.MessageBody{
CapabilityId: capInfo.ID,
CapabilityDonId: capDonInfo.ID,
CallerDonId: workflowDonInfo.ID,
Method: types.MethodExecute,
Payload: rawResponse,
MessageId: []byte("messageID"),
Error: types.Error_INTERNAL_ERROR,
ErrorMsg: serialized,
}

msgWithError.Sender = capabilityPeers[0][:]
err = req.OnMessage(ctx, msgWithError)
require.NoError(t, err)

msgWithError.Sender = capabilityPeers[1][:]
err = req.OnMessage(ctx, msgWithError)
require.NoError(t, err)

response := <-req.ResponseChan()

wantDisplay := fmt.Sprintf("%s : %s", types.Error_INTERNAL_ERROR, serialized)
assert.Equal(t, wantDisplay, response.Err.Error(), "It should be equal to 'Public:User:FailedPrecondition:rpc error: EVM error invalid argument'")

var capErr caperrors.Error
require.ErrorAs(t, response.Err, &capErr)
assert.Equal(t, caperrors.OriginUser, capErr.Origin())
assert.Equal(t, caperrors.VisibilityPublic, capErr.Visibility())
assert.Equal(t, caperrors.FailedPrecondition, capErr.Code())
})

t.Run("Send three messages with different errors", func(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion core/services/workflows/v2/capability_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ func (c *ExecutionHelper) callCapability(ctx context.Context, request *sdkpb.Cap

execLogger.Debugw("Capability execution failed", "err", err)
_ = events.EmitCapabilityFinishedEvent(ctx, loggerLabels, c.WorkflowExecutionID, request.Id, meteringRef, store.StatusErrored, request.Method, err)
c.metrics.With(platform.KeyCapabilityID, request.Id, platform.KeyCapabilityErrorCode, caperrors.Unknown.String()).IncrementCapabilityFailureCounter(ctx)
// TODO shouldn't all capabilities *always* return a typed error, and if so shouldn't the following metric alert us there's a bug we need to fix?
c.metrics.With(platform.KeyCapabilityID, request.Id, platform.KeyCapabilityErrorCode, "BUG").IncrementCapabilityFailureCounter(ctx)
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metric label value "BUG" for capabilityErrorCode deviates from the established caperrors code set and will change metric series unexpectedly. Consider keeping the label within caperrors (e.g., Unknown) and separately logging/recording that the returned error did not implement caperrors.Error (or add a dedicated metric for untyped errors).

Suggested change
c.metrics.With(platform.KeyCapabilityID, request.Id, platform.KeyCapabilityErrorCode, "BUG").IncrementCapabilityFailureCounter(ctx)
c.metrics.With(platform.KeyCapabilityID, request.Id, platform.KeyCapabilityErrorCode, "Unknown").IncrementCapabilityFailureCounter(ctx)

Copilot uses AI. Check for mistakes.
c.metrics.IncrementTotalWorkflowStepErrorsCounter(ctx)
return nil, fmt.Errorf("failed to execute capability: %w", err)
}
Expand Down
Loading