Skip to content

Commit

Permalink
pr comments
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed May 9, 2023
1 parent c789055 commit 153d613
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 3 deletions.
23 changes: 22 additions & 1 deletion service/history/archival/archiver.go
Expand Up @@ -136,48 +136,64 @@ func (a *archiver) Archive(ctx context.Context, request *Request) (res *Response
tag.ArchivalRequestWorkflowID(request.WorkflowID),
tag.ArchivalRequestRunID(request.RunID),
)

defer func(start time.Time) {
metricsScope := a.metricsHandler

status := "ok"
if err != nil {
status = "err"

var rateLimitExceededErr *serviceerror.ResourceExhausted

if errors.As(err, &rateLimitExceededErr) {
status = "rate_limit_exceeded"
}

logger.Warn("failed to archive workflow", tag.Error(err))
}

metricsScope.Timer(metrics.ArchiverArchiveLatency.GetMetricName()).
Record(time.Since(start), metrics.StringTag("status", status))
}(time.Now())

numTargets := len(request.Targets)
if err := a.rateLimiter.WaitN(ctx, numTargets); err != nil {
return nil, &serviceerror.ResourceExhausted{
Cause: enumspb.RESOURCE_EXHAUSTED_CAUSE_RPS_LIMIT,
Message: fmt.Sprintf("archival rate limited: %s", err.Error()),
}
}

var wg sync.WaitGroup

errs := make([]error, numTargets)

for i, target := range request.Targets {
wg.Add(1)

i := i

switch target {
case TargetHistory:
go func() {
defer wg.Done()

errs[i] = a.archiveHistory(ctx, request, logger)
}()
case TargetVisibility:
go func() {
defer wg.Done()

errs[i] = a.archiveVisibility(ctx, request, logger)
}()
default:
return nil, fmt.Errorf("unknown archival target: %s", target)
}
}

wg.Wait()

return &Response{}, multierr.Combine(errs...)
}

Expand Down Expand Up @@ -219,16 +235,18 @@ func (a *archiver) archiveVisibility(ctx context.Context, request *Request, logg
return err
}

// The types of the search attributes may not be embedded in the request,
// so we fetch them from the search attributes provider here.
saTypeMap, err := a.searchAttributeProvider.GetSearchAttributes(a.visibilityManager.GetIndexName(), false)
if err != nil {
return err
}

// It is safe to pass nil to typeMap here because search attributes type must be embedded by caller.
searchAttributes, err := searchattribute.Stringify(request.SearchAttributes, &saTypeMap)
if err != nil {
return err
}

return visibilityArchiver.Archive(ctx, request.VisibilityURI, &archiverspb.VisibilityRecord{
NamespaceId: request.NamespaceID,
Namespace: request.Namespace,
Expand All @@ -250,11 +268,14 @@ func (a *archiver) archiveVisibility(ctx context.Context, request *Request, logg
// statement (this would make the err always nil).
func (a *archiver) recordArchiveTargetResult(logger log.Logger, startTime time.Time, target Target, err *error) {
duration := time.Since(startTime)

status := "ok"
if *err != nil {
status = "err"

logger.Error("failed to archive target", tag.NewStringTag("target", string(target)), tag.Error(*err))
}

tags := []metrics.Tag{
metrics.StringTag("target", string(target)),
metrics.StringTag("status", status),
Expand Down
14 changes: 12 additions & 2 deletions service/history/archival/archiver_test.go
Expand Up @@ -32,6 +32,9 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/fx"
"go.uber.org/multierr"

"go.temporal.io/api/common/v1"
carchiver "go.temporal.io/server/common/archiver"
"go.temporal.io/server/common/archiver/provider"
Expand All @@ -44,8 +47,6 @@ import (
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/common/testing/mocksdk"
"go.temporal.io/server/service/history/configs"
"go.uber.org/fx"
"go.uber.org/multierr"
)

func TestArchiver(t *testing.T) {
Expand Down Expand Up @@ -182,24 +183,30 @@ func TestArchiver(t *testing.T) {

historyURI, err := carchiver.NewURI("test:///history/archival")
require.NoError(t, err)

if c.ExpectArchiveHistory {
archiverProvider.EXPECT().GetHistoryArchiver(gomock.Any(), gomock.Any()).Return(historyArchiver, nil)
historyArchiver.EXPECT().Archive(gomock.Any(), historyURI, gomock.Any()).Return(c.ArchiveHistoryErr)
}

visibilityURI, err := carchiver.NewURI("test:///visibility/archival")
require.NoError(t, err)
archiverProvider.EXPECT().GetVisibilityArchiver(gomock.Any(), gomock.Any()).
Return(visibilityArchiver, nil).AnyTimes()

if c.ExpectArchiveVisibility {
visibilityArchiver.EXPECT().Archive(gomock.Any(), visibilityURI, gomock.Any()).
Return(c.ArchiveVisibilityErr)
}

rateLimiter := quotas.NewMockRateLimiter(controller)
rateLimiter.EXPECT().WaitN(gomock.Any(), len(c.Targets)).Return(c.RateLimiterWaitErr)

searchAttributeProvider := searchattribute.NewMockProvider(controller)
searchAttributeProvider.EXPECT().GetSearchAttributes(gomock.Any(), gomock.Any()).Return(
c.NameTypeMap, c.NameTypeMapErr,
).AnyTimes()

visibilityManager := manager.NewMockVisibilityManager(controller)
visibilityManager.EXPECT().GetIndexName().Return("index-name").AnyTimes()

Expand Down Expand Up @@ -233,9 +240,11 @@ func TestArchiver(t *testing.T) {
require.NoError(t, app.Err())
// we need to start the app for fx.Invoke to be called, so that we can get the Archiver
require.NoError(t, app.Start(ctx))

defer func() {
require.NoError(t, app.Stop(ctx))
}()

archiver := <-archivers
searchAttributes := c.SearchAttributes
_, err = archiver.Archive(ctx, &Request{
Expand All @@ -248,6 +257,7 @@ func TestArchiver(t *testing.T) {
if len(c.ExpectedReturnErrors) > 0 {
require.Error(t, err)
assert.Len(t, multierr.Errors(err), len(c.ExpectedReturnErrors))

for _, e := range c.ExpectedReturnErrors {
assert.Contains(t, err.Error(), e)
}
Expand Down

0 comments on commit 153d613

Please sign in to comment.