Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for display archived workflow on web ui from s3 archival store #5117

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 16 additions & 2 deletions common/archiver/s3store/query_parser.go
Expand Up @@ -49,6 +49,7 @@ type (
parsedQuery struct {
workflowTypeName *string
workflowID *string
runID *string
startTime *time.Time
closeTime *time.Time
searchPrecision *string
Expand All @@ -59,6 +60,7 @@ type (
const (
WorkflowTypeName = "WorkflowTypeName"
WorkflowID = "WorkflowId"
RunID = "RunId"
StartTime = "StartTime"
CloseTime = "CloseTime"
SearchPrecision = "SearchPrecision"
Expand Down Expand Up @@ -91,8 +93,8 @@ func (p *queryParser) Parse(query string) (*parsedQuery, error) {
if err := p.convertWhereExpr(whereExpr, parsedQuery); err != nil {
return nil, err
}
if parsedQuery.workflowID == nil && parsedQuery.workflowTypeName == nil {
return nil, errors.New("WorkflowId or WorkflowTypeName is required in query")
if parsedQuery.workflowID == nil && parsedQuery.workflowTypeName == nil && parsedQuery.runID == nil {
return nil, errors.New("WorkflowId or WorkflowTypeName or RunId is required in query")
}
if parsedQuery.workflowID != nil && parsedQuery.workflowTypeName != nil {
return nil, errors.New("only one of WorkflowId or WorkflowTypeName can be specified in a query")
Expand Down Expand Up @@ -176,6 +178,18 @@ func (p *queryParser) convertComparisonExpr(compExpr *sqlparser.ComparisonExpr,
return fmt.Errorf("can not query %s multiple times", WorkflowID)
}
parsedQuery.workflowID = convert.StringPtr(val)
case RunID:
val, err := extractStringValue(valStr)
if err != nil {
return err
}
if op != "=" {
return fmt.Errorf("only operation = is support for %s", RunID)
}
if parsedQuery.runID != nil {
return fmt.Errorf("can not query %s multiple times", RunID)
}
parsedQuery.runID = convert.StringPtr(val)
case CloseTime:
timestamp, err := convertToTime(valStr)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion common/archiver/s3store/query_parser_test.go
Expand Up @@ -81,7 +81,10 @@ func (s *queryParserSuite) TestParseWorkflowIDAndWorkflowTypeName() {
},
{
query: "RunId = \"random runID\"",
expectErr: true,
expectErr: false,
parsedQuery: &parsedQuery{
runID: convert.StringPtr("random runID"),
},
},
{
query: "WorkflowId = 'random workflowID'",
Expand Down Expand Up @@ -128,6 +131,7 @@ func (s *queryParserSuite) TestParseWorkflowIDAndWorkflowTypeName() {
s.NoError(err)
s.Equal(tc.parsedQuery.workflowID, parsedQuery.workflowID)
s.Equal(tc.parsedQuery.workflowTypeName, parsedQuery.workflowTypeName)
s.Equal(tc.parsedQuery.runID, parsedQuery.runID)

}
}
Expand Down
7 changes: 7 additions & 0 deletions common/archiver/s3store/visibility_archiver.go
Expand Up @@ -74,6 +74,7 @@ const (
secondaryIndexKeyCloseTimeout = "closeTimeout"
primaryIndexKeyWorkflowTypeName = "workflowTypeName"
primaryIndexKeyWorkflowID = "workflowID"
primaryIndexKeyRunID = "runID"
)

// NewVisibilityArchiver creates a new archiver.VisibilityArchiver based on s3
Expand Down Expand Up @@ -165,6 +166,8 @@ func createIndexesToArchive(request *archiverspb.VisibilityRecord) []indexToArch
{primaryIndexKeyWorkflowTypeName, request.WorkflowTypeName, secondaryIndexKeyStartTimeout, timestamp.TimeValue(request.StartTime)},
{primaryIndexKeyWorkflowID, request.GetWorkflowId(), secondaryIndexKeyCloseTimeout, timestamp.TimeValue(request.CloseTime)},
{primaryIndexKeyWorkflowID, request.GetWorkflowId(), secondaryIndexKeyStartTimeout, timestamp.TimeValue(request.StartTime)},
{primaryIndexKeyRunID, request.GetRunId(), secondaryIndexKeyCloseTimeout, timestamp.TimeValue(request.CloseTime)},
{primaryIndexKeyRunID, request.GetRunId(), secondaryIndexKeyStartTimeout, timestamp.TimeValue(request.StartTime)},
}
}

Expand Down Expand Up @@ -278,6 +281,10 @@ func (v *visibilityArchiver) query(
primaryIndex = primaryIndexKeyWorkflowID
primaryIndexValue = request.parsedQuery.workflowID
}
if request.parsedQuery.runID != nil {
primaryIndex = primaryIndexKeyRunID
primaryIndexValue = request.parsedQuery.runID
}

prefix := constructIndexedVisibilitySearchPrefix(
URI.Path(),
Expand Down
1 change: 1 addition & 0 deletions service/frontend/errors.go
Expand Up @@ -112,6 +112,7 @@ var (
errUnableToGetNamespaceInfoMessage = "Unable to get namespace info with error: %v"
errUnableToCreateFrontendClientMessage = "Unable to create frontend client with error: %v."
errTooManySearchAttributesMessage = "Unable to create search attributes: cannot have more than %d search attribute of type %s."
errUnableToListArchivedWorkflowExecutionMessage = "Unable To list archived workflow execution: WorkflowID %s RunID %s"

errListNotAllowed = serviceerror.NewPermissionDenied("List is disabled on this namespace.", "")
errSchedulesNotAllowed = serviceerror.NewPermissionDenied("Schedules are disabled on this namespace.", "")
Expand Down
111 changes: 108 additions & 3 deletions service/frontend/workflow_handler.go
Expand Up @@ -491,6 +491,12 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistoryReverse(ctx context.Contex
request.MaximumPageSize = common.GetHistoryMaxPageSize
}

enableArchivalRead := wh.archivalMetadata.GetHistoryConfig().ReadEnabled()
workflowExecutionArchived := wh.workflowExecutionArchived(ctx, request.Execution, namespaceID)
if enableArchivalRead && workflowExecutionArchived {
return wh.getArchivedHistoryReverse(ctx, request, namespaceID)
}

if dynamicconfig.AccessHistory(wh.config.AccessHistoryFraction, wh.metricsScope(ctx).WithTags(metrics.OperationTag(metrics.FrontendGetWorkflowExecutionHistoryReverseTag))) {
response, err := wh.historyClient.GetWorkflowExecutionHistoryReverse(ctx,
&historyservice.GetWorkflowExecutionHistoryReverseRequest{
Expand Down Expand Up @@ -2356,6 +2362,12 @@ func (wh *WorkflowHandler) DescribeWorkflowExecution(ctx context.Context, reques
return nil, err
}

enableArchivalRead := wh.archivalMetadata.GetVisibilityConfig().ReadEnabled()
workflowArchived := wh.workflowExecutionArchived(ctx, request.Execution, namespaceID)
if enableArchivalRead && workflowArchived {
return wh.describeArchivedWorkflowExecution(ctx, request)
}

response, err := wh.historyClient.DescribeWorkflowExecution(ctx, &historyservice.DescribeWorkflowExecutionRequest{
NamespaceId: namespaceID.String(),
Request: request,
Expand Down Expand Up @@ -3830,26 +3842,66 @@ func (wh *WorkflowHandler) validateBuildIdCompatibilityUpdate(
}

func (wh *WorkflowHandler) historyArchived(ctx context.Context, request *workflowservice.GetWorkflowExecutionHistoryRequest, namespaceID namespace.ID) bool {
if request.GetExecution() == nil || request.GetExecution().GetRunId() == "" {
return wh.workflowExecutionArchived(ctx, request.Execution, namespaceID)
}

func (wh *WorkflowHandler) workflowExecutionArchived(ctx context.Context, execution *commonpb.WorkflowExecution, namespaceID namespace.ID) bool {
if execution == nil || execution.GetRunId() == "" {
return false
}
getMutableStateRequest := &historyservice.GetMutableStateRequest{
NamespaceId: namespaceID.String(),
Execution: request.Execution,
Execution: execution,
}
_, err := wh.historyClient.GetMutableState(ctx, getMutableStateRequest)
if err == nil {
return false
}
switch err.(type) {
case *serviceerror.NotFound:
// the only case in which history is assumed to be archived is if getting mutable state returns entity not found error
// the only case in which workflow or workflow's event history is assumed to be archived is if getting mutable state returns entity not found error
return true
}

return false
}

func (wh *WorkflowHandler) describeArchivedWorkflowExecution(ctx context.Context, request *workflowservice.DescribeWorkflowExecutionRequest) (*workflowservice.DescribeWorkflowExecutionResponse, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think archived workflow can be described based on visibility records. Fields like pending activities or pending children can't be populated by vis records.

If we really want to do it, the right way should be rebuild workflow mutable state from history events, then describe the rebuilt mutable states so that all fields in the describe response can be populated.

Alternatively, a separate API can be created for describing archived workflow and defined a new response that returns only limited information.

cc @yiminc

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think archived workflow can be described based on visibility records. Fields like pending activities or pending children can't be populated by vis records.

It seems a little weird if a archived workflow still has these pending things. Should we consider a archived wf is totally completed(without any changeable things)?

rebuild workflow mutable state from history events, then describe the rebuilt mutable states so that all fields in the describe response can be populated.

I'm not familiar with the internals of mutable state.. According to the previous experience issue, an archived workflow which has past the retention is considered deleted from mutable states. If we want to describe it based on mutable state, there maybe a lot of change to do.

A separate API is good. We need more discuss about the limited information of an archived wf and how to get it..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

workflow can still have pending things even after closed, for example when workflow is force terminated before activity finishes. Archived or not is a concept of where the data is stored, so they are orthogonal.

there maybe a lot of change to do.
Yes it's a lot of work and change to rehydrate archived workflow back into the system. But, to me, that seems the right thing to do, and not causing more confusion to the caller of DescribeWorkflowExecution.

r := &workflowservice.ListArchivedWorkflowExecutionsRequest{
Namespace: request.Namespace,
PageSize: 1,
Query: fmt.Sprintf(
"WorkflowId = '%s' and RunId = '%s'",
request.Execution.GetWorkflowId(),
request.Execution.GetRunId(),
),
}
archivedWorkflowExecutionsResponse, err := wh.ListArchivedWorkflowExecutions(ctx, r)
if err != nil {
return nil, serviceerror.NewNotFound(err.Error())
}
if len(archivedWorkflowExecutionsResponse.Executions) == 0 {
return nil, serviceerror.NewNotFound(fmt.Sprintf(errUnableToListArchivedWorkflowExecutionMessage, request.Execution.GetWorkflowId(), request.Execution.GetRunId()))
}
//get execution info
executionInfo := archivedWorkflowExecutionsResponse.Executions[0]
if executionInfo.TaskQueue == "" {
//todo: support display task queue
executionInfo.TaskQueue = "Null"
}
result := &workflowservice.DescribeWorkflowExecutionResponse{
ExecutionConfig: &workflowpb.WorkflowExecutionConfig{
TaskQueue: &taskqueuepb.TaskQueue{
Name: executionInfo.TaskQueue,
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
},
},
WorkflowExecutionInfo: executionInfo,
}

return result, nil
}

func (wh *WorkflowHandler) getArchivedHistory(
ctx context.Context,
request *workflowservice.GetWorkflowExecutionHistoryRequest,
Expand Down Expand Up @@ -3900,6 +3952,59 @@ func (wh *WorkflowHandler) getArchivedHistory(
}, nil
}

func (wh *WorkflowHandler) getArchivedHistoryReverse(
ctx context.Context,
request *workflowservice.GetWorkflowExecutionHistoryReverseRequest,
namespaceID namespace.ID,
) (*workflowservice.GetWorkflowExecutionHistoryReverseResponse, error) {
entry, err := wh.namespaceRegistry.GetNamespaceByID(namespaceID)
if err != nil {
return nil, err
}

URIString := entry.HistoryArchivalState().URI
if URIString == "" {
// if URI is empty, it means the namespace has never enabled for archival.
// the error is not "workflow has passed retention period", because
// we have no way to tell if the requested workflow exists or not.
return nil, errHistoryNotFound
}

URI, err := archiver.NewURI(URIString)
if err != nil {
return nil, err
}

historyArchiver, err := wh.archiverProvider.GetHistoryArchiver(URI.Scheme(), string(primitives.FrontendService))
if err != nil {
return nil, err
}

resp, err := historyArchiver.Get(ctx, URI, &archiver.GetHistoryRequest{
NamespaceID: namespaceID.String(),
WorkflowID: request.GetExecution().GetWorkflowId(),
RunID: request.GetExecution().GetRunId(),
NextPageToken: request.GetNextPageToken(),
PageSize: int(request.GetMaximumPageSize()),
})
if err != nil {
return nil, err
}

history := &historypb.History{}
for _, batch := range resp.HistoryBatches {
history.Events = append(history.Events, batch.Events...)
}
// reverse the events
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit confused here. Looks like this is only reversing events within a page. When there are multiple pages, the returned history will look like something like:
event 100, 99, 98, ..., 1, event 200, 199, 198, ..., 101.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep.There will be a mistake. Mark it on my todo list.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure how you plan to resolve this though. It seems like a n^2 alg. if the underlying storage doesn't support reading backward.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether if the underlying storage support it. I'll check it out. If not, maybe there's no smarter way.
I think I'll focus on this and learning the internals of mutable state for now. Since the other changes require more design discussion。 :)

for i, j := 0, len(history.Events)-1; i < j; i, j = i+1, j-1 {
history.Events[i], history.Events[j] = history.Events[j], history.Events[i]
}
return &workflowservice.GetWorkflowExecutionHistoryReverseResponse{
History: history,
NextPageToken: resp.NextPageToken,
}, nil
}

// cancelOutstandingPoll cancel outstanding poll if context was canceled and returns true. Otherwise returns false.
func (wh *WorkflowHandler) cancelOutstandingPoll(ctx context.Context, namespaceID namespace.ID, taskQueueType enumspb.TaskQueueType,
taskQueue *taskqueuepb.TaskQueue, pollerID string) bool {
Expand Down
47 changes: 47 additions & 0 deletions service/frontend/workflow_handler_test.go
Expand Up @@ -81,6 +81,7 @@ const (

testWorkflowID = "test-workflow-id"
testRunID = "test-run-id"
testWorkflowTypeName = "test-workflow-type"
testHistoryArchivalURI = "testScheme://history/URI"
testVisibilityArchivalURI = "testScheme://visibility/URI"
)
Expand Down Expand Up @@ -1484,6 +1485,52 @@ func (s *workflowHandlerSuite) TestDescribeWorkflowExecution_CompletedStatus() {
s.NoError(err)
}

func (s *workflowHandlerSuite) TestDescribeWorkflowExecution_ArchivedStatus() {
s.mockNamespaceCache.EXPECT().GetNamespace(gomock.Any()).Return(namespace.NewLocalNamespaceForTest(
&persistencespb.NamespaceInfo{Name: "test-namespace"},
&persistencespb.NamespaceConfig{
VisibilityArchivalState: enumspb.ARCHIVAL_STATE_ENABLED,
VisibilityArchivalUri: testVisibilityArchivalURI,
},
"",
), nil).AnyTimes()
s.mockNamespaceCache.EXPECT().GetNamespaceID(gomock.Any()).Return(
s.testNamespaceID,
nil,
).AnyTimes()
now := timestamp.TimePtr(time.Now())
mockWorkflowExecution := &commonpb.WorkflowExecution{
WorkflowId: uuid.New(),
RunId: uuid.New(),
}
mockWorkflowExecutionInfo := &workflowpb.WorkflowExecutionInfo{
Execution: mockWorkflowExecution,
Type: &commonpb.WorkflowType{Name: testWorkflowTypeName},
StartTime: now,
CloseTime: now,
Status: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
ExecutionTime: now,
Memo: nil,
SearchAttributes: nil,
}
s.mockArchivalMetadata.EXPECT().GetVisibilityConfig().Return(archiver.NewArchivalConfig("enabled", dc.GetStringPropertyFn("enabled"), dc.GetBoolPropertyFn(true), "disabled", "random URI")).AnyTimes()
s.mockVisibilityArchiver.EXPECT().Query(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&archiver.QueryVisibilityResponse{Executions: []*workflowpb.WorkflowExecutionInfo{mockWorkflowExecutionInfo}}, nil)
s.mockArchiverProvider.EXPECT().GetVisibilityArchiver(gomock.Any(), gomock.Any()).Return(s.mockVisibilityArchiver, nil)
s.mockSearchAttributesProvider.EXPECT().GetSearchAttributes("", false)

s.mockHistoryClient.EXPECT().GetMutableState(gomock.Any(), gomock.Any()).Return(nil, &serviceerror.NotFound{})
wh := s.getWorkflowHandler(s.newConfig())

request := &workflowservice.DescribeWorkflowExecutionRequest{
Namespace: s.testNamespace.String(),
Execution: mockWorkflowExecution,
}
resp, err := wh.DescribeWorkflowExecution(context.Background(), request)
s.NoError(err)
s.NotNil(resp)
s.Equal(resp.WorkflowExecutionInfo, mockWorkflowExecutionInfo)
}

func (s *workflowHandlerSuite) TestListWorkflowExecutions() {
config := s.newConfig()
wh := s.getWorkflowHandler(config)
Expand Down