Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/capabilities/remote/executable/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,10 @@ func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest
return commoncap.CapabilityResponse{}, fmt.Errorf("failed to unmarshal capability response: %w", err)
}

if c.cfg.Load() != nil {
capabilityResponse.Metadata.CapDON_N = uint32(len(c.cfg.Load().localDONInfo.Members)) //nolint:gosec // G115
}

return capabilityResponse, nil
}

Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ require (
github.com/smartcontractkit/chainlink-protos/rmn/v1.6/go v0.0.0-20250131130834-15e0d4cde2a6 // indirect
github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 // indirect
github.com/smartcontractkit/chainlink-protos/svr v1.1.0 // indirect
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20250912140847-cbfb1710ac76 // indirect
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20251006153136-770c21cddddc // indirect
github.com/smartcontractkit/chainlink-solana v1.1.2-0.20251007010318-c9a7b2d44524 // indirect
github.com/smartcontractkit/chainlink-sui v0.0.0-20250916193659-4becc28a467f // indirect
github.com/smartcontractkit/chainlink-testing-framework/framework/components/fake v0.10.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1639,8 +1639,8 @@ github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 h1:B7itmjy+C
github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0/go.mod h1:h6kqaGajbNRrezm56zhx03p0mVmmA2xxj7E/M4ytLUA=
github.com/smartcontractkit/chainlink-protos/svr v1.1.0 h1:79Z9N9dMbMVRGaLoDPAQ+vOwbM+Hnx8tIN2xCPG8H4o=
github.com/smartcontractkit/chainlink-protos/svr v1.1.0/go.mod h1:TcOliTQU6r59DwG4lo3U+mFM9WWyBHGuFkkxQpvSujo=
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20250912140847-cbfb1710ac76 h1:IVaB9Nbaqteno+kW64J45q0yS8xJjnzLWO1P7kibolk=
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20250912140847-cbfb1710ac76/go.mod h1:HIpGvF6nKCdtZ30xhdkKWGM9+4Z4CVqJH8ZBL1FTEiY=
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20251006153136-770c21cddddc h1:lfHeaWtYLiCID91RQNBj/TYlFN1Rsz7yNe17Gx/Uoy8=
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20251006153136-770c21cddddc/go.mod h1:HIpGvF6nKCdtZ30xhdkKWGM9+4Z4CVqJH8ZBL1FTEiY=
github.com/smartcontractkit/chainlink-solana v1.1.2-0.20251007010318-c9a7b2d44524 h1:QgjF+S64bGDyaNcz11zDg7GC7FwNmYrsHN6jiJPRVkk=
github.com/smartcontractkit/chainlink-solana v1.1.2-0.20251007010318-c9a7b2d44524/go.mod h1:vcms/UPnfg7LZ2txinn59yJR6rXZ31XOk5++03LOeys=
github.com/smartcontractkit/chainlink-sui v0.0.0-20250916193659-4becc28a467f h1:7saUNbu+edzDgRPedNFfTsx5+5RL40r1r0pgISoh8Hs=
Expand Down
2 changes: 1 addition & 1 deletion core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) {
}

if meteringOK {
err := meteringReport.Settle(stepState.Ref, response.Metadata.Metering)
err := meteringReport.Settle(stepState.Ref, response.Metadata)
if err != nil {
l.Error(fmt.Sprintf("failed to set metering report step for ref %s: %s", stepState.Ref, err))
}
Expand Down
42 changes: 38 additions & 4 deletions core/services/workflows/metering/metering.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type ProtoDetail struct {
type ReportStep struct {
// The ID of the capability being used in this step
CapabilityID string
// CapDONN is the total number of nodes in a capability DON.
CapdonN uint32
// The maximum amount of universal credits that should be used in this step
Deduction decimal.Decimal
// The actual resource spend that each node used for this step
Expand Down Expand Up @@ -367,7 +369,7 @@ func (r *Report) Deduct(ref string, opt DeductOpt) ([]capabilities.SpendLimit, e
// by returning earmarked local balance to the available to use pool and adding the spend to the metering report.
// The Deduct method must be called before Settle.
// We expect to only set this value once - an error is returned if a step would be overwritten.
func (r *Report) Settle(ref string, spendsByNode []capabilities.MeteringNodeDetail) error {
func (r *Report) Settle(ref string, metadata capabilities.ResponseMetadata) error {
r.mu.Lock()
defer r.mu.Unlock()

Expand All @@ -388,7 +390,7 @@ func (r *Report) Settle(ref string, spendsByNode []capabilities.MeteringNodeDeta
resourceSpends := make(map[string][]ReportStepDetail)

// Group by resource dimension
for _, nodeDetail := range spendsByNode {
for _, nodeDetail := range metadata.Metering {
resourceSpends[nodeDetail.SpendUnit] = append(resourceSpends[nodeDetail.SpendUnit], ReportStepDetail{
Peer2PeerID: nodeDetail.Peer2PeerID,
SpendValue: nodeDetail.SpendValue,
Expand Down Expand Up @@ -424,6 +426,10 @@ func (r *Report) Settle(ref string, spendsByNode []capabilities.MeteringNodeDeta
}

deciVals = append(deciVals, value)

if isGasSpendType(unit) && len(deciVals) > 1 {
r.switchToMeteringMode(fmt.Errorf("multiple executions for single execution unit [%s]: %w", unit, err))
}
}

// TODO: explicitly ignore RPC_EVM spend types for now -
Expand All @@ -435,7 +441,20 @@ func (r *Report) Settle(ref string, spendsByNode []capabilities.MeteringNodeDeta
}

aggregated.SpendValue = medianSpend(deciVals)
bal, err := r.balance.ConvertToBalance(unit, aggregated.SpendValue)
value := aggregated.SpendValue

// if N is not set, assume 1
if metadata.CapDON_N == 0 {
metadata.CapDON_N = 1
}

// TODO: indicate in the registry config that a capability is single execution or not
// https://smartcontract-it.atlassian.net/browse/CRE-1037
if !isGasSpendType(unit) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/nit add ticket num where we handle this more flexibly by injecting whether or not its a single execution cap into the wf registry config?

value = value.Mul(decimal.NewFromUint64(uint64(metadata.CapDON_N)))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we cast to uint32 prior, now uint64 here - should the type in the proto be set to the equivalent of a unit64 to streamline all these conversions?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many of the libraries in Go tend to use the 64 bit variants. It might make sense to streamline as 64 bit if memory size isn't an issue.

}

bal, err := r.balance.ConvertToBalance(unit, value)

if err != nil {
r.switchToMeteringMode(fmt.Errorf("attempted to Settle [%s]: %w", unit, err))
Expand All @@ -448,6 +467,7 @@ func (r *Report) Settle(ref string, spendsByNode []capabilities.MeteringNodeDeta
}

step.Spends = resourceSpends
step.CapdonN = metadata.CapDON_N
r.steps[ref] = step

// if in metering mode, exit early without modifying local balance
Expand Down Expand Up @@ -508,7 +528,20 @@ func (r *Report) FormatReport() *protoEvents.MeteringReport {
nodeDetails := []*protoEvents.MeteringReportNodeDetail{}
r.stepRefLookup = append(r.stepRefLookup, ref+":"+step.CapabilityID)

for unit, details := range step.Spends {
// since map key order is non-deterministic, order the keys tohelp make tests deterministic
// until per-unit aggregation is fixed
orderedUnits := make([]string, 0, len(step.Spends))
for unit := range step.Spends {
orderedUnits = append(orderedUnits, unit)
}

sort.Slice(orderedUnits, func(i, j int) bool {
return orderedUnits[i] > orderedUnits[j]
})

for _, unit := range orderedUnits {
details := step.Spends[unit]

for _, detail := range details {
nodeDetails = append(nodeDetails, &protoEvents.MeteringReportNodeDetail{
Peer_2PeerId: detail.Peer2PeerID,
Expand All @@ -526,6 +559,7 @@ func (r *Report) FormatReport() *protoEvents.MeteringReport {
}

stepDetails.Nodes = nodeDetails
stepDetails.CapdonN = step.CapdonN
protoReport.Steps[ref] = stepDetails
}

Expand Down
Loading
Loading