Skip to content

Commit

Permalink
Disable eager activities for incompatible versioned activities (#5030)
Browse files Browse the repository at this point in the history
Only enable eager activity start if dynamic config enables it and
workflow doesn't use versioning.
If a workflow _uses_ versioning, only allow eager activities which
intend to use a compatible version since a
worker is obviously compatible with itself and we are okay dispatching
an eager task knowning that there may be a
newer "default" compatible version.
Note that if `UseCompatibleVersion` is false, it implies that the
activity should run on the "default" version
for the task queue.

---------

Co-authored-by: David Reiss <dnr@dnr.im>
  • Loading branch information
2 people authored and rodrigozhou committed Oct 30, 2023
1 parent cd14e15 commit 19ba0ce
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 4 deletions.
13 changes: 9 additions & 4 deletions service/history/workflow_task_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,11 +376,16 @@ func (handler *workflowTaskHandlerImpl) handleCommandScheduleActivity(

enums.SetDefaultTaskQueueKind(&attr.GetTaskQueue().Kind)

eagerStartActivity := false
namespace := handler.mutableState.GetNamespaceEntry().Name().String()
if attr.RequestEagerExecution && handler.config.EnableActivityEagerExecution(namespace) {
eagerStartActivity = true
}

// Enable eager activity start if dynamic config enables it and either 1. workflow doesn't use versioning,
// or 2. workflow uses versioning and activity intends to use a compatible version (since a
// worker is obviously compatible with itself and we are okay dispatching an eager task knowing that there may be a
// newer "default" compatible version).
// Note that if `UseCompatibleVersion` is false, it implies that the activity should run on the "default" version
// for the task queue.
eagerStartActivity := attr.RequestEagerExecution && handler.config.EnableActivityEagerExecution(namespace) &&
(!handler.mutableState.GetWorkerVersionStamp().GetUseVersioning() || attr.UseCompatibleVersion)

_, _, err := handler.mutableState.AddActivityTaskScheduledEvent(
handler.workflowTaskCompletedID,
Expand Down
75 changes: 75 additions & 0 deletions tests/versioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

commandpb "go.temporal.io/api/command/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
Expand Down Expand Up @@ -802,6 +803,80 @@ func (s *versioningIntegSuite) dispatchActivityCompatible() {
s.Equal("v1.1", out)
}

func (s *versioningIntegSuite) TestDispatchActivityEager() {
dc := s.testCluster.host.dcClient
dc.OverrideValue(dynamicconfig.EnableActivityEagerExecution, true)
defer dc.RemoveOverride(dynamicconfig.EnableActivityEagerExecution)

tq := s.randomizeStr(s.T().Name())
v1 := s.prefixed("v1")

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

_, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{TaskQueue: tq}, "wf")
s.Require().NoError(err)

pollResponse, err := s.sdkClient.WorkflowService().PollWorkflowTaskQueue(ctx, &workflowservice.PollWorkflowTaskQueueRequest{
Namespace: s.namespace,
TaskQueue: &taskqueuepb.TaskQueue{Name: tq},
Identity: "test",
WorkerVersionCapabilities: &commonpb.WorkerVersionCapabilities{
BuildId: v1,
},
})
s.Require().NoError(err)
startToCloseTimeout := time.Minute

completionResponse, err := s.sdkClient.WorkflowService().RespondWorkflowTaskCompleted(ctx, &workflowservice.RespondWorkflowTaskCompletedRequest{
Identity: "test",
WorkerVersionStamp: &commonpb.WorkerVersionStamp{
BuildId: v1,
UseVersioning: true,
},
TaskToken: pollResponse.TaskToken,
Commands: []*commandpb.Command{
{
CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{
ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{
ActivityId: "compatible",
TaskQueue: &taskqueuepb.TaskQueue{
Name: tq,
},
StartToCloseTimeout: &startToCloseTimeout,
ActivityType: &commonpb.ActivityType{
Name: "ignore",
},
RequestEagerExecution: true,
UseCompatibleVersion: true,
},
},
},
{
CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{
ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{
ActivityId: "latest",
TaskQueue: &taskqueuepb.TaskQueue{
Name: tq,
},
StartToCloseTimeout: &startToCloseTimeout,
ActivityType: &commonpb.ActivityType{
Name: "ignore",
},
RequestEagerExecution: true,
UseCompatibleVersion: false,
},
},
},
},
})
s.Require().NoError(err)
s.Require().Equal(1, len(completionResponse.ActivityTasks))
s.Require().Equal("compatible", completionResponse.ActivityTasks[0].ActivityId)
}

func (s *versioningIntegSuite) TestDispatchActivityCrossTQFails() {
dc := s.testCluster.host.dcClient
defer dc.RemoveOverride(dynamicconfig.MatchingNumTaskqueueReadPartitions)
Expand Down

0 comments on commit 19ba0ce

Please sign in to comment.