Skip to content

Commit

Permalink
Update worker replication config (#3373)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 authored and alexshtin committed Sep 26, 2020
1 parent 1b00d51 commit d516b40
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 1 deletion.
3 changes: 3 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ var keys = map[Key]string{
WorkerReplicationTaskContextDuration: "worker.replicationTaskContextDuration",
WorkerReReplicationContextTimeout: "worker.workerReReplicationContextTimeout",
WorkerEnableRPCReplication: "worker.enableWorkerRPCReplication",
WorkerEnableKafkaReplication: "worker.enableKafkaReplication",
WorkerIndexerConcurrency: "worker.indexerConcurrency",
WorkerESProcessorNumOfWorkers: "worker.ESProcessorNumOfWorkers",
WorkerESProcessorBulkActions: "worker.ESProcessorBulkActions",
Expand Down Expand Up @@ -701,6 +702,8 @@ const (
WorkerReReplicationContextTimeout
// WorkerEnableRPCReplication is the feature flag for RPC replication
WorkerEnableRPCReplication
// WorkerEnableKafkaReplication is the feature flag for kafka replication
WorkerEnableKafkaReplication
// WorkerIndexerConcurrency is the max concurrent messages to be processed at any given time
WorkerIndexerConcurrency
// WorkerESProcessorNumOfWorkers is num of workers for esProcessor
Expand Down
1 change: 1 addition & 0 deletions service/worker/replicator/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type (
ReplicationTaskContextTimeout dynamicconfig.DurationPropertyFn
ReReplicationContextTimeout dynamicconfig.DurationPropertyFnWithNamespaceIDFilter
EnableRPCReplication dynamicconfig.BoolPropertyFn
EnableKafkaReplication dynamicconfig.BoolPropertyFn
}
)

Expand Down
3 changes: 2 additions & 1 deletion service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func NewConfig(params *resource.BootstrapParams) *Config {
ReplicationTaskContextTimeout: dc.GetDurationProperty(dynamicconfig.WorkerReplicationTaskContextDuration, 30*time.Second),
ReReplicationContextTimeout: dc.GetDurationPropertyFilteredByNamespaceID(dynamicconfig.WorkerReReplicationContextTimeout, 0*time.Second),
EnableRPCReplication: dc.GetBoolProperty(dynamicconfig.WorkerEnableRPCReplication, false),
EnableKafkaReplication: dc.GetBoolProperty(dynamicconfig.WorkerEnableKafkaReplication, false),
},
ArchiverConfig: &archiver.Config{
ArchiverConcurrency: dc.GetIntProperty(dynamicconfig.WorkerArchiverConcurrency, 50),
Expand Down Expand Up @@ -181,7 +182,7 @@ func (s *Service) Start() {
s.startIndexer()
}

if s.GetClusterMetadata().IsGlobalNamespaceEnabled() {
if s.GetClusterMetadata().IsGlobalNamespaceEnabled() && s.config.ReplicationCfg.EnableKafkaReplication() {
s.startReplicator()
}
if s.GetArchivalMetadata().GetHistoryConfig().ClusterConfiguredForArchival() {
Expand Down

0 comments on commit d516b40

Please sign in to comment.