diff --git a/common/archiver/s3store/query_parser.go b/common/archiver/s3store/query_parser.go index df9b1e63b09..f3f6f21fba0 100644 --- a/common/archiver/s3store/query_parser.go +++ b/common/archiver/s3store/query_parser.go @@ -49,6 +49,7 @@ type ( parsedQuery struct { workflowTypeName *string workflowID *string + runID *string startTime *time.Time closeTime *time.Time searchPrecision *string @@ -59,6 +60,7 @@ type ( const ( WorkflowTypeName = "WorkflowTypeName" WorkflowID = "WorkflowId" + RunID = "RunId" StartTime = "StartTime" CloseTime = "CloseTime" SearchPrecision = "SearchPrecision" @@ -91,8 +93,8 @@ func (p *queryParser) Parse(query string) (*parsedQuery, error) { if err := p.convertWhereExpr(whereExpr, parsedQuery); err != nil { return nil, err } - if parsedQuery.workflowID == nil && parsedQuery.workflowTypeName == nil { - return nil, errors.New("WorkflowId or WorkflowTypeName is required in query") + if parsedQuery.workflowID == nil && parsedQuery.workflowTypeName == nil && parsedQuery.runID == nil { + return nil, errors.New("WorkflowId or WorkflowTypeName or RunId is required in query") } if parsedQuery.workflowID != nil && parsedQuery.workflowTypeName != nil { return nil, errors.New("only one of WorkflowId or WorkflowTypeName can be specified in a query") @@ -176,6 +178,18 @@ func (p *queryParser) convertComparisonExpr(compExpr *sqlparser.ComparisonExpr, return fmt.Errorf("can not query %s multiple times", WorkflowID) } parsedQuery.workflowID = convert.StringPtr(val) + case RunID: + val, err := extractStringValue(valStr) + if err != nil { + return err + } + if op != "=" { + return fmt.Errorf("only operation = is support for %s", RunID) + } + if parsedQuery.runID != nil { + return fmt.Errorf("can not query %s multiple times", RunID) + } + parsedQuery.runID = convert.StringPtr(val) case CloseTime: timestamp, err := convertToTime(valStr) if err != nil { diff --git a/common/archiver/s3store/query_parser_test.go b/common/archiver/s3store/query_parser_test.go index 5222107c735..919611c8ea2 100644 --- a/common/archiver/s3store/query_parser_test.go +++ b/common/archiver/s3store/query_parser_test.go @@ -81,7 +81,10 @@ func (s *queryParserSuite) TestParseWorkflowIDAndWorkflowTypeName() { }, { query: "RunId = \"random runID\"", - expectErr: true, + expectErr: false, + parsedQuery: &parsedQuery{ + runID: convert.StringPtr("random runID"), + }, }, { query: "WorkflowId = 'random workflowID'", @@ -128,6 +131,7 @@ func (s *queryParserSuite) TestParseWorkflowIDAndWorkflowTypeName() { s.NoError(err) s.Equal(tc.parsedQuery.workflowID, parsedQuery.workflowID) s.Equal(tc.parsedQuery.workflowTypeName, parsedQuery.workflowTypeName) + s.Equal(tc.parsedQuery.runID, parsedQuery.runID) } } diff --git a/common/archiver/s3store/visibility_archiver.go b/common/archiver/s3store/visibility_archiver.go index f028852c781..9e62f50f37e 100644 --- a/common/archiver/s3store/visibility_archiver.go +++ b/common/archiver/s3store/visibility_archiver.go @@ -74,6 +74,7 @@ const ( secondaryIndexKeyCloseTimeout = "closeTimeout" primaryIndexKeyWorkflowTypeName = "workflowTypeName" primaryIndexKeyWorkflowID = "workflowID" + primaryIndexKeyRunID = "runID" ) // NewVisibilityArchiver creates a new archiver.VisibilityArchiver based on s3 @@ -165,6 +166,8 @@ func createIndexesToArchive(request *archiverspb.VisibilityRecord) []indexToArch {primaryIndexKeyWorkflowTypeName, request.WorkflowTypeName, secondaryIndexKeyStartTimeout, timestamp.TimeValue(request.StartTime)}, {primaryIndexKeyWorkflowID, request.GetWorkflowId(), secondaryIndexKeyCloseTimeout, timestamp.TimeValue(request.CloseTime)}, {primaryIndexKeyWorkflowID, request.GetWorkflowId(), secondaryIndexKeyStartTimeout, timestamp.TimeValue(request.StartTime)}, + {primaryIndexKeyRunID, request.GetRunId(), secondaryIndexKeyCloseTimeout, timestamp.TimeValue(request.CloseTime)}, + {primaryIndexKeyRunID, request.GetRunId(), secondaryIndexKeyStartTimeout, timestamp.TimeValue(request.StartTime)}, } } @@ -278,6 +281,10 @@ func (v *visibilityArchiver) query( primaryIndex = primaryIndexKeyWorkflowID primaryIndexValue = request.parsedQuery.workflowID } + if request.parsedQuery.runID != nil { + primaryIndex = primaryIndexKeyRunID + primaryIndexValue = request.parsedQuery.runID + } prefix := constructIndexedVisibilitySearchPrefix( URI.Path(), diff --git a/service/frontend/errors.go b/service/frontend/errors.go index 9f6dbe53003..ae5e10d41ab 100644 --- a/service/frontend/errors.go +++ b/service/frontend/errors.go @@ -112,6 +112,7 @@ var ( errUnableToGetNamespaceInfoMessage = "Unable to get namespace info with error: %v" errUnableToCreateFrontendClientMessage = "Unable to create frontend client with error: %v." errTooManySearchAttributesMessage = "Unable to create search attributes: cannot have more than %d search attribute of type %s." + errUnableToListArchivedWorkflowExecutionMessage = "Unable To list archived workflow execution: WorkflowID %s RunID %s" errListNotAllowed = serviceerror.NewPermissionDenied("List is disabled on this namespace.", "") errSchedulesNotAllowed = serviceerror.NewPermissionDenied("Schedules are disabled on this namespace.", "") diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index 729f55c2d6c..560bb0929a2 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -491,6 +491,12 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistoryReverse(ctx context.Contex request.MaximumPageSize = common.GetHistoryMaxPageSize } + enableArchivalRead := wh.archivalMetadata.GetHistoryConfig().ReadEnabled() + workflowExecutionArchived := wh.workflowExecutionArchived(ctx, request.Execution, namespaceID) + if enableArchivalRead && workflowExecutionArchived { + return wh.getArchivedHistoryReverse(ctx, request, namespaceID) + } + if dynamicconfig.AccessHistory(wh.config.AccessHistoryFraction, wh.metricsScope(ctx).WithTags(metrics.OperationTag(metrics.FrontendGetWorkflowExecutionHistoryReverseTag))) { response, err := wh.historyClient.GetWorkflowExecutionHistoryReverse(ctx, &historyservice.GetWorkflowExecutionHistoryReverseRequest{ @@ -2356,6 +2362,12 @@ func (wh *WorkflowHandler) DescribeWorkflowExecution(ctx context.Context, reques return nil, err } + enableArchivalRead := wh.archivalMetadata.GetVisibilityConfig().ReadEnabled() + workflowArchived := wh.workflowExecutionArchived(ctx, request.Execution, namespaceID) + if enableArchivalRead && workflowArchived { + return wh.describeArchivedWorkflowExecution(ctx, request) + } + response, err := wh.historyClient.DescribeWorkflowExecution(ctx, &historyservice.DescribeWorkflowExecutionRequest{ NamespaceId: namespaceID.String(), Request: request, @@ -3830,12 +3842,16 @@ func (wh *WorkflowHandler) validateBuildIdCompatibilityUpdate( } func (wh *WorkflowHandler) historyArchived(ctx context.Context, request *workflowservice.GetWorkflowExecutionHistoryRequest, namespaceID namespace.ID) bool { - if request.GetExecution() == nil || request.GetExecution().GetRunId() == "" { + return wh.workflowExecutionArchived(ctx, request.Execution, namespaceID) +} + +func (wh *WorkflowHandler) workflowExecutionArchived(ctx context.Context, execution *commonpb.WorkflowExecution, namespaceID namespace.ID) bool { + if execution == nil || execution.GetRunId() == "" { return false } getMutableStateRequest := &historyservice.GetMutableStateRequest{ NamespaceId: namespaceID.String(), - Execution: request.Execution, + Execution: execution, } _, err := wh.historyClient.GetMutableState(ctx, getMutableStateRequest) if err == nil { @@ -3843,13 +3859,49 @@ func (wh *WorkflowHandler) historyArchived(ctx context.Context, request *workflo } switch err.(type) { case *serviceerror.NotFound: - // the only case in which history is assumed to be archived is if getting mutable state returns entity not found error + // the only case in which workflow or workflow's event history is assumed to be archived is if getting mutable state returns entity not found error return true } return false } +func (wh *WorkflowHandler) describeArchivedWorkflowExecution(ctx context.Context, request *workflowservice.DescribeWorkflowExecutionRequest) (*workflowservice.DescribeWorkflowExecutionResponse, error) { + r := &workflowservice.ListArchivedWorkflowExecutionsRequest{ + Namespace: request.Namespace, + PageSize: 1, + Query: fmt.Sprintf( + "WorkflowId = '%s' and RunId = '%s'", + request.Execution.GetWorkflowId(), + request.Execution.GetRunId(), + ), + } + archivedWorkflowExecutionsResponse, err := wh.ListArchivedWorkflowExecutions(ctx, r) + if err != nil { + return nil, serviceerror.NewNotFound(err.Error()) + } + if len(archivedWorkflowExecutionsResponse.Executions) == 0 { + return nil, serviceerror.NewNotFound(fmt.Sprintf(errUnableToListArchivedWorkflowExecutionMessage, request.Execution.GetWorkflowId(), request.Execution.GetRunId())) + } + //get execution info + executionInfo := archivedWorkflowExecutionsResponse.Executions[0] + if executionInfo.TaskQueue == "" { + //todo: support display task queue + executionInfo.TaskQueue = "Null" + } + result := &workflowservice.DescribeWorkflowExecutionResponse{ + ExecutionConfig: &workflowpb.WorkflowExecutionConfig{ + TaskQueue: &taskqueuepb.TaskQueue{ + Name: executionInfo.TaskQueue, + Kind: enumspb.TASK_QUEUE_KIND_NORMAL, + }, + }, + WorkflowExecutionInfo: executionInfo, + } + + return result, nil +} + func (wh *WorkflowHandler) getArchivedHistory( ctx context.Context, request *workflowservice.GetWorkflowExecutionHistoryRequest, @@ -3900,6 +3952,59 @@ func (wh *WorkflowHandler) getArchivedHistory( }, nil } +func (wh *WorkflowHandler) getArchivedHistoryReverse( + ctx context.Context, + request *workflowservice.GetWorkflowExecutionHistoryReverseRequest, + namespaceID namespace.ID, +) (*workflowservice.GetWorkflowExecutionHistoryReverseResponse, error) { + entry, err := wh.namespaceRegistry.GetNamespaceByID(namespaceID) + if err != nil { + return nil, err + } + + URIString := entry.HistoryArchivalState().URI + if URIString == "" { + // if URI is empty, it means the namespace has never enabled for archival. + // the error is not "workflow has passed retention period", because + // we have no way to tell if the requested workflow exists or not. + return nil, errHistoryNotFound + } + + URI, err := archiver.NewURI(URIString) + if err != nil { + return nil, err + } + + historyArchiver, err := wh.archiverProvider.GetHistoryArchiver(URI.Scheme(), string(primitives.FrontendService)) + if err != nil { + return nil, err + } + + resp, err := historyArchiver.Get(ctx, URI, &archiver.GetHistoryRequest{ + NamespaceID: namespaceID.String(), + WorkflowID: request.GetExecution().GetWorkflowId(), + RunID: request.GetExecution().GetRunId(), + NextPageToken: request.GetNextPageToken(), + PageSize: int(request.GetMaximumPageSize()), + }) + if err != nil { + return nil, err + } + + history := &historypb.History{} + for _, batch := range resp.HistoryBatches { + history.Events = append(history.Events, batch.Events...) + } + // reverse the events + for i, j := 0, len(history.Events)-1; i < j; i, j = i+1, j-1 { + history.Events[i], history.Events[j] = history.Events[j], history.Events[i] + } + return &workflowservice.GetWorkflowExecutionHistoryReverseResponse{ + History: history, + NextPageToken: resp.NextPageToken, + }, nil +} + // cancelOutstandingPoll cancel outstanding poll if context was canceled and returns true. Otherwise returns false. func (wh *WorkflowHandler) cancelOutstandingPoll(ctx context.Context, namespaceID namespace.ID, taskQueueType enumspb.TaskQueueType, taskQueue *taskqueuepb.TaskQueue, pollerID string) bool { diff --git a/service/frontend/workflow_handler_test.go b/service/frontend/workflow_handler_test.go index 5e7b088b9ba..8358b70874a 100644 --- a/service/frontend/workflow_handler_test.go +++ b/service/frontend/workflow_handler_test.go @@ -81,6 +81,7 @@ const ( testWorkflowID = "test-workflow-id" testRunID = "test-run-id" + testWorkflowTypeName = "test-workflow-type" testHistoryArchivalURI = "testScheme://history/URI" testVisibilityArchivalURI = "testScheme://visibility/URI" ) @@ -1484,6 +1485,52 @@ func (s *workflowHandlerSuite) TestDescribeWorkflowExecution_CompletedStatus() { s.NoError(err) } +func (s *workflowHandlerSuite) TestDescribeWorkflowExecution_ArchivedStatus() { + s.mockNamespaceCache.EXPECT().GetNamespace(gomock.Any()).Return(namespace.NewLocalNamespaceForTest( + &persistencespb.NamespaceInfo{Name: "test-namespace"}, + &persistencespb.NamespaceConfig{ + VisibilityArchivalState: enumspb.ARCHIVAL_STATE_ENABLED, + VisibilityArchivalUri: testVisibilityArchivalURI, + }, + "", + ), nil).AnyTimes() + s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return( + s.testNamespaceID, + nil, + ).AnyTimes() + now := timestamp.TimePtr(time.Now()) + mockWorkflowExecution := &commonpb.WorkflowExecution{ + WorkflowId: uuid.New(), + RunId: uuid.New(), + } + mockWorkflowExecutionInfo := &workflowpb.WorkflowExecutionInfo{ + Execution: mockWorkflowExecution, + Type: &commonpb.WorkflowType{Name: testWorkflowTypeName}, + StartTime: now, + CloseTime: now, + Status: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, + ExecutionTime: now, + Memo: nil, + SearchAttributes: nil, + } + s.mockArchivalMetadata.EXPECT().GetVisibilityConfig().Return(archiver.NewArchivalConfig("enabled", dc.GetStringPropertyFn("enabled"), dc.GetBoolPropertyFn(true), "disabled", "random URI")).AnyTimes() + s.mockVisibilityArchiver.EXPECT().Query(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&archiver.QueryVisibilityResponse{Executions: []*workflowpb.WorkflowExecutionInfo{mockWorkflowExecutionInfo}}, nil) + s.mockArchiverProvider.EXPECT().GetVisibilityArchiver(gomock.Any(), gomock.Any()).Return(s.mockVisibilityArchiver, nil) + s.mockSearchAttributesProvider.EXPECT().GetSearchAttributes("", false) + + s.mockHistoryClient.EXPECT().GetMutableState(gomock.Any(), gomock.Any()).Return(nil, &serviceerror.NotFound{}) + wh := s.getWorkflowHandler(s.newConfig()) + + request := &workflowservice.DescribeWorkflowExecutionRequest{ + Namespace: s.testNamespace.String(), + Execution: mockWorkflowExecution, + } + resp, err := wh.DescribeWorkflowExecution(context.Background(), request) + s.NoError(err) + s.NotNil(resp) + s.Equal(resp.WorkflowExecutionInfo, mockWorkflowExecutionInfo) +} + func (s *workflowHandlerSuite) TestListWorkflowExecutions() { config := s.newConfig() wh := s.getWorkflowHandler(config)