Skip to content

Commit

Permalink
Add a DLQ replication test
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Oct 19, 2023
1 parent b8f2123 commit 481061b
Show file tree
Hide file tree
Showing 8 changed files with 399 additions and 31 deletions.
14 changes: 14 additions & 0 deletions service/worker/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
package worker

import (
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.uber.org/fx"

"go.temporal.io/server/common/config"
Expand Down Expand Up @@ -67,6 +70,17 @@ var Module = fx.Options(
fx.Provide(PersistenceRateLimitingParamsProvider),
service.PersistenceLazyLoadedServiceResolverModule,
fx.Provide(ServiceResolverProvider),
fx.Provide(func(
clusterMetadata cluster.Metadata,
metadataManager persistence.MetadataManager,
logger log.Logger,
) namespace.ReplicationTaskExecutor {
return namespace.NewReplicationTaskExecutor(
clusterMetadata.GetCurrentClusterName(),
metadataManager,
logger,
)
}),
fx.Provide(NewService),
fx.Provide(fx.Annotate(NewWorkerManager, fx.ParamTags(workercommon.WorkerComponentTag))),
fx.Provide(NewPerNamespaceWorkerManager),
Expand Down
24 changes: 11 additions & 13 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,11 @@ type (
esClient esclient.Client
config *Config

workerManager *workerManager
perNamespaceWorkerManager *perNamespaceWorkerManager
scanner *scanner.Scanner
matchingClient matchingservice.MatchingServiceClient
workerManager *workerManager
perNamespaceWorkerManager *perNamespaceWorkerManager
scanner *scanner.Scanner
matchingClient matchingservice.MatchingServiceClient
namespaceReplicationTaskExecutor namespace.ReplicationTaskExecutor
}

// Config contains all the service config for worker
Expand Down Expand Up @@ -140,6 +141,7 @@ func NewService(
perNamespaceWorkerManager *perNamespaceWorkerManager,
visibilityManager manager.VisibilityManager,
matchingClient resource.MatchingClient,
namespaceReplicationTaskExecutor namespace.ReplicationTaskExecutor,
) (*Service, error) {
workerServiceResolver, err := membershipMonitor.GetResolver(primitives.WorkerService)
if err != nil {
Expand Down Expand Up @@ -167,9 +169,10 @@ func NewService(
historyClient: historyClient,
visibilityManager: visibilityManager,

workerManager: workerManager,
perNamespaceWorkerManager: perNamespaceWorkerManager,
matchingClient: matchingClient,
workerManager: workerManager,
perNamespaceWorkerManager: perNamespaceWorkerManager,
matchingClient: matchingClient,
namespaceReplicationTaskExecutor: namespaceReplicationTaskExecutor,
}
if err := s.initScanner(); err != nil {
return nil, err
Expand Down Expand Up @@ -460,11 +463,6 @@ func (s *Service) startScanner() {
}

func (s *Service) startReplicator() {
namespaceReplicationTaskExecutor := namespace.NewReplicationTaskExecutor(
s.clusterMetadata.GetCurrentClusterName(),
s.metadataManager,
s.logger,
)
msgReplicator := replicator.NewReplicator(
s.clusterMetadata,
s.clientBean,
Expand All @@ -473,7 +471,7 @@ func (s *Service) startReplicator() {
s.hostInfo,
s.workerServiceResolver,
s.namespaceReplicationQueue,
namespaceReplicationTaskExecutor,
s.namespaceReplicationTaskExecutor,
s.matchingClient,
s.namespaceRegistry,
)
Expand Down
2 changes: 1 addition & 1 deletion tests/dlq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (s *dlqSuite) TestTDBG() {
"dlq",
"--" + tdbg.FlagDLQVersion, "v2",
"read",
"--" + tdbg.FlagDLQType, strconv.Itoa(int(tasks.CategoryTransfer.ID())),
"--" + tdbg.FlagDLQType, strconv.Itoa(tasks.CategoryTransfer.ID()),
"--" + tdbg.FlagCluster, sourceCluster,
"--" + tdbg.FlagPageSize, "1",
"--" + tdbg.FlagMaxMessageCount, tc.maxMessageCount,
Expand Down
32 changes: 18 additions & 14 deletions tests/functional_test_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,11 @@ type (
dynamicConfigOverrides map[dynamicconfig.Key]interface{}
hostPort string
}
// suiteParams contains the variables which are used to configure the test suite via the Option argument to
// setupSuite.
suiteParams struct {
fxOptions map[primitives.ServiceName][]fx.Option
// TestClusterParams contains the variables which are used to configure test suites via the Option type.
TestClusterParams struct {
ServiceOptions map[primitives.ServiceName][]fx.Option
}
Option func(params *suiteParams)
Option func(params *TestClusterParams)
)

// WithFxOptionsForService returns an Option which, when passed as an argument to setupSuite, will append the given list
Expand All @@ -100,18 +99,13 @@ type (
// scalable and flexible. The reason we need to do this on a per-service basis is that there are separate fx apps for
// each one.
func WithFxOptionsForService(serviceName primitives.ServiceName, options ...fx.Option) Option {
return func(params *suiteParams) {
params.fxOptions[serviceName] = append(params.fxOptions[serviceName], options...)
return func(params *TestClusterParams) {
params.ServiceOptions[serviceName] = append(params.ServiceOptions[serviceName], options...)
}
}

func (s *FunctionalTestBase) setupSuite(defaultClusterConfigFile string, options ...Option) {
params := suiteParams{
fxOptions: make(map[primitives.ServiceName][]fx.Option),
}
for _, opt := range options {
opt(&params)
}
params := ApplyTestClusterParams(options)

s.hostPort = "127.0.0.1:7134"
if TestFlags.FrontendAddr != "" {
Expand All @@ -122,7 +116,7 @@ func (s *FunctionalTestBase) setupSuite(defaultClusterConfigFile string, options
clusterConfig, err := GetTestClusterConfig(defaultClusterConfigFile)
s.Require().NoError(err)
clusterConfig.DynamicConfigOverrides = s.dynamicConfigOverrides
clusterConfig.ServiceFxOptions = params.fxOptions
clusterConfig.ServiceFxOptions = params.ServiceOptions
s.testClusterConfig = clusterConfig

if clusterConfig.FrontendAddress != "" {
Expand Down Expand Up @@ -164,6 +158,16 @@ func (s *FunctionalTestBase) setupSuite(defaultClusterConfigFile string, options
time.Sleep(2 * NamespaceCacheRefreshInterval)
}

func ApplyTestClusterParams(options []Option) TestClusterParams {
params := TestClusterParams{
ServiceOptions: make(map[primitives.ServiceName][]fx.Option),
}
for _, opt := range options {
opt(&params)
}
return params
}

// setupLogger sets the Logger for the test suite.
// If the Logger is already set, this method does nothing.
// If the Logger is not set, this method creates a new log.TestLogger which logs to stdout and stderr.
Expand Down
4 changes: 3 additions & 1 deletion tests/xdc/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ func (s *xdcBaseSuite) clusterReplicationConfig() []*replicationpb.ClusterReplic
return config
}

func (s *xdcBaseSuite) setupSuite(clusterNames []string) {
func (s *xdcBaseSuite) setupSuite(clusterNames []string, opts ...tests.Option) {
params := tests.ApplyTestClusterParams(opts)
s.clusterNames = clusterNames
s.logger = log.NewTestLogger()
if s.dynamicConfigOverrides == nil {
Expand Down Expand Up @@ -101,6 +102,7 @@ func (s *xdcBaseSuite) setupSuite(clusterNames []string) {
InitialFailoverVersion: int64(i + 1),
RPCAddress: fmt.Sprintf("127.0.0.1:%d134", 7+i),
}
clusterConfigs[i].ServiceFxOptions = params.ServiceOptions
}

c, err := tests.NewCluster(clusterConfigs[0], log.With(s.logger, tag.ClusterName(s.clusterNames[0])))
Expand Down

0 comments on commit 481061b

Please sign in to comment.