diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 6dc69cfff6c..a13b78efebe 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -531,8 +531,6 @@ const ( ArchivalProcessorMaxPollRPS = "history.archivalProcessorMaxPollRPS" // ArchivalProcessorMaxPollHostRPS is max poll rate per second for all archivalQueueProcessor on a host ArchivalProcessorMaxPollHostRPS = "history.archivalProcessorMaxPollHostRPS" - // ArchivalTaskMaxRetryCount is max times of retry for archivalQueueProcessor - ArchivalTaskMaxRetryCount = "history.archivalTaskMaxRetryCount" // ArchivalProcessorSchedulerWorkerCount is the number of workers in the host level task scheduler for // archivalQueueProcessor ArchivalProcessorSchedulerWorkerCount = "history.archivalProcessorSchedulerWorkerCount" diff --git a/service/history/archival/archiver.go b/service/history/archival/archiver.go index ac61c4f8548..6f44fe99b38 100644 --- a/service/history/archival/archiver.go +++ b/service/history/archival/archiver.go @@ -44,7 +44,6 @@ import ( "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" - "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/quotas" "go.temporal.io/server/common/searchattribute" ) @@ -61,18 +60,18 @@ type ( BranchToken []byte NextEventID int64 CloseFailoverVersion int64 - HistoryURI string + HistoryURI carchiver.URI // visibility archival WorkflowTypeName string - StartTime time.Time - ExecutionTime time.Time - CloseTime time.Time + StartTime *time.Time + ExecutionTime *time.Time + CloseTime *time.Time Status enumspb.WorkflowExecutionStatus HistoryLength int64 Memo *commonpb.Memo SearchAttributes *commonpb.SearchAttributes - VisibilityURI string + VisibilityURI carchiver.URI // archival targets: history and/or visibility Targets []Target @@ -148,7 +147,6 @@ func (a *archiver) Archive(ctx context.Context, request *Request) (res *Response Message: fmt.Sprintf("archival rate limited: %s", err.Error()), } } - var wg sync.WaitGroup errs := make([]error, len(request.Targets)) for i, target := range request.Targets { @@ -178,21 +176,16 @@ func (a *archiver) archiveHistory(ctx context.Context, request *Request, logger logger, tag.ArchivalRequestBranchToken(request.BranchToken), tag.ArchivalRequestCloseFailoverVersion(request.CloseFailoverVersion), - tag.ArchivalURI(request.HistoryURI), + tag.ArchivalURI(request.HistoryURI.String()), ) defer a.recordArchiveTargetResult(logger, time.Now(), TargetHistory, &err) - URI, err := carchiver.NewURI(request.HistoryURI) - if err != nil { - return err - } - - historyArchiver, err := a.archiverProvider.GetHistoryArchiver(URI.Scheme(), request.CallerService) + historyArchiver, err := a.archiverProvider.GetHistoryArchiver(request.HistoryURI.Scheme(), request.CallerService) if err != nil { return err } - return historyArchiver.Archive(ctx, URI, &carchiver.ArchiveHistoryRequest{ + return historyArchiver.Archive(ctx, request.HistoryURI, &carchiver.ArchiveHistoryRequest{ ShardID: request.ShardID, NamespaceID: request.NamespaceID, Namespace: request.Namespace, @@ -207,16 +200,11 @@ func (a *archiver) archiveHistory(ctx context.Context, request *Request, logger func (a *archiver) archiveVisibility(ctx context.Context, request *Request, logger log.Logger) (err error) { logger = log.With( logger, - tag.ArchivalURI(request.VisibilityURI), + tag.ArchivalURI(request.VisibilityURI.String()), ) defer a.recordArchiveTargetResult(logger, time.Now(), TargetVisibility, &err) - uri, err := carchiver.NewURI(request.VisibilityURI) - if err != nil { - return err - } - - visibilityArchiver, err := a.archiverProvider.GetVisibilityArchiver(uri.Scheme(), request.CallerService) + visibilityArchiver, err := a.archiverProvider.GetVisibilityArchiver(request.VisibilityURI.Scheme(), request.CallerService) if err != nil { return err } @@ -226,20 +214,20 @@ func (a *archiver) archiveVisibility(ctx context.Context, request *Request, logg if err != nil { return err } - return visibilityArchiver.Archive(ctx, uri, &archiverspb.VisibilityRecord{ + return visibilityArchiver.Archive(ctx, request.VisibilityURI, &archiverspb.VisibilityRecord{ NamespaceId: request.NamespaceID, Namespace: request.Namespace, WorkflowId: request.WorkflowID, RunId: request.RunID, WorkflowTypeName: request.WorkflowTypeName, - StartTime: timestamp.TimePtr(request.StartTime), - ExecutionTime: timestamp.TimePtr(request.ExecutionTime), - CloseTime: timestamp.TimePtr(request.CloseTime), + StartTime: request.StartTime, + ExecutionTime: request.ExecutionTime, + CloseTime: request.CloseTime, Status: request.Status, HistoryLength: request.HistoryLength, Memo: request.Memo, SearchAttributes: searchAttributes, - HistoryArchivalUri: request.HistoryURI, + HistoryArchivalUri: request.HistoryURI.String(), }) } diff --git a/service/history/archival/archiver_test.go b/service/history/archival/archiver_test.go index ee9f7334b40..77d70ff2afe 100644 --- a/service/history/archival/archiver_test.go +++ b/service/history/archival/archiver_test.go @@ -282,8 +282,8 @@ func TestArchiver(t *testing.T) { archiver := NewArchiver(archiverProvider, logRecorder, metricsHandler, rateLimiter) _, err = archiver.Archive(ctx, &Request{ - HistoryURI: historyURI.String(), - VisibilityURI: visibilityURI.String(), + HistoryURI: historyURI, + VisibilityURI: visibilityURI, Targets: c.Targets, }) diff --git a/service/history/archivalQueueTaskExecutor.go b/service/history/archivalQueueTaskExecutor.go new file mode 100644 index 00000000000..0ca6d24f979 --- /dev/null +++ b/service/history/archivalQueueTaskExecutor.go @@ -0,0 +1,208 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "context" + "errors" + "fmt" + + enumspb "go.temporal.io/api/enums/v1" + + carchiver "go.temporal.io/server/common/archiver" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/log/tag" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/primitives" + "go.temporal.io/server/service/history/archival" + "go.temporal.io/server/service/history/queues" + "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/tasks" + "go.temporal.io/server/service/history/workflow" +) + +type archivalQueueTaskExecutor struct { + archiver archival.Archiver + shardContext shard.Context + workflowCache workflow.Cache + logger log.Logger + metricsHandler metrics.MetricsHandler +} + +func newArchivalQueueTaskExecutor( + archiver archival.Archiver, + shardContext shard.Context, + workflowCache workflow.Cache, + metricsHandler metrics.MetricsHandler, + logger log.Logger, +) *archivalQueueTaskExecutor { + return &archivalQueueTaskExecutor{ + archiver: archiver, + shardContext: shardContext, + workflowCache: workflowCache, + logger: logger, + metricsHandler: metricsHandler, + } +} + +func (e *archivalQueueTaskExecutor) Execute( + ctx context.Context, + executable queues.Executable, +) (tags []metrics.Tag, + isActive bool, err error) { + task := executable.GetTask() + taskType := queues.GetArchivalTaskTypeTagValue(task) + tags = []metrics.Tag{ + getNamespaceTagByID(e.shardContext.GetNamespaceRegistry(), task.GetNamespaceID()), + metrics.TaskTypeTag(taskType), + } + switch task := task.(type) { + case *tasks.ArchiveExecutionTask: + err = e.processArchiveExecutionTask(ctx, task) + default: + err = fmt.Errorf("task with invalid type sent to archival queue: %+v", task) + } + return tags, true, err +} + +func (e *archivalQueueTaskExecutor) processArchiveExecutionTask(ctx context.Context, task *tasks.ArchiveExecutionTask) (err error) { + weContext, release, err := getWorkflowExecutionContextForTask(ctx, e.workflowCache, task) + if err != nil { + return err + } + defer func() { release(err) }() + + mutableState, err := weContext.LoadMutableState(ctx) + if err != nil { + return err + } + if mutableState == nil || mutableState.IsWorkflowExecutionRunning() { + return nil + } + lastWriteVersion, err := mutableState.GetLastWriteVersion() + if err != nil { + return err + } + namespaceEntry := mutableState.GetNamespaceEntry() + err = CheckTaskVersion( + e.shardContext, + e.logger, + namespaceEntry, + lastWriteVersion, + task.Version, + task, + ) + if err != nil { + return err + } + + namespaceName := namespaceEntry.Name() + nextEventID := mutableState.GetNextEventID() + executionInfo := mutableState.GetExecutionInfo() + workflowTypeName := executionInfo.GetWorkflowTypeName() + + executionState := mutableState.GetExecutionState() + memo := getWorkflowMemo(copyMemo(executionInfo.Memo)) + closeTime, err := mutableState.GetWorkflowCloseTime(ctx) + if err != nil { + return err + } + if closeTime == nil { + return errors.New("can't archive workflow with nil close time") + } + branchToken, err := mutableState.GetCurrentBranchToken() + if err != nil { + return err + } + + var targets []archival.Target + if e.shardContext.GetArchivalMetadata().GetVisibilityConfig().ClusterConfiguredForArchival() && + namespaceEntry.VisibilityArchivalState().State == enumspb.ARCHIVAL_STATE_ENABLED { + targets = append(targets, archival.TargetVisibility) + } + if e.shardContext.GetArchivalMetadata().GetHistoryConfig().ClusterConfiguredForArchival() && + namespaceEntry.HistoryArchivalState().State == enumspb.ARCHIVAL_STATE_ENABLED { + targets = append(targets, archival.TargetHistory) + } + + historyURI, err := carchiver.NewURI(namespaceEntry.HistoryArchivalState().URI) + if err != nil { + e.logger.Error( + "Failed to parse history URI.", + tag.ArchivalURI(namespaceEntry.HistoryArchivalState().URI), + tag.Error(err), + ) + return nil + } + visibilityURI, err := carchiver.NewURI(namespaceEntry.VisibilityArchivalState().URI) + if err != nil { + e.logger.Error( + "Failed to parse visibility URI.", + tag.ArchivalURI(namespaceEntry.VisibilityArchivalState().URI), + tag.Error(err), + ) + return nil + } + _, err = e.archiver.Archive(ctx, &archival.Request{ + ShardID: e.shardContext.GetShardID(), + NamespaceID: task.NamespaceID, + Namespace: namespaceName.String(), + WorkflowID: task.WorkflowID, + RunID: task.RunID, + BranchToken: branchToken, + NextEventID: nextEventID, + CloseFailoverVersion: lastWriteVersion, + HistoryURI: historyURI, + VisibilityURI: visibilityURI, + WorkflowTypeName: workflowTypeName, + StartTime: executionInfo.GetStartTime(), + ExecutionTime: executionInfo.GetExecutionTime(), + CloseTime: closeTime, + Status: executionState.Status, + HistoryLength: nextEventID - 1, + Memo: memo, + SearchAttributes: getSearchAttributes(copySearchAttributes(executionInfo.SearchAttributes)), + Targets: targets, + CallerService: primitives.HistoryService, + }) + if err != nil { + return err + } + taskGenerator := workflow.NewTaskGenerator(e.shardContext.GetNamespaceRegistry(), mutableState, + e.shardContext.GetConfig()) + err = taskGenerator.GenerateDeleteHistoryEventTask(*closeTime, true) + if err != nil { + return err + } + err = e.shardContext.AddTasks(ctx, &persistence.AddHistoryTasksRequest{ + ShardID: e.shardContext.GetShardID(), + NamespaceID: namespaceEntry.ID().String(), + WorkflowID: task.WorkflowID, + RunID: task.RunID, + Tasks: mutableState.PopTasks(), + }) + return err +} diff --git a/service/history/archivalQueueTaskExecutor_test.go b/service/history/archivalQueueTaskExecutor_test.go new file mode 100644 index 00000000000..63e57d0beaa --- /dev/null +++ b/service/history/archivalQueueTaskExecutor_test.go @@ -0,0 +1,341 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "context" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + enumspb "go.temporal.io/api/enums/v1" + "go.temporal.io/api/serviceerror" + + "go.temporal.io/server/api/persistence/v1" + carchiver "go.temporal.io/server/common/archiver" + "go.temporal.io/server/common/clock" + "go.temporal.io/server/common/cluster" + "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" + persistence2 "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/primitives/timestamp" + "go.temporal.io/server/service/history/archival" + "go.temporal.io/server/service/history/queues" + "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/tasks" + "go.temporal.io/server/service/history/tests" + "go.temporal.io/server/service/history/workflow" +) + +type archivalConfig struct { + ClusterEnabled bool + NamespaceArchivalState carchiver.ArchivalState +} + +type params struct { + IsWorkflowExecutionRunning bool + Retention *time.Duration + Task tasks.Task + ExpectedDeleteTime time.Time + ExpectedErrorSubstrings []string + ExpectArchive bool + ExpectAddTask bool + ExpectedTargets []archival.Target + HistoryConfig archivalConfig + VisibilityConfig archivalConfig + WorkflowKey definition.WorkflowKey + StartTime time.Time + ExecutionTime time.Time + CloseTime time.Time + Version int64 + GetNamespaceByIDError error + HistoryURI string + VisibilityURI string +} + +type testCase struct { + params + Name string +} + +func newTestCase(name string, configure func(params *params)) testCase { + var p params + p.HistoryConfig.NamespaceArchivalState = carchiver.ArchivalEnabled + p.VisibilityConfig.NamespaceArchivalState = carchiver.ArchivalEnabled + p.HistoryConfig.ClusterEnabled = true + p.VisibilityConfig.ClusterEnabled = true + p.WorkflowKey = definition.NewWorkflowKey( + tests.NamespaceID.String(), + tests.WorkflowID, + tests.RunID, + ) + p.StartTime = time.Unix(0, 0) + p.ExecutionTime = time.Unix(0, 0) + p.CloseTime = time.Unix(0, 0).Add(time.Minute * 2) + p.Retention = timestamp.DurationPtr(time.Hour) + // delete time = close time + retention + // delete time = 2 minutes + 1 hour = 1 hour 2 minutes + p.ExpectedDeleteTime = time.Unix(0, 0).Add(time.Minute * 2).Add(time.Hour) + p.Version = 52 + p.Task = &tasks.ArchiveExecutionTask{ + WorkflowKey: p.WorkflowKey, + Version: p.Version, + } + p.HistoryURI = "test://history/archival" + p.VisibilityURI = "test://visibility/archival" + + p.ExpectedTargets = []archival.Target{ + archival.TargetHistory, + archival.TargetVisibility, + } + p.ExpectArchive = true + p.ExpectAddTask = true + configure(&p) + return testCase{ + Name: name, + params: p, + } +} + +func TestArchivalQueueTaskExecutor(t *testing.T) { + for _, c := range []testCase{ + newTestCase("success", func(p *params) { + }), + newTestCase("history archival disabled for cluster", func(p *params) { + p.HistoryConfig.ClusterEnabled = false + p.ExpectedTargets = []archival.Target{ + archival.TargetVisibility, + } + }), + newTestCase("history archival disabled for namespace", func(p *params) { + p.HistoryConfig.NamespaceArchivalState = carchiver.ArchivalDisabled + p.ExpectedTargets = []archival.Target{ + archival.TargetVisibility, + } + }), + newTestCase("visibility archival disabled for cluster", func(p *params) { + p.VisibilityConfig.ClusterEnabled = false + p.ExpectedTargets = []archival.Target{ + archival.TargetHistory, + } + }), + newTestCase("visibility archival disabled for namespace", func(p *params) { + p.VisibilityConfig.NamespaceArchivalState = carchiver.ArchivalDisabled + p.ExpectedTargets = []archival.Target{ + archival.TargetHistory, + } + }), + newTestCase("both history and visibility archival disabled", func(p *params) { + p.VisibilityConfig.NamespaceArchivalState = carchiver.ArchivalDisabled + p.HistoryConfig.NamespaceArchivalState = carchiver.ArchivalDisabled + p.ExpectedTargets = []archival.Target{} + }), + newTestCase("running execution", func(p *params) { + p.IsWorkflowExecutionRunning = true + p.ExpectArchive = false + p.ExpectAddTask = false + }), + newTestCase("namespace not found", func(p *params) { + p.Retention = nil + p.GetNamespaceByIDError = &serviceerror.NamespaceNotFound{} + p.ExpectedDeleteTime = p.CloseTime.Add(24 * time.Hour) + }), + newTestCase("wrong task type", func(p *params) { + p.Task = &tasks.DeleteExecutionTask{ + WorkflowKey: p.WorkflowKey, + Version: p.Version, + } + p.ExpectArchive = false + p.ExpectAddTask = false + p.ExpectedErrorSubstrings = []string{"invalid type"} + }), + newTestCase("invalid history URI", func(p *params) { + p.HistoryURI = "invalid_uri" + p.ExpectArchive = false + p.ExpectAddTask = false + }), + newTestCase("invalid visibility URI", func(p *params) { + p.VisibilityURI = "invalid_uri" + p.ExpectArchive = false + p.ExpectAddTask = false + }), + } { + c := c // store c in closure to prevent loop from changing it when a parallel task is accessing it + t.Run(c.Name, func(t *testing.T) { + t.Parallel() + controller := gomock.NewController(t) + namespaceRegistry := namespace.NewMockRegistry(controller) + task := c.Task + shardContext := shard.NewMockContext(controller) + workflowCache := workflow.NewMockCache(controller) + workflowContext := workflow.NewMockContext(controller) + mutableState := workflow.NewMockMutableState(controller) + branchToken := []byte{42} + metricsHandler := metrics.NoopMetricsHandler + logger := log.NewNoopLogger() + timeSource := clock.NewRealTimeSource() + a := archival.NewMockArchiver(controller) + + mutableState.EXPECT().IsWorkflowExecutionRunning().Return(c.IsWorkflowExecutionRunning).AnyTimes() + mutableState.EXPECT().GetCurrentVersion().Return(c.Version).AnyTimes() + mutableState.EXPECT().GetWorkflowKey().Return(c.WorkflowKey).AnyTimes() + shardContext.EXPECT().GetNamespaceRegistry().Return(namespaceRegistry).AnyTimes() + cfg := tests.NewDynamicConfig() + cfg.RetentionTimerJitterDuration = func() time.Duration { + return 0 + } + shardContext.EXPECT().GetConfig().Return(cfg).AnyTimes() + mockMetadata := cluster.NewMockMetadata(controller) + mockMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes() + shardContext.EXPECT().GetClusterMetadata().Return(mockMetadata).AnyTimes() + workflowContext.EXPECT().LoadMutableState(gomock.Any()).Return(mutableState, nil).AnyTimes() + workflowCache.EXPECT().GetOrCreateWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(workflowContext, workflow.ReleaseCacheFunc(func(err error) {}), nil).AnyTimes() + + historyArchivalState := c.HistoryConfig.NamespaceArchivalState + visibilityArchivalState := c.VisibilityConfig.NamespaceArchivalState + namespaceEntry := namespace.NewGlobalNamespaceForTest( + &persistence.NamespaceInfo{ + Id: tests.NamespaceID.String(), + Name: tests.Namespace.String(), + }, + &persistence.NamespaceConfig{ + Retention: c.Retention, + HistoryArchivalState: enumspb.ArchivalState(historyArchivalState), + HistoryArchivalUri: c.HistoryURI, + VisibilityArchivalState: enumspb.ArchivalState(visibilityArchivalState), + VisibilityArchivalUri: c.VisibilityURI, + }, + &persistence.NamespaceReplicationConfig{ + ActiveClusterName: cluster.TestCurrentClusterName, + Clusters: []string{ + cluster.TestCurrentClusterName, + }, + }, + 52, + ) + namespaceRegistry.EXPECT().GetNamespaceName(namespaceEntry.ID()). + Return(namespaceEntry.Name(), nil).AnyTimes() + namespaceRegistry.EXPECT().GetNamespaceByID(namespaceEntry.ID()). + Return(namespaceEntry, c.GetNamespaceByIDError).AnyTimes() + mutableState.EXPECT().GetCurrentBranchToken().Return(branchToken, nil).AnyTimes() + mutableState.EXPECT().GetNamespaceEntry().Return(namespaceEntry).AnyTimes() + mutableState.EXPECT().GetNextEventID().Return(int64(100)).AnyTimes() + mutableState.EXPECT().GetLastWriteVersion().Return(int64(52), nil).AnyTimes() + mutableState.EXPECT().GetWorkflowCloseTime(gomock.Any()).Return(&c.CloseTime, nil).AnyTimes() + executionInfo := &persistence.WorkflowExecutionInfo{ + NamespaceId: tests.NamespaceID.String(), + StartTime: &c.StartTime, + ExecutionTime: &c.ExecutionTime, + CloseTime: &c.CloseTime, + } + mutableState.EXPECT().GetExecutionInfo().Return(executionInfo).AnyTimes() + archivalMetadata := carchiver.NewMockArchivalMetadata(controller) + historyConfig := carchiver.NewMockArchivalConfig(controller) + historyConfig.EXPECT().ClusterConfiguredForArchival().Return(c.HistoryConfig.ClusterEnabled).AnyTimes() + archivalMetadata.EXPECT().GetHistoryConfig().Return(historyConfig).AnyTimes() + visibilityConfig := carchiver.NewMockArchivalConfig(controller) + visibilityConfig.EXPECT().ClusterConfiguredForArchival().Return(c.VisibilityConfig.ClusterEnabled).AnyTimes() + archivalMetadata.EXPECT().GetVisibilityConfig().Return(visibilityConfig).AnyTimes() + shardContext.EXPECT().GetArchivalMetadata().Return(archivalMetadata).AnyTimes() + shardID := int32(1) + shardContext.EXPECT().GetShardID().Return(shardID).AnyTimes() + executionState := &persistence.WorkflowExecutionState{ + State: 0, + Status: 0, + } + mutableState.EXPECT().GetExecutionState().Return(executionState).AnyTimes() + + if c.ExpectArchive { + a.EXPECT().Archive(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, + request *archival.Request) (*archival.Response, error) { + assert.Equal(t, c.StartTime, *request.StartTime) + assert.Equal(t, c.ExecutionTime, *request.ExecutionTime) + assert.Equal(t, c.CloseTime, *request.CloseTime) + assert.ElementsMatch(t, c.ExpectedTargets, request.Targets) + + return &archival.Response{}, nil + }) + } + + if c.ExpectAddTask { + mutableState.EXPECT().AddTasks(gomock.Any()).Do(func(ts ...*tasks. + DeleteHistoryEventTask) { + require.Len(t, ts, 1) + task := ts[0] + assert.Equal(t, c.WorkflowKey, task.WorkflowKey) + assert.Zero(t, task.TaskID) + assert.Equal(t, c.Version, task.Version) + assert.Equal(t, branchToken, task.BranchToken) + assert.True(t, task.WorkflowDataAlreadyArchived) + assert.Equal(t, c.ExpectedDeleteTime, task.VisibilityTimestamp) + popTasks := map[tasks.Category][]tasks.Task{ + tasks.CategoryTimer: { + task, + }, + } + mutableState.EXPECT().PopTasks().Return(popTasks) + shardContext.EXPECT().AddTasks(gomock.Any(), &persistence2.AddHistoryTasksRequest{ + ShardID: shardID, + NamespaceID: tests.NamespaceID.String(), + WorkflowID: task.WorkflowID, + RunID: task.RunID, + Tasks: popTasks, + }) + }) + } + + executor := newArchivalQueueTaskExecutor(a, shardContext, workflowCache, metricsHandler, logger) + executable := queues.NewExecutable( + queues.DefaultReaderId, + task, + nil, + executor, + nil, + nil, + queues.NewNoopPriorityAssigner(), + timeSource, + namespaceRegistry, + nil, + metrics.NoopMetricsHandler, + nil, + nil, + ) + err := executable.Execute() + if len(c.ExpectedErrorSubstrings) > 0 { + require.Error(t, err) + for _, s := range c.ExpectedErrorSubstrings { + assert.ErrorContains(t, err, s) + } + } else { + assert.Nil(t, err) + } + }) + } +} diff --git a/service/history/workflow/task_generator.go b/service/history/workflow/task_generator.go index 1eb8229dcff..77302c5348b 100644 --- a/service/history/workflow/task_generator.go +++ b/service/history/workflow/task_generator.go @@ -55,6 +55,7 @@ type ( closeEvent *historypb.HistoryEvent, deleteAfterClose bool, ) error + GenerateDeleteHistoryEventTask(closeTime time.Time, workflowDataAlreadyArchived bool) error GenerateDeleteExecutionTask() (*tasks.DeleteExecutionTask, error) GenerateRecordWorkflowStartedTasks( startEvent *historypb.HistoryEvent, @@ -148,22 +149,6 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks( ) error { currentVersion := r.mutableState.GetCurrentVersion() - executionInfo := r.mutableState.GetExecutionInfo() - - retention := defaultWorkflowRetention - namespaceEntry, err := r.namespaceRegistry.GetNamespaceByID(namespace.ID(executionInfo.NamespaceId)) - switch err.(type) { - case nil: - retention = namespaceEntry.Retention() - case *serviceerror.NamespaceNotFound: - // namespace is not accessible, use default value above - default: - return err - } - branchToken, err := r.mutableState.GetCurrentBranchToken() - if err != nil { - return err - } closeTasks := []tasks.Task{ &tasks.CloseExecutionTask{ @@ -205,14 +190,9 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks( }) } else { closeTime := timestamp.TimeValue(closeEvent.GetEventTime()) - retentionJitterDuration := backoff.JitDuration(r.config.RetentionTimerJitterDuration(), 1) / 2 - closeTasks = append(closeTasks, &tasks.DeleteHistoryEventTask{ - // TaskID is set by shard - WorkflowKey: r.mutableState.GetWorkflowKey(), - VisibilityTimestamp: closeTime.Add(retention).Add(retentionJitterDuration), - Version: currentVersion, - BranchToken: branchToken, - }) + if err := r.GenerateDeleteHistoryEventTask(closeTime, false); err != nil { + return err + } } } @@ -220,6 +200,37 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks( return nil } +func (r *TaskGeneratorImpl) GenerateDeleteHistoryEventTask(closeTime time.Time, workflowDataAlreadyArchived bool) error { + retention := defaultWorkflowRetention + currentVersion := r.mutableState.GetCurrentVersion() + executionInfo := r.mutableState.GetExecutionInfo() + namespaceEntry, err := r.namespaceRegistry.GetNamespaceByID(namespace.ID(executionInfo.NamespaceId)) + switch err.(type) { + case nil: + retention = namespaceEntry.Retention() + case *serviceerror.NamespaceNotFound: + // namespace is not accessible, use default value above + default: + return err + } + branchToken, err := r.mutableState.GetCurrentBranchToken() + if err != nil { + return err + } + + retentionJitterDuration := backoff.JitDuration(r.config.RetentionTimerJitterDuration(), 1) / 2 + deleteTime := closeTime.Add(retention).Add(retentionJitterDuration) + r.mutableState.AddTasks(&tasks.DeleteHistoryEventTask{ + // TaskID is set by shard + WorkflowKey: r.mutableState.GetWorkflowKey(), + VisibilityTimestamp: deleteTime, + Version: currentVersion, + BranchToken: branchToken, + WorkflowDataAlreadyArchived: workflowDataAlreadyArchived, + }) + return nil +} + func (r *TaskGeneratorImpl) GenerateDeleteExecutionTask() (*tasks.DeleteExecutionTask, error) { return &tasks.DeleteExecutionTask{ // TaskID, VisibilityTimestamp is set by shard diff --git a/service/history/workflow/task_generator_mock.go b/service/history/workflow/task_generator_mock.go index 7f20a71707e..f528ee3acf8 100644 --- a/service/history/workflow/task_generator_mock.go +++ b/service/history/workflow/task_generator_mock.go @@ -30,6 +30,7 @@ package workflow import ( reflect "reflect" + time "time" gomock "github.com/golang/mock/gomock" history "go.temporal.io/api/history/v1" @@ -144,6 +145,20 @@ func (mr *MockTaskGeneratorMockRecorder) GenerateDeleteExecutionTask() *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateDeleteExecutionTask", reflect.TypeOf((*MockTaskGenerator)(nil).GenerateDeleteExecutionTask)) } +// GenerateDeleteHistoryEventTask mocks base method. +func (m *MockTaskGenerator) GenerateDeleteHistoryEventTask(closeTime time.Time, workflowDataAlreadyArchived bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GenerateDeleteHistoryEventTask", closeTime, workflowDataAlreadyArchived) + ret0, _ := ret[0].(error) + return ret0 +} + +// GenerateDeleteHistoryEventTask indicates an expected call of GenerateDeleteHistoryEventTask. +func (mr *MockTaskGeneratorMockRecorder) GenerateDeleteHistoryEventTask(closeTime, workflowDataAlreadyArchived interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateDeleteHistoryEventTask", reflect.TypeOf((*MockTaskGenerator)(nil).GenerateDeleteHistoryEventTask), closeTime, workflowDataAlreadyArchived) +} + // GenerateHistoryReplicationTasks mocks base method. func (m *MockTaskGenerator) GenerateHistoryReplicationTasks(branchToken []byte, events []*history.HistoryEvent) error { m.ctrl.T.Helper() diff --git a/service/history/workflow/task_generator_test.go b/service/history/workflow/task_generator_test.go index 7801871d491..80fac60f816 100644 --- a/service/history/workflow/task_generator_test.go +++ b/service/history/workflow/task_generator_test.go @@ -130,7 +130,7 @@ func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) { mutableState.EXPECT().GetWorkflowKey().Return(definition.NewWorkflowKey( namespaceEntry.ID().String(), tests.WorkflowID, tests.RunID, )).AnyTimes() - mutableState.EXPECT().GetCurrentBranchToken().Return(nil, nil) + mutableState.EXPECT().GetCurrentBranchToken().Return(nil, nil).AnyTimes() taskGenerator := NewTaskGenerator(namespaceRegistry, mutableState, &configs.Config{ DurableArchivalEnabled: func() bool { return c.DurableArchivalEnabled @@ -142,52 +142,10 @@ func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) { closeTime := time.Unix(0, 0) + var allTasks []tasks.Task mutableState.EXPECT().AddTasks(gomock.Any()).Do(func(ts ...tasks.Task) { - var ( - closeExecutionTask *tasks.CloseExecutionTask - deleteHistoryEventTask *tasks.DeleteHistoryEventTask - closeExecutionVisibilityTask *tasks.CloseExecutionVisibilityTask - archiveExecutionTask *tasks.ArchiveExecutionTask - ) - for _, task := range ts { - switch t := task.(type) { - case *tasks.CloseExecutionTask: - closeExecutionTask = t - case *tasks.DeleteHistoryEventTask: - deleteHistoryEventTask = t - case *tasks.CloseExecutionVisibilityTask: - closeExecutionVisibilityTask = t - case *tasks.ArchiveExecutionTask: - archiveExecutionTask = t - } - } - require.NotNil(t, closeExecutionTask) - assert.Equal(t, c.DeleteAfterClose, closeExecutionTask.DeleteAfterClose) - - if c.ExpectCloseExecutionVisibilityTask { - assert.NotNil(t, closeExecutionVisibilityTask) - } else { - assert.Nil(t, closeExecutionVisibilityTask) - } - if c.ExpectArchiveExecutionTask { - require.NotNil(t, archiveExecutionTask) - assert.Equal(t, archiveExecutionTask.NamespaceID, namespaceEntry.ID().String()) - assert.Equal(t, archiveExecutionTask.WorkflowID, tests.WorkflowID) - assert.Equal(t, archiveExecutionTask.RunID, tests.RunID) - } else { - assert.Nil(t, archiveExecutionTask) - } - if c.ExpectDeleteHistoryEventTask { - require.NotNil(t, deleteHistoryEventTask) - assert.Equal(t, deleteHistoryEventTask.NamespaceID, namespaceEntry.ID().String()) - assert.Equal(t, deleteHistoryEventTask.WorkflowID, tests.WorkflowID) - assert.Equal(t, deleteHistoryEventTask.RunID, tests.RunID) - assert.True(t, deleteHistoryEventTask.VisibilityTimestamp.After(closeTime.Add(retention))) - assert.True(t, deleteHistoryEventTask.VisibilityTimestamp.Before(closeTime.Add(retention).Add(time.Second*2))) - } else { - assert.Nil(t, deleteHistoryEventTask) - } - }) + allTasks = append(allTasks, ts...) + }).MinTimes(1) err := taskGenerator.GenerateWorkflowCloseTasks(&historypb.HistoryEvent{ Attributes: &historypb.HistoryEvent_WorkflowExecutionCompletedEventAttributes{ @@ -196,6 +154,118 @@ func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) { EventTime: timestamp.TimePtr(closeTime), }, c.DeleteAfterClose) require.NoError(t, err) + + var ( + closeExecutionTask *tasks.CloseExecutionTask + deleteHistoryEventTask *tasks.DeleteHistoryEventTask + closeExecutionVisibilityTask *tasks.CloseExecutionVisibilityTask + archiveExecutionTask *tasks.ArchiveExecutionTask + ) + for _, task := range allTasks { + switch t := task.(type) { + case *tasks.CloseExecutionTask: + closeExecutionTask = t + case *tasks.DeleteHistoryEventTask: + deleteHistoryEventTask = t + case *tasks.CloseExecutionVisibilityTask: + closeExecutionVisibilityTask = t + case *tasks.ArchiveExecutionTask: + archiveExecutionTask = t + } + } + require.NotNil(t, closeExecutionTask) + assert.Equal(t, c.DeleteAfterClose, closeExecutionTask.DeleteAfterClose) + + if c.ExpectCloseExecutionVisibilityTask { + assert.NotNil(t, closeExecutionVisibilityTask) + } else { + assert.Nil(t, closeExecutionVisibilityTask) + } + if c.ExpectArchiveExecutionTask { + require.NotNil(t, archiveExecutionTask) + assert.Equal(t, archiveExecutionTask.NamespaceID, namespaceEntry.ID().String()) + assert.Equal(t, archiveExecutionTask.WorkflowID, tests.WorkflowID) + assert.Equal(t, archiveExecutionTask.RunID, tests.RunID) + } else { + assert.Nil(t, archiveExecutionTask) + } + if c.ExpectDeleteHistoryEventTask { + require.NotNil(t, deleteHistoryEventTask) + assert.Equal(t, deleteHistoryEventTask.NamespaceID, namespaceEntry.ID().String()) + assert.Equal(t, deleteHistoryEventTask.WorkflowID, tests.WorkflowID) + assert.Equal(t, deleteHistoryEventTask.RunID, tests.RunID) + } else { + assert.Nil(t, deleteHistoryEventTask) + } + }) + } +} + +func TestTaskGeneratorImpl_GenerateDeleteHistoryEventTask(t *testing.T) { + for _, c := range []struct { + Name string + AlreadyArchived bool + }{ + { + Name: "AlreadyArchived", + AlreadyArchived: true, + }, + { + Name: "NotAlreadyArchived", + AlreadyArchived: false, + }, + } { + c := c + t.Run(c.Name, func(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + namespaceRegistry := namespace.NewMockRegistry(ctrl) + retention := 24 * time.Hour + namespaceEntry := tests.GlobalNamespaceEntry.Clone(namespace.WithRetention(&retention)) + namespaceRegistry.EXPECT().GetNamespaceID(gomock.Any()).Return(namespaceEntry.ID(), nil).AnyTimes() + namespaceRegistry.EXPECT().GetNamespaceByID(namespaceEntry.ID()).Return(namespaceEntry, nil).AnyTimes() + + mutableState := NewMockMutableState(ctrl) + mutableState.EXPECT().GetCurrentVersion().Return(int64(42)).AnyTimes() + mutableState.EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{ + NamespaceId: namespaceEntry.ID().String(), + }).AnyTimes() + workflowKey := definition.NewWorkflowKey( + namespaceEntry.ID().String(), tests.WorkflowID, tests.RunID, + ) + mutableState.EXPECT().GetWorkflowKey().Return(workflowKey).AnyTimes() + mutableState.EXPECT().GetCurrentBranchToken().Return([]byte{0xd, 0xe, 0xa, 0xd, 0xb, 0xe, 0xe, 0xf}, nil) + closeTime := time.Unix(3600, 0) + mutableState.EXPECT().GetWorkflowCloseTime(gomock.Any()).Return(&closeTime, nil).AnyTimes() + mutableState.EXPECT().AddTasks(gomock.Any()).Do(func(task *tasks.DeleteHistoryEventTask) { + assert.Equal(t, workflowKey, task.WorkflowKey) + // before jitter + // closeTime = 1 hour + // retention = 24 hours + // visibilityTimestamp = closeTime + retention + // visibilityTimestamp = 1 hour + 24 hours = 25 hours + // visibilityTimestamp = 25 hours * 3600 seconds = 90,000 seconds + // + // after jitter + // jitter between 0 and 2 seconds + // visibilityTimestamp between 90,000 to 90,002 seconds + assert.GreaterOrEqual(t, task.VisibilityTimestamp, time.Unix(90000, 0)) + assert.LessOrEqual(t, task.VisibilityTimestamp, time.Unix(90002, 0)) + + assert.Zero(t, task.TaskID) + assert.Equal(t, int64(42), task.Version) + assert.Equal(t, []byte{0xd, 0xe, 0xa, 0xd, 0xb, 0xe, 0xe, 0xf}, task.BranchToken) + assert.Equal(t, c.AlreadyArchived, task.WorkflowDataAlreadyArchived) + }) + + taskGenerator := NewTaskGenerator(namespaceRegistry, mutableState, &configs.Config{ + RetentionTimerJitterDuration: func() time.Duration { + return time.Second + }, + }) + + err := taskGenerator.GenerateDeleteHistoryEventTask(closeTime, c.AlreadyArchived) + require.NoError(t, err) }) } }