Skip to content

Commit

Permalink
Add close failover version to history blobs and support indexing of t…
Browse files Browse the repository at this point in the history
…hese versions (#1919)
  • Loading branch information
andrewjdawson2016 committed May 30, 2019
1 parent 251a5cc commit 29b0fb1
Show file tree
Hide file tree
Showing 13 changed files with 725 additions and 297 deletions.
2 changes: 1 addition & 1 deletion common/blobstore/blob/key.go
Expand Up @@ -40,7 +40,7 @@ const (

var (
// allowedRegex indicates the allowed format of both key name pieces and extension
allowedRegex = regexp.MustCompile(`^[a-zA-Z0-9]+$`)
allowedRegex = regexp.MustCompile(`^[a-zA-Z0-9-]+$`)
)

type (
Expand Down
13 changes: 13 additions & 0 deletions common/blobstore/blob/key_test.go
Expand Up @@ -87,6 +87,12 @@ func (s *KeySuite) TestNewKey() {
expectError: false,
expectBuiltKey: "valid_set_of_pieces.ext",
},
{
extension: "ext",
pieces: []string{"valid", "-10"},
expectError: false,
expectBuiltKey: "valid_-10.ext",
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -149,6 +155,13 @@ func (s *KeySuite) TestNewKeyFromString() {
expectExtension: "ext",
expectNamePieces: []string{"foo1", "bar2", "3baz"},
},
{
inputStr: "foo1_bar2_-10.ext",
expectError: false,
expectBuiltKey: "foo1_bar2_-10.ext",
expectExtension: "ext",
expectNamePieces: []string{"foo1", "bar2", "-10"},
},
}

for _, tc := range testCases {
Expand Down
5 changes: 5 additions & 0 deletions common/log/tag/tags.go
Expand Up @@ -642,6 +642,11 @@ func ArchivalRequestDomainID(requestDomainID string) Tag {
return newStringTag("archival-request-domain-id", requestDomainID)
}

// ArchivalRequestDomainName returns tag for RequestDomainName
func ArchivalRequestDomainName(requestDomainName string) Tag {
return newStringTag("archival-request-domain-name", requestDomainName)
}

// ArchivalRequestWorkflowID returns tag for RequestWorkflowID
func ArchivalRequestWorkflowID(requestWorkflowID string) Tag {
return newStringTag("archival-request-workflow-id", requestWorkflowID)
Expand Down
25 changes: 18 additions & 7 deletions service/frontend/workflowHandler.go
Expand Up @@ -96,7 +96,8 @@ type (
}

getHistoryContinuationTokenArchival struct {
BlobstorePageToken int
BlobstorePageToken int
CloseFailoverVersion int64
}
)

Expand Down Expand Up @@ -3216,7 +3217,6 @@ func (wh *WorkflowHandler) getArchivedHistory(
domainID string,
scope metrics.Scope,
) (*gen.GetWorkflowExecutionHistoryResponse, error) {

entry, err := wh.domainCache.GetDomainByID(domainID)
if err != nil {
return nil, wh.error(err, scope)
Expand All @@ -3232,11 +3232,24 @@ func (wh *WorkflowHandler) getArchivedHistory(
return nil, wh.error(errInvalidNextArchivalPageToken, scope)
}
} else {
indexKey, err := archiver.NewHistoryIndexBlobKey(domainID, request.Execution.GetWorkflowId(), request.Execution.GetRunId())
if err != nil {
return nil, wh.error(err, scope)
}
indexTags, err := wh.blobstoreClient.GetTags(ctx, archivalBucket, indexKey)
if err != nil {
return nil, wh.error(err, scope)
}
highestVersion, err := archiver.GetHighestVersion(indexTags)
if err != nil {
return nil, wh.error(err, scope)
}
token = &getHistoryContinuationTokenArchival{
BlobstorePageToken: common.FirstBlobPageToken,
BlobstorePageToken: common.FirstBlobPageToken,
CloseFailoverVersion: *highestVersion,
}
}
key, err := archiver.NewHistoryBlobKey(domainID, request.Execution.GetWorkflowId(), request.Execution.GetRunId(), token.BlobstorePageToken)
key, err := archiver.NewHistoryBlobKey(domainID, request.Execution.GetWorkflowId(), request.Execution.GetRunId(), token.CloseFailoverVersion, token.BlobstorePageToken)
if err != nil {
return nil, wh.error(err, scope)
}
Expand All @@ -3255,9 +3268,7 @@ func (wh *WorkflowHandler) getArchivedHistory(
return nil, wh.error(err, scope)
}
}
token = &getHistoryContinuationTokenArchival{
BlobstorePageToken: *historyBlob.Header.NextPageToken,
}
token.BlobstorePageToken = *historyBlob.Header.NextPageToken
if *historyBlob.Header.IsLast {
token = nil
}
Expand Down
123 changes: 99 additions & 24 deletions service/frontend/workflowHandler_test.go
Expand Up @@ -54,6 +54,10 @@ import (

const (
numHistoryShards = 10

testArchivalBucket = "test-bucket"
testWorkflowID = "test-workflow-id"
testRunID = "test-run-id"
)

type (
Expand Down Expand Up @@ -465,7 +469,8 @@ func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_InvalidTaskStar
}

func (s *workflowHandlerSuite) getWorkflowHandlerWithParams(mService cs.Service, config *Config,
mMetadataManager persistence.MetadataManager, blobStore blobstore.Client) *WorkflowHandler {
mMetadataManager persistence.MetadataManager, blobStore *mocks.BlobstoreClient) *WorkflowHandler {
s.mockBlobstoreClient = blobStore
return NewWorkflowHandler(mService, config, mMetadataManager, s.mockHistoryMgr, s.mockHistoryV2Mgr,
s.mockVisibilityMgr, s.mockProducer, blobStore)
}
Expand Down Expand Up @@ -909,8 +914,8 @@ func (s *workflowHandlerSuite) TestHistoryArchived() {
}
getHistoryRequest = &shared.GetWorkflowExecutionHistoryRequest{
Execution: &shared.WorkflowExecution{
WorkflowId: common.StringPtr("test-workflow-id"),
RunId: common.StringPtr("test-run-id"),
WorkflowId: common.StringPtr(testWorkflowID),
RunId: common.StringPtr(testRunID),
},
}
s.False(wh.historyArchived(context.Background(), getHistoryRequest, "test-domain"))
Expand All @@ -921,8 +926,8 @@ func (s *workflowHandlerSuite) TestHistoryArchived() {
}
getHistoryRequest = &shared.GetWorkflowExecutionHistoryRequest{
Execution: &shared.WorkflowExecution{
WorkflowId: common.StringPtr("test-workflow-id"),
RunId: common.StringPtr("test-run-id"),
WorkflowId: common.StringPtr(testWorkflowID),
RunId: common.StringPtr(testRunID),
},
}
s.True(wh.historyArchived(context.Background(), getHistoryRequest, "test-domain"))
Expand All @@ -933,8 +938,8 @@ func (s *workflowHandlerSuite) TestHistoryArchived() {
}
getHistoryRequest = &shared.GetWorkflowExecutionHistoryRequest{
Execution: &shared.WorkflowExecution{
WorkflowId: common.StringPtr("test-workflow-id"),
RunId: common.StringPtr("test-run-id"),
WorkflowId: common.StringPtr(testWorkflowID),
RunId: common.StringPtr(testRunID),
},
}
s.False(wh.historyArchived(context.Background(), getHistoryRequest, "test-domain"))
Expand All @@ -947,7 +952,7 @@ func (s *workflowHandlerSuite) TestGetArchivedHistory_Failure_DomainCacheEntryEr
wh := s.getWorkflowHandlerWithParams(s.mockService, config, mMetadataManager, s.mockBlobstoreClient)
wh.metricsClient = wh.Service.GetMetricsClient()
wh.startWG.Done()
resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest(nil), "test-domain-id", metrics.NoopScope(metrics.Frontend))
resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest(nil), s.testDomainID, metrics.NoopScope(metrics.Frontend))
s.Nil(resp)
s.Error(err)
}
Expand All @@ -962,22 +967,22 @@ func (s *workflowHandlerSuite) TestGetArchivedHistory_Failure_ArchivalBucketEmpt
wh := s.getWorkflowHandlerWithParams(mService, config, mMetadataManager, s.mockBlobstoreClient)
wh.metricsClient = wh.Service.GetMetricsClient()
wh.startWG.Done()
resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest(nil), "test-domain-id", metrics.NoopScope(metrics.Frontend))
resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest(nil), s.testDomainID, metrics.NoopScope(metrics.Frontend))
s.Nil(resp)
s.Error(err)
}

func (s *workflowHandlerSuite) TestGetArchivedHistory_Failure_InvalidPageToken() {
config := s.newConfig()
mMetadataManager := &mocks.MetadataManager{}
mMetadataManager.On("GetDomain", mock.Anything).Return(persistenceGetDomainResponse("test-bucket", shared.ArchivalStatusEnabled), nil)
mMetadataManager.On("GetDomain", mock.Anything).Return(persistenceGetDomainResponse(testArchivalBucket, shared.ArchivalStatusEnabled), nil)
clusterMetadata := &mocks.ClusterMetadata{}
clusterMetadata.On("IsGlobalDomainEnabled").Return(false)
mService := cs.NewTestService(clusterMetadata, s.mockMessagingClient, s.mockMetricClient, s.mockClientBean)
wh := s.getWorkflowHandlerWithParams(mService, config, mMetadataManager, s.mockBlobstoreClient)
wh.metricsClient = wh.Service.GetMetricsClient()
wh.startWG.Done()
resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest([]byte{3, 4, 5, 1}), "test-domain-id", metrics.NoopScope(metrics.Frontend))
resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest([]byte{3, 4, 5, 1}), s.testDomainID, metrics.NoopScope(metrics.Frontend))
s.Nil(resp)
s.Error(err)
s.Equal(errInvalidNextArchivalPageToken, err)
Expand All @@ -986,7 +991,7 @@ func (s *workflowHandlerSuite) TestGetArchivedHistory_Failure_InvalidPageToken()
func (s *workflowHandlerSuite) TestGetArchivedHistory_Failure_InvalidBlobKey() {
config := s.newConfig()
mMetadataManager := &mocks.MetadataManager{}
mMetadataManager.On("GetDomain", mock.Anything).Return(persistenceGetDomainResponse("test-bucket", shared.ArchivalStatusEnabled), nil)
mMetadataManager.On("GetDomain", mock.Anything).Return(persistenceGetDomainResponse(testArchivalBucket, shared.ArchivalStatusEnabled), nil)
clusterMetadata := &mocks.ClusterMetadata{}
clusterMetadata.On("IsGlobalDomainEnabled").Return(false)
mService := cs.NewTestService(clusterMetadata, s.mockMessagingClient, s.mockMetricClient, s.mockClientBean)
Expand All @@ -995,32 +1000,50 @@ func (s *workflowHandlerSuite) TestGetArchivedHistory_Failure_InvalidBlobKey() {
wh.startWG.Done()
getHistoryRequest := getHistoryRequest(nil)
getHistoryRequest.Execution.WorkflowId = common.StringPtr("")
resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest, "test-domain-id", metrics.NoopScope(metrics.Frontend))
resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest, s.testDomainID, metrics.NoopScope(metrics.Frontend))
s.Nil(resp)
s.Error(err)
}

func (s *workflowHandlerSuite) TestGetArchivedHistory_Failure_FailedToGetVersions() {
config := s.newConfig()
mMetadataManager := &mocks.MetadataManager{}
mMetadataManager.On("GetDomain", mock.Anything).Return(persistenceGetDomainResponse(testArchivalBucket, shared.ArchivalStatusEnabled), nil)
clusterMetadata := &mocks.ClusterMetadata{}
clusterMetadata.On("IsGlobalDomainEnabled").Return(false)
mService := cs.NewTestService(clusterMetadata, s.mockMessagingClient, s.mockMetricClient, s.mockClientBean)
mBlobstore := &mocks.BlobstoreClient{}
mBlobstore.On("GetTags", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("failed to get tags for index blob"))
wh := s.getWorkflowHandlerWithParams(mService, config, mMetadataManager, mBlobstore)
wh.metricsClient = wh.Service.GetMetricsClient()
wh.startWG.Done()
resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest(nil), s.testDomainID, metrics.NoopScope(metrics.Frontend))
s.Nil(resp)
s.Error(err)
}

func (s *workflowHandlerSuite) TestGetArchivedHistory_Failure_FailedToDownload() {
config := s.newConfig()
mMetadataManager := &mocks.MetadataManager{}
mMetadataManager.On("GetDomain", mock.Anything).Return(persistenceGetDomainResponse("test-bucket", shared.ArchivalStatusEnabled), nil)
mMetadataManager.On("GetDomain", mock.Anything).Return(persistenceGetDomainResponse(testArchivalBucket, shared.ArchivalStatusEnabled), nil)
clusterMetadata := &mocks.ClusterMetadata{}
clusterMetadata.On("IsGlobalDomainEnabled").Return(false)
mService := cs.NewTestService(clusterMetadata, s.mockMessagingClient, s.mockMetricClient, s.mockClientBean)
mBlobstore := &mocks.BlobstoreClient{}
mBlobstore.On("GetTags", mock.Anything, mock.Anything, mock.Anything).Return(map[string]string{"10": ""}, nil)
mBlobstore.On("Download", mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("failed to download blob"))
wh := s.getWorkflowHandlerWithParams(mService, config, mMetadataManager, mBlobstore)
wh.metricsClient = wh.Service.GetMetricsClient()
wh.startWG.Done()
resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest(nil), "test-domain-id", metrics.NoopScope(metrics.Frontend))
resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest(nil), s.testDomainID, metrics.NoopScope(metrics.Frontend))
s.Nil(resp)
s.Error(err)
}

func (s *workflowHandlerSuite) TestGetArchivedHistory_Success_GetFirstPage() {
config := s.newConfig()
mMetadataManager := &mocks.MetadataManager{}
mMetadataManager.On("GetDomain", mock.Anything).Return(persistenceGetDomainResponse("test-bucket", shared.ArchivalStatusEnabled), nil)
mMetadataManager.On("GetDomain", mock.Anything).Return(persistenceGetDomainResponse(testArchivalBucket, shared.ArchivalStatusEnabled), nil)
clusterMetadata := &mocks.ClusterMetadata{}
clusterMetadata.On("IsGlobalDomainEnabled").Return(false)
mService := cs.NewTestService(clusterMetadata, s.mockMessagingClient, s.mockMetricClient, s.mockClientBean)
Expand All @@ -1037,17 +1060,64 @@ func (s *workflowHandlerSuite) TestGetArchivedHistory_Success_GetFirstPage() {
s.NoError(err)
historyBlob, err := blob.Wrap(blob.NewBlob(bytes, map[string]string{}), blob.JSONEncoded())
s.NoError(err)
mBlobstore.On("Download", mock.Anything, mock.Anything, mock.Anything).Return(historyBlob, nil)
mBlobstore.On("GetTags", mock.Anything, mock.Anything, mock.Anything).Return(map[string]string{"10": ""}, nil)
historyKey, _ := archiver.NewHistoryBlobKey(s.testDomainID, testWorkflowID, testRunID, 10, common.FirstBlobPageToken)
mBlobstore.On("Download", mock.Anything, mock.Anything, historyKey).Return(historyBlob, nil)
wh := s.getWorkflowHandlerWithParams(mService, config, mMetadataManager, mBlobstore)
wh.metricsClient = wh.Service.GetMetricsClient()
wh.startWG.Done()
resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest(nil), "test-domain-id", metrics.NoopScope(metrics.Frontend))
resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest(nil), s.testDomainID, metrics.NoopScope(metrics.Frontend))
s.NoError(err)
s.NotNil(resp)
s.NotNil(resp.History)
s.True(resp.GetArchived())
expectedNextPageToken := &getHistoryContinuationTokenArchival{
BlobstorePageToken: 2,
CloseFailoverVersion: 10,
}
expectedSerializedNextPageToken, err := serializeHistoryTokenArchival(expectedNextPageToken)
s.NoError(err)
s.Equal(expectedSerializedNextPageToken, resp.NextPageToken)
}

func (s *workflowHandlerSuite) TestGetArchivedHistory_Success_SecondPageIndexNotUsed() {
config := s.newConfig()
mMetadataManager := &mocks.MetadataManager{}
mMetadataManager.On("GetDomain", mock.Anything).Return(persistenceGetDomainResponse(testArchivalBucket, shared.ArchivalStatusEnabled), nil)
clusterMetadata := &mocks.ClusterMetadata{}
clusterMetadata.On("IsGlobalDomainEnabled").Return(false)
mService := cs.NewTestService(clusterMetadata, s.mockMessagingClient, s.mockMetricClient, s.mockClientBean)
mBlobstore := &mocks.BlobstoreClient{}
unwrappedBlob := &archiver.HistoryBlob{
Header: &archiver.HistoryBlobHeader{
CurrentPageToken: common.IntPtr(common.FirstBlobPageToken + 1),
NextPageToken: common.IntPtr(common.FirstBlobPageToken + 2),
IsLast: common.BoolPtr(false),
},
Body: &shared.History{},
}
bytes, err := json.Marshal(unwrappedBlob)
s.NoError(err)
historyBlob, err := blob.Wrap(blob.NewBlob(bytes, map[string]string{}), blob.JSONEncoded())
s.NoError(err)
historyKey, _ := archiver.NewHistoryBlobKey(s.testDomainID, testWorkflowID, testRunID, 10, common.FirstBlobPageToken+1)
mBlobstore.On("Download", mock.Anything, mock.Anything, historyKey).Return(historyBlob, nil)
wh := s.getWorkflowHandlerWithParams(mService, config, mMetadataManager, mBlobstore)
wh.metricsClient = wh.Service.GetMetricsClient()
wh.startWG.Done()
pageToken, err := serializeHistoryTokenArchival(&getHistoryContinuationTokenArchival{
BlobstorePageToken: common.FirstBlobPageToken + 1,
CloseFailoverVersion: 10,
})
s.NoError(err)
resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest(pageToken), s.testDomainID, metrics.NoopScope(metrics.Frontend))
s.NoError(err)
s.NotNil(resp)
s.NotNil(resp.History)
s.True(resp.GetArchived())
expectedNextPageToken := &getHistoryContinuationTokenArchival{
BlobstorePageToken: 2,
BlobstorePageToken: common.FirstBlobPageToken + 2,
CloseFailoverVersion: 10,
}
expectedSerializedNextPageToken, err := serializeHistoryTokenArchival(expectedNextPageToken)
s.NoError(err)
Expand All @@ -1057,7 +1127,7 @@ func (s *workflowHandlerSuite) TestGetArchivedHistory_Success_GetFirstPage() {
func (s *workflowHandlerSuite) TestGetArchivedHistory_Success_GetLastPage() {
config := s.newConfig()
mMetadataManager := &mocks.MetadataManager{}
mMetadataManager.On("GetDomain", mock.Anything).Return(persistenceGetDomainResponse("test-bucket", shared.ArchivalStatusEnabled), nil)
mMetadataManager.On("GetDomain", mock.Anything).Return(persistenceGetDomainResponse(testArchivalBucket, shared.ArchivalStatusEnabled), nil)
clusterMetadata := &mocks.ClusterMetadata{}
clusterMetadata.On("IsGlobalDomainEnabled").Return(false)
mService := cs.NewTestService(clusterMetadata, s.mockMessagingClient, s.mockMetricClient, s.mockClientBean)
Expand All @@ -1074,11 +1144,16 @@ func (s *workflowHandlerSuite) TestGetArchivedHistory_Success_GetLastPage() {
s.NoError(err)
historyBlob, err := blob.Wrap(blob.NewBlob(bytes, map[string]string{}), blob.JSONEncoded())
s.NoError(err)
mBlobstore.On("Download", mock.Anything, mock.Anything, mock.Anything).Return(historyBlob, nil)
historyKey, _ := archiver.NewHistoryBlobKey(s.testDomainID, testWorkflowID, testRunID, 10, 5)
mBlobstore.On("Download", mock.Anything, mock.Anything, historyKey).Return(historyBlob, nil)
wh := s.getWorkflowHandlerWithParams(mService, config, mMetadataManager, mBlobstore)
wh.metricsClient = wh.Service.GetMetricsClient()
wh.startWG.Done()
resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest(nil), "test-domain-id", metrics.NoopScope(metrics.Frontend))
pageToken, err := serializeHistoryTokenArchival(&getHistoryContinuationTokenArchival{
BlobstorePageToken: 5,
CloseFailoverVersion: 10,
})
resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest(pageToken), s.testDomainID, metrics.NoopScope(metrics.Frontend))
s.NoError(err)
s.NotNil(resp)
s.NotNil(resp.History)
Expand Down Expand Up @@ -1292,8 +1367,8 @@ func registerDomainRequest(archivalStatus *shared.ArchivalStatus, bucketName *st
func getHistoryRequest(nextPageToken []byte) *shared.GetWorkflowExecutionHistoryRequest {
return &shared.GetWorkflowExecutionHistoryRequest{
Execution: &shared.WorkflowExecution{
WorkflowId: common.StringPtr("test-workflow-id"),
RunId: common.StringPtr("test-run-id"),
WorkflowId: common.StringPtr(testWorkflowID),
RunId: common.StringPtr(testRunID),
},
NextPageToken: nextPageToken,
}
Expand Down

0 comments on commit 29b0fb1

Please sign in to comment.