Skip to content

Commit

Permalink
Use search attribute type map in visibility archival (#4304)
Browse files Browse the repository at this point in the history
* Use search attribute type map in visibility archival

* pr comments
  • Loading branch information
MichaelSnowden committed May 9, 2023
1 parent d0007b0 commit 9f66510
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 175 deletions.
53 changes: 43 additions & 10 deletions service/history/archival/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/server/common/persistence/visibility/manager"
"go.uber.org/multierr"

archiverspb "go.temporal.io/server/api/archiver/v1"
Expand Down Expand Up @@ -92,10 +93,12 @@ type (
}

archiver struct {
archiverProvider provider.ArchiverProvider
metricsHandler metrics.Handler
logger log.Logger
rateLimiter quotas.RateLimiter
archiverProvider provider.ArchiverProvider
metricsHandler metrics.Handler
logger log.Logger
rateLimiter quotas.RateLimiter
searchAttributeProvider searchattribute.Provider
visibilityManager manager.VisibilityManager
}
)

Expand All @@ -110,12 +113,16 @@ func NewArchiver(
logger log.Logger,
metricsHandler metrics.Handler,
rateLimiter quotas.RateLimiter,
searchAttributeProvider searchattribute.Provider,
visibilityManger manager.VisibilityManager,
) Archiver {
return &archiver{
archiverProvider: archiverProvider,
metricsHandler: metricsHandler.WithTags(metrics.OperationTag(metrics.ArchiverClientScope)),
logger: logger,
rateLimiter: rateLimiter,
archiverProvider: archiverProvider,
metricsHandler: metricsHandler.WithTags(metrics.OperationTag(metrics.ArchiverClientScope)),
logger: logger,
rateLimiter: rateLimiter,
searchAttributeProvider: searchAttributeProvider,
visibilityManager: visibilityManger,
}
}

Expand All @@ -129,48 +136,64 @@ func (a *archiver) Archive(ctx context.Context, request *Request) (res *Response
tag.ArchivalRequestWorkflowID(request.WorkflowID),
tag.ArchivalRequestRunID(request.RunID),
)

defer func(start time.Time) {
metricsScope := a.metricsHandler

status := "ok"
if err != nil {
status = "err"

var rateLimitExceededErr *serviceerror.ResourceExhausted

if errors.As(err, &rateLimitExceededErr) {
status = "rate_limit_exceeded"
}

logger.Warn("failed to archive workflow", tag.Error(err))
}

metricsScope.Timer(metrics.ArchiverArchiveLatency.GetMetricName()).
Record(time.Since(start), metrics.StringTag("status", status))
}(time.Now())

numTargets := len(request.Targets)
if err := a.rateLimiter.WaitN(ctx, numTargets); err != nil {
return nil, &serviceerror.ResourceExhausted{
Cause: enumspb.RESOURCE_EXHAUSTED_CAUSE_RPS_LIMIT,
Message: fmt.Sprintf("archival rate limited: %s", err.Error()),
}
}

var wg sync.WaitGroup

errs := make([]error, numTargets)

for i, target := range request.Targets {
wg.Add(1)

i := i

switch target {
case TargetHistory:
go func() {
defer wg.Done()

errs[i] = a.archiveHistory(ctx, request, logger)
}()
case TargetVisibility:
go func() {
defer wg.Done()

errs[i] = a.archiveVisibility(ctx, request, logger)
}()
default:
return nil, fmt.Errorf("unknown archival target: %s", target)
}
}

wg.Wait()

return &Response{}, multierr.Combine(errs...)
}

Expand Down Expand Up @@ -212,11 +235,18 @@ func (a *archiver) archiveVisibility(ctx context.Context, request *Request, logg
return err
}

// It is safe to pass nil to typeMap here because search attributes type must be embedded by caller.
searchAttributes, err := searchattribute.Stringify(request.SearchAttributes, nil)
// The types of the search attributes may not be embedded in the request,
// so we fetch them from the search attributes provider here.
saTypeMap, err := a.searchAttributeProvider.GetSearchAttributes(a.visibilityManager.GetIndexName(), false)
if err != nil {
return err
}

searchAttributes, err := searchattribute.Stringify(request.SearchAttributes, &saTypeMap)
if err != nil {
return err
}

return visibilityArchiver.Archive(ctx, request.VisibilityURI, &archiverspb.VisibilityRecord{
NamespaceId: request.NamespaceID,
Namespace: request.Namespace,
Expand All @@ -238,11 +268,14 @@ func (a *archiver) archiveVisibility(ctx context.Context, request *Request, logg
// statement (this would make the err always nil).
func (a *archiver) recordArchiveTargetResult(logger log.Logger, startTime time.Time, target Target, err *error) {
duration := time.Since(startTime)

status := "ok"
if *err != nil {
status = "err"

logger.Error("failed to archive target", tag.NewStringTag("target", string(target)), tag.Error(*err))
}

tags := []metrics.Tag{
metrics.StringTag("target", string(target)),
metrics.StringTag("status", status),
Expand Down
Loading

0 comments on commit 9f66510

Please sign in to comment.