Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip visibility database validation if advanced visibility is enabled #1497

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 8 additions & 12 deletions common/persistence/cassandra/version_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@
package cassandra

import (
"context"

"golang.org/x/sync/errgroup"

"go.temporal.io/server/common/config"
"go.temporal.io/server/common/persistence/nosql/nosqlplugin/cassandra/gocql"
"go.temporal.io/server/common/persistence/schema"
Expand All @@ -44,16 +40,16 @@ import (
func VerifyCompatibleVersion(
cfg config.Persistence,
r resolver.ServiceResolver,
checkVisibility bool,
) error {
g, _ := errgroup.WithContext(context.Background())
g.Go(func() error {
return checkMainKeyspace(cfg, r)
})
g.Go(func() error {
return checkVisibilityKeyspace(cfg, r)
})

return g.Wait()
if err := checkMainKeyspace(cfg, r); err != nil {
return err
}
if checkVisibility {
return checkVisibilityKeyspace(cfg, r)
}
return nil
}

func checkMainKeyspace(
Expand Down
19 changes: 7 additions & 12 deletions common/persistence/sql/version_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@
package sql

import (
"context"

"golang.org/x/sync/errgroup"

"go.temporal.io/server/common/config"
"go.temporal.io/server/common/persistence/sql/sqlplugin"
"go.temporal.io/server/common/resolver"
Expand All @@ -39,17 +35,16 @@ import (
func VerifyCompatibleVersion(
cfg config.Persistence,
r resolver.ServiceResolver,
checkVisibility bool,
) error {

g, _ := errgroup.WithContext(context.Background())
g.Go(func() error {
return checkMainDatabase(cfg, r)
})
g.Go(func() error {
if err := checkMainDatabase(cfg, r); err != nil {
return err
}
if checkVisibility {
return checkVisibilityDatabase(cfg, r)
})

return g.Wait()
}
return nil
}

func checkMainDatabase(
Expand Down
5 changes: 0 additions & 5 deletions config/development_es.yaml
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
persistence:
defaultStore: cass-default
visibilityStore: cass-visibility
advancedVisibilityStore: es-visibility
numHistoryShards: 4
datastores:
cass-default:
cassandra:
hosts: "127.0.0.1"
keyspace: "temporal"
cass-visibility:
cassandra:
hosts: "127.0.0.1"
keyspace: "temporal_visibility"
es-visibility:
elasticsearch:
version: "v7"
Expand Down
12 changes: 0 additions & 12 deletions config/development_mysql-es.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
persistence:
defaultStore: mysql-default
visibilityStore: mysql-visibility
advancedVisibilityStore: es-visibility
numHistoryShards: 4
datastores:
Expand All @@ -15,17 +14,6 @@ persistence:
maxConns: 20
maxIdleConns: 20
maxConnLifetime: "1h"
mysql-visibility:
sql:
pluginName: "mysql"
databaseName: "temporal_visibility"
connectAddr: "127.0.0.1:3306"
connectProtocol: "tcp"
user: "temporal"
password: "temporal"
maxConns: 2
maxIdleConns: 2
maxConnLifetime: "1h"
es-visibility:
elasticsearch:
version: "v7"
Expand Down
131 changes: 69 additions & 62 deletions temporal/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/pborman/uuid"
"github.com/uber-go/tally"
sdkclient "go.temporal.io/sdk/client"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/archiver"
Expand All @@ -47,7 +46,7 @@ import (
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/cassandra"
persistenceClient "go.temporal.io/server/common/persistence/client"
"go.temporal.io/server/common/persistence/elasticsearch/client"
esclient "go.temporal.io/server/common/persistence/elasticsearch/client"
"go.temporal.io/server/common/persistence/sql"
"go.temporal.io/server/common/pprof"
"go.temporal.io/server/common/primitives"
Expand Down Expand Up @@ -120,7 +119,18 @@ func (s *Server) Start() error {
s.so.persistenceServiceResolver = resolver.NewNoopResolver()
}

err = verifyPersistenceCompatibleVersion(s.so.config.Persistence, s.so.persistenceServiceResolver)
if s.so.dynamicConfigClient == nil {
s.so.dynamicConfigClient, err = dynamicconfig.NewFileBasedClient(&s.so.config.DynamicConfigClient, s.logger, s.stoppedCh)
if err != nil {
s.logger.Info("Error creating file based dynamic config client, use no-op config client instead.", tag.Error(err))
s.so.dynamicConfigClient = dynamicconfig.NewNoopClient()
}
}
dc := dynamicconfig.NewCollection(s.so.dynamicConfigClient, s.logger)

advancedVisibilityWritingMode := dc.GetStringProperty(dynamicconfig.AdvancedVisibilityWritingMode, common.GetDefaultAdvancedVisibilityWritingMode(s.so.config.Persistence.IsAdvancedVisibilityConfigExist()))()

err = verifyPersistenceCompatibleVersion(s.so.config.Persistence, s.so.persistenceServiceResolver, advancedVisibilityWritingMode != common.AdvancedVisibilityWritingModeOn)
if err != nil {
return err
}
Expand All @@ -134,15 +144,6 @@ func (s *Server) Start() error {
return fmt.Errorf("ringpop config validation error: %w", err)
}

if s.so.dynamicConfigClient == nil {
s.so.dynamicConfigClient, err = dynamicconfig.NewFileBasedClient(&s.so.config.DynamicConfigClient, s.logger, s.stoppedCh)
if err != nil {
s.logger.Info("Error creating file based dynamic config client, use no-op config client instead.", tag.Error(err))
s.so.dynamicConfigClient = dynamicconfig.NewNoopClient()
}
}
dc := dynamicconfig.NewCollection(s.so.dynamicConfigClient, s.logger)

err = updateClusterMetadataConfig(s.so.config, s.so.persistenceServiceResolver, s.logger)
if err != nil {
return fmt.Errorf("unable to initialize cluster metadata: %w", err)
Expand Down Expand Up @@ -176,8 +177,13 @@ func (s *Server) Start() error {
}
}

esConfig, esClient, err := s.getESConfigClient(advancedVisibilityWritingMode)
if err != nil {
return err
}

for _, svcName := range s.so.serviceNames {
params, err := s.newBootstrapParams(svcName, dc, s.serverReporter, s.sdkReporter)
params, err := s.newBootstrapParams(svcName, dc, s.serverReporter, s.sdkReporter, esConfig, esClient)
if err != nil {
return err
}
Expand Down Expand Up @@ -249,14 +255,20 @@ func (s *Server) newBootstrapParams(
dc *dynamicconfig.Collection,
serverReporter metrics.Reporter,
sdkReporter metrics.Reporter,
esConfig *config.Elasticsearch,
esClient esclient.Client,
) (*resource.BootstrapParams, error) {

params := resource.BootstrapParams{}
params.Name = svcName
params.Logger = s.logger
params.PersistenceConfig = s.so.config.Persistence
params.DynamicConfigClient = s.so.dynamicConfigClient
params.ClusterMetadataConfig = s.so.config.ClusterMetadata
params := &resource.BootstrapParams{
Name: svcName,
Logger: s.logger,
PersistenceConfig: s.so.config.Persistence,
DynamicConfigClient: s.so.dynamicConfigClient,
ClusterMetadataConfig: s.so.config.ClusterMetadata,
DCRedirectionPolicy: s.so.config.DCRedirectionPolicy,
ESConfig: esConfig,
ESClient: esClient,
}

svcCfg := s.so.config.Services[svcName]
rpcFactory := rpc.NewFactory(&svcCfg.RPC, svcName, s.logger, s.so.tlsConfigProvider)
Expand All @@ -265,8 +277,8 @@ func (s *Server) newBootstrapParams(
// Ringpop uses a different port to register handlers, this map is needed to resolve
// services to correct addresses used by clients through ServiceResolver lookup API
servicePortMap := make(map[string]int)
for svcName, svcCfg := range s.so.config.Services {
servicePortMap[svcName] = svcCfg.RPC.GRPCPort
for sn, sc := range s.so.config.Services {
servicePortMap[sn] = sc.RPC.GRPCPort
}

params.MembershipFactoryInitializer =
Expand All @@ -281,9 +293,7 @@ func (s *Server) newBootstrapParams(
)
}

params.DCRedirectionPolicy = s.so.config.DCRedirectionPolicy

//todo: Replace this hack with actually using sdkReporter, Client or Scope.
// todo: Replace this hack with actually using sdkReporter, Client or Scope.
if serverReporter == nil {
var err error
serverReporter, sdkReporter, err = svcCfg.Metrics.InitMetricReporters(s.logger, nil)
Expand Down Expand Up @@ -329,40 +339,6 @@ func (s *Server) newBootstrapParams(
return nil, fmt.Errorf("unable to create public client: %w", err)
}

advancedVisMode := dc.GetStringProperty(
dynamicconfig.AdvancedVisibilityWritingMode,
common.GetDefaultAdvancedVisibilityWritingMode(s.so.config.Persistence.IsAdvancedVisibilityConfigExist()),
)()

if advancedVisMode != common.AdvancedVisibilityWritingModeOff {
// verify config of advanced visibility store
advancedVisStoreKey := s.so.config.Persistence.AdvancedVisibilityStore
advancedVisStore, ok := s.so.config.Persistence.DataStores[advancedVisStoreKey]
if !ok {
return nil, fmt.Errorf("unable to find advanced visibility store in config for %q key", advancedVisStoreKey)
}

if s.so.elasticseachHttpClient == nil {
s.so.elasticseachHttpClient, err = client.NewAwsHttpClient(advancedVisStore.ElasticSearch.AWSRequestSigning)
if err != nil {
return nil, fmt.Errorf("unable to create AWS HTTP client for Elasticsearch: %w", err)
}
}

esClient, err := client.NewClient(advancedVisStore.ElasticSearch, s.so.elasticseachHttpClient, s.logger)
if err != nil {
return nil, fmt.Errorf("unable to create Elasticsearch client: %w", err)
}
params.ESConfig = advancedVisStore.ElasticSearch
params.ESClient = esClient

// verify index name
indexName := advancedVisStore.ElasticSearch.GetVisibilityIndex()
if len(indexName) == 0 {
return nil, errors.New("visibility index in missing in Elasticsearch config")
}
}

params.ArchivalMetadata = archiver.NewArchivalMetadata(
dc,
s.so.config.Archival.History.State,
Expand All @@ -388,16 +364,47 @@ func (s *Server) newBootstrapParams(

params.PersistenceServiceResolver = s.so.persistenceServiceResolver

return &params, nil
return params, nil
}

func (s *Server) getESConfigClient(advancedVisibilityWritingMode string) (*config.Elasticsearch, esclient.Client, error) {
if advancedVisibilityWritingMode == common.AdvancedVisibilityWritingModeOff {
return nil, nil, nil
}

advancedVisibilityStore, ok := s.so.config.Persistence.DataStores[s.so.config.Persistence.AdvancedVisibilityStore]
if !ok {
return nil, nil, fmt.Errorf("unable to find advanced visibility store in config for %q key", s.so.config.Persistence.AdvancedVisibilityStore)
}

indexName := advancedVisibilityStore.ElasticSearch.GetVisibilityIndex()
if len(indexName) == 0 {
return nil, nil, errors.New("visibility index in missing in Elasticsearch config")
}

if s.so.elasticseachHttpClient == nil {
var err error
s.so.elasticseachHttpClient, err = esclient.NewAwsHttpClient(advancedVisibilityStore.ElasticSearch.AWSRequestSigning)
if err != nil {
return nil, nil, fmt.Errorf("unable to create AWS HTTP client for Elasticsearch: %w", err)
}
}

esClient, err := esclient.NewClient(advancedVisibilityStore.ElasticSearch, s.so.elasticseachHttpClient, s.logger)
if err != nil {
return nil, nil, fmt.Errorf("unable to create Elasticsearch client: %w", err)
}

return advancedVisibilityStore.ElasticSearch, esClient, nil
}

func verifyPersistenceCompatibleVersion(config config.Persistence, persistenceServiceResolver resolver.ServiceResolver) error {
func verifyPersistenceCompatibleVersion(config config.Persistence, persistenceServiceResolver resolver.ServiceResolver, checkVisibility bool) error {
// cassandra schema version validation
if err := cassandra.VerifyCompatibleVersion(config, persistenceServiceResolver); err != nil {
if err := cassandra.VerifyCompatibleVersion(config, persistenceServiceResolver, checkVisibility); err != nil {
return fmt.Errorf("cassandra schema version compatibility check failed: %w", err)
}
// sql schema version validation
if err := sql.VerifyCompatibleVersion(config, persistenceServiceResolver); err != nil {
if err := sql.VerifyCompatibleVersion(config, persistenceServiceResolver, checkVisibility); err != nil {
return fmt.Errorf("sql schema version compatibility check failed: %w", err)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion tools/cassandra/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (s *VersionTestSuite) TestVerifyCompatibleVersion() {
},
TransactionSizeLimit: dynamicconfig.GetIntPropertyFn(common.DefaultTransactionSizeLimit),
}
s.NoError(cassandra.VerifyCompatibleVersion(cfg, resolver.NewNoopResolver()))
s.NoError(cassandra.VerifyCompatibleVersion(cfg, resolver.NewNoopResolver(), true))
}

func (s *VersionTestSuite) createKeyspace(keyspace string) func() {
Expand Down
2 changes: 1 addition & 1 deletion tools/sql/clitest/versionTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (s *VersionTestSuite) TestVerifyCompatibleVersion() {
},
TransactionSizeLimit: dynamicconfig.GetIntPropertyFn(common.DefaultTransactionSizeLimit),
}
s.NoError(persistencesql.VerifyCompatibleVersion(cfg, resolver.NewNoopResolver()))
s.NoError(persistencesql.VerifyCompatibleVersion(cfg, resolver.NewNoopResolver(), true))
}

func (s *VersionTestSuite) createDatabase(database string) func() {
Expand Down