Skip to content

Commit

Permalink
Add an archival queue processor
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Dec 1, 2022
1 parent 79a572d commit 0457a6d
Show file tree
Hide file tree
Showing 8 changed files with 732 additions and 101 deletions.
2 changes: 0 additions & 2 deletions common/dynamicconfig/constants.go
Expand Up @@ -531,8 +531,6 @@ const (
ArchivalProcessorMaxPollRPS = "history.archivalProcessorMaxPollRPS"
// ArchivalProcessorMaxPollHostRPS is max poll rate per second for all archivalQueueProcessor on a host
ArchivalProcessorMaxPollHostRPS = "history.archivalProcessorMaxPollHostRPS"
// ArchivalTaskMaxRetryCount is max times of retry for archivalQueueProcessor
ArchivalTaskMaxRetryCount = "history.archivalTaskMaxRetryCount"
// ArchivalProcessorSchedulerWorkerCount is the number of workers in the host level task scheduler for
// archivalQueueProcessor
ArchivalProcessorSchedulerWorkerCount = "history.archivalProcessorSchedulerWorkerCount"
Expand Down
42 changes: 15 additions & 27 deletions service/history/archival/archiver.go
Expand Up @@ -44,7 +44,6 @@ import (
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/quotas"
"go.temporal.io/server/common/searchattribute"
)
Expand All @@ -61,18 +60,18 @@ type (
BranchToken []byte
NextEventID int64
CloseFailoverVersion int64
HistoryURI string
HistoryURI carchiver.URI

// visibility archival
WorkflowTypeName string
StartTime time.Time
ExecutionTime time.Time
CloseTime time.Time
StartTime *time.Time
ExecutionTime *time.Time
CloseTime *time.Time
Status enumspb.WorkflowExecutionStatus
HistoryLength int64
Memo *commonpb.Memo
SearchAttributes *commonpb.SearchAttributes
VisibilityURI string
VisibilityURI carchiver.URI

// archival targets: history and/or visibility
Targets []Target
Expand Down Expand Up @@ -148,7 +147,6 @@ func (a *archiver) Archive(ctx context.Context, request *Request) (res *Response
Message: fmt.Sprintf("archival rate limited: %s", err.Error()),
}
}

var wg sync.WaitGroup
errs := make([]error, len(request.Targets))
for i, target := range request.Targets {
Expand Down Expand Up @@ -178,21 +176,16 @@ func (a *archiver) archiveHistory(ctx context.Context, request *Request, logger
logger,
tag.ArchivalRequestBranchToken(request.BranchToken),
tag.ArchivalRequestCloseFailoverVersion(request.CloseFailoverVersion),
tag.ArchivalURI(request.HistoryURI),
tag.ArchivalURI(request.HistoryURI.String()),
)
defer a.recordArchiveTargetResult(logger, time.Now(), TargetHistory, &err)

URI, err := carchiver.NewURI(request.HistoryURI)
if err != nil {
return err
}

historyArchiver, err := a.archiverProvider.GetHistoryArchiver(URI.Scheme(), request.CallerService)
historyArchiver, err := a.archiverProvider.GetHistoryArchiver(request.HistoryURI.Scheme(), request.CallerService)
if err != nil {
return err
}

return historyArchiver.Archive(ctx, URI, &carchiver.ArchiveHistoryRequest{
return historyArchiver.Archive(ctx, request.HistoryURI, &carchiver.ArchiveHistoryRequest{
ShardID: request.ShardID,
NamespaceID: request.NamespaceID,
Namespace: request.Namespace,
Expand All @@ -207,16 +200,11 @@ func (a *archiver) archiveHistory(ctx context.Context, request *Request, logger
func (a *archiver) archiveVisibility(ctx context.Context, request *Request, logger log.Logger) (err error) {
logger = log.With(
logger,
tag.ArchivalURI(request.VisibilityURI),
tag.ArchivalURI(request.VisibilityURI.String()),
)
defer a.recordArchiveTargetResult(logger, time.Now(), TargetVisibility, &err)

uri, err := carchiver.NewURI(request.VisibilityURI)
if err != nil {
return err
}

visibilityArchiver, err := a.archiverProvider.GetVisibilityArchiver(uri.Scheme(), request.CallerService)
visibilityArchiver, err := a.archiverProvider.GetVisibilityArchiver(request.VisibilityURI.Scheme(), request.CallerService)
if err != nil {
return err
}
Expand All @@ -226,20 +214,20 @@ func (a *archiver) archiveVisibility(ctx context.Context, request *Request, logg
if err != nil {
return err
}
return visibilityArchiver.Archive(ctx, uri, &archiverspb.VisibilityRecord{
return visibilityArchiver.Archive(ctx, request.VisibilityURI, &archiverspb.VisibilityRecord{
NamespaceId: request.NamespaceID,
Namespace: request.Namespace,
WorkflowId: request.WorkflowID,
RunId: request.RunID,
WorkflowTypeName: request.WorkflowTypeName,
StartTime: timestamp.TimePtr(request.StartTime),
ExecutionTime: timestamp.TimePtr(request.ExecutionTime),
CloseTime: timestamp.TimePtr(request.CloseTime),
StartTime: request.StartTime,
ExecutionTime: request.ExecutionTime,
CloseTime: request.CloseTime,
Status: request.Status,
HistoryLength: request.HistoryLength,
Memo: request.Memo,
SearchAttributes: searchAttributes,
HistoryArchivalUri: request.HistoryURI,
HistoryArchivalUri: request.HistoryURI.String(),
})
}

Expand Down
4 changes: 2 additions & 2 deletions service/history/archival/archiver_test.go
Expand Up @@ -282,8 +282,8 @@ func TestArchiver(t *testing.T) {

archiver := NewArchiver(archiverProvider, logRecorder, metricsHandler, rateLimiter)
_, err = archiver.Archive(ctx, &Request{
HistoryURI: historyURI.String(),
VisibilityURI: visibilityURI.String(),
HistoryURI: historyURI,
VisibilityURI: visibilityURI,
Targets: c.Targets,
})

Expand Down
208 changes: 208 additions & 0 deletions service/history/archivalQueueTaskExecutor.go
@@ -0,0 +1,208 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package history

import (
"context"
"errors"
"fmt"

enumspb "go.temporal.io/api/enums/v1"

carchiver "go.temporal.io/server/common/archiver"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/primitives"
"go.temporal.io/server/service/history/archival"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/workflow"
)

type archivalQueueTaskExecutor struct {
archiver archival.Archiver
shardContext shard.Context
workflowCache workflow.Cache
logger log.Logger
metricsHandler metrics.MetricsHandler
}

func newArchivalQueueTaskExecutor(
archiver archival.Archiver,
shardContext shard.Context,
workflowCache workflow.Cache,
metricsHandler metrics.MetricsHandler,
logger log.Logger,
) *archivalQueueTaskExecutor {
return &archivalQueueTaskExecutor{
archiver: archiver,
shardContext: shardContext,
workflowCache: workflowCache,
logger: logger,
metricsHandler: metricsHandler,
}
}

func (e *archivalQueueTaskExecutor) Execute(
ctx context.Context,
executable queues.Executable,
) (tags []metrics.Tag,
isActive bool, err error) {
task := executable.GetTask()
taskType := queues.GetArchivalTaskTypeTagValue(task)
tags = []metrics.Tag{
getNamespaceTagByID(e.shardContext.GetNamespaceRegistry(), task.GetNamespaceID()),
metrics.TaskTypeTag(taskType),
}
switch task := task.(type) {
case *tasks.ArchiveExecutionTask:
err = e.processArchiveExecutionTask(ctx, task)
default:
err = fmt.Errorf("task with invalid type sent to archival queue: %+v", task)
}
return tags, true, err
}

func (e *archivalQueueTaskExecutor) processArchiveExecutionTask(ctx context.Context, task *tasks.ArchiveExecutionTask) (err error) {
weContext, release, err := getWorkflowExecutionContextForTask(ctx, e.workflowCache, task)
if err != nil {
return err
}
defer func() { release(err) }()

mutableState, err := weContext.LoadMutableState(ctx)
if err != nil {
return err
}
if mutableState == nil || mutableState.IsWorkflowExecutionRunning() {
return nil
}
lastWriteVersion, err := mutableState.GetLastWriteVersion()
if err != nil {
return err
}
namespaceEntry := mutableState.GetNamespaceEntry()
err = CheckTaskVersion(
e.shardContext,
e.logger,
namespaceEntry,
lastWriteVersion,
task.Version,
task,
)
if err != nil {
return err
}

namespaceName := namespaceEntry.Name()
nextEventID := mutableState.GetNextEventID()
executionInfo := mutableState.GetExecutionInfo()
workflowTypeName := executionInfo.GetWorkflowTypeName()

executionState := mutableState.GetExecutionState()
memo := getWorkflowMemo(copyMemo(executionInfo.Memo))
closeTime, err := mutableState.GetWorkflowCloseTime(ctx)
if err != nil {
return err
}
if closeTime == nil {
return errors.New("can't archive workflow with nil close time")
}
branchToken, err := mutableState.GetCurrentBranchToken()
if err != nil {
return err
}

var targets []archival.Target
if e.shardContext.GetArchivalMetadata().GetVisibilityConfig().ClusterConfiguredForArchival() &&
namespaceEntry.VisibilityArchivalState().State == enumspb.ARCHIVAL_STATE_ENABLED {
targets = append(targets, archival.TargetVisibility)
}
if e.shardContext.GetArchivalMetadata().GetHistoryConfig().ClusterConfiguredForArchival() &&
namespaceEntry.HistoryArchivalState().State == enumspb.ARCHIVAL_STATE_ENABLED {
targets = append(targets, archival.TargetHistory)
}

historyURI, err := carchiver.NewURI(namespaceEntry.HistoryArchivalState().URI)
if err != nil {
e.logger.Error(
"Failed to parse history URI.",
tag.ArchivalURI(namespaceEntry.HistoryArchivalState().URI),
tag.Error(err),
)
return nil
}
visibilityURI, err := carchiver.NewURI(namespaceEntry.VisibilityArchivalState().URI)
if err != nil {
e.logger.Error(
"Failed to parse visibility URI.",
tag.ArchivalURI(namespaceEntry.VisibilityArchivalState().URI),
tag.Error(err),
)
return nil
}
_, err = e.archiver.Archive(ctx, &archival.Request{
ShardID: e.shardContext.GetShardID(),
NamespaceID: task.NamespaceID,
Namespace: namespaceName.String(),
WorkflowID: task.WorkflowID,
RunID: task.RunID,
BranchToken: branchToken,
NextEventID: nextEventID,
CloseFailoverVersion: lastWriteVersion,
HistoryURI: historyURI,
VisibilityURI: visibilityURI,
WorkflowTypeName: workflowTypeName,
StartTime: executionInfo.GetStartTime(),
ExecutionTime: executionInfo.GetExecutionTime(),
CloseTime: closeTime,
Status: executionState.Status,
HistoryLength: nextEventID - 1,
Memo: memo,
SearchAttributes: getSearchAttributes(copySearchAttributes(executionInfo.SearchAttributes)),
Targets: targets,
CallerService: primitives.HistoryService,
})
if err != nil {
return err
}
taskGenerator := workflow.NewTaskGenerator(e.shardContext.GetNamespaceRegistry(), mutableState,
e.shardContext.GetConfig())
err = taskGenerator.GenerateDeleteHistoryEventTask(*closeTime, true)
if err != nil {
return err
}
err = e.shardContext.AddTasks(ctx, &persistence.AddHistoryTasksRequest{
ShardID: e.shardContext.GetShardID(),
NamespaceID: namespaceEntry.ID().String(),
WorkflowID: task.WorkflowID,
RunID: task.RunID,
Tasks: mutableState.PopTasks(),
})
return err
}

0 comments on commit 0457a6d

Please sign in to comment.