Skip to content

Commit

Permalink
Add a utility for reliably fetching search attributes and memo
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Dec 8, 2022
1 parent eb88af4 commit c3bf450
Show file tree
Hide file tree
Showing 3 changed files with 267 additions and 17 deletions.
31 changes: 14 additions & 17 deletions service/history/api/describeworkflow/api.go
Expand Up @@ -29,18 +29,21 @@ import (

commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
workflowpb "go.temporal.io/api/workflow/v1"

enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
)

func Invoke(
Expand Down Expand Up @@ -192,25 +195,19 @@ func Invoke(
}
}

if executionInfo.CloseVisibilityTaskCompleted {
// If close visibility task has completed, then search attributes and memo
// were removed from mutable state, and we need to fetch from visibility.
visResponse, err := persistenceVisibilityMgr.GetWorkflowExecution(
ctx,
&manager.GetWorkflowExecutionRequest{
NamespaceID: namespaceID,
Namespace: namespace.Name(req.Request.GetNamespace()),
RunID: executionState.RunId,
WorkflowID: executionInfo.WorkflowId,
CloseTime: executionInfo.CloseTime,
},
relocatableAttributes, err := workflow.NewRelocatableAttributesFetcher(persistenceVisibilityMgr).Fetch(ctx, mutableState)
if err != nil {
shard.GetLogger().Error(
"Failed to fetch relocatable attributes",
tag.WorkflowNamespaceID(namespaceID.String()),
tag.WorkflowID(executionInfo.WorkflowId),
tag.WorkflowRunID(executionState.RunId),
tag.Error(err),
)
if err != nil {
return nil, err
}
result.WorkflowExecutionInfo.SearchAttributes = visResponse.Execution.SearchAttributes
result.WorkflowExecutionInfo.Memo = visResponse.Execution.Memo
return nil, serviceerror.NewInternal("Failed to fetch memo and search attributes")
}
result.WorkflowExecutionInfo.Memo = relocatableAttributes.Memo
result.WorkflowExecutionInfo.SearchAttributes = relocatableAttributes.SearchAttributes

return result, nil
}
112 changes: 112 additions & 0 deletions service/history/workflow/relocatable_attributes_fetcher.go
@@ -0,0 +1,112 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package workflow

import (
"context"

commonpb "go.temporal.io/api/common/v1"

"go.temporal.io/server/common/persistence/visibility/manager"
)

// RelocatableAttributesFetcher is used to fetch the relocatable attributes from the mutable state.
// Relocatable attributes are attributes that can be moved from the mutable state to the persistence backend.
type RelocatableAttributesFetcher interface {
Fetch(
ctx context.Context,
mutableState MutableState,
) (*RelocatableAttributes, error)
}

// NewRelocatableAttributesFetcher creates a new instance of a RelocatableAttributesFetcher.
// The manager.VisibilityManager parameter is used to fetch the relocatable attributes from the persistence backend iff
// we already moved them there out from the mutable state.
// The visibility manager is not used if the relocatable attributes are still in the mutable state.
// We detect that the fields have moved by checking if the CloseExecutionVisibilityTask for this workflow execution is
// marked as complete in the mutable state.
// Because the relocatable fields that we push to persistence are never updated thereafter,
// we may cache them on a per-workflow execution basis.
// Currently, there is no cache, but you may provide a manager.VisibilityManager that supports caching to this function
// safely.
// TODO: Add a cache around the visibility manager for the relocatable attributes.
func NewRelocatableAttributesFetcher(
visibilityManager manager.VisibilityManager,
) RelocatableAttributesFetcher {
return &relocatableAttributesFetcher{
visibilityManager: visibilityManager,
}
}

// RelocatableAttributes contains workflow attributes that can be moved from the mutable state to the persistence
// backend.
type RelocatableAttributes struct {
Memo *commonpb.Memo
SearchAttributes *commonpb.SearchAttributes
}

// relocatableAttributesFetcher is the default implementation of RelocatableAttributesFetcher.
type relocatableAttributesFetcher struct {
visibilityManager manager.VisibilityManager
}

// Fetch fetches the relocatable attributes from the mutable state or the persistence backend.
// First, it checks if the close visibility task is completed. If it is completed, then the relocatable attributes
// are fetched from the persistence backend. Otherwise, the relocatable attributes are fetched from the mutable state.
func (f *relocatableAttributesFetcher) Fetch(
ctx context.Context,
mutableState MutableState,
) (*RelocatableAttributes, error) {
executionInfo := mutableState.GetExecutionInfo()
// If we haven't processed close visibility task yet, then we can fetch the search attributes and memo from the
// mutable state.
if !executionInfo.GetCloseVisibilityTaskCompleted() {
return &RelocatableAttributes{
Memo: &commonpb.Memo{Fields: executionInfo.Memo},
SearchAttributes: &commonpb.SearchAttributes{IndexedFields: executionInfo.SearchAttributes},
}, nil
}

// If we have processed close visibility task, then we need to fetch the search attributes and memo from the
// persistence backend because we have already deleted them from the mutable state.
executionState := mutableState.GetExecutionState()
visResponse, err := f.visibilityManager.GetWorkflowExecution(
ctx,
&manager.GetWorkflowExecutionRequest{
NamespaceID: mutableState.GetNamespaceEntry().ID(),
Namespace: mutableState.GetNamespaceEntry().Name(),
RunID: executionState.GetRunId(),
WorkflowID: executionInfo.GetWorkflowId(),
CloseTime: executionInfo.CloseTime,
},
)
if err != nil {
return nil, err
}
return &RelocatableAttributes{
Memo: visResponse.Execution.Memo,
SearchAttributes: visResponse.Execution.SearchAttributes,
}, nil
}
141 changes: 141 additions & 0 deletions service/history/workflow/relocatable_attributes_fetcher_test.go
@@ -0,0 +1,141 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package workflow

import (
"context"
"errors"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.temporal.io/api/common/v1"
"go.temporal.io/api/workflow/v1"

"go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/service/history/tests"
)

func TestRelocatableAttributesFetcher_Fetch(t *testing.T) {
mutableStateAttributes := &RelocatableAttributes{
Memo: &common.Memo{Fields: map[string]*common.Payload{
"memoLocation": {Data: []byte("mutableState")},
}},
SearchAttributes: &common.SearchAttributes{IndexedFields: map[string]*common.Payload{
"searchAttributesLocation": {Data: []byte("mutableState")},
}},
}
persistenceAttributes := &RelocatableAttributes{
Memo: &common.Memo{Fields: map[string]*common.Payload{
"memoLocation": {Data: []byte("persistence")},
}},
SearchAttributes: &common.SearchAttributes{IndexedFields: map[string]*common.Payload{
"searchAttributesLocation": {Data: []byte("persistence")},
}},
}
require.NotEqual(t, mutableStateAttributes.Memo, persistenceAttributes.Memo)
require.NotEqual(t, mutableStateAttributes.SearchAttributes, persistenceAttributes.SearchAttributes)
testErr := errors.New("test error")
for _, c := range []*struct {
Name string
CloseVisibilityTaskCompleted bool
GetWorkflowExecutionErr error

ExpectedInfo *RelocatableAttributes
ExpectedErr error
}{
{
Name: "CloseVisibilityTaskNotComplete",
CloseVisibilityTaskCompleted: false,

ExpectedInfo: mutableStateAttributes,
},
{
Name: "CloseVisibilityTaskCompleted",
CloseVisibilityTaskCompleted: true,

ExpectedInfo: persistenceAttributes,
},
{
Name: "GetWorkflowExecutionErr",
CloseVisibilityTaskCompleted: true,
GetWorkflowExecutionErr: testErr,

ExpectedErr: testErr,
},
} {
c := c
t.Run(c.Name, func(t *testing.T) {
t.Parallel()
closeTime := time.Unix(100, 0)
executionInfo := &persistence.WorkflowExecutionInfo{
Memo: mutableStateAttributes.Memo.Fields,
SearchAttributes: mutableStateAttributes.SearchAttributes.IndexedFields,
CloseVisibilityTaskCompleted: c.CloseVisibilityTaskCompleted,
CloseTime: &closeTime,
WorkflowId: tests.WorkflowID,
}
executionState := &persistence.WorkflowExecutionState{
RunId: tests.RunID,
}
namespaceEntry := tests.GlobalNamespaceEntry
ctrl := gomock.NewController(t)
visibilityManager := manager.NewMockVisibilityManager(ctrl)
mutableState := NewMockMutableState(ctrl)
mutableState.EXPECT().GetExecutionInfo().Return(executionInfo).AnyTimes()
mutableState.EXPECT().GetNamespaceEntry().Return(namespaceEntry).AnyTimes()
mutableState.EXPECT().GetExecutionState().Return(executionState).AnyTimes()
if c.CloseVisibilityTaskCompleted {
visibilityManager.EXPECT().GetWorkflowExecution(gomock.Any(), &manager.GetWorkflowExecutionRequest{
NamespaceID: namespaceEntry.ID(),
Namespace: namespaceEntry.Name(),
RunID: tests.RunID,
WorkflowID: tests.WorkflowID,
CloseTime: &closeTime,
}).Return(&manager.GetWorkflowExecutionResponse{
Execution: &workflow.WorkflowExecutionInfo{
Memo: persistenceAttributes.Memo,
SearchAttributes: persistenceAttributes.SearchAttributes,
},
}, c.GetWorkflowExecutionErr)
}
ctx := context.Background()

fetcher := NewRelocatableAttributesFetcher(visibilityManager)
info, err := fetcher.Fetch(ctx, mutableState)

if c.ExpectedErr != nil {
require.Error(t, err)
assert.ErrorIs(t, err, c.ExpectedErr)
} else {
require.NoError(t, err)
assert.Equal(t, c.ExpectedInfo, info)
}
})
}
}

0 comments on commit c3bf450

Please sign in to comment.