Skip to content

Commit

Permalink
Add an archival queue processor
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Dec 9, 2022
1 parent b1943f7 commit 9ee1de8
Show file tree
Hide file tree
Showing 8 changed files with 863 additions and 66 deletions.
2 changes: 0 additions & 2 deletions common/dynamicconfig/constants.go
Expand Up @@ -531,8 +531,6 @@ const (
ArchivalProcessorMaxPollRPS = "history.archivalProcessorMaxPollRPS"
// ArchivalProcessorMaxPollHostRPS is max poll rate per second for all archivalQueueProcessor on a host
ArchivalProcessorMaxPollHostRPS = "history.archivalProcessorMaxPollHostRPS"
// ArchivalTaskMaxRetryCount is max times of retry for archivalQueueProcessor
ArchivalTaskMaxRetryCount = "history.archivalTaskMaxRetryCount"
// ArchivalProcessorSchedulerWorkerCount is the number of workers in the host level task scheduler for
// archivalQueueProcessor
ArchivalProcessorSchedulerWorkerCount = "history.archivalProcessorSchedulerWorkerCount"
Expand Down
20 changes: 12 additions & 8 deletions common/metrics/metric_defs.go
Expand Up @@ -1369,14 +1369,18 @@ var (
VersionCheckLatency = NewTimerDef("version_check_latency")

// History
CacheRequests = NewCounterDef("cache_requests")
CacheFailures = NewCounterDef("cache_errors")
CacheLatency = NewTimerDef("cache_latency")
CacheMissCounter = NewCounterDef("cache_miss")
HistoryEventNotificationQueueingLatency = NewTimerDef("history_event_notification_queueing_latency")
HistoryEventNotificationFanoutLatency = NewTimerDef("history_event_notification_fanout_latency")
HistoryEventNotificationInFlightMessageGauge = NewGaugeDef("history_event_notification_inflight_message_gauge")
HistoryEventNotificationFailDeliveryCount = NewCounterDef("history_event_notification_fail_delivery_count")
CacheRequests = NewCounterDef("cache_requests")
CacheFailures = NewCounterDef("cache_errors")
CacheLatency = NewTimerDef("cache_latency")
CacheMissCounter = NewCounterDef("cache_miss")
HistoryEventNotificationQueueingLatency = NewTimerDef("history_event_notification_queueing_latency")
HistoryEventNotificationFanoutLatency = NewTimerDef("history_event_notification_fanout_latency")
HistoryEventNotificationInFlightMessageGauge = NewGaugeDef("history_event_notification_inflight_message_gauge")
HistoryEventNotificationFailDeliveryCount = NewCounterDef("history_event_notification_fail_delivery_count")
// ArchivalTaskInvalidURI is emitted by the archival queue task executor when the history or visibility URI for an
// archival task is not a valid URI.
// We may emit this metric several times for a single task if the task is retried.
ArchivalTaskInvalidURI = NewCounterDef("archival_task_invalid_uri")
ArchiverClientSendSignalCount = NewCounterDef("archiver_client_sent_signal")
ArchiverClientSendSignalFailureCount = NewCounterDef("archiver_client_send_signal_error")
ArchiverClientHistoryRequestCount = NewCounterDef("archiver_client_history_request")
Expand Down
44 changes: 17 additions & 27 deletions service/history/archival/archiver.go
Expand Up @@ -44,7 +44,6 @@ import (
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/quotas"
"go.temporal.io/server/common/searchattribute"
)
Expand All @@ -61,18 +60,20 @@ type (
BranchToken []byte
NextEventID int64
CloseFailoverVersion int64
HistoryURI string
// HistoryURI is the URI of the history archival backend.
HistoryURI carchiver.URI

// visibility archival
WorkflowTypeName string
StartTime time.Time
ExecutionTime time.Time
CloseTime time.Time
StartTime *time.Time
ExecutionTime *time.Time
CloseTime *time.Time
Status enumspb.WorkflowExecutionStatus
HistoryLength int64
Memo *commonpb.Memo
SearchAttributes *commonpb.SearchAttributes
VisibilityURI string
// VisibilityURI is the URI of the visibility archival backend.
VisibilityURI carchiver.URI

// archival targets: history and/or visibility
Targets []Target
Expand Down Expand Up @@ -148,7 +149,6 @@ func (a *archiver) Archive(ctx context.Context, request *Request) (res *Response
Message: fmt.Sprintf("archival rate limited: %s", err.Error()),
}
}

var wg sync.WaitGroup
errs := make([]error, len(request.Targets))
for i, target := range request.Targets {
Expand Down Expand Up @@ -178,21 +178,16 @@ func (a *archiver) archiveHistory(ctx context.Context, request *Request, logger
logger,
tag.ArchivalRequestBranchToken(request.BranchToken),
tag.ArchivalRequestCloseFailoverVersion(request.CloseFailoverVersion),
tag.ArchivalURI(request.HistoryURI),
tag.ArchivalURI(request.HistoryURI.String()),
)
defer a.recordArchiveTargetResult(logger, time.Now(), TargetHistory, &err)

URI, err := carchiver.NewURI(request.HistoryURI)
if err != nil {
return err
}

historyArchiver, err := a.archiverProvider.GetHistoryArchiver(URI.Scheme(), request.CallerService)
historyArchiver, err := a.archiverProvider.GetHistoryArchiver(request.HistoryURI.Scheme(), request.CallerService)
if err != nil {
return err
}

return historyArchiver.Archive(ctx, URI, &carchiver.ArchiveHistoryRequest{
return historyArchiver.Archive(ctx, request.HistoryURI, &carchiver.ArchiveHistoryRequest{
ShardID: request.ShardID,
NamespaceID: request.NamespaceID,
Namespace: request.Namespace,
Expand All @@ -207,16 +202,11 @@ func (a *archiver) archiveHistory(ctx context.Context, request *Request, logger
func (a *archiver) archiveVisibility(ctx context.Context, request *Request, logger log.Logger) (err error) {
logger = log.With(
logger,
tag.ArchivalURI(request.VisibilityURI),
tag.ArchivalURI(request.VisibilityURI.String()),
)
defer a.recordArchiveTargetResult(logger, time.Now(), TargetVisibility, &err)

uri, err := carchiver.NewURI(request.VisibilityURI)
if err != nil {
return err
}

visibilityArchiver, err := a.archiverProvider.GetVisibilityArchiver(uri.Scheme(), request.CallerService)
visibilityArchiver, err := a.archiverProvider.GetVisibilityArchiver(request.VisibilityURI.Scheme(), request.CallerService)
if err != nil {
return err
}
Expand All @@ -226,20 +216,20 @@ func (a *archiver) archiveVisibility(ctx context.Context, request *Request, logg
if err != nil {
return err
}
return visibilityArchiver.Archive(ctx, uri, &archiverspb.VisibilityRecord{
return visibilityArchiver.Archive(ctx, request.VisibilityURI, &archiverspb.VisibilityRecord{
NamespaceId: request.NamespaceID,
Namespace: request.Namespace,
WorkflowId: request.WorkflowID,
RunId: request.RunID,
WorkflowTypeName: request.WorkflowTypeName,
StartTime: timestamp.TimePtr(request.StartTime),
ExecutionTime: timestamp.TimePtr(request.ExecutionTime),
CloseTime: timestamp.TimePtr(request.CloseTime),
StartTime: request.StartTime,
ExecutionTime: request.ExecutionTime,
CloseTime: request.CloseTime,
Status: request.Status,
HistoryLength: request.HistoryLength,
Memo: request.Memo,
SearchAttributes: searchAttributes,
HistoryArchivalUri: request.HistoryURI,
HistoryArchivalUri: request.HistoryURI.String(),
})
}

Expand Down
4 changes: 2 additions & 2 deletions service/history/archival/archiver_test.go
Expand Up @@ -282,8 +282,8 @@ func TestArchiver(t *testing.T) {

archiver := NewArchiver(archiverProvider, logRecorder, metricsHandler, rateLimiter)
_, err = archiver.Archive(ctx, &Request{
HistoryURI: historyURI.String(),
VisibilityURI: visibilityURI.String(),
HistoryURI: historyURI,
VisibilityURI: visibilityURI,
Targets: c.Targets,
})

Expand Down

0 comments on commit 9ee1de8

Please sign in to comment.