Skip to content

Commit

Permalink
Add config filter by task type (#3455)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 authored and dnr committed Oct 10, 2022
1 parent 082713b commit 1e46237
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 34 deletions.
1 change: 1 addition & 0 deletions common/dynamicconfig/client.go
Expand Up @@ -95,5 +95,6 @@ type (
TaskQueueName string
TaskQueueType enumspb.TaskQueueType
ShardID int32
TaskType string
}
)
22 changes: 22 additions & 0 deletions common/dynamicconfig/collection.go
Expand Up @@ -32,6 +32,7 @@ import (

enumspb "go.temporal.io/api/enums/v1"

enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/primitives/timestamp"
Expand Down Expand Up @@ -71,6 +72,7 @@ type (
DurationPropertyFnWithNamespaceIDFilter func(namespaceID string) time.Duration
DurationPropertyFnWithShardIDFilter func(shardID int32) time.Duration
DurationPropertyFnWithTaskQueueInfoFilters func(namespace string, taskQueue string, taskType enumspb.TaskQueueType) time.Duration
DurationPropertyFnWithTaskTypeFilter func(task enumsspb.TaskType) time.Duration
FloatPropertyFn func() float64
FloatPropertyFnWithNamespaceFilter func(namespace string) float64
FloatPropertyFnWithShardIDFilter func(shardID int32) float64
Expand Down Expand Up @@ -282,6 +284,19 @@ func (c *Collection) GetDurationPropertyFilteredByShardID(key Key, defaultValue
}
}

// GetDurationPropertyFilteredByTaskType gets property with task type as filters and asserts that it's a duration
func (c *Collection) GetDurationPropertyFilteredByTaskType(key Key, defaultValue any) DurationPropertyFnWithTaskTypeFilter {
return func(taskType enumsspb.TaskType) time.Duration {
return matchAndConvert(
c,
key,
defaultValue,
taskTypePrecedence(taskType),
convertDuration,
)
}
}

// GetBoolProperty gets property and asserts that it's a bool
func (c *Collection) GetBoolProperty(key Key, defaultValue any) BoolPropertyFn {
return func() bool {
Expand Down Expand Up @@ -491,6 +506,13 @@ func shardIDPrecedence(shardID int32) []Constraints {
}
}

func taskTypePrecedence(taskType enumsspb.TaskType) []Constraints {
return []Constraints{
{TaskType: taskType.String()},
{},
}
}

func convertInt(val any) (int, error) {
if intVal, ok := val.(int); ok {
return intVal, nil
Expand Down
10 changes: 10 additions & 0 deletions common/dynamicconfig/collection_test.go
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/stretchr/testify/suite"

enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/common/log"
)

Expand All @@ -48,6 +49,7 @@ const (
testGetDurationPropertyFilteredByNamespaceKey = "testGetDurationPropertyFilteredByNamespaceKey"
testGetIntPropertyFilteredByTaskQueueInfoKey = "testGetIntPropertyFilteredByTaskQueueInfoKey"
testGetDurationPropertyFilteredByTaskQueueInfoKey = "testGetDurationPropertyFilteredByTaskQueueInfoKey"
testGetDurationPropertyFilteredByTaskTypeKey = "testGetDurationPropertyFilteredByTaskTypeKey"
testGetDurationPropertyStructuredDefaults = "testGetDurationPropertyStructuredDefaults"
testGetBoolPropertyFilteredByNamespaceIDKey = "testGetBoolPropertyFilteredByNamespaceIDKey"
testGetBoolPropertyFilteredByTaskQueueInfoKey = "testGetBoolPropertyFilteredByTaskQueueInfoKey"
Expand Down Expand Up @@ -159,6 +161,14 @@ func (s *collectionSuite) TestGetDurationPropertyFilteredByTaskQueueInfo() {
s.Equal(time.Minute, value(namespace, taskQueue, 0))
}

func (s *collectionSuite) TestGetDurationPropertyFilteredByTaskType() {
taskType := enumsspb.TASK_TYPE_UNSPECIFIED
value := s.cln.GetDurationPropertyFilteredByTaskType(testGetDurationPropertyFilteredByTaskTypeKey, time.Second)
s.Equal(time.Second, value(taskType))
s.client[testGetDurationPropertyFilteredByTaskTypeKey] = time.Minute
s.Equal(time.Minute, value(taskType))
}

func (s *collectionSuite) TestGetDurationPropertyStructuredDefaults() {
defaults := []ConstrainedValue{
{
Expand Down
8 changes: 4 additions & 4 deletions service/history/configs/config.go
Expand Up @@ -79,8 +79,8 @@ type Config struct {

// the artificial delay added to standby cluster's view of active cluster's time
StandbyClusterDelay dynamicconfig.DurationPropertyFn
StandbyTaskMissingEventsResendDelay dynamicconfig.DurationPropertyFn
StandbyTaskMissingEventsDiscardDelay dynamicconfig.DurationPropertyFn
StandbyTaskMissingEventsResendDelay dynamicconfig.DurationPropertyFnWithTaskTypeFilter
StandbyTaskMissingEventsDiscardDelay dynamicconfig.DurationPropertyFnWithTaskTypeFilter

QueuePendingTaskCriticalCount dynamicconfig.IntPropertyFn
QueueReaderStuckCriticalAttempts dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -314,8 +314,8 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
AcquireShardInterval: dc.GetDurationProperty(dynamicconfig.AcquireShardInterval, time.Minute),
AcquireShardConcurrency: dc.GetIntProperty(dynamicconfig.AcquireShardConcurrency, 10),
StandbyClusterDelay: dc.GetDurationProperty(dynamicconfig.StandbyClusterDelay, 5*time.Minute),
StandbyTaskMissingEventsResendDelay: dc.GetDurationProperty(dynamicconfig.StandbyTaskMissingEventsResendDelay, 10*time.Minute),
StandbyTaskMissingEventsDiscardDelay: dc.GetDurationProperty(dynamicconfig.StandbyTaskMissingEventsDiscardDelay, 15*time.Minute),
StandbyTaskMissingEventsResendDelay: dc.GetDurationPropertyFilteredByTaskType(dynamicconfig.StandbyTaskMissingEventsResendDelay, 10*time.Minute),
StandbyTaskMissingEventsDiscardDelay: dc.GetDurationPropertyFilteredByTaskType(dynamicconfig.StandbyTaskMissingEventsDiscardDelay, 15*time.Minute),

QueuePendingTaskCriticalCount: dc.GetIntProperty(dynamicconfig.QueuePendingTaskCriticalCount, 9000),
QueueReaderStuckCriticalAttempts: dc.GetIntProperty(dynamicconfig.QueueReaderStuckCriticalAttempts, 3),
Expand Down
24 changes: 12 additions & 12 deletions service/history/timerQueueStandbyTaskExecutor.go
Expand Up @@ -156,8 +156,8 @@ func (t *timerQueueStandbyTaskExecutor) executeUserTimerTimeoutTask(
getStandbyPostActionFn(
timerTask,
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.config.StandbyTaskMissingEventsResendDelay(timerTask.GetType()),
t.config.StandbyTaskMissingEventsDiscardDelay(timerTask.GetType()),
t.fetchHistoryFromRemote,
standbyTimerTaskPostActionTaskDiscarded,
),
Expand Down Expand Up @@ -256,8 +256,8 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask(
getStandbyPostActionFn(
timerTask,
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.config.StandbyTaskMissingEventsResendDelay(timerTask.GetType()),
t.config.StandbyTaskMissingEventsDiscardDelay(timerTask.GetType()),
t.fetchHistoryFromRemote,
standbyTimerTaskPostActionTaskDiscarded,
),
Expand Down Expand Up @@ -297,8 +297,8 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityRetryTimerTask(
getStandbyPostActionFn(
task,
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.config.StandbyTaskMissingEventsResendDelay(task.GetType()),
t.config.StandbyTaskMissingEventsDiscardDelay(task.GetType()),
t.fetchHistoryFromRemote,
t.pushActivity,
),
Expand Down Expand Up @@ -337,8 +337,8 @@ func (t *timerQueueStandbyTaskExecutor) executeWorkflowTaskTimeoutTask(
getStandbyPostActionFn(
timerTask,
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.config.StandbyTaskMissingEventsResendDelay(timerTask.GetType()),
t.config.StandbyTaskMissingEventsDiscardDelay(timerTask.GetType()),
t.fetchHistoryFromRemote,
standbyTimerTaskPostActionTaskDiscarded,
),
Expand Down Expand Up @@ -376,8 +376,8 @@ func (t *timerQueueStandbyTaskExecutor) executeWorkflowBackoffTimerTask(
getStandbyPostActionFn(
timerTask,
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.config.StandbyTaskMissingEventsResendDelay(timerTask.GetType()),
t.config.StandbyTaskMissingEventsDiscardDelay(timerTask.GetType()),
t.fetchHistoryFromRemote,
standbyTimerTaskPostActionTaskDiscarded,
),
Expand Down Expand Up @@ -411,8 +411,8 @@ func (t *timerQueueStandbyTaskExecutor) executeWorkflowTimeoutTask(
getStandbyPostActionFn(
timerTask,
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.config.StandbyTaskMissingEventsResendDelay(timerTask.GetType()),
t.config.StandbyTaskMissingEventsDiscardDelay(timerTask.GetType()),
t.fetchHistoryFromRemote,
standbyTimerTaskPostActionTaskDiscarded,
),
Expand Down
5 changes: 2 additions & 3 deletions service/history/timerQueueStandbyTaskExecutor_test.go
Expand Up @@ -117,9 +117,8 @@ func (s *timerQueueStandbyTaskExecutorSuite) SetupTest() {
s.clusterName = cluster.TestAlternativeClusterName
s.now = time.Now().UTC()
s.timeSource = clock.NewEventTimeSource().Update(s.now)
s.fetchHistoryDuration = config.StandbyTaskMissingEventsResendDelay() +
(config.StandbyTaskMissingEventsDiscardDelay()-config.StandbyTaskMissingEventsResendDelay())/2
s.discardDuration = config.StandbyTaskMissingEventsDiscardDelay() * 2
s.fetchHistoryDuration = time.Minute * 12
s.discardDuration = time.Minute * 30

s.controller = gomock.NewController(s.T())
s.mockNDCHistoryResender = xdc.NewMockNDCHistoryResender(s.controller)
Expand Down
24 changes: 12 additions & 12 deletions service/history/transferQueueStandbyTaskExecutor.go
Expand Up @@ -158,8 +158,8 @@ func (t *transferQueueStandbyTaskExecutor) processActivityTask(
getStandbyPostActionFn(
transferTask,
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.config.StandbyTaskMissingEventsResendDelay(transferTask.GetType()),
t.config.StandbyTaskMissingEventsDiscardDelay(transferTask.GetType()),
t.fetchHistoryFromRemote,
t.pushActivity,
),
Expand Down Expand Up @@ -216,8 +216,8 @@ func (t *transferQueueStandbyTaskExecutor) processWorkflowTask(
getStandbyPostActionFn(
transferTask,
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.config.StandbyTaskMissingEventsResendDelay(transferTask.GetType()),
t.config.StandbyTaskMissingEventsDiscardDelay(transferTask.GetType()),
t.fetchHistoryFromRemote,
t.pushWorkflowTask,
),
Expand Down Expand Up @@ -331,8 +331,8 @@ func (t *transferQueueStandbyTaskExecutor) processCloseExecution(
getStandbyPostActionFn(
transferTask,
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.config.StandbyTaskMissingEventsResendDelay(transferTask.GetType()),
t.config.StandbyTaskMissingEventsDiscardDelay(transferTask.GetType()),
standbyTaskPostActionNoOp,
standbyTransferTaskPostActionTaskDiscarded,
),
Expand Down Expand Up @@ -366,8 +366,8 @@ func (t *transferQueueStandbyTaskExecutor) processCancelExecution(
getStandbyPostActionFn(
transferTask,
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.config.StandbyTaskMissingEventsResendDelay(transferTask.GetType()),
t.config.StandbyTaskMissingEventsDiscardDelay(transferTask.GetType()),
t.fetchHistoryFromRemote,
standbyTransferTaskPostActionTaskDiscarded,
),
Expand Down Expand Up @@ -401,8 +401,8 @@ func (t *transferQueueStandbyTaskExecutor) processSignalExecution(
getStandbyPostActionFn(
transferTask,
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.config.StandbyTaskMissingEventsResendDelay(transferTask.GetType()),
t.config.StandbyTaskMissingEventsDiscardDelay(transferTask.GetType()),
t.fetchHistoryFromRemote,
standbyTransferTaskPostActionTaskDiscarded,
),
Expand Down Expand Up @@ -483,8 +483,8 @@ func (t *transferQueueStandbyTaskExecutor) processStartChildExecution(
getStandbyPostActionFn(
transferTask,
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.config.StandbyTaskMissingEventsResendDelay(transferTask.GetType()),
t.config.StandbyTaskMissingEventsDiscardDelay(transferTask.GetType()),
t.startChildExecutionResendPostAction,
standbyTransferTaskPostActionTaskDiscarded,
),
Expand Down
5 changes: 2 additions & 3 deletions service/history/transferQueueStandbyTaskExecutor_test.go
Expand Up @@ -127,9 +127,8 @@ func (s *transferQueueStandbyTaskExecutorSuite) SetupTest() {
s.version = s.namespaceEntry.FailoverVersion()
s.now = time.Now().UTC()
s.timeSource = clock.NewEventTimeSource().Update(s.now)
s.fetchHistoryDuration = config.StandbyTaskMissingEventsResendDelay() +
(config.StandbyTaskMissingEventsDiscardDelay()-config.StandbyTaskMissingEventsResendDelay())/2
s.discardDuration = config.StandbyTaskMissingEventsDiscardDelay() * 2
s.fetchHistoryDuration = time.Minute * 12
s.discardDuration = time.Minute * 30

s.controller = gomock.NewController(s.T())
s.mockNDCHistoryResender = xdc.NewMockNDCHistoryResender(s.controller)
Expand Down

0 comments on commit 1e46237

Please sign in to comment.