Skip to content

Commit

Permalink
Send default tasks to unversioned queue when user data disabled (#4610)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr committed Jul 12, 2023
1 parent d4cd737 commit 74b51d8
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 28 deletions.
14 changes: 8 additions & 6 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,9 +878,10 @@ func (wh *WorkflowHandler) PollWorkflowTaskQueue(ctx context.Context, request *w
return &workflowservice.PollWorkflowTaskQueueResponse{}, nil
}

// These errors are expected based on certain client behavior. We should not log them, it'd be too noisy.
var newerBuild *serviceerror.NewerBuildExists
if errors.As(err, &newerBuild) {
// These errors are expected from some versioning situations. We should not log them, it'd be too noisy.
var newerBuild *serviceerror.NewerBuildExists // expected when versioned poller is superceded
var failedPrecond *serviceerror.FailedPrecondition // expected when user data is disabled
if errors.As(err, &newerBuild) || errors.As(err, &failedPrecond) {
return nil, err
}

Expand Down Expand Up @@ -1115,9 +1116,10 @@ func (wh *WorkflowHandler) PollActivityTaskQueue(ctx context.Context, request *w
return &workflowservice.PollActivityTaskQueueResponse{}, nil
}

// These errors are expected based on certain client behavior. We should not log them, it'd be too noisy.
var newerBuild *serviceerror.NewerBuildExists
if errors.As(err, &newerBuild) {
// These errors are expected from some versioning situations. We should not log them, it'd be too noisy.
var newerBuild *serviceerror.NewerBuildExists // expected when versioned poller is superceded
var failedPrecond *serviceerror.FailedPrecondition // expected when user data is disabled
if errors.As(err, &newerBuild) || errors.As(err, &failedPrecond) {
return nil, err
}

Expand Down
16 changes: 6 additions & 10 deletions service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,12 +458,7 @@ func (e *matchingEngineImpl) DispatchSpooledTask(
taskQueue, userDataChanged, err := e.redirectToVersionedQueueForAdd(
ctx, unversionedOrigTaskQueue, directive, stickyInfo)
if err != nil {
// Return error for tasks with compatiblity constraints when user data is disabled so they can be retried later.
// "default" directive tasks become unversioned.
if !errors.Is(err, errUserDataDisabled) || directive.GetBuildId() != "" {
return err
}
err = nil
return err
}
sticky := stickyInfo.kind == enumspb.TASK_QUEUE_KIND_STICKY
tqm, err := e.getTaskQueueManager(ctx, taskQueue, stickyInfo, !sticky)
Expand Down Expand Up @@ -957,9 +952,6 @@ func (e *matchingEngineImpl) GetWorkerBuildIdCompatibility(
}
userData, _, err := tqMgr.GetUserData(ctx)
if err != nil {
if _, ok := err.(*serviceerror.NotFound); ok {
return &matchingservice.GetWorkerBuildIdCompatibilityResponse{}, nil
}
return nil, err
}
return &matchingservice.GetWorkerBuildIdCompatibilityResponse{
Expand Down Expand Up @@ -1503,7 +1495,11 @@ func (e *matchingEngineImpl) redirectToVersionedQueueForAdd(
// Have to look up versioning data.
userData, userDataChanged, err := baseTqm.GetUserData(ctx)
if err != nil {
return taskQueue, userDataChanged, err
if errors.Is(err, errUserDataDisabled) && buildId == "" {
// When user data disabled, send "default" tasks to unversioned queue.
return taskQueue, userDataChanged, nil
}
return nil, nil, err
}
data := userData.GetData().GetVersioningData()

Expand Down
25 changes: 25 additions & 0 deletions service/matching/matchingEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,31 @@ func (s *matchingEngineSuite) TestPollActivityTaskQueues_NamespaceHandover() {
s.Equal(common.ErrNamespaceHandover.Error(), err.Error())
}

func (s *matchingEngineSuite) TestPollWorkflowTask_UserDataDisabled() {
s.matchingEngine.config.LoadUserData = dynamicconfig.GetBoolPropertyFnFilteredByTaskQueueInfo(false)
taskQueue := s.T().Name()

resp, err := s.matchingEngine.PollWorkflowTaskQueue(context.Background(), &matchingservice.PollWorkflowTaskQueueRequest{
NamespaceId: "asdf",
PollRequest: &workflowservice.PollWorkflowTaskQueueRequest{
Namespace: "asdf",
TaskQueue: &taskqueuepb.TaskQueue{
Name: taskQueue,
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
},
Identity: "identity",
WorkerVersionCapabilities: &commonpb.WorkerVersionCapabilities{
BuildId: "some_build_id",
UseVersioning: true,
},
},
}, metrics.NoopMetricsHandler)
s.Error(err)
s.Nil(resp)
var failedPrecondition *serviceerror.FailedPrecondition
s.ErrorAs(err, &failedPrecondition)
}

func (s *matchingEngineSuite) TestAddActivityTasks() {
s.AddTasksTest(enumspb.TASK_QUEUE_TYPE_ACTIVITY, false)
}
Expand Down
4 changes: 2 additions & 2 deletions service/matching/taskQueueManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestDeliverBufferTasks_NoPollers(t *testing.T) {
tlm.taskReader.gorogrp.Wait()
}

func TestDeliverBufferTasks_RetriesVersionedTaskWhenUserInfoDisabled(t *testing.T) {
func TestDeliverBufferTasks_DisableUserData_SendsVersionedToUnversioned(t *testing.T) {
t.Parallel()

controller := gomock.NewController(t)
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestDeliverBufferTasks_RetriesVersionedTaskWhenUserInfoDisabled(t *testing.
tlm.taskReader.gorogrp.Wait()
}

func TestDeliverBufferTasks_RetriesUseDefaultTaskWhenUserInfoDisabled(t *testing.T) {
func TestDeliverBufferTasks_DisableUserData_SendsDefaultToUnversioned(t *testing.T) {
t.Parallel()

controller := gomock.NewController(t)
Expand Down
65 changes: 55 additions & 10 deletions tests/versioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func (s *versioningIntegSuite) dispatchNewWorkflowStartWorkerFirst() {
s.Equal("done!", out)
}

func (s *versioningIntegSuite) TestDisableLoadUserDataDefaultTasksBecomeUnversioned() {
func (s *versioningIntegSuite) TestDisableUserData_DefaultTasksBecomeUnversioned() {
dc := s.testCluster.host.dcClient
dc.OverrideValue(dynamicconfig.MatchingNumTaskqueueReadPartitions, 1)
defer dc.RemoveOverride(dynamicconfig.MatchingNumTaskqueueReadPartitions)
Expand Down Expand Up @@ -1471,7 +1471,7 @@ func (s *versioningIntegSuite) dispatchCron() {
s.GreaterOrEqual(runs2.Load(), int32(3))
}

func (s *versioningIntegSuite) TestDisableLoadUserData() {
func (s *versioningIntegSuite) TestDisableUserData() {
tq := s.T().Name()
v1 := s.prefixed("v1")
v2 := s.prefixed("v2")
Expand All @@ -1482,6 +1482,9 @@ func (s *versioningIntegSuite) TestDisableLoadUserData() {
// First insert some data (we'll try to read it below)
s.addNewDefaultBuildId(ctx, tq, v1)

// unload so that we reload and pick up LoadUserData dynamic config
s.unloadTaskQueue(ctx, tq)

dc := s.testCluster.host.dcClient
defer dc.RemoveOverride(dynamicconfig.MatchingLoadUserData)
dc.OverrideValue(dynamicconfig.MatchingLoadUserData, false)
Expand Down Expand Up @@ -1509,7 +1512,34 @@ func (s *versioningIntegSuite) TestDisableLoadUserData() {
s.Require().ErrorAs(err, &failedPreconditionError)
}

func (s *versioningIntegSuite) TestWorkflowGetsStuckWhenDisablingLoadingUserData() {
func (s *versioningIntegSuite) TestDisableUserData_UnversionedWorkflowRuns() {
tq := s.T().Name()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

dc := s.testCluster.host.dcClient
defer dc.RemoveOverride(dynamicconfig.MatchingLoadUserData)
dc.OverrideValue(dynamicconfig.MatchingLoadUserData, false)

wf := func(ctx workflow.Context) (string, error) {
return "ok", nil
}
wrk := worker.New(s.sdkClient, tq, worker.Options{})
wrk.RegisterWorkflowWithOptions(wf, workflow.RegisterOptions{Name: "wf"})
s.NoError(wrk.Start())
defer wrk.Stop()

run, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{
TaskQueue: tq,
WorkflowExecutionTimeout: 5 * time.Second,
}, "wf")
s.NoError(err)
var out string
s.NoError(run.Get(ctx, &out))
s.Equal("ok", out)
}

func (s *versioningIntegSuite) TestDisableUserData_WorkflowGetsStuck() {
tq := s.T().Name()
v1 := s.prefixed("v1")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
Expand All @@ -1528,7 +1558,7 @@ func (s *versioningIntegSuite) TestWorkflowGetsStuckWhenDisablingLoadingUserData
return nil
}
wrk := worker.New(s.sdkClient, tq, worker.Options{
BuildID: "v1",
BuildID: v1,
UseBuildIDForVersioning: true,
MaxConcurrentWorkflowTaskPollers: numPollers,
})
Expand All @@ -1538,16 +1568,31 @@ func (s *versioningIntegSuite) TestWorkflowGetsStuckWhenDisablingLoadingUserData

run, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{
TaskQueue: tq,
WorkflowExecutionTimeout: 5 * time.Second,
WorkflowExecutionTimeout: 10 * time.Second,
}, "wf")
s.Require().NoError(err)
err = run.Get(ctx, nil)
var timeoutError *temporal.TimeoutError
s.Require().ErrorAs(err, &timeoutError)

// should not run on versioned worker
time.Sleep(2 * time.Second)
s.Require().Equal(int32(0), runs.Load())

wrk.Stop()

// start unversioned worker and let task run there
wrk2 := worker.New(s.sdkClient, tq, worker.Options{
MaxConcurrentWorkflowTaskPollers: numPollers,
})
wrk2.RegisterWorkflowWithOptions(wf, workflow.RegisterOptions{Name: "wf"})
s.NoError(wrk2.Start())
defer wrk2.Stop()

// now workflow can complete
err = run.Get(ctx, nil)
s.NoError(err)
s.Require().Equal(int32(1), runs.Load())
}

func (s *versioningIntegSuite) TestWorkflowQueryTimesOutWhenDisablingLoadingUserData() {
func (s *versioningIntegSuite) TestDisableUserData_QueryTimesOut() {
tq := s.T().Name()
v1 := s.prefixed("v1")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
Expand All @@ -1562,7 +1607,7 @@ func (s *versioningIntegSuite) TestWorkflowQueryTimesOutWhenDisablingLoadingUser
})
}
wrk := worker.New(s.sdkClient, tq, worker.Options{
BuildID: "v1",
BuildID: v1,
UseBuildIDForVersioning: true,
MaxConcurrentWorkflowTaskPollers: numPollers,
})
Expand Down

0 comments on commit 74b51d8

Please sign in to comment.