From d9878e548e12f9a35c67cb943852a6e519085625 Mon Sep 17 00:00:00 2001 From: Andrew Wormald Date: Tue, 10 Mar 2026 09:28:33 +0000 Subject: [PATCH 1/2] Add stopped record check to callback processing Co-Authored-By: Claude Opus 4.6 --- callback.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/callback.go b/callback.go index 0ad378d..f80008c 100644 --- a/callback.go +++ b/callback.go @@ -63,6 +63,11 @@ func processCallback[Type any, Status StatusType]( return nil } + if wr.RunState.Stopped() { + // Skip processing of stopped workflow records to match step consumer behaviour. + return nil + } + run, err := buildRun[Type, Status](w.newRunObj(), store, wr) if err != nil { return err From 90fd568f397a4925031ed0eb7684da7b5cfe7ef2 Mon Sep 17 00:00:00 2001 From: Andrew Wormald Date: Tue, 10 Mar 2026 12:04:39 +0000 Subject: [PATCH 2/2] Add tests for stopped record check in callback processing Co-Authored-By: Claude Opus 4.6 --- callback_internal_test.go | 50 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/callback_internal_test.go b/callback_internal_test.go index ca2a941..93b469b 100644 --- a/callback_internal_test.go +++ b/callback_internal_test.go @@ -201,4 +201,54 @@ func TestProcessCallback(t *testing.T) { err := processCallback(ctx, w, statusStart, nil, current.ForeignID, nil, latestLookup, nil, nil) require.NoError(t, err) }) + + t.Run("Skip if record is stopped (paused)", func(t *testing.T) { + stoppedRecord := &Record{ + WorkflowName: "example", + ForeignID: "32948623984623", + RunID: "JHFJDS-LSFKHJSLD-KSJDBLSL", + RunState: RunStatePaused, + Status: int(statusStart), + Object: b, + } + + callbackCalled := false + callbackFn := CallbackFunc[string, testStatus](func(ctx context.Context, r *Run[string, testStatus], reader io.Reader) (testStatus, error) { + callbackCalled = true + return statusEnd, nil + }) + + latestLookup := func(ctx context.Context, workflowName, foreignID string) (*Record, error) { + return stoppedRecord, nil + } + + err := processCallback(ctx, w, statusStart, callbackFn, current.ForeignID, nil, latestLookup, nil, nil) + require.NoError(t, err) + require.False(t, callbackCalled, "callback should not be called for stopped records") + }) + + t.Run("Skip if record is stopped (cancelled)", func(t *testing.T) { + cancelledRecord := &Record{ + WorkflowName: "example", + ForeignID: "32948623984623", + RunID: "JHFJDS-LSFKHJSLD-KSJDBLSL", + RunState: RunStateCancelled, + Status: int(statusStart), + Object: b, + } + + callbackCalled := false + callbackFn := CallbackFunc[string, testStatus](func(ctx context.Context, r *Run[string, testStatus], reader io.Reader) (testStatus, error) { + callbackCalled = true + return statusEnd, nil + }) + + latestLookup := func(ctx context.Context, workflowName, foreignID string) (*Record, error) { + return cancelledRecord, nil + } + + err := processCallback(ctx, w, statusStart, callbackFn, current.ForeignID, nil, latestLookup, nil, nil) + require.NoError(t, err) + require.False(t, callbackCalled, "callback should not be called for cancelled records") + }) }