Skip to content

Commit

Permalink
Fix history scavenger bug on delete mutable state (#3588)
Browse files Browse the repository at this point in the history
* Check workflow state in scavenger

* Log deleted workflow as record
  • Loading branch information
yux0 committed Nov 15, 2022
1 parent 4f2b98e commit e7f99d0
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 2 deletions.
2 changes: 1 addition & 1 deletion service/worker/scanner/executions/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func printValidationResult(
handler.Counter(metrics.ScavengerValidationFailuresCount.GetMetricName()).Record(1)
for _, result := range results {
handler.Counter(metrics.ScavengerValidationFailuresCount.GetMetricName()).Record(1, metrics.FailureTag(result.failureType))
logger.Error(
logger.Info(
"validation failed for execution.",
tag.WorkflowNamespaceID(mutableState.GetExecutionInfo().GetNamespaceId()),
tag.WorkflowID(mutableState.GetExecutionInfo().GetWorkflowId()),
Expand Down
12 changes: 12 additions & 0 deletions service/worker/scanner/history/scavenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"go.temporal.io/sdk/activity"

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/historyservice/v1"
persistencepb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
Expand Down Expand Up @@ -339,6 +340,11 @@ func (s *Scavenger) cleanUpWorkflowPastRetention(
ctx context.Context,
mutableState *persistencepb.WorkflowMutableState,
) error {
if mutableState.GetExecutionState().GetState() != enums.WORKFLOW_EXECUTION_STATE_COMPLETED {
// Skip running workflow
return nil
}

executionInfo := mutableState.GetExecutionInfo()
ns, err := s.registry.GetNamespaceByID(namespace.ID(executionInfo.GetNamespaceId()))
switch err.(type) {
Expand Down Expand Up @@ -370,7 +376,13 @@ func (s *Scavenger) cleanUpWorkflowPastRetention(
tag.WorkflowID(executionInfo.GetWorkflowId()),
tag.WorkflowRunID(mutableState.GetExecutionState().GetRunId()),
)
return nil
}
s.logger.Info("Delete workflow data past retention via history scavenger",
tag.WorkflowNamespace(ns.Name().String()),
tag.WorkflowID(executionInfo.GetWorkflowId()),
tag.WorkflowRunID(mutableState.GetExecutionState().GetRunId()),
)
}
return nil
}
Expand Down
30 changes: 29 additions & 1 deletion service/worker/scanner/history/scavenger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/adminservicemock/v1"
enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/api/historyservicemock/v1"
persistencepb "go.temporal.io/server/api/persistence/v1"
Expand Down Expand Up @@ -554,6 +555,11 @@ func (s *ScavengerTestSuite) TestDeleteWorkflowAfterRetention() {
ForkTime: timestamp.TimeNowPtrUtcAddDuration(-retention * 2),
Info: persistence.BuildHistoryGarbageCleanupInfo("namespaceID4", "workflowID4", "runID4"),
},
{
BranchToken: s.toBranchToken("treeID5", "branchID5"),
ForkTime: timestamp.TimeNowPtrUtcAddDuration(-retention * 2),
Info: persistence.BuildHistoryGarbageCleanupInfo("namespaceID5", "workflowID5", "runID5"),
},
},
}, nil)
mockedNamespace := namespace.NewNamespaceForTest(
Expand All @@ -579,6 +585,7 @@ func (s *ScavengerTestSuite) TestDeleteWorkflowAfterRetention() {
},
ExecutionState: &persistencepb.WorkflowExecutionState{
RunId: "runID2",
State: enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED,
},
},
}
Expand All @@ -591,6 +598,20 @@ func (s *ScavengerTestSuite) TestDeleteWorkflowAfterRetention() {
},
ExecutionState: &persistencepb.WorkflowExecutionState{
RunId: "runID4",
State: enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED,
},
},
}
runningWorkflow5 := &historyservice.DescribeMutableStateResponse{
DatabaseMutableState: &persistencepb.WorkflowMutableState{
ExecutionInfo: &persistencepb.WorkflowExecutionInfo{
WorkflowId: "workflowID5",
NamespaceId: "namespaceID5",
LastUpdateTime: timestamp.TimePtr(time.Now().UTC().Add(-time.Hour * 24)),
},
ExecutionState: &persistencepb.WorkflowExecutionState{
RunId: "runID5",
State: enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING,
},
},
}
Expand Down Expand Up @@ -623,6 +644,13 @@ func (s *ScavengerTestSuite) TestDeleteWorkflowAfterRetention() {
RunId: "runID4",
},
}).Return(workflowPastRetention4, nil)
s.mockHistoryClient.EXPECT().DescribeMutableState(gomock.Any(), &historyservice.DescribeMutableStateRequest{
NamespaceId: "namespaceID5",
Execution: &commonpb.WorkflowExecution{
WorkflowId: "workflowID5",
RunId: "runID5",
},
}).Return(runningWorkflow5, nil)
s.mockAdminClient.EXPECT().DeleteWorkflowExecution(gomock.Any(), &adminservice.DeleteWorkflowExecutionRequest{
Execution: &commonpb.WorkflowExecution{
WorkflowId: "workflowID2",
Expand All @@ -639,7 +667,7 @@ func (s *ScavengerTestSuite) TestDeleteWorkflowAfterRetention() {
hbd, err := s.scavenger.Run(context.Background())
s.Nil(err)
s.Equal(0, hbd.SkipCount)
s.Equal(4, hbd.SuccessCount)
s.Equal(5, hbd.SuccessCount)
s.Equal(0, hbd.ErrorCount)
s.Equal(2, hbd.CurrentPage)
s.Equal(0, len(hbd.NextPageToken))
Expand Down

0 comments on commit e7f99d0

Please sign in to comment.