Skip to content

Commit

Permalink
Add an archival queue processor
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Nov 25, 2022
1 parent 306797b commit 9427f41
Show file tree
Hide file tree
Showing 4 changed files with 370 additions and 10 deletions.
2 changes: 0 additions & 2 deletions common/dynamicconfig/constants.go
Expand Up @@ -528,8 +528,6 @@ const (
ArchivalProcessorMaxPollRPS = "history.archivalProcessorMaxPollRPS"
// ArchivalProcessorMaxPollHostRPS is max poll rate per second for all archivalQueueProcessor on a host
ArchivalProcessorMaxPollHostRPS = "history.archivalProcessorMaxPollHostRPS"
// ArchivalTaskMaxRetryCount is max times of retry for archivalQueueProcessor
ArchivalTaskMaxRetryCount = "history.archivalTaskMaxRetryCount"
// ArchivalProcessorSchedulerWorkerCount is the number of workers in the host level task scheduler for
// archivalQueueProcessor
ArchivalProcessorSchedulerWorkerCount = "history.archivalProcessorSchedulerWorkerCount"
Expand Down
14 changes: 6 additions & 8 deletions service/history/archival/archiver.go
Expand Up @@ -44,7 +44,6 @@ import (
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/quotas"
"go.temporal.io/server/common/searchattribute"
)
Expand All @@ -65,9 +64,9 @@ type (

// visibility archival
WorkflowTypeName string
StartTime time.Time
ExecutionTime time.Time
CloseTime time.Time
StartTime *time.Time
ExecutionTime *time.Time
CloseTime *time.Time
Status enumspb.WorkflowExecutionStatus
HistoryLength int64
Memo *commonpb.Memo
Expand Down Expand Up @@ -148,7 +147,6 @@ func (a *archiver) Archive(ctx context.Context, request *Request) (res *Response
Message: fmt.Sprintf("archival rate limited: %s", err.Error()),
}
}

var wg sync.WaitGroup
errs := make([]error, len(request.Targets))
for i, target := range request.Targets {
Expand Down Expand Up @@ -232,9 +230,9 @@ func (a *archiver) archiveVisibility(ctx context.Context, request *Request, logg
WorkflowId: request.WorkflowID,
RunId: request.RunID,
WorkflowTypeName: request.WorkflowTypeName,
StartTime: timestamp.TimePtr(request.StartTime),
ExecutionTime: timestamp.TimePtr(request.ExecutionTime),
CloseTime: timestamp.TimePtr(request.CloseTime),
StartTime: request.StartTime,
ExecutionTime: request.ExecutionTime,
CloseTime: request.CloseTime,
Status: request.Status,
HistoryLength: request.HistoryLength,
Memo: request.Memo,
Expand Down
164 changes: 164 additions & 0 deletions service/history/archivalQueueTaskExecutor.go
@@ -0,0 +1,164 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package history

import (
"context"
"errors"
"fmt"
"time"

common2 "go.temporal.io/server/common"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/service/history/archival"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/workflow"
)

type archivalQueueTaskExecutor struct {
archiver archival.Archiver
shardContext shard.Context
workflowCache workflow.Cache
logger log.Logger
metricsHandler metrics.MetricsHandler
}

func newArchivalQueueTaskExecutor(
archiver archival.Archiver,
shardContext shard.Context,
workflowCache workflow.Cache,
metricsHandler metrics.MetricsHandler,
logger log.Logger,
) *archivalQueueTaskExecutor {
return &archivalQueueTaskExecutor{
archiver: archiver,
shardContext: shardContext,
workflowCache: workflowCache,
logger: logger,
metricsHandler: metricsHandler,
}
}

func (e *archivalQueueTaskExecutor) Execute(
ctx context.Context,
executable queues.Executable,
) (tags []metrics.Tag,
isActive bool, err error) {
task := executable.GetTask()
taskType := queues.GetArchivalTaskTypeTagValue(task)
tags = []metrics.Tag{
getNamespaceTagByID(e.shardContext.GetNamespaceRegistry(), task.GetNamespaceID()),
metrics.TaskTypeTag(taskType),
metrics.OperationTag(taskType), // for backward compatibility
}
switch task := task.(type) {
case *tasks.ArchiveExecutionTask:
err = e.processArchiveExecutionTask(ctx, task)
default:
err = fmt.Errorf("task with invalid type sent to archival queue: %+v", task)
}
return tags, true, err
}

const defaultRetention = 24 * time.Hour * 30

func (e *archivalQueueTaskExecutor) processArchiveExecutionTask(ctx context.Context,
task *tasks.ArchiveExecutionTask) (err error) {
weContext, release, err := getWorkflowExecutionContextForTask(ctx, e.workflowCache, task)
if err != nil {
return err
}
defer func() { release(err) }()

mutableState, err := weContext.LoadMutableState(ctx)
if err != nil {
return err
}
if mutableState == nil || mutableState.IsWorkflowExecutionRunning() {
return nil
}
branchToken, err := mutableState.GetCurrentBranchToken()
if err != nil {
return err
}

namespaceEntry := mutableState.GetNamespaceEntry()
namespaceName := namespaceEntry.Name()
nextEventID := mutableState.GetNextEventID()
closeFailoverVersion, err := mutableState.GetLastWriteVersion()
if err != nil {
return err
}

executionInfo := mutableState.GetExecutionInfo()
workflowTypeName := executionInfo.GetWorkflowTypeName()

executionState := mutableState.GetExecutionState()
memo := getWorkflowMemo(copyMemo(executionInfo.Memo))
_, err = e.archiver.Archive(ctx, &archival.Request{
ShardID: e.shardContext.GetShardID(),
NamespaceID: task.NamespaceID,
Namespace: namespaceName.String(),
WorkflowID: task.WorkflowID,
RunID: task.RunID,
BranchToken: branchToken,
NextEventID: nextEventID,
CloseFailoverVersion: closeFailoverVersion,
HistoryURI: namespaceEntry.HistoryArchivalState().URI,
WorkflowTypeName: workflowTypeName,
StartTime: executionInfo.GetStartTime(),
ExecutionTime: executionInfo.GetExecutionTime(),
CloseTime: executionInfo.GetCloseTime(),
Status: executionState.Status,
HistoryLength: nextEventID - 1,
Memo: memo,
SearchAttributes: getSearchAttributes(copySearchAttributes(executionInfo.SearchAttributes)),
VisibilityURI: namespaceEntry.VisibilityArchivalState().URI,
Targets: []archival.Target{archival.TargetHistory, archival.TargetVisibility},
CallerService: common2.HistoryServiceName,
})
if err != nil {
return err
}
retention := namespaceEntry.Retention()
if retention == 0 {
retention = defaultRetention
}
if executionInfo.GetCloseTime() == nil {
return errors.New("can't archive workflow with nil close time")
}
deleteTime := executionInfo.GetCloseTime().Add(retention)
mutableState.AddTasks(&tasks.DeleteHistoryEventTask{
WorkflowKey: task.WorkflowKey,
VisibilityTimestamp: deleteTime,
Version: task.Version,
BranchToken: branchToken,
WorkflowDataAlreadyArchived: true,
})
return err
}

0 comments on commit 9427f41

Please sign in to comment.