diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 98de00089fa..4c996966ff2 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -528,8 +528,6 @@ const ( ArchivalProcessorMaxPollRPS = "history.archivalProcessorMaxPollRPS" // ArchivalProcessorMaxPollHostRPS is max poll rate per second for all archivalQueueProcessor on a host ArchivalProcessorMaxPollHostRPS = "history.archivalProcessorMaxPollHostRPS" - // ArchivalTaskMaxRetryCount is max times of retry for archivalQueueProcessor - ArchivalTaskMaxRetryCount = "history.archivalTaskMaxRetryCount" // ArchivalProcessorSchedulerWorkerCount is the number of workers in the host level task scheduler for // archivalQueueProcessor ArchivalProcessorSchedulerWorkerCount = "history.archivalProcessorSchedulerWorkerCount" diff --git a/service/history/archival/archiver.go b/service/history/archival/archiver.go index ac61c4f8548..adb639cfadd 100644 --- a/service/history/archival/archiver.go +++ b/service/history/archival/archiver.go @@ -44,7 +44,6 @@ import ( "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" - "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/quotas" "go.temporal.io/server/common/searchattribute" ) @@ -65,9 +64,9 @@ type ( // visibility archival WorkflowTypeName string - StartTime time.Time - ExecutionTime time.Time - CloseTime time.Time + StartTime *time.Time + ExecutionTime *time.Time + CloseTime *time.Time Status enumspb.WorkflowExecutionStatus HistoryLength int64 Memo *commonpb.Memo @@ -148,7 +147,6 @@ func (a *archiver) Archive(ctx context.Context, request *Request) (res *Response Message: fmt.Sprintf("archival rate limited: %s", err.Error()), } } - var wg sync.WaitGroup errs := make([]error, len(request.Targets)) for i, target := range request.Targets { @@ -232,9 +230,9 @@ func (a *archiver) archiveVisibility(ctx context.Context, request *Request, logg WorkflowId: request.WorkflowID, RunId: request.RunID, WorkflowTypeName: request.WorkflowTypeName, - StartTime: timestamp.TimePtr(request.StartTime), - ExecutionTime: timestamp.TimePtr(request.ExecutionTime), - CloseTime: timestamp.TimePtr(request.CloseTime), + StartTime: request.StartTime, + ExecutionTime: request.ExecutionTime, + CloseTime: request.CloseTime, Status: request.Status, HistoryLength: request.HistoryLength, Memo: request.Memo, diff --git a/service/history/archivalQueueTaskExecutor.go b/service/history/archivalQueueTaskExecutor.go new file mode 100644 index 00000000000..3bc6f042751 --- /dev/null +++ b/service/history/archivalQueueTaskExecutor.go @@ -0,0 +1,164 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "context" + "errors" + "fmt" + "time" + + common2 "go.temporal.io/server/common" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/service/history/archival" + "go.temporal.io/server/service/history/queues" + "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/tasks" + "go.temporal.io/server/service/history/workflow" +) + +type archivalQueueTaskExecutor struct { + archiver archival.Archiver + shardContext shard.Context + workflowCache workflow.Cache + logger log.Logger + metricsHandler metrics.MetricsHandler +} + +func newArchivalQueueTaskExecutor( + archiver archival.Archiver, + shardContext shard.Context, + workflowCache workflow.Cache, + metricsHandler metrics.MetricsHandler, + logger log.Logger, +) *archivalQueueTaskExecutor { + return &archivalQueueTaskExecutor{ + archiver: archiver, + shardContext: shardContext, + workflowCache: workflowCache, + logger: logger, + metricsHandler: metricsHandler, + } +} + +func (e *archivalQueueTaskExecutor) Execute( + ctx context.Context, + executable queues.Executable, +) (tags []metrics.Tag, + isActive bool, err error) { + task := executable.GetTask() + taskType := queues.GetArchivalTaskTypeTagValue(task) + tags = []metrics.Tag{ + getNamespaceTagByID(e.shardContext.GetNamespaceRegistry(), task.GetNamespaceID()), + metrics.TaskTypeTag(taskType), + metrics.OperationTag(taskType), // for backward compatibility + } + switch task := task.(type) { + case *tasks.ArchiveExecutionTask: + err = e.processArchiveExecutionTask(ctx, task) + default: + err = fmt.Errorf("task with invalid type sent to archival queue: %+v", task) + } + return tags, true, err +} + +const defaultRetention = 24 * time.Hour * 30 + +func (e *archivalQueueTaskExecutor) processArchiveExecutionTask(ctx context.Context, + task *tasks.ArchiveExecutionTask) (err error) { + weContext, release, err := getWorkflowExecutionContextForTask(ctx, e.workflowCache, task) + if err != nil { + return err + } + defer func() { release(err) }() + + mutableState, err := weContext.LoadMutableState(ctx) + if err != nil { + return err + } + if mutableState == nil || mutableState.IsWorkflowExecutionRunning() { + return nil + } + branchToken, err := mutableState.GetCurrentBranchToken() + if err != nil { + return err + } + + namespaceEntry := mutableState.GetNamespaceEntry() + namespaceName := namespaceEntry.Name() + nextEventID := mutableState.GetNextEventID() + closeFailoverVersion, err := mutableState.GetLastWriteVersion() + if err != nil { + return err + } + + executionInfo := mutableState.GetExecutionInfo() + workflowTypeName := executionInfo.GetWorkflowTypeName() + + executionState := mutableState.GetExecutionState() + memo := getWorkflowMemo(copyMemo(executionInfo.Memo)) + _, err = e.archiver.Archive(ctx, &archival.Request{ + ShardID: e.shardContext.GetShardID(), + NamespaceID: task.NamespaceID, + Namespace: namespaceName.String(), + WorkflowID: task.WorkflowID, + RunID: task.RunID, + BranchToken: branchToken, + NextEventID: nextEventID, + CloseFailoverVersion: closeFailoverVersion, + HistoryURI: namespaceEntry.HistoryArchivalState().URI, + WorkflowTypeName: workflowTypeName, + StartTime: executionInfo.GetStartTime(), + ExecutionTime: executionInfo.GetExecutionTime(), + CloseTime: executionInfo.GetCloseTime(), + Status: executionState.Status, + HistoryLength: nextEventID - 1, + Memo: memo, + SearchAttributes: getSearchAttributes(copySearchAttributes(executionInfo.SearchAttributes)), + VisibilityURI: namespaceEntry.VisibilityArchivalState().URI, + Targets: []archival.Target{archival.TargetHistory, archival.TargetVisibility}, + CallerService: common2.HistoryServiceName, + }) + if err != nil { + return err + } + retention := namespaceEntry.Retention() + if retention == 0 { + retention = defaultRetention + } + if executionInfo.GetCloseTime() == nil { + return errors.New("can't archive workflow with nil close time") + } + deleteTime := executionInfo.GetCloseTime().Add(retention) + mutableState.AddTasks(&tasks.DeleteHistoryEventTask{ + WorkflowKey: task.WorkflowKey, + VisibilityTimestamp: deleteTime, + Version: task.Version, + BranchToken: branchToken, + WorkflowDataAlreadyArchived: true, + }) + return err +} diff --git a/service/history/archivalQueueTaskExecutor_test.go b/service/history/archivalQueueTaskExecutor_test.go new file mode 100644 index 00000000000..289b7415e03 --- /dev/null +++ b/service/history/archivalQueueTaskExecutor_test.go @@ -0,0 +1,200 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "context" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common/clock" + "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/service/history/archival" + "go.temporal.io/server/service/history/queues" + shard "go.temporal.io/server/service/history/shard" + "go.temporal.io/server/service/history/tasks" + "go.temporal.io/server/service/history/tests" + "go.temporal.io/server/service/history/workflow" +) + +func TestArchivalQueueTaskExecutor(t *testing.T) { + startTime := time.Unix(1, 0) + executionTime := startTime.Add(time.Second) + closeTime := executionTime.Add(time.Minute) + hourRetention := time.Hour + workflowKey := definition.NewWorkflowKey(tests.NamespaceID.String(), tests.WorkflowID, tests.RunID) + version := 52 + for _, c := range []struct { + Name string + IsWorkflowExecutionRunning bool + Retention *time.Duration + Task tasks.Task + ExpectedDeleteTime time.Time + ExpectedErrorSubstrings []string + ExpectArchive bool + ExpectAddTask bool + }{ + { + Name: "Success", + IsWorkflowExecutionRunning: false, + Retention: &hourRetention, + Task: &tasks.ArchiveExecutionTask{ + WorkflowKey: workflowKey, + Version: int64(version), + }, + ExpectedDeleteTime: closeTime.Add(hourRetention), + ExpectArchive: true, + ExpectAddTask: true, + }, + { + Name: "Running execution", + IsWorkflowExecutionRunning: true, + Retention: &hourRetention, + Task: &tasks.ArchiveExecutionTask{ + WorkflowKey: workflowKey, + Version: int64(version), + }, + ExpectedDeleteTime: closeTime.Add(hourRetention), + ExpectArchive: false, + ExpectAddTask: false, + }, + { + Name: "Default retention", + IsWorkflowExecutionRunning: false, + Retention: nil, + Task: &tasks.ArchiveExecutionTask{ + WorkflowKey: workflowKey, + Version: int64(version), + }, + ExpectedDeleteTime: closeTime.Add(24 * time.Hour * 30), + ExpectArchive: true, + ExpectAddTask: true, + }, + { + Name: "Wrong task type", + Task: &tasks.CloseExecutionTask{ + WorkflowKey: workflowKey, + }, + ExpectedErrorSubstrings: []string{"task with invalid type"}, + }, + } { + c := c // store c in closure to prevent loop from changing it when a parallel task is accessing it + t.Run(c.Name, func(t *testing.T) { + t.Parallel() + controller := gomock.NewController(t) + namespaceRegistry := namespace.NewMockRegistry(controller) + task := c.Task + shardContext := shard.NewMockContext(controller) + workflowCache := workflow.NewMockCache(controller) + workflowContext := workflow.NewMockContext(controller) + mutableState := workflow.NewMockMutableState(controller) + branchToken := []byte{42} + metricsHandler := metrics.NoopMetricsHandler + logger := log.NewNoopLogger() + timeSource := clock.NewRealTimeSource() + archiver := archival.NewMockArchiver(controller) + + namespaceRegistry.EXPECT().GetNamespaceName(tests.NamespaceID).Return(tests.Namespace, nil).AnyTimes() + mutableState.EXPECT().IsWorkflowExecutionRunning().Return(c.IsWorkflowExecutionRunning).AnyTimes() + shardContext.EXPECT().GetNamespaceRegistry().Return(namespaceRegistry).AnyTimes() + workflowContext.EXPECT().LoadMutableState(gomock.Any()).Return(mutableState, nil).AnyTimes() + workflowCache.EXPECT().GetOrCreateWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(workflowContext, workflow.ReleaseCacheFunc(func(err error) {}), nil).AnyTimes() + + if c.ExpectArchive { + namespaceEntry := tests.GlobalNamespaceEntry.Clone(namespace.WithRetention(c.Retention)) + mutableState.EXPECT().GetCurrentBranchToken().Return(branchToken, nil) + mutableState.EXPECT().GetNamespaceEntry().Return(namespaceEntry) + mutableState.EXPECT().GetNextEventID().Return(int64(100)) + mutableState.EXPECT().GetLastWriteVersion().Return(int64(200), nil) + executionInfo := &persistence.WorkflowExecutionInfo{ + StartTime: &startTime, + ExecutionTime: &executionTime, + CloseTime: &closeTime, + } + mutableState.EXPECT().GetExecutionInfo().Return(executionInfo) + executionState := &persistence.WorkflowExecutionState{ + State: 0, + Status: 0, + } + mutableState.EXPECT().GetExecutionState().Return(executionState) + shardContext.EXPECT().GetShardID().Return(int32(1)) + archiver.EXPECT().Archive(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, + request *archival.Request) (*archival.Response, error) { + assert.Equal(t, startTime, *request.StartTime) + assert.Equal(t, executionTime, *request.ExecutionTime) + assert.Equal(t, closeTime, *request.CloseTime) + + return &archival.Response{}, nil + }) + } + + if c.ExpectAddTask { + mutableState.EXPECT().AddTasks(gomock.Any()).Do(func(task *tasks.DeleteHistoryEventTask) { + assert.Equal(t, workflowKey, task.WorkflowKey) + assert.Equal(t, c.ExpectedDeleteTime, task.VisibilityTimestamp) + assert.Zero(t, task.TaskID) + assert.Equal(t, int64(version), task.Version) + assert.Equal(t, branchToken, task.BranchToken) + assert.True(t, task.WorkflowDataAlreadyArchived) + }) + } + + executor := newArchivalQueueTaskExecutor(archiver, shardContext, workflowCache, metricsHandler, logger) + executable := queues.NewExecutable( + queues.DefaultReaderId, + task, + nil, + executor, + nil, + nil, + queues.NewNoopPriorityAssigner(), + timeSource, + namespaceRegistry, + nil, + metrics.NoopMetricsHandler, + nil, + nil, + ) + err := executable.Execute() + if len(c.ExpectedErrorSubstrings) > 0 { + require.Error(t, err) + for _, s := range c.ExpectedErrorSubstrings { + assert.ErrorContains(t, err, s) + } + } else { + assert.Nil(t, err) + } + }) + } +}