Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ func processCallback[Type any, Status StatusType](
return nil
}

if wr.RunState.Stopped() {
// Skip processing of stopped workflow records to match step consumer behaviour.
return nil
}

run, err := buildRun[Type, Status](w.newRunObj(), store, wr)
if err != nil {
return err
Expand Down
50 changes: 50 additions & 0 deletions callback_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,54 @@ func TestProcessCallback(t *testing.T) {
err := processCallback(ctx, w, statusStart, nil, current.ForeignID, nil, latestLookup, nil, nil)
require.NoError(t, err)
})

t.Run("Skip if record is stopped (paused)", func(t *testing.T) {
stoppedRecord := &Record{
WorkflowName: "example",
ForeignID: "32948623984623",
RunID: "JHFJDS-LSFKHJSLD-KSJDBLSL",
RunState: RunStatePaused,
Status: int(statusStart),
Object: b,
}

callbackCalled := false
callbackFn := CallbackFunc[string, testStatus](func(ctx context.Context, r *Run[string, testStatus], reader io.Reader) (testStatus, error) {
callbackCalled = true
return statusEnd, nil
})

latestLookup := func(ctx context.Context, workflowName, foreignID string) (*Record, error) {
return stoppedRecord, nil
}

err := processCallback(ctx, w, statusStart, callbackFn, current.ForeignID, nil, latestLookup, nil, nil)
require.NoError(t, err)
require.False(t, callbackCalled, "callback should not be called for stopped records")
})

t.Run("Skip if record is stopped (cancelled)", func(t *testing.T) {
cancelledRecord := &Record{
WorkflowName: "example",
ForeignID: "32948623984623",
RunID: "JHFJDS-LSFKHJSLD-KSJDBLSL",
RunState: RunStateCancelled,
Status: int(statusStart),
Object: b,
}

callbackCalled := false
callbackFn := CallbackFunc[string, testStatus](func(ctx context.Context, r *Run[string, testStatus], reader io.Reader) (testStatus, error) {
callbackCalled = true
return statusEnd, nil
})

latestLookup := func(ctx context.Context, workflowName, foreignID string) (*Record, error) {
return cancelledRecord, nil
}

err := processCallback(ctx, w, statusStart, callbackFn, current.ForeignID, nil, latestLookup, nil, nil)
require.NoError(t, err)
require.False(t, callbackCalled, "callback should not be called for cancelled records")
})
}
Loading