From 19ba0cec909fd27175101408849ad8b21078942f Mon Sep 17 00:00:00 2001 From: Roey Berman Date: Tue, 24 Oct 2023 13:56:49 -0700 Subject: [PATCH] Disable eager activities for incompatible versioned activities (#5030) Only enable eager activity start if dynamic config enables it and workflow doesn't use versioning. If a workflow _uses_ versioning, only allow eager activities which intend to use a compatible version since a worker is obviously compatible with itself and we are okay dispatching an eager task knowning that there may be a newer "default" compatible version. Note that if `UseCompatibleVersion` is false, it implies that the activity should run on the "default" version for the task queue. --------- Co-authored-by: David Reiss --- service/history/workflow_task_handler.go | 13 ++-- tests/versioning_test.go | 75 ++++++++++++++++++++++++ 2 files changed, 84 insertions(+), 4 deletions(-) diff --git a/service/history/workflow_task_handler.go b/service/history/workflow_task_handler.go index 6350a548bae..25573213309 100644 --- a/service/history/workflow_task_handler.go +++ b/service/history/workflow_task_handler.go @@ -376,11 +376,16 @@ func (handler *workflowTaskHandlerImpl) handleCommandScheduleActivity( enums.SetDefaultTaskQueueKind(&attr.GetTaskQueue().Kind) - eagerStartActivity := false namespace := handler.mutableState.GetNamespaceEntry().Name().String() - if attr.RequestEagerExecution && handler.config.EnableActivityEagerExecution(namespace) { - eagerStartActivity = true - } + + // Enable eager activity start if dynamic config enables it and either 1. workflow doesn't use versioning, + // or 2. workflow uses versioning and activity intends to use a compatible version (since a + // worker is obviously compatible with itself and we are okay dispatching an eager task knowing that there may be a + // newer "default" compatible version). + // Note that if `UseCompatibleVersion` is false, it implies that the activity should run on the "default" version + // for the task queue. + eagerStartActivity := attr.RequestEagerExecution && handler.config.EnableActivityEagerExecution(namespace) && + (!handler.mutableState.GetWorkerVersionStamp().GetUseVersioning() || attr.UseCompatibleVersion) _, _, err := handler.mutableState.AddActivityTaskScheduledEvent( handler.workflowTaskCompletedID, diff --git a/tests/versioning_test.go b/tests/versioning_test.go index d61bf89f813..3fa5de7c0e5 100644 --- a/tests/versioning_test.go +++ b/tests/versioning_test.go @@ -38,6 +38,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + commandpb "go.temporal.io/api/command/v1" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" @@ -802,6 +803,80 @@ func (s *versioningIntegSuite) dispatchActivityCompatible() { s.Equal("v1.1", out) } +func (s *versioningIntegSuite) TestDispatchActivityEager() { + dc := s.testCluster.host.dcClient + dc.OverrideValue(dynamicconfig.EnableActivityEagerExecution, true) + defer dc.RemoveOverride(dynamicconfig.EnableActivityEagerExecution) + + tq := s.randomizeStr(s.T().Name()) + v1 := s.prefixed("v1") + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + _, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{TaskQueue: tq}, "wf") + s.Require().NoError(err) + + pollResponse, err := s.sdkClient.WorkflowService().PollWorkflowTaskQueue(ctx, &workflowservice.PollWorkflowTaskQueueRequest{ + Namespace: s.namespace, + TaskQueue: &taskqueuepb.TaskQueue{Name: tq}, + Identity: "test", + WorkerVersionCapabilities: &commonpb.WorkerVersionCapabilities{ + BuildId: v1, + }, + }) + s.Require().NoError(err) + startToCloseTimeout := time.Minute + + completionResponse, err := s.sdkClient.WorkflowService().RespondWorkflowTaskCompleted(ctx, &workflowservice.RespondWorkflowTaskCompletedRequest{ + Identity: "test", + WorkerVersionStamp: &commonpb.WorkerVersionStamp{ + BuildId: v1, + UseVersioning: true, + }, + TaskToken: pollResponse.TaskToken, + Commands: []*commandpb.Command{ + { + CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK, + Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ + ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{ + ActivityId: "compatible", + TaskQueue: &taskqueuepb.TaskQueue{ + Name: tq, + }, + StartToCloseTimeout: &startToCloseTimeout, + ActivityType: &commonpb.ActivityType{ + Name: "ignore", + }, + RequestEagerExecution: true, + UseCompatibleVersion: true, + }, + }, + }, + { + CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK, + Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ + ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{ + ActivityId: "latest", + TaskQueue: &taskqueuepb.TaskQueue{ + Name: tq, + }, + StartToCloseTimeout: &startToCloseTimeout, + ActivityType: &commonpb.ActivityType{ + Name: "ignore", + }, + RequestEagerExecution: true, + UseCompatibleVersion: false, + }, + }, + }, + }, + }) + s.Require().NoError(err) + s.Require().Equal(1, len(completionResponse.ActivityTasks)) + s.Require().Equal("compatible", completionResponse.ActivityTasks[0].ActivityId) +} + func (s *versioningIntegSuite) TestDispatchActivityCrossTQFails() { dc := s.testCluster.host.dcClient defer dc.RemoveOverride(dynamicconfig.MatchingNumTaskqueueReadPartitions)