Skip to content

Commit be8d099

Browse files
authored
execute: pseudo-delete already executed messages (#888)
* execute: pseudo-delete already executed messages The plugin was getting stuck on large commit reports because it kept fully including already executed messages in the observation when it should have pseudo-deleted those and included messages that are not executed. * fix lint
1 parent 1b2c84c commit be8d099

File tree

2 files changed

+57
-3
lines changed

2 files changed

+57
-3
lines changed

execute/observation.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package execute
33
import (
44
"context"
55
"fmt"
6+
"slices"
67
"sort"
78
"time"
89

@@ -339,6 +340,8 @@ func createEmptyMessageWithIDAndSeqNum(msg cciptypes.Message) cciptypes.Message
339340
//
340341
// Returns:
341342
// - Updated observation containing commit reports, messages, and hashes
343+
//
344+
//nolint:gocyclo // TODO: pull out appropriate helpers.
342345
func (p *Plugin) getMessagesObservation(
343346
ctx context.Context,
344347
lggr logger.Logger,
@@ -349,7 +352,7 @@ func (p *Plugin) getMessagesObservation(
349352
// These messages will not be executed in the current round, but may be executed in future rounds
350353
// (e.g. if gas prices decrease).
351354
if len(previousOutcome.CommitReports) == 0 {
352-
lggr.Debug("TODO: No reports to execute. This is expected after a cold start.")
355+
lggr.Info("No reports to execute. This is expected after a cold start.")
353356
// No reports to execute.
354357
// This is expected after a cold start.
355358
return observation, nil
@@ -421,10 +424,15 @@ func (p *Plugin) getMessagesObservation(
421424
// Process each message in the report and override the empty message and token data if everything fits within
422425
// the size limits
423426
for _, msg := range msgs {
424-
// If msg is not already inflight, add it
427+
// If a message is inflight or already executed, don't include it fully in the observation
428+
// because its already been transmitted in a previous report or executed onchain.
425429
if p.inflightMessageCache.IsInflight(srcChain, msg.Header.MessageID) {
426430
continue
427431
}
432+
if slices.Contains(report.ExecutedMessages, msg.Header.SequenceNumber) {
433+
continue
434+
}
435+
428436
seqNum := msg.Header.SequenceNumber
429437
messageObs[srcChain][seqNum] = msg
430438
tkData[srcChain][seqNum] = p.observeTokenDataForMessage(ctx, lggr, msg)

execute/observation_test.go

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,11 +127,17 @@ func Test_getMessagesObservation(t *testing.T) {
127127
oneByte := make([]byte, 1)
128128

129129
// Helper functions
130-
createCommitData := func(srcChain cciptypes.ChainSelector, from, to cciptypes.SeqNum) exectypes.CommitData {
130+
createCommitData := func(
131+
srcChain cciptypes.ChainSelector,
132+
from,
133+
to cciptypes.SeqNum,
134+
executedMessages ...cciptypes.SeqNum,
135+
) exectypes.CommitData {
131136
return exectypes.CommitData{
132137
SourceChain: srcChain,
133138
SequenceNumberRange: cciptypes.NewSeqNumRange(from, to),
134139
Timestamp: timestamp,
140+
ExecutedMessages: executedMessages,
135141
}
136142
}
137143

@@ -431,6 +437,46 @@ func Test_getMessagesObservation(t *testing.T) {
431437
},
432438
expectedError: false,
433439
},
440+
{
441+
name: "executed messages are skipped but hashed",
442+
commitData: []exectypes.CommitData{
443+
createCommitData(src1, 1, 3, 1, 2), // 1 and 2 are already executed
444+
},
445+
setupMocks: func(ccipReader *readerpkg_mock.MockCCIPReader,
446+
estimateProvider *ccipocr3.MockEstimateProvider,
447+
inflightCache *cache.InflightMessageCache,
448+
codec *codec_mock.MockExecCodec,
449+
) {
450+
// Any small size that fits within the max observation size
451+
codec.EXPECT().EncodeObservation(mock.Anything).Return(oneByte, nil).Maybe()
452+
messages := createMessages(src1, dest, 1, 3)
453+
454+
ccipReader.On("MsgsBetweenSeqNums", ctx, src1, cciptypes.NewSeqNumRange(1, 3)).
455+
Return(messages, nil)
456+
},
457+
expectedObs: exectypes.Observation{
458+
Messages: exectypes.MessageObservations{
459+
src1: {
460+
// 1 and 2 are pseudo deleted because they are already executed
461+
1: NewMessage(1, 1, 0, 0),
462+
2: NewMessage(2, 2, 0, 0),
463+
3: NewMessage(3, 3, int(src1), int(dest)),
464+
},
465+
},
466+
CommitReports: exectypes.CommitObservations{
467+
src1: []exectypes.CommitData{
468+
createCommitData(src1, 1, 3, 1, 2), // 1 and 2 are already executed
469+
},
470+
},
471+
Hashes: exectypes.MessageHashes{
472+
src1: createHashesMap(1, 3),
473+
},
474+
TokenData: exectypes.TokenDataObservations{
475+
src1: createTokenData(1, 3),
476+
},
477+
},
478+
expectedError: false,
479+
},
434480
{
435481
name: "encoding size exceeded mid report",
436482
commitData: []exectypes.CommitData{

0 commit comments

Comments
 (0)