Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add config filter by task type #3455

Merged
merged 2 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions common/dynamicconfig/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,5 +95,6 @@ type (
TaskQueueName string
TaskQueueType enumspb.TaskQueueType
ShardID int32
TaskType string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not make this enumsspb.TaskType? That will make comparisons faster.

}
)
22 changes: 22 additions & 0 deletions common/dynamicconfig/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

enumspb "go.temporal.io/api/enums/v1"

enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/primitives/timestamp"
Expand Down Expand Up @@ -71,6 +72,7 @@ type (
DurationPropertyFnWithNamespaceIDFilter func(namespaceID string) time.Duration
DurationPropertyFnWithShardIDFilter func(shardID int32) time.Duration
DurationPropertyFnWithTaskQueueInfoFilters func(namespace string, taskQueue string, taskType enumspb.TaskQueueType) time.Duration
DurationPropertyFnWithTaskTypeFilter func(task enumsspb.TaskType) time.Duration
FloatPropertyFn func() float64
FloatPropertyFnWithNamespaceFilter func(namespace string) float64
FloatPropertyFnWithShardIDFilter func(shardID int32) float64
Expand Down Expand Up @@ -282,6 +284,19 @@ func (c *Collection) GetDurationPropertyFilteredByShardID(key Key, defaultValue
}
}

// GetDurationPropertyFilteredByTaskType gets property with task type as filters and asserts that it's a duration
func (c *Collection) GetDurationPropertyFilteredByTaskType(key Key, defaultValue any) DurationPropertyFnWithTaskTypeFilter {
return func(taskType enumsspb.TaskType) time.Duration {
return matchAndConvert(
c,
key,
defaultValue,
taskTypePrecedence(taskType),
convertDuration,
)
}
}

// GetBoolProperty gets property and asserts that it's a bool
func (c *Collection) GetBoolProperty(key Key, defaultValue any) BoolPropertyFn {
return func() bool {
Expand Down Expand Up @@ -491,6 +506,13 @@ func shardIDPrecedence(shardID int32) []Constraints {
}
}

func taskTypePrecedence(taskType enumsspb.TaskType) []Constraints {
return []Constraints{
{TaskType: taskType.String()},
{},
}
}

func convertInt(val any) (int, error) {
if intVal, ok := val.(int); ok {
return intVal, nil
Expand Down
10 changes: 10 additions & 0 deletions common/dynamicconfig/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/stretchr/testify/suite"

enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/common/log"
)

Expand All @@ -48,6 +49,7 @@ const (
testGetDurationPropertyFilteredByNamespaceKey = "testGetDurationPropertyFilteredByNamespaceKey"
testGetIntPropertyFilteredByTaskQueueInfoKey = "testGetIntPropertyFilteredByTaskQueueInfoKey"
testGetDurationPropertyFilteredByTaskQueueInfoKey = "testGetDurationPropertyFilteredByTaskQueueInfoKey"
testGetDurationPropertyFilteredByTaskTypeKey = "testGetDurationPropertyFilteredByTaskTypeKey"
testGetDurationPropertyStructuredDefaults = "testGetDurationPropertyStructuredDefaults"
testGetBoolPropertyFilteredByNamespaceIDKey = "testGetBoolPropertyFilteredByNamespaceIDKey"
testGetBoolPropertyFilteredByTaskQueueInfoKey = "testGetBoolPropertyFilteredByTaskQueueInfoKey"
Expand Down Expand Up @@ -159,6 +161,14 @@ func (s *collectionSuite) TestGetDurationPropertyFilteredByTaskQueueInfo() {
s.Equal(time.Minute, value(namespace, taskQueue, 0))
}

func (s *collectionSuite) TestGetDurationPropertyFilteredByTaskType() {
taskType := enumsspb.TASK_TYPE_UNSPECIFIED
value := s.cln.GetDurationPropertyFilteredByTaskType(testGetDurationPropertyFilteredByTaskTypeKey, time.Second)
s.Equal(time.Second, value(taskType))
s.client[testGetDurationPropertyFilteredByTaskTypeKey] = time.Minute
s.Equal(time.Minute, value(taskType))
}

func (s *collectionSuite) TestGetDurationPropertyStructuredDefaults() {
defaults := []ConstrainedValue{
{
Expand Down
8 changes: 4 additions & 4 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ type Config struct {

// the artificial delay added to standby cluster's view of active cluster's time
StandbyClusterDelay dynamicconfig.DurationPropertyFn
StandbyTaskMissingEventsResendDelay dynamicconfig.DurationPropertyFn
StandbyTaskMissingEventsDiscardDelay dynamicconfig.DurationPropertyFn
StandbyTaskMissingEventsResendDelay dynamicconfig.DurationPropertyFnWithTaskTypeFilter
StandbyTaskMissingEventsDiscardDelay dynamicconfig.DurationPropertyFnWithTaskTypeFilter

QueuePendingTaskCriticalCount dynamicconfig.IntPropertyFn
QueueReaderStuckCriticalAttempts dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -314,8 +314,8 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
AcquireShardInterval: dc.GetDurationProperty(dynamicconfig.AcquireShardInterval, time.Minute),
AcquireShardConcurrency: dc.GetIntProperty(dynamicconfig.AcquireShardConcurrency, 10),
StandbyClusterDelay: dc.GetDurationProperty(dynamicconfig.StandbyClusterDelay, 5*time.Minute),
StandbyTaskMissingEventsResendDelay: dc.GetDurationProperty(dynamicconfig.StandbyTaskMissingEventsResendDelay, 10*time.Minute),
StandbyTaskMissingEventsDiscardDelay: dc.GetDurationProperty(dynamicconfig.StandbyTaskMissingEventsDiscardDelay, 15*time.Minute),
StandbyTaskMissingEventsResendDelay: dc.GetDurationPropertyFilteredByTaskType(dynamicconfig.StandbyTaskMissingEventsResendDelay, 10*time.Minute),
StandbyTaskMissingEventsDiscardDelay: dc.GetDurationPropertyFilteredByTaskType(dynamicconfig.StandbyTaskMissingEventsDiscardDelay, 15*time.Minute),

QueuePendingTaskCriticalCount: dc.GetIntProperty(dynamicconfig.QueuePendingTaskCriticalCount, 9000),
QueueReaderStuckCriticalAttempts: dc.GetIntProperty(dynamicconfig.QueueReaderStuckCriticalAttempts, 3),
Expand Down
24 changes: 12 additions & 12 deletions service/history/timerQueueStandbyTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ func (t *timerQueueStandbyTaskExecutor) executeUserTimerTimeoutTask(
getStandbyPostActionFn(
timerTask,
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.config.StandbyTaskMissingEventsResendDelay(timerTask.GetType()),
t.config.StandbyTaskMissingEventsDiscardDelay(timerTask.GetType()),
t.fetchHistoryFromRemote,
standbyTimerTaskPostActionTaskDiscarded,
),
Expand Down Expand Up @@ -256,8 +256,8 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask(
getStandbyPostActionFn(
timerTask,
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.config.StandbyTaskMissingEventsResendDelay(timerTask.GetType()),
t.config.StandbyTaskMissingEventsDiscardDelay(timerTask.GetType()),
t.fetchHistoryFromRemote,
standbyTimerTaskPostActionTaskDiscarded,
),
Expand Down Expand Up @@ -297,8 +297,8 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityRetryTimerTask(
getStandbyPostActionFn(
task,
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.config.StandbyTaskMissingEventsResendDelay(task.GetType()),
t.config.StandbyTaskMissingEventsDiscardDelay(task.GetType()),
t.fetchHistoryFromRemote,
t.pushActivity,
),
Expand Down Expand Up @@ -337,8 +337,8 @@ func (t *timerQueueStandbyTaskExecutor) executeWorkflowTaskTimeoutTask(
getStandbyPostActionFn(
timerTask,
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.config.StandbyTaskMissingEventsResendDelay(timerTask.GetType()),
t.config.StandbyTaskMissingEventsDiscardDelay(timerTask.GetType()),
t.fetchHistoryFromRemote,
standbyTimerTaskPostActionTaskDiscarded,
),
Expand Down Expand Up @@ -376,8 +376,8 @@ func (t *timerQueueStandbyTaskExecutor) executeWorkflowBackoffTimerTask(
getStandbyPostActionFn(
timerTask,
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.config.StandbyTaskMissingEventsResendDelay(timerTask.GetType()),
t.config.StandbyTaskMissingEventsDiscardDelay(timerTask.GetType()),
t.fetchHistoryFromRemote,
standbyTimerTaskPostActionTaskDiscarded,
),
Expand Down Expand Up @@ -411,8 +411,8 @@ func (t *timerQueueStandbyTaskExecutor) executeWorkflowTimeoutTask(
getStandbyPostActionFn(
timerTask,
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.config.StandbyTaskMissingEventsResendDelay(timerTask.GetType()),
t.config.StandbyTaskMissingEventsDiscardDelay(timerTask.GetType()),
t.fetchHistoryFromRemote,
standbyTimerTaskPostActionTaskDiscarded,
),
Expand Down
5 changes: 2 additions & 3 deletions service/history/timerQueueStandbyTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,8 @@ func (s *timerQueueStandbyTaskExecutorSuite) SetupTest() {
s.clusterName = cluster.TestAlternativeClusterName
s.now = time.Now().UTC()
s.timeSource = clock.NewEventTimeSource().Update(s.now)
s.fetchHistoryDuration = config.StandbyTaskMissingEventsResendDelay() +
(config.StandbyTaskMissingEventsDiscardDelay()-config.StandbyTaskMissingEventsResendDelay())/2
s.discardDuration = config.StandbyTaskMissingEventsDiscardDelay() * 2
s.fetchHistoryDuration = time.Minute * 12
s.discardDuration = time.Minute * 30

s.controller = gomock.NewController(s.T())
s.mockNDCHistoryResender = xdc.NewMockNDCHistoryResender(s.controller)
Expand Down
24 changes: 12 additions & 12 deletions service/history/transferQueueStandbyTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ func (t *transferQueueStandbyTaskExecutor) processActivityTask(
getStandbyPostActionFn(
transferTask,
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.config.StandbyTaskMissingEventsResendDelay(transferTask.GetType()),
t.config.StandbyTaskMissingEventsDiscardDelay(transferTask.GetType()),
t.fetchHistoryFromRemote,
t.pushActivity,
),
Expand Down Expand Up @@ -216,8 +216,8 @@ func (t *transferQueueStandbyTaskExecutor) processWorkflowTask(
getStandbyPostActionFn(
transferTask,
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.config.StandbyTaskMissingEventsResendDelay(transferTask.GetType()),
t.config.StandbyTaskMissingEventsDiscardDelay(transferTask.GetType()),
t.fetchHistoryFromRemote,
t.pushWorkflowTask,
),
Expand Down Expand Up @@ -331,8 +331,8 @@ func (t *transferQueueStandbyTaskExecutor) processCloseExecution(
getStandbyPostActionFn(
transferTask,
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.config.StandbyTaskMissingEventsResendDelay(transferTask.GetType()),
t.config.StandbyTaskMissingEventsDiscardDelay(transferTask.GetType()),
standbyTaskPostActionNoOp,
standbyTransferTaskPostActionTaskDiscarded,
),
Expand Down Expand Up @@ -366,8 +366,8 @@ func (t *transferQueueStandbyTaskExecutor) processCancelExecution(
getStandbyPostActionFn(
transferTask,
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.config.StandbyTaskMissingEventsResendDelay(transferTask.GetType()),
t.config.StandbyTaskMissingEventsDiscardDelay(transferTask.GetType()),
t.fetchHistoryFromRemote,
standbyTransferTaskPostActionTaskDiscarded,
),
Expand Down Expand Up @@ -401,8 +401,8 @@ func (t *transferQueueStandbyTaskExecutor) processSignalExecution(
getStandbyPostActionFn(
transferTask,
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.config.StandbyTaskMissingEventsResendDelay(transferTask.GetType()),
t.config.StandbyTaskMissingEventsDiscardDelay(transferTask.GetType()),
t.fetchHistoryFromRemote,
standbyTransferTaskPostActionTaskDiscarded,
),
Expand Down Expand Up @@ -483,8 +483,8 @@ func (t *transferQueueStandbyTaskExecutor) processStartChildExecution(
getStandbyPostActionFn(
transferTask,
t.getCurrentTime,
t.config.StandbyTaskMissingEventsResendDelay(),
t.config.StandbyTaskMissingEventsDiscardDelay(),
t.config.StandbyTaskMissingEventsResendDelay(transferTask.GetType()),
t.config.StandbyTaskMissingEventsDiscardDelay(transferTask.GetType()),
t.startChildExecutionResendPostAction,
standbyTransferTaskPostActionTaskDiscarded,
),
Expand Down
5 changes: 2 additions & 3 deletions service/history/transferQueueStandbyTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,8 @@ func (s *transferQueueStandbyTaskExecutorSuite) SetupTest() {
s.version = s.namespaceEntry.FailoverVersion()
s.now = time.Now().UTC()
s.timeSource = clock.NewEventTimeSource().Update(s.now)
s.fetchHistoryDuration = config.StandbyTaskMissingEventsResendDelay() +
(config.StandbyTaskMissingEventsDiscardDelay()-config.StandbyTaskMissingEventsResendDelay())/2
s.discardDuration = config.StandbyTaskMissingEventsDiscardDelay() * 2
s.fetchHistoryDuration = time.Minute * 12
s.discardDuration = time.Minute * 30

s.controller = gomock.NewController(s.T())
s.mockNDCHistoryResender = xdc.NewMockNDCHistoryResender(s.controller)
Expand Down