Skip to content

Commit

Permalink
Support visibility dual write to different ES indices (#2359)
Browse files Browse the repository at this point in the history
* Change order by clauses in query and update V1 index template. Index created by the old template will continue to function, but won't see the query performance improvement.
* Support visibility dual write to different ES indices.
* Add additional dynamic configs to control visibility dual write behavior.
  • Loading branch information
meiliang86 authored and yiminc committed Jan 12, 2022
1 parent 1a3cf49 commit 319861d
Show file tree
Hide file tree
Showing 17 changed files with 280 additions and 128 deletions.
20 changes: 13 additions & 7 deletions common/dynamicconfig/constants.go
Expand Up @@ -65,12 +65,14 @@ var Keys = map[Key]string{
AdminMatchingNamespaceTaskqueueToPartitionDispatchRate: "admin.matchingNamespaceTaskqueueToPartitionDispatchRate",

// system settings
StandardVisibilityPersistenceMaxReadQPS: "system.standardVisibilityPersistenceMaxReadQPS",
StandardVisibilityPersistenceMaxWriteQPS: "system.standardVisibilityPersistenceMaxWriteQPS",
AdvancedVisibilityPersistenceMaxReadQPS: "system.advancedVisibilityPersistenceMaxReadQPS",
AdvancedVisibilityPersistenceMaxWriteQPS: "system.advancedVisibilityPersistenceMaxWriteQPS",
AdvancedVisibilityWritingMode: "system.advancedVisibilityWritingMode",
EnableReadVisibilityFromES: "system.enableReadVisibilityFromES",
StandardVisibilityPersistenceMaxReadQPS: "system.standardVisibilityPersistenceMaxReadQPS",
StandardVisibilityPersistenceMaxWriteQPS: "system.standardVisibilityPersistenceMaxWriteQPS",
AdvancedVisibilityPersistenceMaxReadQPS: "system.advancedVisibilityPersistenceMaxReadQPS",
AdvancedVisibilityPersistenceMaxWriteQPS: "system.advancedVisibilityPersistenceMaxWriteQPS",
AdvancedVisibilityWritingMode: "system.advancedVisibilityWritingMode",
EnableWriteToSecondaryAdvancedVisibility: "system.enableWriteToSecondaryAdvancedVisibility",
EnableReadVisibilityFromES: "system.enableReadVisibilityFromES",
EnableReadFromSecondaryAdvancedVisibility: "system.enableReadFromSecondaryAdvancedVisibility",

HistoryArchivalState: "system.historyArchivalState",
EnableReadFromHistoryArchival: "system.enableReadFromHistoryArchival",
Expand Down Expand Up @@ -383,8 +385,12 @@ const (
AdvancedVisibilityPersistenceMaxWriteQPS
// AdvancedVisibilityWritingMode is key for how to write to advanced visibility
AdvancedVisibilityWritingMode
// EnableReadVisibilityFromES is key for enable read from elastic search
// EnableWriteToSecondaryAdvancedVisibility is the config to enable write to secondary visibility for Elasticsearch
EnableWriteToSecondaryAdvancedVisibility
// EnableReadVisibilityFromES is key for enable read from Elasticsearch
EnableReadVisibilityFromES
// EnableReadFromSecondaryAdvancedVisibility is the config to enable read from secondary Elasticsearch
EnableReadFromSecondaryAdvancedVisibility
// DisableListVisibilityByFilter is config to disable list open/close workflow using filter
DisableListVisibilityByFilter

Expand Down
54 changes: 50 additions & 4 deletions common/persistence/visibility/factory.go
Expand Up @@ -45,6 +45,7 @@ func NewManager(
persistenceResolver resolver.ServiceResolver,

defaultIndexName string,
secondaryVisibilityIndexName string,
esClient esclient.Client,
esProcessorConfig *elasticsearch.ProcessorConfig,
searchAttributesProvider searchattribute.Provider,
Expand All @@ -56,6 +57,8 @@ func NewManager(
advancedVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn,
enableAdvancedVisibilityRead dynamicconfig.BoolPropertyFnWithNamespaceFilter,
advancedVisibilityWritingMode dynamicconfig.StringPropertyFn,
enableReadFromSecondaryAdvancedVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter,
enableWriteToSecondaryAdvancedVisibility dynamicconfig.BoolPropertyFn,

metricsClient metrics.Client,
logger log.Logger,
Expand Down Expand Up @@ -87,25 +90,64 @@ func NewManager(
return nil, err
}

secondaryVisibilityManager, err := NewAdvancedManager(
secondaryVisibilityIndexName,
esClient,
esProcessorConfig,
searchAttributesProvider,
searchAttributesMapper,
advancedVisibilityPersistenceMaxReadQPS,
advancedVisibilityPersistenceMaxWriteQPS,
metricsClient,
logger,
)
if err != nil {
return nil, err
}

if stdVisibilityManager == nil && advVisibilityManager == nil {
logger.Fatal("invalid config: one of standard or advanced visibility must be configured")
return nil, nil
}

if stdVisibilityManager != nil && secondaryVisibilityManager != nil {
logger.Fatal("invalid config: secondary visibility store cannot be used with standard visibility")
return nil, nil
}

if stdVisibilityManager != nil && advVisibilityManager == nil {
return stdVisibilityManager, nil
}

if stdVisibilityManager == nil && advVisibilityManager != nil {
return advVisibilityManager, nil
if secondaryVisibilityManager == nil {
return advVisibilityManager, nil
}

// Dual write to primary and secondary ES indices.
managerSelector := NewESManagerSelector(
advVisibilityManager,
secondaryVisibilityManager,
enableReadFromSecondaryAdvancedVisibility,
enableWriteToSecondaryAdvancedVisibility)

return NewVisibilityManagerDual(
advVisibilityManager,
secondaryVisibilityManager,
managerSelector,
), nil
}

// If both visibilities are configured use dual write.
return NewVisibilityManagerDual(
// Dual write to standard and advanced visibility.
managerSelector := NewSQLToESManagerSelector(
stdVisibilityManager,
advVisibilityManager,
enableAdvancedVisibilityRead,
advancedVisibilityWritingMode,
advancedVisibilityWritingMode)
return NewVisibilityManagerDual(
stdVisibilityManager,
advVisibilityManager,
managerSelector,
), nil
}

Expand Down Expand Up @@ -150,6 +192,10 @@ func NewAdvancedManager(
metricsClient metrics.Client,
logger log.Logger,
) (manager.VisibilityManager, error) {
if defaultIndexName == "" {
return nil, nil
}

advVisibilityStore := newAdvancedVisibilityStore(
defaultIndexName,
esClient,
Expand Down
122 changes: 122 additions & 0 deletions common/persistence/visibility/manager_selector.go
@@ -0,0 +1,122 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package visibility

import (
"fmt"

"go.temporal.io/api/serviceerror"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence/visibility/manager"
)

type (
managerSelector interface {
readManager(namespace namespace.Name) manager.VisibilityManager
writeManagers() ([]manager.VisibilityManager, error)
}

sqlToESManagerSelector struct {
enableAdvancedVisibilityRead dynamicconfig.BoolPropertyFnWithNamespaceFilter
advancedVisibilityWritingMode dynamicconfig.StringPropertyFn
stdVisibilityManager manager.VisibilityManager
advVisibilityManager manager.VisibilityManager
}

esManagerSelector struct {
enableReadFromSecondaryVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter
enableWriteToSecondaryVisibility dynamicconfig.BoolPropertyFn
visibilityManager manager.VisibilityManager
secondaryVisibilityManager manager.VisibilityManager
}
)

var _ managerSelector = (*sqlToESManagerSelector)(nil)
var _ managerSelector = (*esManagerSelector)(nil)

func NewSQLToESManagerSelector(
stdVisibilityManager manager.VisibilityManager,
advVisibilityManager manager.VisibilityManager,
enableAdvancedVisibilityRead dynamicconfig.BoolPropertyFnWithNamespaceFilter,
advancedVisibilityWritingMode dynamicconfig.StringPropertyFn,
) *sqlToESManagerSelector {
return &sqlToESManagerSelector{
stdVisibilityManager: stdVisibilityManager,
advVisibilityManager: advVisibilityManager,
enableAdvancedVisibilityRead: enableAdvancedVisibilityRead,
advancedVisibilityWritingMode: advancedVisibilityWritingMode,
}
}

func NewESManagerSelector(
visibilityManager manager.VisibilityManager,
secondaryVisibilityManager manager.VisibilityManager,
enableReadFromSecondaryVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter,
enableWriteToSecondaryVisibility dynamicconfig.BoolPropertyFn,
) *esManagerSelector {
return &esManagerSelector{
visibilityManager: visibilityManager,
secondaryVisibilityManager: secondaryVisibilityManager,
enableReadFromSecondaryVisibility: enableReadFromSecondaryVisibility,
enableWriteToSecondaryVisibility: enableWriteToSecondaryVisibility,
}
}

func (v *sqlToESManagerSelector) writeManagers() ([]manager.VisibilityManager, error) {
switch v.advancedVisibilityWritingMode() {
case AdvancedVisibilityWritingModeOff:
return []manager.VisibilityManager{v.stdVisibilityManager}, nil
case AdvancedVisibilityWritingModeOn:
return []manager.VisibilityManager{v.advVisibilityManager}, nil
case AdvancedVisibilityWritingModeDual:
return []manager.VisibilityManager{v.stdVisibilityManager, v.advVisibilityManager}, nil
default:
return nil, serviceerror.NewInternal(fmt.Sprintf("Unknown advanced visibility writing mode: %s", v.advancedVisibilityWritingMode()))
}
}

func (v *sqlToESManagerSelector) readManager(namespace namespace.Name) manager.VisibilityManager {
if v.enableAdvancedVisibilityRead(namespace.String()) {
return v.advVisibilityManager
}
return v.stdVisibilityManager
}

func (v *esManagerSelector) writeManagers() ([]manager.VisibilityManager, error) {
managers := []manager.VisibilityManager{v.visibilityManager}
if v.enableWriteToSecondaryVisibility() {
managers = append(managers, v.secondaryVisibilityManager)
}

return managers, nil
}

func (v *esManagerSelector) readManager(namespace namespace.Name) manager.VisibilityManager {
if v.enableReadFromSecondaryVisibility(namespace.String()) {
return v.secondaryVisibilityManager
}
return v.visibilityManager
}
Expand Up @@ -32,7 +32,8 @@ import (

const (
// VisibilityAppName is used to find ES indexName for visibility
VisibilityAppName = "visibility"
VisibilityAppName = "visibility"
SecondaryVisibilityAppName = "secondary_visibility"
)

// Config for connecting to Elasticsearch
Expand Down Expand Up @@ -86,6 +87,13 @@ func (cfg *Config) GetVisibilityIndex() string {
return cfg.Indices[VisibilityAppName]
}

func (cfg *Config) GetSecondaryVisibilityIndex() string {
if cfg == nil {
return ""
}
return cfg.Indices[SecondaryVisibilityAppName]
}

func (cfg *Config) Validate(storeName string) error {
if cfg == nil {
return fmt.Errorf("persistence config: advanced visibility datastore %q: must provide config for \"elasticsearch\"", storeName)
Expand Down
Expand Up @@ -48,7 +48,7 @@ import (
)

type (
// Processor is interface for elastic search bulk processor
// Processor is interface for Elasticsearch bulk processor
Processor interface {
common.Daemon

Expand Down
Expand Up @@ -60,6 +60,14 @@ const (
readTimeout = 16 * time.Second
)

// Default sort by uses the sorting order defined in the index template, so no
// additional sorting is needed during query.
var defaultSorter = []elastic.Sorter{
elastic.NewFieldSort(searchattribute.CloseTime).Desc().Missing("_first"),
elastic.NewFieldSort(searchattribute.StartTime).Desc().Missing("_first"),
elastic.NewFieldSort(searchattribute.RunID).Desc().Missing("_first"),
}

type (
visibilityStore struct {
esClient client.Client
Expand Down Expand Up @@ -236,7 +244,7 @@ func (s *visibilityStore) addBulkRequestAndWait(bulkRequest *client.BulkableRequ
func (s *visibilityStore) checkProcessor() {
if s.processor == nil {
// must be bug, check history setup
panic("elastic search processor is nil")
panic("Elasticsearch processor is nil")
}
if s.processorAckTimeout == nil {
// must be bug, check history setup
Expand Down Expand Up @@ -592,21 +600,13 @@ func (s *visibilityStore) buildSearchParameters(
Index: s.index,
Query: boolQuery,
PageSize: request.PageSize,
Sorter: defaultSorter,
}

if token != nil && len(token.SearchAfter) > 0 {
params.SearchAfter = token.SearchAfter
}

if overStartTime {
params.Sorter = append(params.Sorter, elastic.NewFieldSort(searchattribute.StartTime).Desc())
} else {
params.Sorter = append(params.Sorter, elastic.NewFieldSort(searchattribute.CloseTime).Desc())
}

// RunID is explicit tiebreaker.
params.Sorter = append(params.Sorter, elastic.NewFieldSort(searchattribute.RunID).Desc())

return params, nil
}

Expand Down Expand Up @@ -658,11 +658,7 @@ func (s *visibilityStore) convertQuery(namespace namespace.Name, namespaceID nam

func (s *visibilityStore) setDefaultFieldSort(fieldSorts []*elastic.FieldSort) []elastic.Sorter {
if len(fieldSorts) == 0 {
// Set default sorting by StartTime desc and RunID as tiebreaker.
return []elastic.Sorter{
elastic.NewFieldSort(searchattribute.StartTime).Desc(),
elastic.NewFieldSort(searchattribute.RunID).Desc(),
}
return defaultSorter
}

res := make([]elastic.Sorter, len(fieldSorts)+1)
Expand Down

0 comments on commit 319861d

Please sign in to comment.