Skip to content

Commit

Permalink
Workflow Update: return NotFound error to the pollers of outcome of a…
Browse files Browse the repository at this point in the history
…ccepted updates of completed workflows (#5846)

## What changed?
<!-- Describe what has changed in this PR -->
Return `NotFound` error to the pollers of outcome of accepted updates of
completed workflows.

## Why?
<!-- Tell your future self why have you made these changes -->
If workflow is completed and update was left in `Accepted` stage it will
never go `Completed`, therefore there is no reason to wait for update
result.

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
Added unit test.

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
No risks.

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->
No.

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
No.
  • Loading branch information
alexshtin committed May 3, 2024
1 parent 5087766 commit f87ba6d
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 4 deletions.
9 changes: 8 additions & 1 deletion service/history/workflow/update/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type (
VisitUpdates(visitor func(updID string, updInfo *updatespb.UpdateInfo))
GetUpdateOutcome(ctx context.Context, updateID string) (*updatepb.Outcome, error)
GetCurrentVersion() int64
IsWorkflowExecutionRunning() bool
}

registry struct {
Expand Down Expand Up @@ -186,12 +187,18 @@ func NewRegistry(
withInstrumentation(&r.instrumentation),
)
} else if acc := updInfo.GetAcceptance(); acc != nil {
r.updates[updID] = newAccepted(
u := newAccepted(
updID,
acc.EventId,
r.remover(updID),
withInstrumentation(&r.instrumentation),
)
if !r.store.IsWorkflowExecutionRunning() {
// If workflow is completed, accepted update will never be completed.
// This will return "workflow completed" error to the pollers of outcome of accepted updates.
u.abort(AbortReasonWorkflowCompleted)
}
r.updates[updID] = u
} else if updInfo.GetCompletion() != nil {
r.completedCount++
}
Expand Down
62 changes: 59 additions & 3 deletions service/history/workflow/update/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ import (

type mockUpdateStore struct {
update.Store
VisitUpdatesFunc func(visitor func(updID string, updInfo *updatespb.UpdateInfo))
GetUpdateOutcomeFunc func(context.Context, string) (*updatepb.Outcome, error)
GetCurrentVersionFunc func() int64
VisitUpdatesFunc func(visitor func(updID string, updInfo *updatespb.UpdateInfo))
GetUpdateOutcomeFunc func(context.Context, string) (*updatepb.Outcome, error)
GetCurrentVersionFunc func() int64
IsWorkflowExecutionRunningFunc func() bool
}

func (m mockUpdateStore) VisitUpdates(
Expand All @@ -70,6 +71,13 @@ func (m mockUpdateStore) GetCurrentVersion() int64 {
return m.GetCurrentVersionFunc()
}

func (m mockUpdateStore) IsWorkflowExecutionRunning() bool {
if m.IsWorkflowExecutionRunningFunc == nil {
return true
}
return m.IsWorkflowExecutionRunningFunc()
}

var emptyUpdateStore = mockUpdateStore{
VisitUpdatesFunc: func(func(updID string, updInfo *updatespb.UpdateInfo)) {
},
Expand Down Expand Up @@ -268,6 +276,54 @@ func TestUpdateRemovalFromRegistry(t *testing.T) {
require.Equal(t, 0, reg.Len(), "update should have been removed")
}

func TestUpdateAccepted_WorkflowCompleted(t *testing.T) {
t.Parallel()
var (
ctx = context.Background()
storedAcceptedUpdateID = t.Name() + "-accepted-update-id"
regStore = mockUpdateStore{
VisitUpdatesFunc: func(visitor func(updID string, updInfo *updatespb.UpdateInfo)) {
storedAcceptedUpdateInfo := &updatespb.UpdateInfo{
Value: &updatespb.UpdateInfo_Acceptance{
Acceptance: &updatespb.AcceptanceInfo{
EventId: 22,
},
},
}
visitor(storedAcceptedUpdateID, storedAcceptedUpdateInfo)
},
IsWorkflowExecutionRunningFunc: func() bool {
return false
},
}
reg = update.NewRegistry(regStore)
effects = effect.Buffer{}
evStore = mockEventStore{Controller: &effects}
)

upd, found, err := reg.FindOrCreate(ctx, storedAcceptedUpdateID)
require.NoError(t, err)
require.True(t, found)

// Even timeout is very short, it won't fire but error will be returned right away.
s, err := upd.WaitLifecycleStage(ctx, enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED, 100*time.Millisecond)
require.Error(t, err)
require.Nil(t, s)
var notFound *serviceerror.NotFound
require.ErrorAs(t, err, &notFound)
require.Equal(t, "workflow execution already completed", err.Error())

meta := updatepb.Meta{UpdateId: storedAcceptedUpdateID}
outcome := successOutcome(t, "success!")
err = upd.OnProtocolMessage(
ctx,
&protocolpb.Message{Body: mustMarshalAny(t, &updatepb.Response{Meta: &meta, Outcome: outcome})},
evStore,
)
require.Error(t, err, "should not be able to completed update for completed workflow")
require.Equal(t, 1, reg.Len(), "update should still be present in map")
}

func TestSendMessageGathering(t *testing.T) {
t.Parallel()
var (
Expand Down

0 comments on commit f87ba6d

Please sign in to comment.