diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index dc01d21063a..a29e6a369dc 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -524,6 +524,8 @@ const ( ArchivalProcessorRetryWarningLimit = "history.archivalProcessorRetryLimitWarning" // ArchivalBackendMaxRPS is the maximum rate of requests per second to the archival backend ArchivalBackendMaxRPS = "history.archivalBackendMaxRPS" + // DurableArchivalEnabled is the flag to enable durable archival + DurableArchivalEnabled = "history.durableArchivalEnabled" // ReplicatorTaskBatchSize is batch size for ReplicatorProcessor ReplicatorTaskBatchSize = "history.replicatorTaskBatchSize" diff --git a/host/archival_test.go b/host/archival_test.go index 4f86670427e..ada7bab297b 100644 --- a/host/archival_test.go +++ b/host/archival_test.go @@ -68,9 +68,6 @@ type archivalSuite struct { } func (s *archivalSuite) SetupSuite() { - s.dynamicConfigOverrides = map[dynamicconfig.Key]interface{}{ - dynamicconfig.RetentionTimerJitterDuration: time.Second, - } s.setupSuite("testdata/integration_test_cluster.yaml") } @@ -85,7 +82,30 @@ func (s *archivalSuite) SetupTest() { func TestArchivalSuite(t *testing.T) { flag.Parse() - suite.Run(t, new(archivalSuite)) + for _, c := range []struct { + Name string + DurableArchivalIsEnabled bool + }{ + { + Name: "DurableArchivalIsDisabled", + DurableArchivalIsEnabled: false, + }, + { + Name: "DurableArchivalIsEnabled", + DurableArchivalIsEnabled: true, + }, + } { + c := c + t.Run(c.Name, func(t *testing.T) { + s := new(archivalSuite) + s.dynamicConfigOverrides = map[dynamicconfig.Key]interface{}{ + dynamicconfig.RetentionTimerJitterDuration: time.Second, + dynamicconfig.ArchivalProcessorArchiveDelay: time.Duration(0), + dynamicconfig.DurableArchivalEnabled: c.DurableArchivalIsEnabled, + } + suite.Run(t, s) + }) + } } func (s *archivalSuite) TestArchival_TimerQueueProcessor() { diff --git a/service/history/configs/config.go b/service/history/configs/config.go index 9e9dd91c736..bd04e16da97 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -425,10 +425,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis NumArchiveSystemWorkflows: dc.GetIntProperty(dynamicconfig.NumArchiveSystemWorkflows, 1000), ArchiveRequestRPS: dc.GetIntProperty(dynamicconfig.ArchiveRequestRPS, 300), // should be much smaller than frontend RPS ArchiveSignalTimeout: dc.GetDurationProperty(dynamicconfig.ArchiveSignalTimeout, 300*time.Millisecond), - DurableArchivalEnabled: func() bool { - // Always return false for now until durable archival is tested end-to-end - return false - }, + DurableArchivalEnabled: dc.GetBoolProperty(dynamicconfig.DurableArchivalEnabled, false), BlobSizeLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BlobSizeLimitError, 2*1024*1024), BlobSizeLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BlobSizeLimitWarn, 512*1024), diff --git a/service/history/queueFactoryBase.go b/service/history/queueFactoryBase.go index f14aa9efce7..4c11d5f0509 100644 --- a/service/history/queueFactoryBase.go +++ b/service/history/queueFactoryBase.go @@ -109,6 +109,10 @@ var QueueModule = fx.Options( Group: QueueFactoryFxGroup, Target: NewVisibilityQueueFactory, }, + fx.Annotated{ + Group: QueueFactoryFxGroup, + Target: NewArchivalQueueFactory, + }, ), fx.Invoke(QueueFactoryLifetimeHooks), )