From 153d613153e1eb3ee2cda7d57e174136b3d0c215 Mon Sep 17 00:00:00 2001 From: Michael Snowden Date: Tue, 9 May 2023 16:04:49 -0700 Subject: [PATCH] pr comments --- service/history/archival/archiver.go | 23 ++++++++++++++++++++++- service/history/archival/archiver_test.go | 14 ++++++++++++-- 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/service/history/archival/archiver.go b/service/history/archival/archiver.go index 7026fe8e5b2..06ae7129fd9 100644 --- a/service/history/archival/archiver.go +++ b/service/history/archival/archiver.go @@ -136,20 +136,27 @@ func (a *archiver) Archive(ctx context.Context, request *Request) (res *Response tag.ArchivalRequestWorkflowID(request.WorkflowID), tag.ArchivalRequestRunID(request.RunID), ) + defer func(start time.Time) { metricsScope := a.metricsHandler + status := "ok" if err != nil { status = "err" + var rateLimitExceededErr *serviceerror.ResourceExhausted + if errors.As(err, &rateLimitExceededErr) { status = "rate_limit_exceeded" } + logger.Warn("failed to archive workflow", tag.Error(err)) } + metricsScope.Timer(metrics.ArchiverArchiveLatency.GetMetricName()). Record(time.Since(start), metrics.StringTag("status", status)) }(time.Now()) + numTargets := len(request.Targets) if err := a.rateLimiter.WaitN(ctx, numTargets); err != nil { return nil, &serviceerror.ResourceExhausted{ @@ -157,27 +164,36 @@ func (a *archiver) Archive(ctx context.Context, request *Request) (res *Response Message: fmt.Sprintf("archival rate limited: %s", err.Error()), } } + var wg sync.WaitGroup + errs := make([]error, numTargets) + for i, target := range request.Targets { wg.Add(1) + i := i + switch target { case TargetHistory: go func() { defer wg.Done() + errs[i] = a.archiveHistory(ctx, request, logger) }() case TargetVisibility: go func() { defer wg.Done() + errs[i] = a.archiveVisibility(ctx, request, logger) }() default: return nil, fmt.Errorf("unknown archival target: %s", target) } } + wg.Wait() + return &Response{}, multierr.Combine(errs...) } @@ -219,16 +235,18 @@ func (a *archiver) archiveVisibility(ctx context.Context, request *Request, logg return err } + // The types of the search attributes may not be embedded in the request, + // so we fetch them from the search attributes provider here. saTypeMap, err := a.searchAttributeProvider.GetSearchAttributes(a.visibilityManager.GetIndexName(), false) if err != nil { return err } - // It is safe to pass nil to typeMap here because search attributes type must be embedded by caller. searchAttributes, err := searchattribute.Stringify(request.SearchAttributes, &saTypeMap) if err != nil { return err } + return visibilityArchiver.Archive(ctx, request.VisibilityURI, &archiverspb.VisibilityRecord{ NamespaceId: request.NamespaceID, Namespace: request.Namespace, @@ -250,11 +268,14 @@ func (a *archiver) archiveVisibility(ctx context.Context, request *Request, logg // statement (this would make the err always nil). func (a *archiver) recordArchiveTargetResult(logger log.Logger, startTime time.Time, target Target, err *error) { duration := time.Since(startTime) + status := "ok" if *err != nil { status = "err" + logger.Error("failed to archive target", tag.NewStringTag("target", string(target)), tag.Error(*err)) } + tags := []metrics.Tag{ metrics.StringTag("target", string(target)), metrics.StringTag("status", status), diff --git a/service/history/archival/archiver_test.go b/service/history/archival/archiver_test.go index ea19984d0fb..ecee80abd81 100644 --- a/service/history/archival/archiver_test.go +++ b/service/history/archival/archiver_test.go @@ -32,6 +32,9 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/fx" + "go.uber.org/multierr" + "go.temporal.io/api/common/v1" carchiver "go.temporal.io/server/common/archiver" "go.temporal.io/server/common/archiver/provider" @@ -44,8 +47,6 @@ import ( "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/common/testing/mocksdk" "go.temporal.io/server/service/history/configs" - "go.uber.org/fx" - "go.uber.org/multierr" ) func TestArchiver(t *testing.T) { @@ -182,24 +183,30 @@ func TestArchiver(t *testing.T) { historyURI, err := carchiver.NewURI("test:///history/archival") require.NoError(t, err) + if c.ExpectArchiveHistory { archiverProvider.EXPECT().GetHistoryArchiver(gomock.Any(), gomock.Any()).Return(historyArchiver, nil) historyArchiver.EXPECT().Archive(gomock.Any(), historyURI, gomock.Any()).Return(c.ArchiveHistoryErr) } + visibilityURI, err := carchiver.NewURI("test:///visibility/archival") require.NoError(t, err) archiverProvider.EXPECT().GetVisibilityArchiver(gomock.Any(), gomock.Any()). Return(visibilityArchiver, nil).AnyTimes() + if c.ExpectArchiveVisibility { visibilityArchiver.EXPECT().Archive(gomock.Any(), visibilityURI, gomock.Any()). Return(c.ArchiveVisibilityErr) } + rateLimiter := quotas.NewMockRateLimiter(controller) rateLimiter.EXPECT().WaitN(gomock.Any(), len(c.Targets)).Return(c.RateLimiterWaitErr) + searchAttributeProvider := searchattribute.NewMockProvider(controller) searchAttributeProvider.EXPECT().GetSearchAttributes(gomock.Any(), gomock.Any()).Return( c.NameTypeMap, c.NameTypeMapErr, ).AnyTimes() + visibilityManager := manager.NewMockVisibilityManager(controller) visibilityManager.EXPECT().GetIndexName().Return("index-name").AnyTimes() @@ -233,9 +240,11 @@ func TestArchiver(t *testing.T) { require.NoError(t, app.Err()) // we need to start the app for fx.Invoke to be called, so that we can get the Archiver require.NoError(t, app.Start(ctx)) + defer func() { require.NoError(t, app.Stop(ctx)) }() + archiver := <-archivers searchAttributes := c.SearchAttributes _, err = archiver.Archive(ctx, &Request{ @@ -248,6 +257,7 @@ func TestArchiver(t *testing.T) { if len(c.ExpectedReturnErrors) > 0 { require.Error(t, err) assert.Len(t, multierr.Errors(err), len(c.ExpectedReturnErrors)) + for _, e := range c.ExpectedReturnErrors { assert.Contains(t, err.Error(), e) }