Skip to content

Commit

Permalink
Fix history scavenger bug on delete mutable state (#3588)
Browse files Browse the repository at this point in the history
* Check workflow state in scavenger

* Log deleted workflow as record
  • Loading branch information
yux0 authored and alexshtin committed Nov 15, 2022
1 parent ec332ef commit f103b19
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 1 deletion.
2 changes: 1 addition & 1 deletion service/worker/scanner/executions/task.go
Expand Up @@ -269,7 +269,7 @@ func printValidationResult(
metrics.FailureTag(result.failureType),
).IncCounter(metrics.ScavengerValidationFailuresCount)

logger.Error(
logger.Info(
"validation failed for execution.",
tag.WorkflowNamespaceID(mutableState.GetExecutionInfo().GetNamespaceId()),
tag.WorkflowID(mutableState.GetExecutionInfo().GetWorkflowId()),
Expand Down
12 changes: 12 additions & 0 deletions service/worker/scanner/history/scavenger.go
Expand Up @@ -34,6 +34,7 @@ import (
"go.temporal.io/sdk/activity"

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/historyservice/v1"
persistencepb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
Expand Down Expand Up @@ -348,6 +349,11 @@ func (s *Scavenger) cleanUpWorkflowPastRetention(
ctx context.Context,
mutableState *persistencepb.WorkflowMutableState,
) error {
if mutableState.GetExecutionState().GetState() != enums.WORKFLOW_EXECUTION_STATE_COMPLETED {
// Skip running workflow
return nil
}

executionInfo := mutableState.GetExecutionInfo()
ns, err := s.registry.GetNamespaceByID(namespace.ID(executionInfo.GetNamespaceId()))
switch err.(type) {
Expand Down Expand Up @@ -379,7 +385,13 @@ func (s *Scavenger) cleanUpWorkflowPastRetention(
tag.WorkflowID(executionInfo.GetWorkflowId()),
tag.WorkflowRunID(mutableState.GetExecutionState().GetRunId()),
)
return nil
}
s.logger.Info("Delete workflow data past retention via history scavenger",
tag.WorkflowNamespace(ns.Name().String()),
tag.WorkflowID(executionInfo.GetWorkflowId()),
tag.WorkflowRunID(mutableState.GetExecutionState().GetRunId()),
)
}
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions service/worker/scanner/history/scavenger_test.go
Expand Up @@ -38,6 +38,7 @@ import (

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/adminservicemock/v1"
enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/api/historyservicemock/v1"
persistencepb "go.temporal.io/server/api/persistence/v1"
Expand Down Expand Up @@ -598,6 +599,7 @@ func (s *ScavengerTestSuite) TestDeleteWorkflowAfterRetention() {
},
ExecutionState: &persistencepb.WorkflowExecutionState{
RunId: "runID2",
State: enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED,
},
},
}
Expand All @@ -610,6 +612,7 @@ func (s *ScavengerTestSuite) TestDeleteWorkflowAfterRetention() {
},
ExecutionState: &persistencepb.WorkflowExecutionState{
RunId: "runID4",
State: enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED,
},
},
}
Expand Down

0 comments on commit f103b19

Please sign in to comment.