Skip to content

Commit

Permalink
Add an archival queue processor
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Nov 7, 2022
1 parent b300010 commit f826cf1
Show file tree
Hide file tree
Showing 3 changed files with 366 additions and 0 deletions.
162 changes: 162 additions & 0 deletions service/history/archivalQueueTaskExecutor.go
@@ -0,0 +1,162 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package history

import (
"context"
"errors"
"fmt"
"time"

common2 "go.temporal.io/server/common"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/service/history/archival"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/workflow"
)

type archivalQueueTaskExecutor struct {
archiver archival.Archiver
shardContext shard.Context
workflowCache workflow.Cache
logger log.Logger
metricsClient metrics.MetricsHandler
}

func newArchivalQueueTaskExecutor(archiver archival.Archiver, shardContext shard.Context, workflowCache workflow.Cache,
metricsHandler metrics.MetricsHandler, logger log.Logger) *archivalQueueTaskExecutor {
return &archivalQueueTaskExecutor{
archiver: archiver,
shardContext: shardContext,
workflowCache: workflowCache,
logger: logger,
metricsClient: metricsHandler,
}
}

func (e *archivalQueueTaskExecutor) Execute(ctx context.Context, executable queues.Executable) (tags []metrics.Tag,
isActive bool, err error) {
task := executable.GetTask()
taskType := queues.GetArchivalTaskTypeTagValue(task)
tags = []metrics.Tag{
getNamespaceTagByID(e.shardContext.GetNamespaceRegistry(), task.GetNamespaceID()),
metrics.TaskTypeTag(taskType),
metrics.OperationTag(taskType), // for backward compatibility
}
switch task := task.(type) {
case *tasks.ArchiveExecutionTask:
err = e.processArchiveExecutionTask(ctx, task)
default:
err = fmt.Errorf("task with invalid type sent to archival queue: %+v", task)
}
return tags, true, err
}

func (e *archivalQueueTaskExecutor) processArchiveExecutionTask(ctx context.Context,
task *tasks.ArchiveExecutionTask) (err error) {
weContext, release, err := getWorkflowExecutionContextForTask(ctx, e.workflowCache, task)
if err != nil {
return err
}
defer func() { release(err) }()

mutableState, err := weContext.LoadMutableState(ctx)
if err != nil {
return err
}
if mutableState == nil || mutableState.IsWorkflowExecutionRunning() {
return errors.New("cannot archive workflow which is running")
}
branchToken, err := mutableState.GetCurrentBranchToken()
if err != nil {
return err
}

namespaceEntry := mutableState.GetNamespaceEntry()
namespaceName := namespaceEntry.Name()
nextEventID := mutableState.GetNextEventID()
closeFailoverVersion, err := mutableState.GetLastWriteVersion()
if err != nil {
return err
}

executionInfo := mutableState.GetExecutionInfo()
workflowTypeName := executionInfo.GetWorkflowTypeName()
startTime := executionInfo.GetStartTime()
if startTime == nil {
return errors.New("can't archive workflow with nil start time")
}
executionTime := executionInfo.GetExecutionTime()
if executionTime == nil {
return errors.New("can't archive workflow with nil execution time")
}
closeTime := executionInfo.GetCloseTime()
if closeTime == nil {
return errors.New("can't archive workflow with nil close time")
}
executionState := mutableState.GetExecutionState()
memo := getWorkflowMemo(copyMemo(executionInfo.Memo))
_, err = e.archiver.Archive(ctx, &archival.Request{
ShardID: e.shardContext.GetShardID(),
NamespaceID: task.NamespaceID,
Namespace: namespaceName.String(),
WorkflowID: task.WorkflowID,
RunID: task.RunID,
BranchToken: branchToken,
NextEventID: nextEventID,
CloseFailoverVersion: closeFailoverVersion,
HistoryURI: namespaceEntry.HistoryArchivalState().URI,
WorkflowTypeName: workflowTypeName,
StartTime: *startTime,
ExecutionTime: *executionTime,
CloseTime: *closeTime,
Status: executionState.Status,
HistoryLength: nextEventID - 1,
Memo: memo,
SearchAttributes: getSearchAttributes(copySearchAttributes(executionInfo.SearchAttributes)),
VisibilityURI: namespaceEntry.VisibilityArchivalState().URI,
Targets: []archival.Target{archival.TargetHistory, archival.TargetVisibility},
CallerService: common2.HistoryServiceName,
})
if err != nil {
return err
}
retention := namespaceEntry.Retention()
if retention == 0 {
retention = 7 * 24 * time.Hour
}
deleteTime := closeTime.Add(retention)
mutableState.AddTasks(&tasks.DeleteHistoryEventTask{
WorkflowKey: task.WorkflowKey,
VisibilityTimestamp: deleteTime,
Version: task.Version,
BranchToken: branchToken,
WorkflowDataAlreadyArchived: true,
})
return err
}
201 changes: 201 additions & 0 deletions service/history/archivalQueueTaskExecutor_test.go
@@ -0,0 +1,201 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package history

import (
"context"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/service/history/archival"
"go.temporal.io/server/service/history/queues"
shard "go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/tests"
"go.temporal.io/server/service/history/workflow"
)

func TestArchivalQueueTaskExecutor(t *testing.T) {
startTime := time.Unix(1, 0)
executionTime := startTime.Add(time.Second)
closeTime := executionTime.Add(time.Minute)
hourRetention := time.Hour
workflowKey := definition.NewWorkflowKey(tests.NamespaceID.String(), tests.WorkflowID, tests.RunID)
version := 52
for _, c := range []struct {
Name string
IsWorkflowExecutionRunning bool
Retention *time.Duration
Task tasks.Task
ExpectedDeleteTime time.Time
ExpectedErrorSubstrings []string
ExpectArchive bool
ExpectAddTask bool
}{
{
Name: "Success",
IsWorkflowExecutionRunning: false,
Retention: &hourRetention,
Task: &tasks.ArchiveExecutionTask{
WorkflowKey: workflowKey,
Version: int64(version),
},
ExpectedDeleteTime: closeTime.Add(hourRetention),
ExpectArchive: true,
ExpectAddTask: true,
},
{
Name: "Running execution",
IsWorkflowExecutionRunning: true,
Retention: &hourRetention,
Task: &tasks.ArchiveExecutionTask{
WorkflowKey: workflowKey,
Version: int64(version),
},
ExpectedDeleteTime: closeTime.Add(hourRetention),
ExpectedErrorSubstrings: []string{"cannot archive workflow which is running"},
ExpectArchive: false,
ExpectAddTask: false,
},
{
Name: "Default retention",
IsWorkflowExecutionRunning: false,
Retention: nil,
Task: &tasks.ArchiveExecutionTask{
WorkflowKey: workflowKey,
Version: int64(version),
},
ExpectedDeleteTime: closeTime.Add(24 * time.Hour * 7),
ExpectArchive: true,
ExpectAddTask: true,
},
{
Name: "Wrong task type",
Task: &tasks.CloseExecutionTask{
WorkflowKey: workflowKey,
},
ExpectedErrorSubstrings: []string{"task with invalid type"},
},
} {
c := c // store c in closure to prevent loop from changing it when a parallel task is accessing it
t.Run(c.Name, func(t *testing.T) {
t.Parallel()
controller := gomock.NewController(t)
namespaceRegistry := namespace.NewMockRegistry(controller)
task := c.Task
shardContext := shard.NewMockContext(controller)
workflowCache := workflow.NewMockCache(controller)
workflowContext := workflow.NewMockContext(controller)
mutableState := workflow.NewMockMutableState(controller)
branchToken := []byte{42}
metricsHandler := metrics.NoopMetricsHandler
logger := log.NewNoopLogger()
timeSource := clock.NewRealTimeSource()
archiver := archival.NewMockArchiver(controller)

namespaceRegistry.EXPECT().GetNamespaceName(tests.NamespaceID).Return(tests.Namespace, nil).AnyTimes()
mutableState.EXPECT().IsWorkflowExecutionRunning().Return(c.IsWorkflowExecutionRunning).AnyTimes()
shardContext.EXPECT().GetNamespaceRegistry().Return(namespaceRegistry).AnyTimes()
workflowContext.EXPECT().LoadMutableState(gomock.Any()).Return(mutableState, nil).AnyTimes()
workflowCache.EXPECT().GetOrCreateWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(workflowContext, workflow.ReleaseCacheFunc(func(err error) {}), nil).AnyTimes()

if c.ExpectArchive {
namespaceEntry := tests.GlobalNamespaceEntry.Clone(namespace.WithRetention(c.Retention))
mutableState.EXPECT().GetCurrentBranchToken().Return(branchToken, nil)
mutableState.EXPECT().GetNamespaceEntry().Return(namespaceEntry)
mutableState.EXPECT().GetNextEventID().Return(int64(100))
mutableState.EXPECT().GetLastWriteVersion().Return(int64(200), nil)
executionInfo := &persistence.WorkflowExecutionInfo{
StartTime: &startTime,
ExecutionTime: &executionTime,
CloseTime: &closeTime,
}
mutableState.EXPECT().GetExecutionInfo().Return(executionInfo)
executionState := &persistence.WorkflowExecutionState{
State: 0,
Status: 0,
}
mutableState.EXPECT().GetExecutionState().Return(executionState)
shardContext.EXPECT().GetShardID().Return(int32(1))
archiver.EXPECT().Archive(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context,
request *archival.Request) (*archival.Response, error) {
assert.Equal(t, startTime, request.StartTime)
assert.Equal(t, executionTime, request.ExecutionTime)
assert.Equal(t, closeTime, request.CloseTime)

return &archival.Response{}, nil
})
}

if c.ExpectAddTask {
mutableState.EXPECT().AddTasks(&tasks.DeleteHistoryEventTask{
WorkflowKey: workflowKey,
VisibilityTimestamp: c.ExpectedDeleteTime,
TaskID: 0,
Version: int64(version),
BranchToken: branchToken,
WorkflowDataAlreadyArchived: true,
})
}

executor := newArchivalQueueTaskExecutor(archiver, shardContext, workflowCache, metricsHandler, logger)
executable := queues.NewExecutable(
queues.DefaultReaderId,
task,
nil,
executor,
nil,
nil,
queues.NewNoopPriorityAssigner(),
timeSource,
namespaceRegistry,
nil,
metrics.NoopMetricsHandler,
nil,
nil,
)
err := executable.Execute()
if len(c.ExpectedErrorSubstrings) > 0 {
require.Error(t, err)
for _, s := range c.ExpectedErrorSubstrings {
assert.ErrorContains(t, err, s)
}
} else {
assert.Nil(t, err)
}
})
}
}
3 changes: 3 additions & 0 deletions service/history/configs/config.go
Expand Up @@ -25,6 +25,7 @@
package configs

import (
"math"
"time"

enumspb "go.temporal.io/api/enums/v1"
Expand Down Expand Up @@ -284,6 +285,7 @@ type Config struct {
ArchivalProcessorSchedulerRoundRobinWeights dynamicconfig.MapPropertyFnWithNamespaceFilter
ArchivalProcessorMaxPollHostRPS dynamicconfig.IntPropertyFn
ArchivalTaskBatchSize dynamicconfig.IntPropertyFn
ArchivalTaskMaxRetryCount dynamicconfig.IntPropertyFn
ArchivalProcessorPollBackoffInterval dynamicconfig.DurationPropertyFn
ArchivalProcessorMaxPollRPS dynamicconfig.IntPropertyFn
ArchivalProcessorMaxPollInterval dynamicconfig.DurationPropertyFn
Expand Down Expand Up @@ -506,6 +508,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis

// Archival related
ArchivalTaskBatchSize: dc.GetIntProperty(dynamicconfig.ArchivalTaskBatchSize, 100),
ArchivalTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.ArchivalTaskMaxRetryCount, math.MaxInt32),
ArchivalProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.ArchivalProcessorMaxPollRPS, 20),
ArchivalProcessorMaxPollHostRPS: dc.GetIntProperty(dynamicconfig.ArchivalProcessorMaxPollHostRPS, 0),
ArchivalProcessorSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.ArchivalProcessorSchedulerWorkerCount, 512),
Expand Down

0 comments on commit f826cf1

Please sign in to comment.