From 1e462372a36598d3850bac2de99938951b0be9e9 Mon Sep 17 00:00:00 2001 From: Yu Xia Date: Thu, 6 Oct 2022 10:54:22 -0700 Subject: [PATCH] Add config filter by task type (#3455) --- common/dynamicconfig/client.go | 1 + common/dynamicconfig/collection.go | 22 +++++++++++++++++ common/dynamicconfig/collection_test.go | 10 ++++++++ service/history/configs/config.go | 8 +++---- .../history/timerQueueStandbyTaskExecutor.go | 24 +++++++++---------- .../timerQueueStandbyTaskExecutor_test.go | 5 ++-- .../transferQueueStandbyTaskExecutor.go | 24 +++++++++---------- .../transferQueueStandbyTaskExecutor_test.go | 5 ++-- 8 files changed, 65 insertions(+), 34 deletions(-) diff --git a/common/dynamicconfig/client.go b/common/dynamicconfig/client.go index bca11a841f8..989b7d91955 100644 --- a/common/dynamicconfig/client.go +++ b/common/dynamicconfig/client.go @@ -95,5 +95,6 @@ type ( TaskQueueName string TaskQueueType enumspb.TaskQueueType ShardID int32 + TaskType string } ) diff --git a/common/dynamicconfig/collection.go b/common/dynamicconfig/collection.go index fd2dcac8de0..ba0eb175e57 100644 --- a/common/dynamicconfig/collection.go +++ b/common/dynamicconfig/collection.go @@ -32,6 +32,7 @@ import ( enumspb "go.temporal.io/api/enums/v1" + enumsspb "go.temporal.io/server/api/enums/v1" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/primitives/timestamp" @@ -71,6 +72,7 @@ type ( DurationPropertyFnWithNamespaceIDFilter func(namespaceID string) time.Duration DurationPropertyFnWithShardIDFilter func(shardID int32) time.Duration DurationPropertyFnWithTaskQueueInfoFilters func(namespace string, taskQueue string, taskType enumspb.TaskQueueType) time.Duration + DurationPropertyFnWithTaskTypeFilter func(task enumsspb.TaskType) time.Duration FloatPropertyFn func() float64 FloatPropertyFnWithNamespaceFilter func(namespace string) float64 FloatPropertyFnWithShardIDFilter func(shardID int32) float64 @@ -282,6 +284,19 @@ func (c *Collection) GetDurationPropertyFilteredByShardID(key Key, defaultValue } } +// GetDurationPropertyFilteredByTaskType gets property with task type as filters and asserts that it's a duration +func (c *Collection) GetDurationPropertyFilteredByTaskType(key Key, defaultValue any) DurationPropertyFnWithTaskTypeFilter { + return func(taskType enumsspb.TaskType) time.Duration { + return matchAndConvert( + c, + key, + defaultValue, + taskTypePrecedence(taskType), + convertDuration, + ) + } +} + // GetBoolProperty gets property and asserts that it's a bool func (c *Collection) GetBoolProperty(key Key, defaultValue any) BoolPropertyFn { return func() bool { @@ -491,6 +506,13 @@ func shardIDPrecedence(shardID int32) []Constraints { } } +func taskTypePrecedence(taskType enumsspb.TaskType) []Constraints { + return []Constraints{ + {TaskType: taskType.String()}, + {}, + } +} + func convertInt(val any) (int, error) { if intVal, ok := val.(int); ok { return intVal, nil diff --git a/common/dynamicconfig/collection_test.go b/common/dynamicconfig/collection_test.go index 6da4787a109..8db8e3e2148 100644 --- a/common/dynamicconfig/collection_test.go +++ b/common/dynamicconfig/collection_test.go @@ -30,6 +30,7 @@ import ( "github.com/stretchr/testify/suite" + enumsspb "go.temporal.io/server/api/enums/v1" "go.temporal.io/server/common/log" ) @@ -48,6 +49,7 @@ const ( testGetDurationPropertyFilteredByNamespaceKey = "testGetDurationPropertyFilteredByNamespaceKey" testGetIntPropertyFilteredByTaskQueueInfoKey = "testGetIntPropertyFilteredByTaskQueueInfoKey" testGetDurationPropertyFilteredByTaskQueueInfoKey = "testGetDurationPropertyFilteredByTaskQueueInfoKey" + testGetDurationPropertyFilteredByTaskTypeKey = "testGetDurationPropertyFilteredByTaskTypeKey" testGetDurationPropertyStructuredDefaults = "testGetDurationPropertyStructuredDefaults" testGetBoolPropertyFilteredByNamespaceIDKey = "testGetBoolPropertyFilteredByNamespaceIDKey" testGetBoolPropertyFilteredByTaskQueueInfoKey = "testGetBoolPropertyFilteredByTaskQueueInfoKey" @@ -159,6 +161,14 @@ func (s *collectionSuite) TestGetDurationPropertyFilteredByTaskQueueInfo() { s.Equal(time.Minute, value(namespace, taskQueue, 0)) } +func (s *collectionSuite) TestGetDurationPropertyFilteredByTaskType() { + taskType := enumsspb.TASK_TYPE_UNSPECIFIED + value := s.cln.GetDurationPropertyFilteredByTaskType(testGetDurationPropertyFilteredByTaskTypeKey, time.Second) + s.Equal(time.Second, value(taskType)) + s.client[testGetDurationPropertyFilteredByTaskTypeKey] = time.Minute + s.Equal(time.Minute, value(taskType)) +} + func (s *collectionSuite) TestGetDurationPropertyStructuredDefaults() { defaults := []ConstrainedValue{ { diff --git a/service/history/configs/config.go b/service/history/configs/config.go index e60fa508b1f..276f6284bf5 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -79,8 +79,8 @@ type Config struct { // the artificial delay added to standby cluster's view of active cluster's time StandbyClusterDelay dynamicconfig.DurationPropertyFn - StandbyTaskMissingEventsResendDelay dynamicconfig.DurationPropertyFn - StandbyTaskMissingEventsDiscardDelay dynamicconfig.DurationPropertyFn + StandbyTaskMissingEventsResendDelay dynamicconfig.DurationPropertyFnWithTaskTypeFilter + StandbyTaskMissingEventsDiscardDelay dynamicconfig.DurationPropertyFnWithTaskTypeFilter QueuePendingTaskCriticalCount dynamicconfig.IntPropertyFn QueueReaderStuckCriticalAttempts dynamicconfig.IntPropertyFn @@ -314,8 +314,8 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis AcquireShardInterval: dc.GetDurationProperty(dynamicconfig.AcquireShardInterval, time.Minute), AcquireShardConcurrency: dc.GetIntProperty(dynamicconfig.AcquireShardConcurrency, 10), StandbyClusterDelay: dc.GetDurationProperty(dynamicconfig.StandbyClusterDelay, 5*time.Minute), - StandbyTaskMissingEventsResendDelay: dc.GetDurationProperty(dynamicconfig.StandbyTaskMissingEventsResendDelay, 10*time.Minute), - StandbyTaskMissingEventsDiscardDelay: dc.GetDurationProperty(dynamicconfig.StandbyTaskMissingEventsDiscardDelay, 15*time.Minute), + StandbyTaskMissingEventsResendDelay: dc.GetDurationPropertyFilteredByTaskType(dynamicconfig.StandbyTaskMissingEventsResendDelay, 10*time.Minute), + StandbyTaskMissingEventsDiscardDelay: dc.GetDurationPropertyFilteredByTaskType(dynamicconfig.StandbyTaskMissingEventsDiscardDelay, 15*time.Minute), QueuePendingTaskCriticalCount: dc.GetIntProperty(dynamicconfig.QueuePendingTaskCriticalCount, 9000), QueueReaderStuckCriticalAttempts: dc.GetIntProperty(dynamicconfig.QueueReaderStuckCriticalAttempts, 3), diff --git a/service/history/timerQueueStandbyTaskExecutor.go b/service/history/timerQueueStandbyTaskExecutor.go index e3c0d8248b0..4e98484629f 100644 --- a/service/history/timerQueueStandbyTaskExecutor.go +++ b/service/history/timerQueueStandbyTaskExecutor.go @@ -156,8 +156,8 @@ func (t *timerQueueStandbyTaskExecutor) executeUserTimerTimeoutTask( getStandbyPostActionFn( timerTask, t.getCurrentTime, - t.config.StandbyTaskMissingEventsResendDelay(), - t.config.StandbyTaskMissingEventsDiscardDelay(), + t.config.StandbyTaskMissingEventsResendDelay(timerTask.GetType()), + t.config.StandbyTaskMissingEventsDiscardDelay(timerTask.GetType()), t.fetchHistoryFromRemote, standbyTimerTaskPostActionTaskDiscarded, ), @@ -256,8 +256,8 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask( getStandbyPostActionFn( timerTask, t.getCurrentTime, - t.config.StandbyTaskMissingEventsResendDelay(), - t.config.StandbyTaskMissingEventsDiscardDelay(), + t.config.StandbyTaskMissingEventsResendDelay(timerTask.GetType()), + t.config.StandbyTaskMissingEventsDiscardDelay(timerTask.GetType()), t.fetchHistoryFromRemote, standbyTimerTaskPostActionTaskDiscarded, ), @@ -297,8 +297,8 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityRetryTimerTask( getStandbyPostActionFn( task, t.getCurrentTime, - t.config.StandbyTaskMissingEventsResendDelay(), - t.config.StandbyTaskMissingEventsDiscardDelay(), + t.config.StandbyTaskMissingEventsResendDelay(task.GetType()), + t.config.StandbyTaskMissingEventsDiscardDelay(task.GetType()), t.fetchHistoryFromRemote, t.pushActivity, ), @@ -337,8 +337,8 @@ func (t *timerQueueStandbyTaskExecutor) executeWorkflowTaskTimeoutTask( getStandbyPostActionFn( timerTask, t.getCurrentTime, - t.config.StandbyTaskMissingEventsResendDelay(), - t.config.StandbyTaskMissingEventsDiscardDelay(), + t.config.StandbyTaskMissingEventsResendDelay(timerTask.GetType()), + t.config.StandbyTaskMissingEventsDiscardDelay(timerTask.GetType()), t.fetchHistoryFromRemote, standbyTimerTaskPostActionTaskDiscarded, ), @@ -376,8 +376,8 @@ func (t *timerQueueStandbyTaskExecutor) executeWorkflowBackoffTimerTask( getStandbyPostActionFn( timerTask, t.getCurrentTime, - t.config.StandbyTaskMissingEventsResendDelay(), - t.config.StandbyTaskMissingEventsDiscardDelay(), + t.config.StandbyTaskMissingEventsResendDelay(timerTask.GetType()), + t.config.StandbyTaskMissingEventsDiscardDelay(timerTask.GetType()), t.fetchHistoryFromRemote, standbyTimerTaskPostActionTaskDiscarded, ), @@ -411,8 +411,8 @@ func (t *timerQueueStandbyTaskExecutor) executeWorkflowTimeoutTask( getStandbyPostActionFn( timerTask, t.getCurrentTime, - t.config.StandbyTaskMissingEventsResendDelay(), - t.config.StandbyTaskMissingEventsDiscardDelay(), + t.config.StandbyTaskMissingEventsResendDelay(timerTask.GetType()), + t.config.StandbyTaskMissingEventsDiscardDelay(timerTask.GetType()), t.fetchHistoryFromRemote, standbyTimerTaskPostActionTaskDiscarded, ), diff --git a/service/history/timerQueueStandbyTaskExecutor_test.go b/service/history/timerQueueStandbyTaskExecutor_test.go index 720aacfd037..7b6aca3e3fd 100644 --- a/service/history/timerQueueStandbyTaskExecutor_test.go +++ b/service/history/timerQueueStandbyTaskExecutor_test.go @@ -117,9 +117,8 @@ func (s *timerQueueStandbyTaskExecutorSuite) SetupTest() { s.clusterName = cluster.TestAlternativeClusterName s.now = time.Now().UTC() s.timeSource = clock.NewEventTimeSource().Update(s.now) - s.fetchHistoryDuration = config.StandbyTaskMissingEventsResendDelay() + - (config.StandbyTaskMissingEventsDiscardDelay()-config.StandbyTaskMissingEventsResendDelay())/2 - s.discardDuration = config.StandbyTaskMissingEventsDiscardDelay() * 2 + s.fetchHistoryDuration = time.Minute * 12 + s.discardDuration = time.Minute * 30 s.controller = gomock.NewController(s.T()) s.mockNDCHistoryResender = xdc.NewMockNDCHistoryResender(s.controller) diff --git a/service/history/transferQueueStandbyTaskExecutor.go b/service/history/transferQueueStandbyTaskExecutor.go index ef69bc33be5..d81ef896e07 100644 --- a/service/history/transferQueueStandbyTaskExecutor.go +++ b/service/history/transferQueueStandbyTaskExecutor.go @@ -158,8 +158,8 @@ func (t *transferQueueStandbyTaskExecutor) processActivityTask( getStandbyPostActionFn( transferTask, t.getCurrentTime, - t.config.StandbyTaskMissingEventsResendDelay(), - t.config.StandbyTaskMissingEventsDiscardDelay(), + t.config.StandbyTaskMissingEventsResendDelay(transferTask.GetType()), + t.config.StandbyTaskMissingEventsDiscardDelay(transferTask.GetType()), t.fetchHistoryFromRemote, t.pushActivity, ), @@ -216,8 +216,8 @@ func (t *transferQueueStandbyTaskExecutor) processWorkflowTask( getStandbyPostActionFn( transferTask, t.getCurrentTime, - t.config.StandbyTaskMissingEventsResendDelay(), - t.config.StandbyTaskMissingEventsDiscardDelay(), + t.config.StandbyTaskMissingEventsResendDelay(transferTask.GetType()), + t.config.StandbyTaskMissingEventsDiscardDelay(transferTask.GetType()), t.fetchHistoryFromRemote, t.pushWorkflowTask, ), @@ -331,8 +331,8 @@ func (t *transferQueueStandbyTaskExecutor) processCloseExecution( getStandbyPostActionFn( transferTask, t.getCurrentTime, - t.config.StandbyTaskMissingEventsResendDelay(), - t.config.StandbyTaskMissingEventsDiscardDelay(), + t.config.StandbyTaskMissingEventsResendDelay(transferTask.GetType()), + t.config.StandbyTaskMissingEventsDiscardDelay(transferTask.GetType()), standbyTaskPostActionNoOp, standbyTransferTaskPostActionTaskDiscarded, ), @@ -366,8 +366,8 @@ func (t *transferQueueStandbyTaskExecutor) processCancelExecution( getStandbyPostActionFn( transferTask, t.getCurrentTime, - t.config.StandbyTaskMissingEventsResendDelay(), - t.config.StandbyTaskMissingEventsDiscardDelay(), + t.config.StandbyTaskMissingEventsResendDelay(transferTask.GetType()), + t.config.StandbyTaskMissingEventsDiscardDelay(transferTask.GetType()), t.fetchHistoryFromRemote, standbyTransferTaskPostActionTaskDiscarded, ), @@ -401,8 +401,8 @@ func (t *transferQueueStandbyTaskExecutor) processSignalExecution( getStandbyPostActionFn( transferTask, t.getCurrentTime, - t.config.StandbyTaskMissingEventsResendDelay(), - t.config.StandbyTaskMissingEventsDiscardDelay(), + t.config.StandbyTaskMissingEventsResendDelay(transferTask.GetType()), + t.config.StandbyTaskMissingEventsDiscardDelay(transferTask.GetType()), t.fetchHistoryFromRemote, standbyTransferTaskPostActionTaskDiscarded, ), @@ -483,8 +483,8 @@ func (t *transferQueueStandbyTaskExecutor) processStartChildExecution( getStandbyPostActionFn( transferTask, t.getCurrentTime, - t.config.StandbyTaskMissingEventsResendDelay(), - t.config.StandbyTaskMissingEventsDiscardDelay(), + t.config.StandbyTaskMissingEventsResendDelay(transferTask.GetType()), + t.config.StandbyTaskMissingEventsDiscardDelay(transferTask.GetType()), t.startChildExecutionResendPostAction, standbyTransferTaskPostActionTaskDiscarded, ), diff --git a/service/history/transferQueueStandbyTaskExecutor_test.go b/service/history/transferQueueStandbyTaskExecutor_test.go index 7698768e72c..511a273e5e6 100644 --- a/service/history/transferQueueStandbyTaskExecutor_test.go +++ b/service/history/transferQueueStandbyTaskExecutor_test.go @@ -127,9 +127,8 @@ func (s *transferQueueStandbyTaskExecutorSuite) SetupTest() { s.version = s.namespaceEntry.FailoverVersion() s.now = time.Now().UTC() s.timeSource = clock.NewEventTimeSource().Update(s.now) - s.fetchHistoryDuration = config.StandbyTaskMissingEventsResendDelay() + - (config.StandbyTaskMissingEventsDiscardDelay()-config.StandbyTaskMissingEventsResendDelay())/2 - s.discardDuration = config.StandbyTaskMissingEventsDiscardDelay() * 2 + s.fetchHistoryDuration = time.Minute * 12 + s.discardDuration = time.Minute * 30 s.controller = gomock.NewController(s.T()) s.mockNDCHistoryResender = xdc.NewMockNDCHistoryResender(s.controller)