Skip to content

Commit

Permalink
Skip visibility database validation if advanced visibility is enabled (
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Apr 27, 2021
1 parent e67e845 commit 6c23f02
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 105 deletions.
20 changes: 8 additions & 12 deletions common/persistence/cassandra/version_checker.go
Expand Up @@ -25,10 +25,6 @@
package cassandra

import (
"context"

"golang.org/x/sync/errgroup"

"go.temporal.io/server/common/config"
"go.temporal.io/server/common/persistence/nosql/nosqlplugin/cassandra/gocql"
"go.temporal.io/server/common/persistence/schema"
Expand All @@ -44,16 +40,16 @@ import (
func VerifyCompatibleVersion(
cfg config.Persistence,
r resolver.ServiceResolver,
checkVisibility bool,
) error {
g, _ := errgroup.WithContext(context.Background())
g.Go(func() error {
return checkMainKeyspace(cfg, r)
})
g.Go(func() error {
return checkVisibilityKeyspace(cfg, r)
})

return g.Wait()
if err := checkMainKeyspace(cfg, r); err != nil {
return err
}
if checkVisibility {
return checkVisibilityKeyspace(cfg, r)
}
return nil
}

func checkMainKeyspace(
Expand Down
19 changes: 7 additions & 12 deletions common/persistence/sql/version_checker.go
Expand Up @@ -25,10 +25,6 @@
package sql

import (
"context"

"golang.org/x/sync/errgroup"

"go.temporal.io/server/common/config"
"go.temporal.io/server/common/persistence/sql/sqlplugin"
"go.temporal.io/server/common/resolver"
Expand All @@ -39,17 +35,16 @@ import (
func VerifyCompatibleVersion(
cfg config.Persistence,
r resolver.ServiceResolver,
checkVisibility bool,
) error {

g, _ := errgroup.WithContext(context.Background())
g.Go(func() error {
return checkMainDatabase(cfg, r)
})
g.Go(func() error {
if err := checkMainDatabase(cfg, r); err != nil {
return err
}
if checkVisibility {
return checkVisibilityDatabase(cfg, r)
})

return g.Wait()
}
return nil
}

func checkMainDatabase(
Expand Down
5 changes: 0 additions & 5 deletions config/development_es.yaml
@@ -1,17 +1,12 @@
persistence:
defaultStore: cass-default
visibilityStore: cass-visibility
advancedVisibilityStore: es-visibility
numHistoryShards: 4
datastores:
cass-default:
cassandra:
hosts: "127.0.0.1"
keyspace: "temporal"
cass-visibility:
cassandra:
hosts: "127.0.0.1"
keyspace: "temporal_visibility"
es-visibility:
elasticsearch:
version: "v7"
Expand Down
12 changes: 0 additions & 12 deletions config/development_mysql-es.yaml
@@ -1,6 +1,5 @@
persistence:
defaultStore: mysql-default
visibilityStore: mysql-visibility
advancedVisibilityStore: es-visibility
numHistoryShards: 4
datastores:
Expand All @@ -15,17 +14,6 @@ persistence:
maxConns: 20
maxIdleConns: 20
maxConnLifetime: "1h"
mysql-visibility:
sql:
pluginName: "mysql"
databaseName: "temporal_visibility"
connectAddr: "127.0.0.1:3306"
connectProtocol: "tcp"
user: "temporal"
password: "temporal"
maxConns: 2
maxIdleConns: 2
maxConnLifetime: "1h"
es-visibility:
elasticsearch:
version: "v7"
Expand Down
131 changes: 69 additions & 62 deletions temporal/server.go
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/pborman/uuid"
"github.com/uber-go/tally"
sdkclient "go.temporal.io/sdk/client"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/archiver"
Expand All @@ -47,7 +46,7 @@ import (
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/cassandra"
persistenceClient "go.temporal.io/server/common/persistence/client"
"go.temporal.io/server/common/persistence/elasticsearch/client"
esclient "go.temporal.io/server/common/persistence/elasticsearch/client"
"go.temporal.io/server/common/persistence/sql"
"go.temporal.io/server/common/pprof"
"go.temporal.io/server/common/primitives"
Expand Down Expand Up @@ -120,7 +119,18 @@ func (s *Server) Start() error {
s.so.persistenceServiceResolver = resolver.NewNoopResolver()
}

err = verifyPersistenceCompatibleVersion(s.so.config.Persistence, s.so.persistenceServiceResolver)
if s.so.dynamicConfigClient == nil {
s.so.dynamicConfigClient, err = dynamicconfig.NewFileBasedClient(&s.so.config.DynamicConfigClient, s.logger, s.stoppedCh)
if err != nil {
s.logger.Info("Error creating file based dynamic config client, use no-op config client instead.", tag.Error(err))
s.so.dynamicConfigClient = dynamicconfig.NewNoopClient()
}
}
dc := dynamicconfig.NewCollection(s.so.dynamicConfigClient, s.logger)

advancedVisibilityWritingMode := dc.GetStringProperty(dynamicconfig.AdvancedVisibilityWritingMode, common.GetDefaultAdvancedVisibilityWritingMode(s.so.config.Persistence.IsAdvancedVisibilityConfigExist()))()

err = verifyPersistenceCompatibleVersion(s.so.config.Persistence, s.so.persistenceServiceResolver, advancedVisibilityWritingMode != common.AdvancedVisibilityWritingModeOn)
if err != nil {
return err
}
Expand All @@ -134,15 +144,6 @@ func (s *Server) Start() error {
return fmt.Errorf("ringpop config validation error: %w", err)
}

if s.so.dynamicConfigClient == nil {
s.so.dynamicConfigClient, err = dynamicconfig.NewFileBasedClient(&s.so.config.DynamicConfigClient, s.logger, s.stoppedCh)
if err != nil {
s.logger.Info("Error creating file based dynamic config client, use no-op config client instead.", tag.Error(err))
s.so.dynamicConfigClient = dynamicconfig.NewNoopClient()
}
}
dc := dynamicconfig.NewCollection(s.so.dynamicConfigClient, s.logger)

err = updateClusterMetadataConfig(s.so.config, s.so.persistenceServiceResolver, s.logger)
if err != nil {
return fmt.Errorf("unable to initialize cluster metadata: %w", err)
Expand Down Expand Up @@ -176,8 +177,13 @@ func (s *Server) Start() error {
}
}

esConfig, esClient, err := s.getESConfigClient(advancedVisibilityWritingMode)
if err != nil {
return err
}

for _, svcName := range s.so.serviceNames {
params, err := s.newBootstrapParams(svcName, dc, s.serverReporter, s.sdkReporter)
params, err := s.newBootstrapParams(svcName, dc, s.serverReporter, s.sdkReporter, esConfig, esClient)
if err != nil {
return err
}
Expand Down Expand Up @@ -249,14 +255,20 @@ func (s *Server) newBootstrapParams(
dc *dynamicconfig.Collection,
serverReporter metrics.Reporter,
sdkReporter metrics.Reporter,
esConfig *config.Elasticsearch,
esClient esclient.Client,
) (*resource.BootstrapParams, error) {

params := resource.BootstrapParams{}
params.Name = svcName
params.Logger = s.logger
params.PersistenceConfig = s.so.config.Persistence
params.DynamicConfigClient = s.so.dynamicConfigClient
params.ClusterMetadataConfig = s.so.config.ClusterMetadata
params := &resource.BootstrapParams{
Name: svcName,
Logger: s.logger,
PersistenceConfig: s.so.config.Persistence,
DynamicConfigClient: s.so.dynamicConfigClient,
ClusterMetadataConfig: s.so.config.ClusterMetadata,
DCRedirectionPolicy: s.so.config.DCRedirectionPolicy,
ESConfig: esConfig,
ESClient: esClient,
}

svcCfg := s.so.config.Services[svcName]
rpcFactory := rpc.NewFactory(&svcCfg.RPC, svcName, s.logger, s.so.tlsConfigProvider)
Expand All @@ -265,8 +277,8 @@ func (s *Server) newBootstrapParams(
// Ringpop uses a different port to register handlers, this map is needed to resolve
// services to correct addresses used by clients through ServiceResolver lookup API
servicePortMap := make(map[string]int)
for svcName, svcCfg := range s.so.config.Services {
servicePortMap[svcName] = svcCfg.RPC.GRPCPort
for sn, sc := range s.so.config.Services {
servicePortMap[sn] = sc.RPC.GRPCPort
}

params.MembershipFactoryInitializer =
Expand All @@ -281,9 +293,7 @@ func (s *Server) newBootstrapParams(
)
}

params.DCRedirectionPolicy = s.so.config.DCRedirectionPolicy

//todo: Replace this hack with actually using sdkReporter, Client or Scope.
// todo: Replace this hack with actually using sdkReporter, Client or Scope.
if serverReporter == nil {
var err error
serverReporter, sdkReporter, err = svcCfg.Metrics.InitMetricReporters(s.logger, nil)
Expand Down Expand Up @@ -329,40 +339,6 @@ func (s *Server) newBootstrapParams(
return nil, fmt.Errorf("unable to create public client: %w", err)
}

advancedVisMode := dc.GetStringProperty(
dynamicconfig.AdvancedVisibilityWritingMode,
common.GetDefaultAdvancedVisibilityWritingMode(s.so.config.Persistence.IsAdvancedVisibilityConfigExist()),
)()

if advancedVisMode != common.AdvancedVisibilityWritingModeOff {
// verify config of advanced visibility store
advancedVisStoreKey := s.so.config.Persistence.AdvancedVisibilityStore
advancedVisStore, ok := s.so.config.Persistence.DataStores[advancedVisStoreKey]
if !ok {
return nil, fmt.Errorf("unable to find advanced visibility store in config for %q key", advancedVisStoreKey)
}

if s.so.elasticseachHttpClient == nil {
s.so.elasticseachHttpClient, err = client.NewAwsHttpClient(advancedVisStore.ElasticSearch.AWSRequestSigning)
if err != nil {
return nil, fmt.Errorf("unable to create AWS HTTP client for Elasticsearch: %w", err)
}
}

esClient, err := client.NewClient(advancedVisStore.ElasticSearch, s.so.elasticseachHttpClient, s.logger)
if err != nil {
return nil, fmt.Errorf("unable to create Elasticsearch client: %w", err)
}
params.ESConfig = advancedVisStore.ElasticSearch
params.ESClient = esClient

// verify index name
indexName := advancedVisStore.ElasticSearch.GetVisibilityIndex()
if len(indexName) == 0 {
return nil, errors.New("visibility index in missing in Elasticsearch config")
}
}

params.ArchivalMetadata = archiver.NewArchivalMetadata(
dc,
s.so.config.Archival.History.State,
Expand All @@ -388,16 +364,47 @@ func (s *Server) newBootstrapParams(

params.PersistenceServiceResolver = s.so.persistenceServiceResolver

return &params, nil
return params, nil
}

func (s *Server) getESConfigClient(advancedVisibilityWritingMode string) (*config.Elasticsearch, esclient.Client, error) {
if advancedVisibilityWritingMode == common.AdvancedVisibilityWritingModeOff {
return nil, nil, nil
}

advancedVisibilityStore, ok := s.so.config.Persistence.DataStores[s.so.config.Persistence.AdvancedVisibilityStore]
if !ok {
return nil, nil, fmt.Errorf("unable to find advanced visibility store in config for %q key", s.so.config.Persistence.AdvancedVisibilityStore)
}

indexName := advancedVisibilityStore.ElasticSearch.GetVisibilityIndex()
if len(indexName) == 0 {
return nil, nil, errors.New("visibility index in missing in Elasticsearch config")
}

if s.so.elasticseachHttpClient == nil {
var err error
s.so.elasticseachHttpClient, err = esclient.NewAwsHttpClient(advancedVisibilityStore.ElasticSearch.AWSRequestSigning)
if err != nil {
return nil, nil, fmt.Errorf("unable to create AWS HTTP client for Elasticsearch: %w", err)
}
}

esClient, err := esclient.NewClient(advancedVisibilityStore.ElasticSearch, s.so.elasticseachHttpClient, s.logger)
if err != nil {
return nil, nil, fmt.Errorf("unable to create Elasticsearch client: %w", err)
}

return advancedVisibilityStore.ElasticSearch, esClient, nil
}

func verifyPersistenceCompatibleVersion(config config.Persistence, persistenceServiceResolver resolver.ServiceResolver) error {
func verifyPersistenceCompatibleVersion(config config.Persistence, persistenceServiceResolver resolver.ServiceResolver, checkVisibility bool) error {
// cassandra schema version validation
if err := cassandra.VerifyCompatibleVersion(config, persistenceServiceResolver); err != nil {
if err := cassandra.VerifyCompatibleVersion(config, persistenceServiceResolver, checkVisibility); err != nil {
return fmt.Errorf("cassandra schema version compatibility check failed: %w", err)
}
// sql schema version validation
if err := sql.VerifyCompatibleVersion(config, persistenceServiceResolver); err != nil {
if err := sql.VerifyCompatibleVersion(config, persistenceServiceResolver, checkVisibility); err != nil {
return fmt.Errorf("sql schema version compatibility check failed: %w", err)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion tools/cassandra/version_test.go
Expand Up @@ -94,7 +94,7 @@ func (s *VersionTestSuite) TestVerifyCompatibleVersion() {
},
TransactionSizeLimit: dynamicconfig.GetIntPropertyFn(common.DefaultTransactionSizeLimit),
}
s.NoError(cassandra.VerifyCompatibleVersion(cfg, resolver.NewNoopResolver()))
s.NoError(cassandra.VerifyCompatibleVersion(cfg, resolver.NewNoopResolver(), true))
}

func (s *VersionTestSuite) createKeyspace(keyspace string) func() {
Expand Down
2 changes: 1 addition & 1 deletion tools/sql/clitest/versionTest.go
Expand Up @@ -134,7 +134,7 @@ func (s *VersionTestSuite) TestVerifyCompatibleVersion() {
},
TransactionSizeLimit: dynamicconfig.GetIntPropertyFn(common.DefaultTransactionSizeLimit),
}
s.NoError(persistencesql.VerifyCompatibleVersion(cfg, resolver.NewNoopResolver()))
s.NoError(persistencesql.VerifyCompatibleVersion(cfg, resolver.NewNoopResolver(), true))
}

func (s *VersionTestSuite) createDatabase(database string) func() {
Expand Down

0 comments on commit 6c23f02

Please sign in to comment.