Skip to content

Commit

Permalink
Activity local dispatch (#2618)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Mar 22, 2022
1 parent af9130d commit 7fb1bbe
Show file tree
Hide file tree
Showing 16 changed files with 597 additions and 357 deletions.
599 changes: 340 additions & 259 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions common/dynamicconfig/constants.go
Expand Up @@ -109,6 +109,8 @@ const (
EnableParentClosePolicyWorker = "system.enableParentClosePolicyWorker"
// EnableStickyQuery indicates if sticky query should be enabled per namespace
EnableStickyQuery = "system.enableStickyQuery"
// EnableActivityLocalDispatch indicates if acitivty local dispatch is enabled per namespace
EnableActivityLocalDispatch = "system.enableActivityLocalDispatch"

// key for size limit

Expand Down
6 changes: 3 additions & 3 deletions go.mod
Expand Up @@ -39,7 +39,7 @@ require (
go.opentelemetry.io/otel/sdk v1.4.0
go.opentelemetry.io/otel/sdk/export/metric v0.27.0
go.opentelemetry.io/otel/sdk/metric v0.27.0
go.temporal.io/api v1.7.1-0.20220321175358-f623ba3ce7d4
go.temporal.io/api v1.7.1-0.20220322230426-36cd7fadac67
go.temporal.io/sdk v1.14.0
go.temporal.io/version v0.3.0
go.uber.org/atomic v1.9.0
Expand Down Expand Up @@ -105,7 +105,7 @@ require (
golang.org/x/text v0.3.7 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220317150908-0efb43f6373e // indirect
google.golang.org/protobuf v1.27.1 // indirect
google.golang.org/genproto v0.0.0-20220322021311-435b647f9ef2 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
)
11 changes: 6 additions & 5 deletions go.sum
Expand Up @@ -439,8 +439,8 @@ go.opentelemetry.io/otel/trace v1.4.0 h1:4OOUrPZdVFQkbzl/JSdvGCWIdw5ONXXxzHlaLlW
go.opentelemetry.io/otel/trace v1.4.0/go.mod h1:uc3eRsqDfWs9R7b92xbQbU42/eTNz4N+gLP8qJCi4aE=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.temporal.io/api v1.7.1-0.20220223032354-6e6fe738916a/go.mod h1:OnUq5eS+Nyx+irKb3Ws5YB7yjGFf5XmI3WcVRU9COEo=
go.temporal.io/api v1.7.1-0.20220321175358-f623ba3ce7d4 h1:+jDzci7ob+fSBqmSLfN66f3EG88wyZBcOfiDunkhYUc=
go.temporal.io/api v1.7.1-0.20220321175358-f623ba3ce7d4/go.mod h1:tECxXXsJgzLShh6b8w1zCTrxkQBrrf1UsleZEK8pQlw=
go.temporal.io/api v1.7.1-0.20220322230426-36cd7fadac67 h1:FgGiyUQ0iR2B4iktL7mFaDpq6FRR6rqx5/hQeMZrp5I=
go.temporal.io/api v1.7.1-0.20220322230426-36cd7fadac67/go.mod h1:WTzSbbNciKViPgr9W7g2QHw/7m8dORozCaZjiA4kdfw=
go.temporal.io/sdk v1.14.0 h1:7tJO72gK4xmsZ8W3Xp1rwKYdkwQ/mgnKN5LmROyZTac=
go.temporal.io/sdk v1.14.0/go.mod h1:7rvvSS6oCXp19JSFQtSOhLxCX3wpEQSJZJlyCGleo9M=
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=
Expand Down Expand Up @@ -846,8 +846,8 @@ google.golang.org/genproto v0.0.0-20220114231437-d2e6a121cae0/go.mod h1:5CzLGKJ6
google.golang.org/genproto v0.0.0-20220201184016-50beb8ab5c44/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20220204002441-d6cc3cc0770e/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20220222213610-43724f9ea8cf/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI=
google.golang.org/genproto v0.0.0-20220317150908-0efb43f6373e h1:fNKDNuUyC4WH+inqDMpfXDdfvwfYILbsX+oskGZ8hxg=
google.golang.org/genproto v0.0.0-20220317150908-0efb43f6373e/go.mod h1:hAL49I2IFola2sVEjAn7MEwsja0xp51I0tlGAf9hz4E=
google.golang.org/genproto v0.0.0-20220322021311-435b647f9ef2 h1:3n0D2NdPGm0g0wrVJzXJWW5CBOoqgGBkDX9cRMJHZAY=
google.golang.org/genproto v0.0.0-20220322021311-435b647f9ef2/go.mod h1:hAL49I2IFola2sVEjAn7MEwsja0xp51I0tlGAf9hz4E=
google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
Expand Down Expand Up @@ -893,8 +893,9 @@ google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGj
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
2 changes: 1 addition & 1 deletion proto/api
Expand Up @@ -188,6 +188,7 @@ message RespondWorkflowTaskCompletedRequest {

message RespondWorkflowTaskCompletedResponse {
RecordWorkflowTaskStartedResponse started_response = 1;
repeated temporal.api.workflowservice.v1.PollActivityTaskQueueResponse activity_tasks = 2;
}

message RespondWorkflowTaskFailedRequest {
Expand Down
9 changes: 7 additions & 2 deletions service/frontend/workflowHandler.go
Expand Up @@ -982,7 +982,9 @@ func (wh *WorkflowHandler) RespondWorkflowTaskCompleted(
return nil, err
}

completedResp := &workflowservice.RespondWorkflowTaskCompletedResponse{}
completedResp := &workflowservice.RespondWorkflowTaskCompletedResponse{
ActivityTasks: histResp.ActivityTasks,
}
if request.GetReturnNewWorkflowTask() && histResp != nil && histResp.StartedResponse != nil {
taskToken := &tokenspb.Task{
NamespaceId: taskToken.GetNamespaceId(),
Expand All @@ -991,7 +993,10 @@ func (wh *WorkflowHandler) RespondWorkflowTaskCompleted(
ScheduleId: histResp.StartedResponse.GetScheduledEventId(),
ScheduleAttempt: histResp.StartedResponse.GetAttempt(),
}
token, _ := wh.tokenSerializer.Serialize(taskToken)
token, err := wh.tokenSerializer.Serialize(taskToken)
if err != nil {
return nil, err
}
workflowExecution := &commonpb.WorkflowExecution{
WorkflowId: taskToken.GetWorkflowId(),
RunId: taskToken.GetRunId(),
Expand Down
10 changes: 2 additions & 8 deletions service/history/configs/config.go
Expand Up @@ -251,6 +251,7 @@ type Config struct {
ESProcessorAckTimeout dynamicconfig.DurationPropertyFn

EnableCrossNamespaceCommands dynamicconfig.BoolPropertyFn
EnableActivityLocalDispatch dynamicconfig.BoolPropertyFnWithNamespaceFilter
}

const (
Expand Down Expand Up @@ -436,6 +437,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
ESProcessorAckTimeout: dc.GetDurationProperty(dynamicconfig.WorkerESProcessorAckTimeout, 1*time.Minute),

EnableCrossNamespaceCommands: dc.GetBoolProperty(dynamicconfig.EnableCrossNamespaceCommands, true),
EnableActivityLocalDispatch: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableActivityLocalDispatch, false),
}

return cfg
Expand All @@ -445,11 +447,3 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
func (config *Config) GetShardID(namespaceID namespace.ID, workflowID string) int32 {
return common.WorkflowIDToHistoryShard(namespaceID.String(), workflowID, config.NumberOfShards)
}

func NewDynamicConfig() *Config {
dc := dynamicconfig.NewNoopCollection()
config := NewConfig(dc, 1, false, "")
// reduce the duration of long poll to increase test speed
config.LongPollExpirationInterval = dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.HistoryLongPollExpirationInterval, 10*time.Second)
return config
}
109 changes: 107 additions & 2 deletions service/history/historyEngine_test.go
Expand Up @@ -1639,6 +1639,111 @@ func (s *engineSuite) TestRespondWorkflowTaskCompletedSingleActivityScheduledWor
s.Equal(5*time.Second, timestamp.DurationValue(activity1Attributes.HeartbeatTimeout))
}

func (s *engineSuite) TestRespondWorkflowTaskCompleted_ActivityLocalDispatch() {
we := commonpb.WorkflowExecution{
WorkflowId: tests.WorkflowID,
RunId: tests.RunID,
}
tl := "testTaskQueue"
tt := &tokenspb.Task{
ScheduleAttempt: 1,
WorkflowId: tests.WorkflowID,
RunId: we.GetRunId(),
ScheduleId: 2,
}
taskToken, _ := tt.Marshal()
identity := "testIdentity"
input := payloads.EncodeString("input")

msBuilder := workflow.TestLocalMutableState(s.mockHistoryEngine.shard, s.eventsCache,
tests.LocalNamespaceEntry, log.NewTestLogger(), we.GetRunId())
addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 90*time.Second, 200*time.Second, identity)
di := addWorkflowTaskScheduledEvent(msBuilder)
addWorkflowTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity)

scheduleToCloseTimeout := timestamp.DurationPtr(90 * time.Second)
scheduleToStartTimeout := timestamp.DurationPtr(10 * time.Second)
startToCloseTimeout := timestamp.DurationPtr(50 * time.Second)
heartbeatTimeout := timestamp.DurationPtr(5 * time.Second)
commands := []*commandpb.Command{
{
CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{
ActivityId: "activity1",
ActivityType: &commonpb.ActivityType{Name: "activity_type1"},
TaskQueue: &taskqueuepb.TaskQueue{Name: tl},
Input: input,
ScheduleToCloseTimeout: scheduleToCloseTimeout,
ScheduleToStartTimeout: scheduleToStartTimeout,
StartToCloseTimeout: startToCloseTimeout,
HeartbeatTimeout: heartbeatTimeout,
RequestStart: false,
}},
},
{
CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{
ActivityId: "activity2",
ActivityType: &commonpb.ActivityType{Name: "activity_type2"},
TaskQueue: &taskqueuepb.TaskQueue{Name: tl},
Input: input,
ScheduleToCloseTimeout: scheduleToCloseTimeout,
ScheduleToStartTimeout: scheduleToStartTimeout,
StartToCloseTimeout: startToCloseTimeout,
HeartbeatTimeout: heartbeatTimeout,
RequestStart: true,
}},
},
}

ms := workflow.TestCloneToProto(msBuilder)
gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: ms}

s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(gwmsResponse, nil)
s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil)

resp, err := s.mockHistoryEngine.RespondWorkflowTaskCompleted(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{
NamespaceId: tests.NamespaceID.String(),
CompleteRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{
TaskToken: taskToken,
Commands: commands,
Identity: identity,
},
})
s.NoError(err)
executionBuilder := s.getBuilder(tests.NamespaceID, we)
s.Equal(int64(7), executionBuilder.GetNextEventID())
s.Equal(int64(3), executionBuilder.GetExecutionInfo().LastWorkflowTaskStartId)
s.Equal(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, executionBuilder.GetExecutionState().State)
s.False(executionBuilder.HasPendingWorkflowTask())

ai1, ok := executionBuilder.GetActivityByActivityID("activity1")
s.True(ok)
s.Equal(common.EmptyEventID, ai1.StartedId)

ai2, ok := executionBuilder.GetActivityByActivityID("activity2")
s.True(ok)
s.Equal(common.TransientEventID, ai2.StartedId)
s.NotZero(ai2.StartedTime)

scheduledEvent := s.getActivityScheduledEvent(executionBuilder, ai2.ScheduleId)

s.Len(resp.ActivityTasks, 1)
activityTask := resp.ActivityTasks[0]
s.Equal("activity2", activityTask.ActivityId)
s.Equal("activity_type2", activityTask.ActivityType.GetName())
s.Equal(input, activityTask.Input)
s.Equal(we, *activityTask.WorkflowExecution)
s.Equal(scheduledEvent.EventTime, activityTask.CurrentAttemptScheduledTime)
s.Equal(scheduledEvent.EventTime, activityTask.ScheduledTime)
s.Equal(*scheduleToCloseTimeout, *activityTask.ScheduleToCloseTimeout)
s.Equal(startToCloseTimeout, activityTask.StartToCloseTimeout)
s.Equal(heartbeatTimeout, activityTask.HeartbeatTimeout)
s.Equal(int32(1), activityTask.Attempt)
s.Nil(activityTask.HeartbeatDetails)
s.Equal(tests.LocalNamespaceEntry.Name().String(), activityTask.WorkflowNamespace)
}

func (s *engineSuite) TestRespondWorkflowTaskCompleted_WorkflowTaskHeartbeatTimeout() {

we := commonpb.WorkflowExecution{
Expand Down Expand Up @@ -5038,7 +5143,7 @@ func addActivityTaskScheduledEvent(
ScheduleToStartTimeout: &scheduleToStartTimeout,
StartToCloseTimeout: &startToCloseTimeout,
HeartbeatTimeout: &heartbeatTimeout,
})
}, false)

return event, ai
}
Expand Down Expand Up @@ -5066,7 +5171,7 @@ func addActivityTaskScheduledEventWithRetry(
StartToCloseTimeout: &startToCloseTimeout,
HeartbeatTimeout: &heartbeatTimeout,
RetryPolicy: retryPolicy,
})
}, false)

return event, ai
}
Expand Down
1 change: 1 addition & 0 deletions service/history/tests/vars.go
Expand Up @@ -144,5 +144,6 @@ func NewDynamicConfig() *configs.Config {
config := configs.NewConfig(dc, 1, false, "")
// reduce the duration of long poll to increase test speed
config.LongPollExpirationInterval = dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.HistoryLongPollExpirationInterval, 10*time.Second)
config.EnableActivityLocalDispatch = dynamicconfig.GetBoolPropertyFnFilteredByNamespace(true)
return config
}
2 changes: 1 addition & 1 deletion service/history/workflow/mutable_state.go
Expand Up @@ -88,7 +88,7 @@ type (
AddActivityTaskCanceledEvent(int64, int64, int64, *commonpb.Payloads, string) (*historypb.HistoryEvent, error)
AddActivityTaskCompletedEvent(int64, int64, *workflowservice.RespondActivityTaskCompletedRequest) (*historypb.HistoryEvent, error)
AddActivityTaskFailedEvent(int64, int64, *failurepb.Failure, enumspb.RetryState, string) (*historypb.HistoryEvent, error)
AddActivityTaskScheduledEvent(int64, *commandpb.ScheduleActivityTaskCommandAttributes) (*historypb.HistoryEvent, *persistencespb.ActivityInfo, error)
AddActivityTaskScheduledEvent(int64, *commandpb.ScheduleActivityTaskCommandAttributes, bool) (*historypb.HistoryEvent, *persistencespb.ActivityInfo, error)
AddActivityTaskStartedEvent(*persistencespb.ActivityInfo, int64, string, string) (*historypb.HistoryEvent, error)
AddActivityTaskTimedOutEvent(int64, int64, *failurepb.Failure, enumspb.RetryState) (*historypb.HistoryEvent, error)
AddChildWorkflowExecutionCanceledEvent(int64, *commonpb.WorkflowExecution, *historypb.WorkflowExecutionCanceledEventAttributes) (*historypb.HistoryEvent, error)
Expand Down
14 changes: 9 additions & 5 deletions service/history/workflow/mutable_state_impl.go
Expand Up @@ -1816,6 +1816,7 @@ func (e *MutableStateImpl) ReplicateWorkflowTaskFailedEvent() error {
func (e *MutableStateImpl) AddActivityTaskScheduledEvent(
workflowTaskCompletedEventID int64,
command *commandpb.ScheduleActivityTaskCommandAttributes,
bypassTaskGeneration bool,
) (*historypb.HistoryEvent, *persistencespb.ActivityInfo, error) {

opTag := tag.WorkflowActionActivityTaskScheduled
Expand All @@ -1834,12 +1835,15 @@ func (e *MutableStateImpl) AddActivityTaskScheduledEvent(
event := e.hBuilder.AddActivityTaskScheduledEvent(workflowTaskCompletedEventID, command)
ai, err := e.ReplicateActivityTaskScheduledEvent(workflowTaskCompletedEventID, event)
// TODO merge active & passive task generation
if err := e.taskGenerator.GenerateActivityTasks(
timestamp.TimeValue(event.GetEventTime()),
event,
); err != nil {
return nil, nil, err
if !bypassTaskGeneration {
if err := e.taskGenerator.GenerateActivityTasks(
timestamp.TimeValue(event.GetEventTime()),
event,
); err != nil {
return nil, nil, err
}
}

return event, ai, err
}

Expand Down
8 changes: 4 additions & 4 deletions service/history/workflow/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 7fb1bbe

Please sign in to comment.