Skip to content

Commit

Permalink
Add dynamic config to disable loading task queue user data (#4477)
Browse files Browse the repository at this point in the history
* Add dynamic config to disable loading task queue user data

* Fix lint
  • Loading branch information
bergundy committed Jun 10, 2023
1 parent d44aa1e commit 06d2ce8
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 6 deletions.
5 changes: 5 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,11 @@ const (
MatchingLongPollExpirationInterval = "matching.longPollExpirationInterval"
// MatchingSyncMatchWaitDuration is to wait time for sync match
MatchingSyncMatchWaitDuration = "matching.syncMatchWaitDuration"
// MatchingLoadUserData can be used to entirely disable loading user data from persistence (and the inter node RPCs
// that propoagate it). When turned off, features that rely on user data (e.g. worker versioning) will essentially
// be disabled. When disabled, matching will drop tasks for versioned workflows and activities to avoid breaking
// versioning semantics. Operator intervention will be required to reschedule the dropped tasks.
MatchingLoadUserData = "matching.loadUserData"
// MatchingUpdateAckInterval is the interval for update ack
MatchingUpdateAckInterval = "matching.updateAckInterval"
// MatchingMaxTaskQueueIdleTime is the time after which an idle task queue will be unloaded
Expand Down
11 changes: 11 additions & 0 deletions service/matching/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ type (
EnableReadFromSecondaryVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter
VisibilityDisableOrderByClause dynamicconfig.BoolPropertyFnWithNamespaceFilter
VisibilityEnableManualPagination dynamicconfig.BoolPropertyFnWithNamespaceFilter

LoadUserData dynamicconfig.BoolPropertyFnWithTaskQueueInfoFilters
}

forwarderConfig struct {
Expand Down Expand Up @@ -119,6 +121,11 @@ type (
AdminNamespaceToPartitionDispatchRate func() float64
// partition qps = AdminNamespaceTaskQueueToPartitionDispatchRate(namespace, task_queue)
AdminNamespaceTaskQueueToPartitionDispatchRate func() float64

// If set to false, matching does not load user data from DB for root partitions or fetch it via RPC from the
// root. When disbled, features that rely on user data (e.g. worker versioning) will essentially be disabled.
// See the documentation for constants.MatchingLoadUserData for the implications on versioning.
LoadUserData func() bool
}
)

Expand Down Expand Up @@ -150,6 +157,7 @@ func NewConfig(
PersistenceDynamicRateLimitingParams: dc.GetMapProperty(dynamicconfig.MatchingPersistenceDynamicRateLimitingParams, dynamicconfig.DefaultDynamicRateLimitingParams),
SyncMatchWaitDuration: dc.GetDurationPropertyFilteredByTaskQueueInfo(dynamicconfig.MatchingSyncMatchWaitDuration, 200*time.Millisecond),
TestDisableSyncMatch: dc.GetBoolProperty(dynamicconfig.TestMatchingDisableSyncMatch, false),
LoadUserData: dc.GetBoolPropertyFilteredByTaskQueueInfo(dynamicconfig.MatchingLoadUserData, true),
RPS: dc.GetIntProperty(dynamicconfig.MatchingRPS, 1200),
RangeSize: 100000,
GetTasksBatchSize: dc.GetIntPropertyFilteredByTaskQueueInfo(dynamicconfig.MatchingGetTasksBatchSize, 1000),
Expand Down Expand Up @@ -206,6 +214,9 @@ func newTaskQueueConfig(id *taskQueueID, config *Config, namespace namespace.Nam
return config.SyncMatchWaitDuration(namespace.String(), taskQueueName, taskType)
},
TestDisableSyncMatch: config.TestDisableSyncMatch,
LoadUserData: func() bool {
return config.LoadUserData(namespace.String(), taskQueueName, taskType)
},
LongPollExpirationInterval: func() time.Duration {
return config.LongPollExpirationInterval(namespace.String(), taskQueueName, taskType)
},
Expand Down
46 changes: 46 additions & 0 deletions service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,13 @@ func (e *matchingEngineImpl) AddWorkflowTask(
return false, err
}

shouldDrop, err := e.shouldDropTask(origTaskQueue, addRequest.VersionDirective)
if err != nil {
return false, err
} else if shouldDrop {
return true, nil
}

// We don't need the userDataChanged channel here because:
// - if we sync match or sticky worker unavailable, we're done
// - if we spool to db, we'll re-resolve when it comes out of the db
Expand Down Expand Up @@ -380,6 +387,13 @@ func (e *matchingEngineImpl) AddActivityTask(
return false, err
}

shouldDrop, err := e.shouldDropTask(origTaskQueue, addRequest.VersionDirective)
if err != nil {
return false, err
} else if shouldDrop {
return true, nil
}

// We don't need the userDataChanged channel here because:
// - if we sync match, we're done
// - if we spool to db, we'll re-resolve when it comes out of the db
Expand Down Expand Up @@ -434,6 +448,12 @@ func (e *matchingEngineImpl) DispatchSpooledTask(
unversionedOrigTaskQueue := newTaskQueueIDWithVersionSet(origTaskQueue, "")
// Redirect and re-resolve if we're blocked in matcher and user data changes.
for {
shouldDrop, err := e.shouldDropTask(unversionedOrigTaskQueue, directive)
if err != nil {
return err
} else if shouldDrop {
return nil
}
taskQueue, userDataChanged, err := e.redirectToVersionedQueueForAdd(
ctx, unversionedOrigTaskQueue, directive, stickyInfo)
if err != nil {
Expand Down Expand Up @@ -669,6 +689,12 @@ func (e *matchingEngineImpl) QueryWorkflow(
return nil, err
}

shouldDrop, err := e.shouldDropTask(origTaskQueue, queryRequest.VersionDirective)
if err != nil {
return nil, err
} else if shouldDrop {
return nil, serviceerror.NewFailedPrecondition("Operations on versioned workflows are disabled")
}
// We don't need the userDataChanged channel here because we either do this sync (local or remote)
// or fail with a relatively short timeout.
taskQueue, _, err := e.redirectToVersionedQueueForAdd(ctx, origTaskQueue, queryRequest.VersionDirective, stickyInfo)
Expand Down Expand Up @@ -1432,6 +1458,26 @@ func (e *matchingEngineImpl) redirectToVersionedQueueForPoll(
return newTaskQueueIDWithVersionSet(taskQueue, versionSet), nil
}

// When user data loading is disabled, we intentionally drop tasks for versioned workflows to avoid breaking versioning
// semantics and dispatching tasks to the wrong workers.
func (e *matchingEngineImpl) shouldDropTask(taskQueue *taskQueueID, directive *taskqueuespb.TaskVersionDirective) (bool, error) {
isVersioned := false
switch directive.GetValue().(type) {
case *taskqueuespb.TaskVersionDirective_UseDefault,
*taskqueuespb.TaskVersionDirective_BuildId:
isVersioned = true
}
if !isVersioned {
return false, nil
}
namespaceEntry, err := e.namespaceRegistry.GetNamespaceByID(taskQueue.namespaceID)
if err != nil {
return false, err
}
shouldDrop := !e.config.LoadUserData(namespaceEntry.Name().String(), taskQueue.BaseNameString(), taskQueue.taskType)
return shouldDrop, nil
}

func (e *matchingEngineImpl) redirectToVersionedQueueForAdd(
ctx context.Context,
taskQueue *taskQueueID,
Expand Down
71 changes: 71 additions & 0 deletions service/matching/matchingEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@ import (
"go.temporal.io/api/workflowservice/v1"

clockspb "go.temporal.io/server/api/clock/v1"
"go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/api/historyservicemock/v1"
"go.temporal.io/server/api/matchingservice/v1"
"go.temporal.io/server/api/matchingservicemock/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/api/taskqueue/v1"
tokenspb "go.temporal.io/server/api/token/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/clock"
Expand Down Expand Up @@ -2246,6 +2248,75 @@ func (s *matchingEngineSuite) TestUpdateUserData_FailsOnKnownVersionMismatch() {
s.ErrorAs(err, &failedPreconditionError)
}

func (s *matchingEngineSuite) TestAddWorkflowTask_ForVersionedWorkflows_SilentlyDroppedWhenDisablingLoadingUserData() {
namespaceId := uuid.New()
tq := taskqueuepb.TaskQueue{
Name: "test",
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
}
s.matchingEngine.config.LoadUserData = func(string, string, enumspb.TaskQueueType) bool { return false }

_, err := s.matchingEngine.AddWorkflowTask(context.Background(), &matchingservice.AddWorkflowTaskRequest{
NamespaceId: namespaceId,
Execution: &commonpb.WorkflowExecution{
WorkflowId: "test",
RunId: uuid.New(),
},
TaskQueue: &tq,
ScheduledEventId: 7,
Source: enums.TASK_SOURCE_HISTORY,
VersionDirective: &taskqueue.TaskVersionDirective{
Value: &taskqueue.TaskVersionDirective_UseDefault{UseDefault: &types.Empty{}},
},
})
s.Require().NoError(err)
}

func (s *matchingEngineSuite) TestAddActivityTask_ForVersionedWorkflows_SilentlyDroppedWhenDisablingLoadingUserData() {
namespaceId := uuid.New()
tq := taskqueuepb.TaskQueue{
Name: "test",
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
}
s.matchingEngine.config.LoadUserData = func(string, string, enumspb.TaskQueueType) bool { return false }

_, err := s.matchingEngine.AddActivityTask(context.Background(), &matchingservice.AddActivityTaskRequest{
NamespaceId: namespaceId,
Execution: &commonpb.WorkflowExecution{
WorkflowId: "test",
RunId: uuid.New(),
},
TaskQueue: &tq,
ScheduledEventId: 7,
Source: enums.TASK_SOURCE_HISTORY,
VersionDirective: &taskqueue.TaskVersionDirective{
Value: &taskqueue.TaskVersionDirective_UseDefault{UseDefault: &types.Empty{}},
},
})
s.Require().NoError(err)
}

func (s *matchingEngineSuite) TestDispatchSpooledTask_ForVersionedWorkflows_SilentlyDroppedWhenDisablingLoadingUserData() {
namespaceId := namespace.ID(uuid.New())
tqId, err := newTaskQueueID(namespaceId, "foo", enumspb.TASK_QUEUE_TYPE_ACTIVITY)
s.Require().NoError(err)
s.matchingEngine.config.LoadUserData = func(string, string, enumspb.TaskQueueType) bool { return false }

err = s.matchingEngine.DispatchSpooledTask(context.Background(), &internalTask{
event: &genericTaskInfo{
&persistencespb.AllocatedTaskInfo{
Data: &persistencespb.TaskInfo{
VersionDirective: &taskqueue.TaskVersionDirective{
Value: &taskqueue.TaskVersionDirective_UseDefault{UseDefault: &types.Empty{}},
},
},
},
func(ati *persistencespb.AllocatedTaskInfo, err error) {},
},
}, tqId, stickyInfo{})
s.Require().NoError(err)
}

func (s *matchingEngineSuite) setupRecordActivityTaskStartedMock(tlName string) {
activityTypeName := "activity1"
activityID := "activityId1"
Expand Down
10 changes: 9 additions & 1 deletion service/matching/taskQueueManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ func (c *taskQueueManagerImpl) Start() {
c.taskReader.Start()
if c.shouldFetchUserData() {
c.goroGroup.Go(c.fetchUserDataLoop)
} else {
c.userDataInitialFetch.Set(struct{}{}, nil)
}
c.logger.Info("", tag.LifeCycleStarted)
c.taggedMetricsHandler.Counter(metrics.TaskQueueStartedCounter.GetMetricName()).Record(1)
Expand Down Expand Up @@ -358,7 +360,7 @@ func (c *taskQueueManagerImpl) managesSpecificVersionSet() bool {
func (c *taskQueueManagerImpl) shouldFetchUserData() bool {
// 1. If the db stores it, then we definitely should not be fetching.
// 2. Additionally, we should not fetch for "versioned" tqms.
return !c.db.DbStoresUserData() && !c.managesSpecificVersionSet()
return c.config.LoadUserData() && !c.db.DbStoresUserData() && !c.managesSpecificVersionSet()
}

func (c *taskQueueManagerImpl) WaitUntilInitialized(ctx context.Context) error {
Expand Down Expand Up @@ -491,11 +493,17 @@ func (c *taskQueueManagerImpl) GetUserData(ctx context.Context) (*persistencespb
if c.managesSpecificVersionSet() {
return nil, nil, errNoUserDataOnVersionedTQM
}
if !c.config.LoadUserData() {
return nil, nil, nil
}
return c.db.GetUserData(ctx)
}

// UpdateUserData updates user data for this task queue and replicates across clusters if necessary.
func (c *taskQueueManagerImpl) UpdateUserData(ctx context.Context, options UserDataUpdateOptions, updateFn UserDataUpdateFunc) error {
if !c.config.LoadUserData() {
return serviceerror.NewFailedPrecondition("Task queue user data operations are disabled")
}
newData, shouldReplicate, err := c.db.UpdateUserData(ctx, updateFn, options.KnownVersion, options.TaskQueueLimitPerBuildId)
if err != nil {
return err
Expand Down
16 changes: 16 additions & 0 deletions service/matching/taskQueueManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,3 +858,19 @@ func TestUpdateOnNonRootFails(t *testing.T) {
require.Error(t, err)
require.ErrorIs(t, err, errUserDataNoMutateNonRoot)
}

func TestDisableLoadUserData_NonRootDoesNotRequestUserDataFromRoot(t *testing.T) {
ctx := context.Background()
controller := gomock.NewController(t)
defer controller.Finish()
taskQueueId, err := newTaskQueueIDWithPartition(defaultNamespaceId, defaultRootTqID, enumspb.TASK_QUEUE_TYPE_WORKFLOW, 1)
require.NoError(t, err)
tqCfg := defaultTqmTestOpts(controller)
tqCfg.tqId = taskQueueId
mgr := mustCreateTestTaskQueueManagerWithConfig(t, controller, tqCfg)
tqCfg.matchingClientMock.EXPECT().GetTaskQueueUserData(gomock.Any(), gomock.Any()).Times(0)
mgr.config.LoadUserData = func() bool { return false }
mgr.Start()
err = mgr.WaitUntilInitialized(ctx)
require.NoError(t, err)
}
3 changes: 0 additions & 3 deletions service/matching/taskWriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,6 @@ func (w *taskWriter) appendTasks(
func (w *taskWriter) taskWriterLoop(ctx context.Context) error {
err := w.initReadWriteState(ctx)
w.tlMgr.initializedError.Set(struct{}{}, err)
if !w.tlMgr.shouldFetchUserData() {
w.tlMgr.userDataInitialFetch.Set(struct{}{}, err)
}
if err != nil {
// We can't recover from here without starting over, so unload the whole task queue
w.tlMgr.unloadFromEngine()
Expand Down
Loading

0 comments on commit 06d2ce8

Please sign in to comment.