Skip to content

Commit

Permalink
Delete old archival code
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Mar 9, 2023
1 parent 75facb1 commit a74d762
Show file tree
Hide file tree
Showing 48 changed files with 145 additions and 4,244 deletions.
6 changes: 1 addition & 5 deletions common/archiver/archivalMetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ func (a *archivalConfig) StaticClusterState() ArchivalState {
const (
// ArchivalDisabled means this cluster is not configured to handle archival
ArchivalDisabled ArchivalState = iota
// ArchivalPaused means this cluster is configured to handle archival but is currently not archiving
// This state is not yet implemented, as of now ArchivalPaused is treated the same way as ArchivalDisabled
ArchivalPaused
_ // for deprecated ArchivalPaused state
// ArchivalEnabled means this cluster is currently archiving
ArchivalEnabled
)
Expand Down Expand Up @@ -216,8 +214,6 @@ func getClusterArchivalState(str string) (ArchivalState, error) {
switch str {
case "", config.ArchivalDisabled:
return ArchivalDisabled, nil
case config.ArchivalPaused:
return ArchivalPaused, nil
case config.ArchivalEnabled:
return ArchivalEnabled, nil
}
Expand Down
2 changes: 0 additions & 2 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,8 +538,6 @@ const (
ArchivalProcessorArchiveDelay = "history.archivalProcessorArchiveDelay"
// ArchivalBackendMaxRPS is the maximum rate of requests per second to the archival backend
ArchivalBackendMaxRPS = "history.archivalBackendMaxRPS"
// DurableArchivalEnabled is the flag to enable durable archival
DurableArchivalEnabled = "history.durableArchivalEnabled"

// ReplicatorTaskBatchSize is batch size for ReplicatorProcessor
ReplicatorTaskBatchSize = "history.replicatorTaskBatchSize"
Expand Down
29 changes: 8 additions & 21 deletions common/persistence/serialization/task_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,33 +550,22 @@ func (s *TaskSerializer) transferCloseTaskToProto(
TaskId: closeTask.TaskID,
VisibilityTime: timestamp.TimePtr(closeTask.VisibilityTimestamp),
DeleteAfterClose: closeTask.DeleteAfterClose,
TaskDetails: &persistencespb.TransferTaskInfo_CloseExecutionTaskDetails_{
CloseExecutionTaskDetails: &persistencespb.TransferTaskInfo_CloseExecutionTaskDetails{
CanSkipVisibilityArchival: closeTask.CanSkipVisibilityArchival,
},
},
}
}

func (s *TaskSerializer) transferCloseTaskFromProto(
closeTask *persistencespb.TransferTaskInfo,
) *tasks.CloseExecutionTask {
canSkipVisibilityArchival := false
closeExecutionTaskDetails := closeTask.GetCloseExecutionTaskDetails()
if closeExecutionTaskDetails != nil {
canSkipVisibilityArchival = closeExecutionTaskDetails.CanSkipVisibilityArchival
}
return &tasks.CloseExecutionTask{
WorkflowKey: definition.NewWorkflowKey(
closeTask.NamespaceId,
closeTask.WorkflowId,
closeTask.RunId,
),
VisibilityTimestamp: *closeTask.VisibilityTime,
TaskID: closeTask.TaskId,
Version: closeTask.Version,
DeleteAfterClose: closeTask.DeleteAfterClose,
CanSkipVisibilityArchival: canSkipVisibilityArchival,
VisibilityTimestamp: *closeTask.VisibilityTime,
TaskID: closeTask.TaskId,
Version: closeTask.Version,
DeleteAfterClose: closeTask.DeleteAfterClose,
// Delete workflow task process stage is not persisted. It is only for in memory retries.
DeleteProcessStage: tasks.DeleteWorkflowExecutionStageNone,
}
Expand Down Expand Up @@ -872,7 +861,6 @@ func (s *TaskSerializer) timerWorkflowCleanupTaskToProto(
TaskId: workflowCleanupTimer.TaskID,
VisibilityTime: &workflowCleanupTimer.VisibilityTimestamp,
BranchToken: workflowCleanupTimer.BranchToken,
AlreadyArchived: workflowCleanupTimer.WorkflowDataAlreadyArchived,
}
}

Expand All @@ -885,11 +873,10 @@ func (s *TaskSerializer) timerWorkflowCleanupTaskFromProto(
workflowCleanupTimer.WorkflowId,
workflowCleanupTimer.RunId,
),
VisibilityTimestamp: *workflowCleanupTimer.VisibilityTime,
TaskID: workflowCleanupTimer.TaskId,
Version: workflowCleanupTimer.Version,
BranchToken: workflowCleanupTimer.BranchToken,
WorkflowDataAlreadyArchived: workflowCleanupTimer.AlreadyArchived,
VisibilityTimestamp: *workflowCleanupTimer.VisibilityTime,
TaskID: workflowCleanupTimer.TaskId,
Version: workflowCleanupTimer.Version,
BranchToken: workflowCleanupTimer.BranchToken,
// Delete workflow task process stage is not persisted. It is only for in memory retries.
ProcessStage: tasks.DeleteWorkflowExecutionStageNone,
}
Expand Down
3 changes: 0 additions & 3 deletions common/persistence/serialization/task_serializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ func (s *taskSerializerSuite) TestTransferCloseTask() {
}
s.assertEqualTasks(closeTask)

closeTask.CanSkipVisibilityArchival = true
s.assertEqualTasks(closeTask)
}

Expand Down Expand Up @@ -258,8 +257,6 @@ func (s *taskSerializerSuite) TestTimerWorkflowCleanupTask() {
BranchToken: []byte{123},
}
s.assertEqualTasks(workflowCleanupTimer)
workflowCleanupTimer.WorkflowDataAlreadyArchived = true
s.assertEqualTasks(workflowCleanupTimer)
}

func (s *taskSerializerSuite) TestVisibilityStartTask() {
Expand Down
2 changes: 1 addition & 1 deletion service/history/archival_queue_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (e *archivalQueueTaskExecutor) addDeletionTask(
e.shardContext.GetConfig(),
e.shardContext.GetArchivalMetadata(),
)
err = taskGenerator.GenerateDeleteHistoryEventTask(*closeTime, true)
err = taskGenerator.GenerateDeleteHistoryEventTask(*closeTime)
if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion service/history/archival_queue_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,6 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
assert.Zero(t, task.TaskID)
assert.Equal(t, p.LastWriteVersionBeforeArchival, task.Version)
assert.Equal(t, branchToken, task.BranchToken)
assert.True(t, task.WorkflowDataAlreadyArchived)
assert.Equal(t, p.ExpectedDeleteTime, task.VisibilityTimestamp)
popTasks := map[tasks.Category][]tasks.Task{
tasks.CategoryTimer: {
Expand Down
2 changes: 0 additions & 2 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ type Config struct {
NumArchiveSystemWorkflows dynamicconfig.IntPropertyFn
ArchiveRequestRPS dynamicconfig.IntPropertyFn
ArchiveSignalTimeout dynamicconfig.DurationPropertyFn
DurableArchivalEnabled dynamicconfig.BoolPropertyFn

// Size limit related settings
BlobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
Expand Down Expand Up @@ -417,7 +416,6 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
NumArchiveSystemWorkflows: dc.GetIntProperty(dynamicconfig.NumArchiveSystemWorkflows, 1000),
ArchiveRequestRPS: dc.GetIntProperty(dynamicconfig.ArchiveRequestRPS, 300), // should be much smaller than frontend RPS
ArchiveSignalTimeout: dc.GetDurationProperty(dynamicconfig.ArchiveSignalTimeout, 300*time.Millisecond),
DurableArchivalEnabled: dc.GetBoolProperty(dynamicconfig.DurableArchivalEnabled, true),

BlobSizeLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BlobSizeLimitError, 2*1024*1024),
BlobSizeLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BlobSizeLimitWarn, 512*1024),
Expand Down
104 changes: 0 additions & 104 deletions service/history/deletemanager/delete_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,18 @@ import (
"time"

commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"

enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/primitives"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/workflow"
wcache "go.temporal.io/server/service/history/workflow/cache"
"go.temporal.io/server/service/worker/archiver"
)

type (
Expand All @@ -73,7 +69,6 @@ type (
we commonpb.WorkflowExecution,
weCtx workflow.Context,
ms workflow.MutableState,
archiveIfEnabled bool,
stage *tasks.DeleteWorkflowExecutionStage,
) error
}
Expand All @@ -83,7 +78,6 @@ type (
workflowCache wcache.Cache
config *configs.Config
metricsHandler metrics.Handler
archivalClient archiver.Client
timeSource clock.TimeSource
}
)
Expand All @@ -94,15 +88,13 @@ func NewDeleteManager(
shard shard.Context,
cache wcache.Cache,
config *configs.Config,
archiverClient archiver.Client,
timeSource clock.TimeSource,
) *DeleteManagerImpl {
deleteManager := &DeleteManagerImpl{
shard: shard,
workflowCache: cache,
metricsHandler: shard.GetMetricsHandler(),
config: config,
archivalClient: archiverClient,
timeSource: timeSource,
}

Expand Down Expand Up @@ -155,7 +147,6 @@ func (m *DeleteManagerImpl) DeleteWorkflowExecution(
we,
weCtx,
ms,
false,
forceDeleteFromOpenVisibility,
stage,
m.metricsHandler.WithTags(metrics.OperationTag(metrics.HistoryDeleteWorkflowExecutionScope)),
Expand All @@ -168,7 +159,6 @@ func (m *DeleteManagerImpl) DeleteWorkflowExecutionByRetention(
we commonpb.WorkflowExecution,
weCtx workflow.Context,
ms workflow.MutableState,
archiveIfEnabled bool,
stage *tasks.DeleteWorkflowExecutionStage,
) error {

Expand All @@ -178,7 +168,6 @@ func (m *DeleteManagerImpl) DeleteWorkflowExecutionByRetention(
we,
weCtx,
ms,
archiveIfEnabled,
false, // When retention is fired, workflow execution is always closed.
stage,
m.metricsHandler.WithTags(metrics.OperationTag(metrics.HistoryProcessDeleteHistoryEventScope)),
Expand All @@ -191,7 +180,6 @@ func (m *DeleteManagerImpl) deleteWorkflowExecutionInternal(
we commonpb.WorkflowExecution,
weCtx workflow.Context,
ms workflow.MutableState,
archiveIfEnabled bool,
forceDeleteFromOpenVisibility bool,
stage *tasks.DeleteWorkflowExecutionStage,
metricsHandler metrics.Handler,
Expand All @@ -218,26 +206,6 @@ func (m *DeleteManagerImpl) deleteWorkflowExecutionInternal(
}
}

// NOTE: old versions (before server version 1.17.3) of archival workflow will delete workflow history directly
// after archiving history. But getting workflow close time requires workflow close event (for workflows closed by
// server version before 1.17), so this step needs to be done after getting workflow close time.
if archiveIfEnabled {
deletionPromised, err := m.archiveWorkflowIfEnabled(ctx, namespaceID, we, currentBranchToken, weCtx, ms, metricsHandler)
if err != nil {
return err
}
if deletionPromised {
// Don't delete workflow data. The workflow data will be deleted after history archived.
// if we proceed to delete mutable state, then history scavanger may kick in and
// delete history before history archival is done.

// HOWEVER, when rolling out this change, we don't know if worker is running an old version of the
// archival workflow (before 1.17.3), which will only delete workflow history. To prevent this from
// happening, worker role must be deployed first.
return nil
}
}

if err := m.shard.DeleteWorkflowExecution(
ctx,
definition.WorkflowKey{
Expand All @@ -260,75 +228,3 @@ func (m *DeleteManagerImpl) deleteWorkflowExecutionInternal(
metricsHandler.Counter(metrics.WorkflowCleanupDeleteCount.GetMetricName()).Record(1)
return nil
}

func (m *DeleteManagerImpl) archiveWorkflowIfEnabled(
ctx context.Context,
namespaceID namespace.ID,
workflowExecution commonpb.WorkflowExecution,
currentBranchToken []byte,
weCtx workflow.Context,
ms workflow.MutableState,
metricsHandler metrics.Handler,
) (deletionPromised bool, err error) {

namespaceRegistryEntry := ms.GetNamespaceEntry()

clusterConfiguredForHistoryArchival := m.shard.GetArchivalMetadata().GetHistoryConfig().ClusterConfiguredForArchival()
namespaceConfiguredForHistoryArchival := namespaceRegistryEntry.HistoryArchivalState().State == enumspb.ARCHIVAL_STATE_ENABLED
archiveHistory := clusterConfiguredForHistoryArchival && namespaceConfiguredForHistoryArchival

// TODO: @ycyang once archival backfill is in place cluster:paused && namespace:enabled should be a nop rather than a delete
if !archiveHistory {
return false, nil
}

closeFailoverVersion, err := ms.GetLastWriteVersion()
if err != nil {
return false, err
}

req := &archiver.ClientRequest{
ArchiveRequest: &archiver.ArchiveRequest{
ShardID: m.shard.GetShardID(),
NamespaceID: namespaceID.String(),
WorkflowID: workflowExecution.GetWorkflowId(),
RunID: workflowExecution.GetRunId(),
Namespace: namespaceRegistryEntry.Name().String(),
Targets: []archiver.ArchivalTarget{archiver.ArchiveTargetHistory},
HistoryURI: namespaceRegistryEntry.HistoryArchivalState().URI,
NextEventID: ms.GetNextEventID(),
BranchToken: currentBranchToken,
CloseFailoverVersion: closeFailoverVersion,
},
CallerService: string(primitives.HistoryService),
AttemptArchiveInline: false, // archive in workflow by default
}
executionStats, err := weCtx.LoadExecutionStats(ctx)
if err == nil && executionStats.HistorySize < int64(m.config.TimerProcessorHistoryArchivalSizeLimit()) {
req.AttemptArchiveInline = true
}

saTypeMap, err := m.shard.GetSearchAttributesProvider().GetSearchAttributes(m.config.DefaultVisibilityIndexName, false)
if err != nil {
return false, err
}
// Setting search attributes types here because archival client needs to stringify them,
// and it might not have access to typeMap (i.e. type needs to be embedded).
searchattribute.ApplyTypeMap(req.ArchiveRequest.SearchAttributes, saTypeMap)

ctx, cancel := context.WithTimeout(context.Background(), m.config.TimerProcessorArchivalTimeLimit())
defer cancel()
resp, err := m.archivalClient.Archive(ctx, req)
if err != nil {
return false, err
}
if resp.HistoryArchivedInline {
metricsHandler.Counter(metrics.WorkflowCleanupDeleteHistoryInlineCount.GetMetricName()).Record(1)
} else {
metricsHandler.Counter(metrics.WorkflowCleanupArchiveCount.GetMetricName()).Record(1)
}

// inline archival don't perform deletion
// only archival through archival workflow will
return !resp.HistoryArchivedInline, nil
}
8 changes: 4 additions & 4 deletions service/history/deletemanager/delete_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit a74d762

Please sign in to comment.