Skip to content

Commit

Permalink
Allow search attributes when Elasticsearch is not configured (#1556)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed May 18, 2021
1 parent 37583eb commit 71f073b
Show file tree
Hide file tree
Showing 17 changed files with 204 additions and 404 deletions.
5 changes: 3 additions & 2 deletions common/config/elasticsearch.go
Expand Up @@ -73,9 +73,10 @@ type (
}
)

// GetVisibilityIndex return visibility index name
// GetVisibilityIndex return visibility index name from Elasticsearch config or empty string if it is not defined.
func (cfg *Elasticsearch) GetVisibilityIndex() string {
if cfg == nil {
// Empty string is be used as default index name when Elasticsearch is not configured.
return ""
}
return cfg.Indices[VisibilityAppName]
Expand All @@ -86,7 +87,7 @@ func (cfg *Elasticsearch) validate(storeName string) error {
return fmt.Errorf("persistence config: advanced visibility datastore %q: missing indices", storeName)

}
if cfg.GetVisibilityIndex() == "" {
if cfg.Indices[VisibilityAppName] == "" {
return fmt.Errorf("persistence config: advanced visibility datastore %q: missing %q key", storeName, VisibilityAppName)
}
return nil
Expand Down
26 changes: 5 additions & 21 deletions common/persistence/search_attributes_manager.go
Expand Up @@ -25,7 +25,6 @@
package persistence

import (
"errors"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -61,10 +60,6 @@ type (

var _ searchattribute.Manager = (*SearchAttributesManager)(nil)

var (
ErrEmptyIndexName = errors.New("indexName is empty")
)

func NewSearchAttributesManager(
timeSource clock.TimeSource,
clusterMetadataManager ClusterMetadataManager,
Expand All @@ -85,16 +80,12 @@ func NewSearchAttributesManager(
}

// GetSearchAttributes returns all search attributes (including system and build-in) for specified index.
// indexName can be an empty string when Elasticsearch is not configured.
func (m *SearchAttributesManager) GetSearchAttributes(
indexName string,
forceRefreshCache bool,
) (searchattribute.NameTypeMap, error) {

// Empty indexName means advanced visibility is not enabled.
if indexName == "" {
return searchattribute.NameTypeMap{}, nil
}

now := m.timeSource.Now()
saCache := m.cache.Load().(searchAttributesCache)

Expand Down Expand Up @@ -150,16 +141,12 @@ func (m *SearchAttributesManager) refreshCache(saCache searchAttributesCache, no
}

// SaveSearchAttributes saves search attributes to cluster metadata.
// indexName can be an empty string when Elasticsearch is not configured.
func (m *SearchAttributesManager) SaveSearchAttributes(
indexName string,
newCustomSearchAttributes map[string]enumspb.IndexedValueType,
) error {

// Empty indexName means advanced visibility is not enabled.
if indexName == "" {
return ErrEmptyIndexName
}

clusterMetadataResponse, err := m.clusterMetadataManager.GetClusterMetadata()
if err != nil {
return err
Expand All @@ -174,11 +161,8 @@ func (m *SearchAttributesManager) SaveSearchAttributes(
ClusterMetadata: clusterMetadata,
Version: clusterMetadataResponse.Version,
})
if err != nil {
return err
}

// Flush local cache.
// Flush local cache, even if there was an error, which is most likely version mismatch (=stale cache).
m.cache.Store(searchAttributesCache{})
return nil

return err
}
42 changes: 39 additions & 3 deletions common/persistence/search_attributes_manager_test.go
Expand Up @@ -162,9 +162,20 @@ func (s *searchAttributesManagerSuite) TestGetSearchAttributesCache_NotFoundErro
}

func (s *searchAttributesManagerSuite) TestGetSearchAttributesCache_EmptyIndex() {
s.mockClusterMetadataManager.EXPECT().GetClusterMetadata().Return(&GetClusterMetadataResponse{
ClusterMetadata: persistencespb.ClusterMetadata{
IndexSearchAttributes: map[string]*persistencespb.IndexSearchAttributes{
"": {
CustomSearchAttributes: map[string]enumspb.IndexedValueType{
"OrderId": enumspb.INDEXED_VALUE_TYPE_KEYWORD,
}}},
},
Version: 1,
}, nil)

searchAttributes, err := s.manager.GetSearchAttributes("", false)
s.NoError(err)
s.Len(searchAttributes.Custom(), 0)
s.Len(searchAttributes.Custom(), 1)
}

func (s *searchAttributesManagerSuite) TestSaveSearchAttributes_UpdateIndex() {
Expand Down Expand Up @@ -229,9 +240,34 @@ func (s *searchAttributesManagerSuite) TestSaveSearchAttributes_NewIndex() {
}

func (s *searchAttributesManagerSuite) TestSaveSearchAttributesCache_EmptyIndex() {
s.mockClusterMetadataManager.EXPECT().GetClusterMetadata().Return(&GetClusterMetadataResponse{
ClusterMetadata: persistencespb.ClusterMetadata{
IndexSearchAttributes: map[string]*persistencespb.IndexSearchAttributes{
"index-name-2": {
CustomSearchAttributes: map[string]enumspb.IndexedValueType{
"OrderId2": enumspb.INDEXED_VALUE_TYPE_KEYWORD,
}}},
},
Version: 1,
}, nil)

s.mockClusterMetadataManager.EXPECT().SaveClusterMetadata(&SaveClusterMetadataRequest{
ClusterMetadata: persistencespb.ClusterMetadata{
IndexSearchAttributes: map[string]*persistencespb.IndexSearchAttributes{
"index-name-2": {
CustomSearchAttributes: map[string]enumspb.IndexedValueType{
"OrderId2": enumspb.INDEXED_VALUE_TYPE_KEYWORD,
}},
"": {
CustomSearchAttributes: map[string]enumspb.IndexedValueType{
"OrderId": enumspb.INDEXED_VALUE_TYPE_KEYWORD,
}}},
},
Version: 1,
}).Return(false, nil)

err := s.manager.SaveSearchAttributes("", map[string]enumspb.IndexedValueType{
"OrderId": enumspb.INDEXED_VALUE_TYPE_KEYWORD,
})
s.Error(err)
s.ErrorIs(err, ErrEmptyIndexName)
s.NoError(err)
}
23 changes: 23 additions & 0 deletions config/dynamicconfig/development_es.yaml
Expand Up @@ -34,3 +34,26 @@ system.advancedVisibilityWritingMode:
system.enableReadVisibilityFromES:
- value: true
constraints: {}
frontend.validSearchAttributes:
- value:
NamespaceId: "Keyword"
WorkflowId: "Keyword"
RunId: "Keyword"
WorkflowType: "Keyword"
StartTime: "Int"
ExecutionTime: "Int"
CloseTime: "Int"
ExecutionStatus: "Int"
TaskQueue: "Keyword"
CustomStringField: "String"
CustomKeywordField: "Keyword"
CustomIntField: "Int"
CustomDoubleField: "Double"
CustomBoolField: "Bool"
CustomDatetimeField: "Datetime"
TemporalChangeVersion: "Keyword"
BinaryChecksums: "Keyword"
CustomNamespace: "Keyword"
Operator: "Keyword"


9 changes: 9 additions & 0 deletions host/continueasnew_test.go
Expand Up @@ -64,6 +64,12 @@ func (s *integrationSuite) TestContinueAsNewWorkflow() {
memo := &commonpb.Memo{
Fields: map[string]*commonpb.Payload{"memoKey": payload.EncodeString("memoVal")},
}
searchAttrPayload := payload.EncodeString("random keyword")
searchAttr := &commonpb.SearchAttributes{
IndexedFields: map[string]*commonpb.Payload{
"CustomKeywordField": searchAttrPayload,
},
}

request := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Expand All @@ -74,6 +80,7 @@ func (s *integrationSuite) TestContinueAsNewWorkflow() {
Input: nil,
Header: header,
Memo: memo,
SearchAttributes: searchAttr,
WorkflowRunTimeout: timestamp.DurationPtr(100 * time.Second),
WorkflowTaskTimeout: timestamp.DurationPtr(10 * time.Second),
Identity: identity,
Expand Down Expand Up @@ -105,6 +112,7 @@ func (s *integrationSuite) TestContinueAsNewWorkflow() {
Input: payloads.EncodeBytes(buf.Bytes()),
Header: header,
Memo: memo,
SearchAttributes: searchAttr,
WorkflowRunTimeout: timestamp.DurationPtr(100 * time.Second),
WorkflowTaskTimeout: timestamp.DurationPtr(10 * time.Second),
}},
Expand Down Expand Up @@ -144,6 +152,7 @@ func (s *integrationSuite) TestContinueAsNewWorkflow() {
s.Equal(previousRunID, lastRunStartedEvent.GetWorkflowExecutionStartedEventAttributes().GetContinuedExecutionRunId())
s.Equal(header, lastRunStartedEvent.GetWorkflowExecutionStartedEventAttributes().Header)
s.Equal(memo, lastRunStartedEvent.GetWorkflowExecutionStartedEventAttributes().Memo)
s.Equal(searchAttr, lastRunStartedEvent.GetWorkflowExecutionStartedEventAttributes().SearchAttributes)
}

func (s *integrationSuite) TestContinueAsNewRun_Timeout() {
Expand Down

0 comments on commit 71f073b

Please sign in to comment.