Skip to content

Commit

Permalink
Cancel activity when eager execution and request cancellation are in …
Browse files Browse the repository at this point in the history
…the same WFT (#3029)
  • Loading branch information
yycptt authored and alexshtin committed Jul 8, 2022
1 parent 6513af8 commit fb208d5
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 58 deletions.
4 changes: 2 additions & 2 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ const (
EnableParentClosePolicyWorker = "system.enableParentClosePolicyWorker"
// EnableStickyQuery indicates if sticky query should be enabled per namespace
EnableStickyQuery = "system.enableStickyQuery"
// EnableActivityLocalDispatch indicates if acitivty local dispatch is enabled per namespace
EnableActivityLocalDispatch = "system.enableActivityLocalDispatch"
// EnableActivityEagerExecution indicates if acitivty eager execution is enabled per namespace
EnableActivityEagerExecution = "system.enableActivityEagerExecution"
// NamespaceCacheRefreshInterval is the key for namespace cache refresh interval dynamic config
NamespaceCacheRefreshInterval = "system.namespaceCacheRefreshInterval"

Expand Down
4 changes: 2 additions & 2 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ type Config struct {
ESProcessorAckTimeout dynamicconfig.DurationPropertyFn

EnableCrossNamespaceCommands dynamicconfig.BoolPropertyFn
EnableActivityLocalDispatch dynamicconfig.BoolPropertyFnWithNamespaceFilter
EnableActivityEagerExecution dynamicconfig.BoolPropertyFnWithNamespaceFilter
NamespaceCacheRefreshInterval dynamicconfig.DurationPropertyFn
}

Expand Down Expand Up @@ -477,7 +477,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
ESProcessorAckTimeout: dc.GetDurationProperty(dynamicconfig.WorkerESProcessorAckTimeout, 1*time.Minute),

EnableCrossNamespaceCommands: dc.GetBoolProperty(dynamicconfig.EnableCrossNamespaceCommands, true),
EnableActivityLocalDispatch: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableActivityLocalDispatch, false),
EnableActivityEagerExecution: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableActivityEagerExecution, false),
NamespaceCacheRefreshInterval: dc.GetDurationProperty(dynamicconfig.NamespaceCacheRefreshInterval, 10*time.Second),
}

Expand Down
84 changes: 83 additions & 1 deletion service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1657,7 +1657,7 @@ func (s *engineSuite) TestRespondWorkflowTaskCompletedSingleActivityScheduledWor
s.Equal(5*time.Second, timestamp.DurationValue(activity1Attributes.HeartbeatTimeout))
}

func (s *engineSuite) TestRespondWorkflowTaskCompleted_ActivityLocalDispatch() {
func (s *engineSuite) TestRespondWorkflowTaskCompleted_ActivityEagerExecution_NotCancelled() {
namespaceID := tests.NamespaceID
we := commonpb.WorkflowExecution{
WorkflowId: tests.WorkflowID,
Expand Down Expand Up @@ -1764,6 +1764,88 @@ func (s *engineSuite) TestRespondWorkflowTaskCompleted_ActivityLocalDispatch() {
s.Equal(tests.LocalNamespaceEntry.Name().String(), activityTask.WorkflowNamespace)
}

func (s *engineSuite) TestRespondWorkflowTaskCompleted_ActivityEagerExecution_Cancelled() {
namespaceID := tests.NamespaceID
we := commonpb.WorkflowExecution{
WorkflowId: tests.WorkflowID,
RunId: tests.RunID,
}
tl := "testTaskQueue"
tt := &tokenspb.Task{
Attempt: 1,
NamespaceId: namespaceID.String(),
WorkflowId: tests.WorkflowID,
RunId: we.GetRunId(),
ScheduledEventId: 2,
}
taskToken, _ := tt.Marshal()
identity := "testIdentity"
input := payloads.EncodeString("input")

msBuilder := workflow.TestLocalMutableState(s.mockHistoryEngine.shard, s.eventsCache,
tests.LocalNamespaceEntry, log.NewTestLogger(), we.GetRunId())
addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 90*time.Second, 200*time.Second, identity)
wt := addWorkflowTaskScheduledEvent(msBuilder)
addWorkflowTaskStartedEvent(msBuilder, wt.ScheduledEventID, tl, identity)

scheduleToCloseTimeout := timestamp.DurationPtr(90 * time.Second)
scheduleToStartTimeout := timestamp.DurationPtr(10 * time.Second)
startToCloseTimeout := timestamp.DurationPtr(50 * time.Second)
heartbeatTimeout := timestamp.DurationPtr(5 * time.Second)
commands := []*commandpb.Command{
{
CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{
ActivityId: "activity1",
ActivityType: &commonpb.ActivityType{Name: "activity_type1"},
TaskQueue: &taskqueuepb.TaskQueue{Name: tl},
Input: input,
ScheduleToCloseTimeout: scheduleToCloseTimeout,
ScheduleToStartTimeout: scheduleToStartTimeout,
StartToCloseTimeout: startToCloseTimeout,
HeartbeatTimeout: heartbeatTimeout,
RequestEagerExecution: true,
}},
},
{
CommandType: enumspb.COMMAND_TYPE_REQUEST_CANCEL_ACTIVITY_TASK,
Attributes: &commandpb.Command_RequestCancelActivityTaskCommandAttributes{RequestCancelActivityTaskCommandAttributes: &commandpb.RequestCancelActivityTaskCommandAttributes{
ScheduledEventId: 5,
}},
},
}

ms := workflow.TestCloneToProto(msBuilder)
gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: ms}

s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(gwmsResponse, nil)
s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil)

resp, err := s.mockHistoryEngine.RespondWorkflowTaskCompleted(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{
NamespaceId: tests.NamespaceID.String(),
CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{
TaskToken: taskToken,
Commands: commands,
Identity: identity,
ReturnNewWorkflowTask: true,
},
})
s.NoError(err)
executionBuilder := s.getBuilder(tests.NamespaceID, we)
s.Equal(int64(10), executionBuilder.GetNextEventID()) // activity scheduled, request cancel, cancelled, workflow task scheduled, started
s.Equal(int64(3), executionBuilder.GetExecutionInfo().LastWorkflowTaskStartedEventId)
s.Equal(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, executionBuilder.GetExecutionState().State)
s.True(executionBuilder.HasPendingWorkflowTask())

_, ok := executionBuilder.GetActivityByActivityID("activity1")
s.False(ok)

s.Len(resp.ActivityTasks, 0)
s.NotNil(resp.StartedResponse)
s.Equal(int64(10), resp.StartedResponse.NextEventId)
s.Equal(int64(3), resp.StartedResponse.PreviousStartedEventId)
}

func (s *engineSuite) TestRespondWorkflowTaskCompleted_WorkflowTaskHeartbeatTimeout() {
namespaceID := tests.NamespaceID
we := commonpb.WorkflowExecution{
Expand Down
2 changes: 1 addition & 1 deletion service/history/tests/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func NewDynamicConfig() *configs.Config {
config := configs.NewConfig(dc, 1, false, "")
// reduce the duration of long poll to increase test speed
config.LongPollExpirationInterval = dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.HistoryLongPollExpirationInterval, 10*time.Second)
config.EnableActivityLocalDispatch = dynamicconfig.GetBoolPropertyFnFilteredByNamespace(true)
config.EnableActivityEagerExecution = dynamicconfig.GetBoolPropertyFnFilteredByNamespace(true)
config.NamespaceCacheRefreshInterval = dynamicconfig.GetDurationPropertyFn(time.Second)
return config
}
152 changes: 100 additions & 52 deletions service/history/workflowTaskHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ type (
workflowTaskResponseMutation func(
resp *historyservice.RespondWorkflowTaskCompletedResponse,
) error

commandPostAction func(
ctx context.Context,
) (workflowTaskResponseMutation, error)

handleCommandResponse struct {
workflowTaskResponseMutation workflowTaskResponseMutation
commandPostAction commandPostAction
}
)

func newWorkflowTaskHandler(
Expand Down Expand Up @@ -146,19 +155,36 @@ func (handler *workflowTaskHandlerImpl) handleCommands(
}

var mutations []workflowTaskResponseMutation
var postActions []commandPostAction
for _, command := range commands {
mutation, err := handler.handleCommand(ctx, command)
response, err := handler.handleCommand(ctx, command)
if err != nil || handler.stopProcessing {
return nil, err
}
if response != nil {
if response.workflowTaskResponseMutation != nil {
mutations = append(mutations, response.workflowTaskResponseMutation)
}
if response.commandPostAction != nil {
postActions = append(postActions, response.commandPostAction)
}
}
}

for _, postAction := range postActions {
mutation, err := postAction(ctx)
if err != nil || handler.stopProcessing {
return nil, err
}
if mutation != nil {
mutations = append(mutations, mutation)
}
}

return mutations, nil
}

func (handler *workflowTaskHandlerImpl) handleCommand(ctx context.Context, command *commandpb.Command) (workflowTaskResponseMutation, error) {
func (handler *workflowTaskHandlerImpl) handleCommand(ctx context.Context, command *commandpb.Command) (*handleCommandResponse, error) {
switch command.GetCommandType() {
case enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK:
return handler.handleCommandScheduleActivity(ctx, command.GetScheduleActivityTaskCommandAttributes())
Expand Down Expand Up @@ -207,7 +233,7 @@ func (handler *workflowTaskHandlerImpl) handleCommand(ctx context.Context, comma
func (handler *workflowTaskHandlerImpl) handleCommandScheduleActivity(
_ context.Context,
attr *commandpb.ScheduleActivityTaskCommandAttributes,
) (workflowTaskResponseMutation, error) {
) (*handleCommandResponse, error) {

handler.metricsClient.IncCounter(
metrics.HistoryRespondWorkflowTaskCompletedScope,
Expand Down Expand Up @@ -241,16 +267,16 @@ func (handler *workflowTaskHandlerImpl) handleCommandScheduleActivity(

enums.SetDefaultTaskQueueKind(&attr.GetTaskQueue().Kind)

localDispatchActivity := false
eagerStartActivity := false
namespace := handler.mutableState.GetNamespaceEntry().Name().String()
if attr.RequestEagerExecution && handler.config.EnableActivityLocalDispatch(namespace) {
localDispatchActivity = true
if attr.RequestEagerExecution && handler.config.EnableActivityEagerExecution(namespace) {
eagerStartActivity = true
}

event, ai, err := handler.mutableState.AddActivityTaskScheduledEvent(
_, _, err = handler.mutableState.AddActivityTaskScheduledEvent(
handler.workflowTaskCompletedID,
attr,
localDispatchActivity,
eagerStartActivity,
)
if err != nil {
if _, ok := err.(*serviceerror.InvalidArgument); ok {
Expand All @@ -259,61 +285,83 @@ func (handler *workflowTaskHandlerImpl) handleCommandScheduleActivity(
return nil, err
}

if !localDispatchActivity {
if !eagerStartActivity {
return &handleCommandResponse{}, nil
}

return &handleCommandResponse{
commandPostAction: func(ctx context.Context) (workflowTaskResponseMutation, error) {
return handler.handlePostCommandEagerExecuteActivity(ctx, attr)
},
}, nil
}

func (handler *workflowTaskHandlerImpl) handlePostCommandEagerExecuteActivity(
_ context.Context,
attr *commandpb.ScheduleActivityTaskCommandAttributes,
) (workflowTaskResponseMutation, error) {
ai, ok := handler.mutableState.GetActivityByActivityID(attr.ActivityId)
if !ok {
// activity cancelled in the same worflow task
return nil, nil
}

if _, err := handler.mutableState.AddActivityTaskStartedEvent(
ai,
event.GetEventId(),
ai.GetScheduledEventId(),
uuid.New(),
handler.identity,
); err != nil {
return nil, err
}
return func(resp *historyservice.RespondWorkflowTaskCompletedResponse) error {
runID := handler.mutableState.GetExecutionState().RunId
attr := event.GetActivityTaskScheduledEventAttributes()

shardClock, err := handler.shard.NewVectorClock()
if err != nil {
return err
}
taskToken := &tokenspb.Task{
NamespaceId: namespaceID.String(),
WorkflowId: executionInfo.WorkflowId,
RunId: runID,
ScheduledEventId: event.EventId,
Attempt: ai.Attempt,
ActivityId: attr.ActivityId,
ActivityType: attr.ActivityType.GetName(),
Clock: shardClock,
}
serializedToken, err := handler.tokenSerializer.Serialize(taskToken)
if err != nil {
return err
}
activityTask := &workflowservice.PollActivityTaskQueueResponse{
ActivityId: attr.ActivityId,
ActivityType: attr.ActivityType,
Header: attr.Header,
Input: attr.Input,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: executionInfo.WorkflowId,
RunId: runID,
},
CurrentAttemptScheduledTime: ai.ScheduledTime,
ScheduledTime: event.EventTime,
ScheduleToCloseTimeout: attr.ScheduleToCloseTimeout,
StartedTime: ai.StartedTime,
StartToCloseTimeout: attr.StartToCloseTimeout,
HeartbeatTimeout: attr.HeartbeatTimeout,
TaskToken: serializedToken,
Attempt: ai.Attempt,
HeartbeatDetails: ai.LastHeartbeatDetails,
WorkflowType: handler.mutableState.GetWorkflowType(),
WorkflowNamespace: namespace,
}
executionInfo := handler.mutableState.GetExecutionInfo()
namespaceID := namespace.ID(executionInfo.NamespaceId)
runID := handler.mutableState.GetExecutionState().RunId

shardClock, err := handler.shard.NewVectorClock()
if err != nil {
return nil, err
}

taskToken := &tokenspb.Task{
NamespaceId: namespaceID.String(),
WorkflowId: executionInfo.WorkflowId,
RunId: runID,
ScheduledEventId: ai.GetScheduledEventId(),
Attempt: ai.Attempt,
ActivityId: attr.ActivityId,
ActivityType: attr.ActivityType.GetName(),
Clock: shardClock,
}
serializedToken, err := handler.tokenSerializer.Serialize(taskToken)
if err != nil {
return nil, err
}

activityTask := &workflowservice.PollActivityTaskQueueResponse{
ActivityId: attr.ActivityId,
ActivityType: attr.ActivityType,
Header: attr.Header,
Input: attr.Input,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: executionInfo.WorkflowId,
RunId: runID,
},
CurrentAttemptScheduledTime: ai.ScheduledTime,
ScheduledTime: ai.ScheduledTime,
ScheduleToCloseTimeout: attr.ScheduleToCloseTimeout,
StartedTime: ai.StartedTime,
StartToCloseTimeout: attr.StartToCloseTimeout,
HeartbeatTimeout: attr.HeartbeatTimeout,
TaskToken: serializedToken,
Attempt: ai.Attempt,
HeartbeatDetails: ai.LastHeartbeatDetails,
WorkflowType: handler.mutableState.GetWorkflowType(),
WorkflowNamespace: handler.mutableState.GetNamespaceEntry().Name().String(),
}

return func(resp *historyservice.RespondWorkflowTaskCompletedResponse) error {
resp.ActivityTasks = append(resp.ActivityTasks, activityTask)
return nil
}, nil
Expand Down

0 comments on commit fb208d5

Please sign in to comment.