Skip to content

Commit

Permalink
Add an archival queue processor
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Dec 6, 2022
1 parent 34e256a commit 3ecd20b
Show file tree
Hide file tree
Showing 9 changed files with 823 additions and 109 deletions.
2 changes: 0 additions & 2 deletions common/dynamicconfig/constants.go
Expand Up @@ -531,8 +531,6 @@ const (
ArchivalProcessorMaxPollRPS = "history.archivalProcessorMaxPollRPS"
// ArchivalProcessorMaxPollHostRPS is max poll rate per second for all archivalQueueProcessor on a host
ArchivalProcessorMaxPollHostRPS = "history.archivalProcessorMaxPollHostRPS"
// ArchivalTaskMaxRetryCount is max times of retry for archivalQueueProcessor
ArchivalTaskMaxRetryCount = "history.archivalTaskMaxRetryCount"
// ArchivalProcessorSchedulerWorkerCount is the number of workers in the host level task scheduler for
// archivalQueueProcessor
ArchivalProcessorSchedulerWorkerCount = "history.archivalProcessorSchedulerWorkerCount"
Expand Down
20 changes: 12 additions & 8 deletions common/metrics/metric_defs.go
Expand Up @@ -1361,14 +1361,18 @@ var (
VersionCheckLatency = NewTimerDef("version_check_latency")

// History
CacheRequests = NewCounterDef("cache_requests")
CacheFailures = NewCounterDef("cache_errors")
CacheLatency = NewTimerDef("cache_latency")
CacheMissCounter = NewCounterDef("cache_miss")
HistoryEventNotificationQueueingLatency = NewTimerDef("history_event_notification_queueing_latency")
HistoryEventNotificationFanoutLatency = NewTimerDef("history_event_notification_fanout_latency")
HistoryEventNotificationInFlightMessageGauge = NewGaugeDef("history_event_notification_inflight_message_gauge")
HistoryEventNotificationFailDeliveryCount = NewCounterDef("history_event_notification_fail_delivery_count")
CacheRequests = NewCounterDef("cache_requests")
CacheFailures = NewCounterDef("cache_errors")
CacheLatency = NewTimerDef("cache_latency")
CacheMissCounter = NewCounterDef("cache_miss")
HistoryEventNotificationQueueingLatency = NewTimerDef("history_event_notification_queueing_latency")
HistoryEventNotificationFanoutLatency = NewTimerDef("history_event_notification_fanout_latency")
HistoryEventNotificationInFlightMessageGauge = NewGaugeDef("history_event_notification_inflight_message_gauge")
HistoryEventNotificationFailDeliveryCount = NewCounterDef("history_event_notification_fail_delivery_count")
// ArchivalTaskInvalidURI is emitted by the archival queue task executor when the history or visibility URI for an
// archival task is not a valid URI.
// We may emit this metric several times for a single task if the task is retried.
ArchivalTaskInvalidURI = NewCounterDef("archival_task_invalid_uri")
ArchiverClientSendSignalCount = NewCounterDef("archiver_client_sent_signal")
ArchiverClientSendSignalFailureCount = NewCounterDef("archiver_client_send_signal_error")
ArchiverClientHistoryRequestCount = NewCounterDef("archiver_client_history_request")
Expand Down
42 changes: 15 additions & 27 deletions service/history/archival/archiver.go
Expand Up @@ -44,7 +44,6 @@ import (
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/quotas"
"go.temporal.io/server/common/searchattribute"
)
Expand All @@ -61,18 +60,18 @@ type (
BranchToken []byte
NextEventID int64
CloseFailoverVersion int64
HistoryURI string
HistoryURI carchiver.URI

// visibility archival
WorkflowTypeName string
StartTime time.Time
ExecutionTime time.Time
CloseTime time.Time
StartTime *time.Time
ExecutionTime *time.Time
CloseTime *time.Time
Status enumspb.WorkflowExecutionStatus
HistoryLength int64
Memo *commonpb.Memo
SearchAttributes *commonpb.SearchAttributes
VisibilityURI string
VisibilityURI carchiver.URI

// archival targets: history and/or visibility
Targets []Target
Expand Down Expand Up @@ -148,7 +147,6 @@ func (a *archiver) Archive(ctx context.Context, request *Request) (res *Response
Message: fmt.Sprintf("archival rate limited: %s", err.Error()),
}
}

var wg sync.WaitGroup
errs := make([]error, len(request.Targets))
for i, target := range request.Targets {
Expand Down Expand Up @@ -178,21 +176,16 @@ func (a *archiver) archiveHistory(ctx context.Context, request *Request, logger
logger,
tag.ArchivalRequestBranchToken(request.BranchToken),
tag.ArchivalRequestCloseFailoverVersion(request.CloseFailoverVersion),
tag.ArchivalURI(request.HistoryURI),
tag.ArchivalURI(request.HistoryURI.String()),
)
defer a.recordArchiveTargetResult(logger, time.Now(), TargetHistory, &err)

URI, err := carchiver.NewURI(request.HistoryURI)
if err != nil {
return err
}

historyArchiver, err := a.archiverProvider.GetHistoryArchiver(URI.Scheme(), request.CallerService)
historyArchiver, err := a.archiverProvider.GetHistoryArchiver(request.HistoryURI.Scheme(), request.CallerService)
if err != nil {
return err
}

return historyArchiver.Archive(ctx, URI, &carchiver.ArchiveHistoryRequest{
return historyArchiver.Archive(ctx, request.HistoryURI, &carchiver.ArchiveHistoryRequest{
ShardID: request.ShardID,
NamespaceID: request.NamespaceID,
Namespace: request.Namespace,
Expand All @@ -207,16 +200,11 @@ func (a *archiver) archiveHistory(ctx context.Context, request *Request, logger
func (a *archiver) archiveVisibility(ctx context.Context, request *Request, logger log.Logger) (err error) {
logger = log.With(
logger,
tag.ArchivalURI(request.VisibilityURI),
tag.ArchivalURI(request.VisibilityURI.String()),
)
defer a.recordArchiveTargetResult(logger, time.Now(), TargetVisibility, &err)

uri, err := carchiver.NewURI(request.VisibilityURI)
if err != nil {
return err
}

visibilityArchiver, err := a.archiverProvider.GetVisibilityArchiver(uri.Scheme(), request.CallerService)
visibilityArchiver, err := a.archiverProvider.GetVisibilityArchiver(request.VisibilityURI.Scheme(), request.CallerService)
if err != nil {
return err
}
Expand All @@ -226,20 +214,20 @@ func (a *archiver) archiveVisibility(ctx context.Context, request *Request, logg
if err != nil {
return err
}
return visibilityArchiver.Archive(ctx, uri, &archiverspb.VisibilityRecord{
return visibilityArchiver.Archive(ctx, request.VisibilityURI, &archiverspb.VisibilityRecord{
NamespaceId: request.NamespaceID,
Namespace: request.Namespace,
WorkflowId: request.WorkflowID,
RunId: request.RunID,
WorkflowTypeName: request.WorkflowTypeName,
StartTime: timestamp.TimePtr(request.StartTime),
ExecutionTime: timestamp.TimePtr(request.ExecutionTime),
CloseTime: timestamp.TimePtr(request.CloseTime),
StartTime: request.StartTime,
ExecutionTime: request.ExecutionTime,
CloseTime: request.CloseTime,
Status: request.Status,
HistoryLength: request.HistoryLength,
Memo: request.Memo,
SearchAttributes: searchAttributes,
HistoryArchivalUri: request.HistoryURI,
HistoryArchivalUri: request.HistoryURI.String(),
})
}

Expand Down
4 changes: 2 additions & 2 deletions service/history/archival/archiver_test.go
Expand Up @@ -282,8 +282,8 @@ func TestArchiver(t *testing.T) {

archiver := NewArchiver(archiverProvider, logRecorder, metricsHandler, rateLimiter)
_, err = archiver.Archive(ctx, &Request{
HistoryURI: historyURI.String(),
VisibilityURI: visibilityURI.String(),
HistoryURI: historyURI,
VisibilityURI: visibilityURI,
Targets: c.Targets,
})

Expand Down
235 changes: 235 additions & 0 deletions service/history/archivalQueueTaskExecutor.go
@@ -0,0 +1,235 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package history

import (
"context"
"fmt"

enumspb "go.temporal.io/api/enums/v1"

carchiver "go.temporal.io/server/common/archiver"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/primitives"
"go.temporal.io/server/service/history/archival"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/workflow"
"go.temporal.io/server/service/history/workflow/cache"
)

// NewArchivalQueueTaskExecutor creates a new queue task executor for the archival queue.
// If you use this executor, you must monitor for the metrics.ArchivalTaskInvalidURI.
// If this metric is emitted, it means that an archival URI is invalid and the task will never succeed.
func NewArchivalQueueTaskExecutor(
archiver archival.Archiver,
shardContext shard.Context,
workflowCache cache.Cache,
metricsHandler metrics.MetricsHandler,
logger log.Logger,
) *archivalQueueTaskExecutor {
return &archivalQueueTaskExecutor{
archiver: archiver,
shardContext: shardContext,
workflowCache: workflowCache,
metricsHandler: metricsHandler,
logger: logger,
}
}

// Execute executes a task from the archival queue.
func (e *archivalQueueTaskExecutor) Execute(
ctx context.Context,
executable queues.Executable,
) (tags []metrics.Tag, isActive bool, err error) {
task := executable.GetTask()
taskType := queues.GetArchivalTaskTypeTagValue(task)
tags = []metrics.Tag{
getNamespaceTagByID(e.shardContext.GetNamespaceRegistry(), task.GetNamespaceID()),
metrics.TaskTypeTag(taskType),
}
switch task := task.(type) {
case *tasks.ArchiveExecutionTask:
err = e.processArchiveExecutionTask(ctx, task)
default:
err = fmt.Errorf("task with invalid type sent to archival queue: %+v", task)
}
return tags, true, err
}

// archivalQueueTaskExecutor is an implementation of queues.Executor for archival queue.
type archivalQueueTaskExecutor struct {
archiver archival.Archiver
shardContext shard.Context
workflowCache cache.Cache
metricsHandler metrics.MetricsHandler
logger log.Logger
}

// processArchiveExecutionTask processes a task to archive a workflow execution.
func (e *archivalQueueTaskExecutor) processArchiveExecutionTask(ctx context.Context, task *tasks.ArchiveExecutionTask) (err error) {
weContext, release, err := getWorkflowExecutionContextForTask(ctx, e.workflowCache, task)
if err != nil {
return err
}
defer func() { release(err) }()

mutableState, err := weContext.LoadMutableState(ctx)
if err != nil {
return err
}
if mutableState == nil || mutableState.IsWorkflowExecutionRunning() {
return nil
}
lastWriteVersion, err := mutableState.GetLastWriteVersion()
if err != nil {
return err
}
namespaceEntry := mutableState.GetNamespaceEntry()
err = CheckTaskVersion(
e.shardContext,
e.logger,
namespaceEntry,
lastWriteVersion,
task.Version,
task,
)
if err != nil {
return err
}

namespaceName := namespaceEntry.Name()
nextEventID := mutableState.GetNextEventID()
executionInfo := mutableState.GetExecutionInfo()
workflowTypeName := executionInfo.GetWorkflowTypeName()

executionState := mutableState.GetExecutionState()
memo := getWorkflowMemo(copyMemo(executionInfo.Memo))
closeTime, err := mutableState.GetWorkflowCloseTime(ctx)
if err != nil {
return err
}
branchToken, err := mutableState.GetCurrentBranchToken()
if err != nil {
return err
}

var targets []archival.Target
if e.shardContext.GetArchivalMetadata().GetVisibilityConfig().ClusterConfiguredForArchival() &&
namespaceEntry.VisibilityArchivalState().State == enumspb.ARCHIVAL_STATE_ENABLED {
targets = append(targets, archival.TargetVisibility)
}
if e.shardContext.GetArchivalMetadata().GetHistoryConfig().ClusterConfiguredForArchival() &&
namespaceEntry.HistoryArchivalState().State == enumspb.ARCHIVAL_STATE_ENABLED {
targets = append(targets, archival.TargetHistory)
}
if len(targets) == 0 {
return nil
}

logger := log.With(e.logger,
tag.WorkflowNamespace(namespaceName.String()),
tag.WorkflowID(executionInfo.WorkflowId),
tag.WorkflowRunID(executionState.RunId),
tag.Task(task),
)

historyURIString := namespaceEntry.HistoryArchivalState().URI
historyURI, err := carchiver.NewURI(historyURIString)
if err != nil {
e.metricsHandler.Counter(metrics.ArchivalTaskInvalidURI.GetMetricName()).Record(
1,
metrics.NamespaceTag(namespaceName.String()),
metrics.FailureTag("invalid_history_uri"),
)
logger.Error(
"Failed to parse history URI.",
tag.ArchivalURI(historyURIString),
tag.Error(err),
)
return nil
}
visibilityURIString := namespaceEntry.VisibilityArchivalState().URI
visibilityURI, err := carchiver.NewURI(visibilityURIString)
if err != nil {
e.metricsHandler.Counter(metrics.ArchivalTaskInvalidURI.GetMetricName()).Record(
1,
metrics.NamespaceTag(namespaceName.String()),
metrics.FailureTag("invalid_visibility_uri"),
)
logger.Error(
"Failed to parse visibility URI.",
tag.ArchivalURI(visibilityURIString),
tag.Error(err),
)
return nil
}
_, err = e.archiver.Archive(ctx, &archival.Request{
ShardID: e.shardContext.GetShardID(),
NamespaceID: task.NamespaceID,
Namespace: namespaceName.String(),
WorkflowID: task.WorkflowID,
RunID: task.RunID,
BranchToken: branchToken,
NextEventID: nextEventID,
CloseFailoverVersion: lastWriteVersion,
HistoryURI: historyURI,
VisibilityURI: visibilityURI,
WorkflowTypeName: workflowTypeName,
StartTime: executionInfo.GetStartTime(),
ExecutionTime: executionInfo.GetExecutionTime(),
CloseTime: closeTime,
Status: executionState.Status,
HistoryLength: nextEventID - 1,
Memo: memo,
SearchAttributes: getSearchAttributes(copySearchAttributes(executionInfo.SearchAttributes)),
Targets: targets,
CallerService: primitives.HistoryService,
})
if err != nil {
return err
}
taskGenerator := workflow.NewTaskGenerator(
e.shardContext.GetNamespaceRegistry(),
mutableState,
e.shardContext.GetConfig(),
)
err = taskGenerator.GenerateDeleteHistoryEventTask(*closeTime, true)
if err != nil {
return err
}
err = e.shardContext.AddTasks(ctx, &persistence.AddHistoryTasksRequest{
ShardID: e.shardContext.GetShardID(),
NamespaceID: namespaceEntry.ID().String(),
WorkflowID: task.WorkflowID,
RunID: task.RunID,
Tasks: mutableState.PopTasks(),
})
return err
}

0 comments on commit 3ecd20b

Please sign in to comment.