Skip to content

Commit

Permalink
Fix missing properties in continueasnew when workflow fail (#2883)
Browse files Browse the repository at this point in the history
  • Loading branch information
vancexu committed Dec 3, 2019
1 parent 34a89aa commit a725304
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 34 deletions.
12 changes: 6 additions & 6 deletions host/continueasnew_test.go
Expand Up @@ -33,9 +33,9 @@ import (
)

func (s *integrationSuite) TestContinueAsNewWorkflow() {
id := "interation-continue-as-new-workflow-test"
wt := "interation-continue-as-new-workflow-test-type"
tl := "interation-continue-as-new-workflow-test-tasklist"
id := "integration-continue-as-new-workflow-test"
wt := "integration-continue-as-new-workflow-test-type"
tl := "integration-continue-as-new-workflow-test-tasklist"
identity := "worker1"

workflowType := &workflow.WorkflowType{}
Expand Down Expand Up @@ -139,9 +139,9 @@ func (s *integrationSuite) TestContinueAsNewWorkflow() {
}

func (s *integrationSuite) TestContinueAsNewWorkflow_Timeout() {
id := "interation-continue-as-new-workflow-timeout-test"
wt := "interation-continue-as-new-workflow-timeout-test-type"
tl := "interation-continue-as-new-workflow-timeout-test-tasklist"
id := "integration-continue-as-new-workflow-timeout-test"
wt := "integration-continue-as-new-workflow-timeout-test-type"
tl := "integration-continue-as-new-workflow-timeout-test-tasklist"
identity := "worker1"

workflowType := &workflow.WorkflowType{}
Expand Down
16 changes: 8 additions & 8 deletions host/decision_test.go
Expand Up @@ -32,7 +32,7 @@ import (

func (s *integrationSuite) TestDecisionHeartbeatingWithEmptyResult() {
id := uuid.New()
wt := "interation-workflow-decision-heartbeating-local-activities"
wt := "integration-workflow-decision-heartbeating-local-activities"
tl := id
identity := "worker1"

Expand Down Expand Up @@ -139,7 +139,7 @@ func (s *integrationSuite) TestDecisionHeartbeatingWithEmptyResult() {

func (s *integrationSuite) TestDecisionHeartbeatingWithLocalActivitiesResult() {
id := uuid.New()
wt := "interation-workflow-decision-heartbeating-local-activities"
wt := "integration-workflow-decision-heartbeating-local-activities"
tl := id
identity := "worker1"

Expand Down Expand Up @@ -283,7 +283,7 @@ func (s *integrationSuite) TestDecisionHeartbeatingWithLocalActivitiesResult() {

func (s *integrationSuite) TestWorkflowTerminationSignalBeforeRegularDecisionStarted() {
id := uuid.New()
wt := "interation-workflow-transient-decision-test-type"
wt := "integration-workflow-transient-decision-test-type"
tl := id
identity := "worker1"

Expand Down Expand Up @@ -358,7 +358,7 @@ func (s *integrationSuite) TestWorkflowTerminationSignalBeforeRegularDecisionSta

func (s *integrationSuite) TestWorkflowTerminationSignalAfterRegularDecisionStarted() {
id := uuid.New()
wt := "interation-workflow-transient-decision-test-type"
wt := "integration-workflow-transient-decision-test-type"
tl := id
identity := "worker1"

Expand Down Expand Up @@ -433,7 +433,7 @@ func (s *integrationSuite) TestWorkflowTerminationSignalAfterRegularDecisionStar

func (s *integrationSuite) TestWorkflowTerminationSignalAfterRegularDecisionStartedAndFailDecision() {
id := uuid.New()
wt := "interation-workflow-transient-decision-test-type"
wt := "integration-workflow-transient-decision-test-type"
tl := id
identity := "worker1"

Expand Down Expand Up @@ -520,7 +520,7 @@ func (s *integrationSuite) TestWorkflowTerminationSignalAfterRegularDecisionStar

func (s *integrationSuite) TestWorkflowTerminationSignalBeforeTransientDecisionStarted() {
id := uuid.New()
wt := "interation-workflow-transient-decision-test-type"
wt := "integration-workflow-transient-decision-test-type"
tl := id
identity := "worker1"

Expand Down Expand Up @@ -625,7 +625,7 @@ func (s *integrationSuite) TestWorkflowTerminationSignalBeforeTransientDecisionS

func (s *integrationSuite) TestWorkflowTerminationSignalAfterTransientDecisionStarted() {
id := uuid.New()
wt := "interation-workflow-transient-decision-test-type"
wt := "integration-workflow-transient-decision-test-type"
tl := id
identity := "worker1"

Expand Down Expand Up @@ -727,7 +727,7 @@ func (s *integrationSuite) TestWorkflowTerminationSignalAfterTransientDecisionSt

func (s *integrationSuite) TestWorkflowTerminationSignalAfterTransientDecisionStartedAndFailDecision() {
id := uuid.New()
wt := "interation-workflow-transient-decision-test-type"
wt := "integration-workflow-transient-decision-test-type"
tl := id
identity := "worker1"

Expand Down
18 changes: 9 additions & 9 deletions host/gethistory_test.go
Expand Up @@ -35,9 +35,9 @@ import (
)

func (s *integrationSuite) TestGetWorkflowExecutionHistory_All() {
workflowID := "interation-get-workflow-history-events-long-poll-test-all"
workflowTypeName := "interation-get-workflow-history-events-long-poll-test-all-type"
tasklistName := "interation-get-workflow-history-events-long-poll-test-all-tasklist"
workflowID := "integration-get-workflow-history-events-long-poll-test-all"
workflowTypeName := "integration-get-workflow-history-events-long-poll-test-all-type"
tasklistName := "integration-get-workflow-history-events-long-poll-test-all-tasklist"
identity := "worker1"
activityName := "activity_type1"

Expand Down Expand Up @@ -203,9 +203,9 @@ func (s *integrationSuite) TestGetWorkflowExecutionHistory_All() {
}

func (s *integrationSuite) TestGetWorkflowExecutionHistory_Close() {
workflowID := "interation-get-workflow-history-events-long-poll-test-close"
workflowTypeName := "interation-get-workflow-history-events-long-poll-test-close-type"
tasklistName := "interation-get-workflow-history-events-long-poll-test-close-tasklist"
workflowID := "integration-get-workflow-history-events-long-poll-test-close"
workflowTypeName := "integration-get-workflow-history-events-long-poll-test-close-type"
tasklistName := "integration-get-workflow-history-events-long-poll-test-close-tasklist"
identity := "worker1"
activityName := "activity_type1"

Expand Down Expand Up @@ -362,9 +362,9 @@ func (s *integrationSuite) TestGetWorkflowExecutionHistory_Close() {
}

func (s *integrationSuite) TestGetWorkflowExecutionRawHistory_All() {
workflowID := "interation-get-workflow-history-raw-events-all"
workflowTypeName := "interation-get-workflow-history-raw-events-all-type"
tasklistName := "interation-get-workflow-history-raw-events-all-tasklist"
workflowID := "integration-get-workflow-history-raw-events-all"
workflowTypeName := "integration-get-workflow-history-raw-events-all-type"
tasklistName := "integration-get-workflow-history-raw-events-all-tasklist"
identity := "worker1"
activityName := "activity_type1"

Expand Down
121 changes: 115 additions & 6 deletions host/integration_test.go
Expand Up @@ -278,9 +278,9 @@ StartNewExecutionLoop:
}

func (s *integrationSuite) TestSequentialWorkflow() {
id := "interation-sequential-workflow-test"
wt := "interation-sequential-workflow-test-type"
tl := "interation-sequential-workflow-test-tasklist"
id := "integration-sequential-workflow-test"
wt := "integration-sequential-workflow-test-type"
tl := "integration-sequential-workflow-test-tasklist"
identity := "worker1"
activityName := "activity_type1"

Expand Down Expand Up @@ -388,9 +388,9 @@ func (s *integrationSuite) TestSequentialWorkflow() {
}

func (s *integrationSuite) TestCompleteDecisionTaskAndCreateNewOne() {
id := "interation-complete-decision-create-new-test"
wt := "interation-complete-decision-create-new-test-type"
tl := "interation-complete-decision-create-new-test-tasklist"
id := "integration-complete-decision-create-new-test"
wt := "integration-complete-decision-create-new-test-type"
tl := "integration-complete-decision-create-new-test-tasklist"
identity := "worker1"

workflowType := &workflow.WorkflowType{}
Expand Down Expand Up @@ -859,6 +859,9 @@ func (s *integrationSuite) TestCronWorkflow() {
memo := &workflow.Memo{
Fields: map[string][]byte{"memoKey": []byte("memoVal")},
}
searchAttr := &workflow.SearchAttributes{
IndexedFields: map[string][]byte{"CustomKeywordField": []byte("1")},
}

request := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Expand All @@ -872,6 +875,7 @@ func (s *integrationSuite) TestCronWorkflow() {
Identity: common.StringPtr(identity),
CronSchedule: common.StringPtr(cronSchedule), //minimum interval by standard spec is 1m (* * * * *), use non-standard descriptor for short interval for test
Memo: memo,
SearchAttributes: searchAttr,
}

startWorkflowTS := time.Now()
Expand Down Expand Up @@ -969,6 +973,7 @@ func (s *integrationSuite) TestCronWorkflow() {
s.Equal("cron-test-error", attributes.GetFailureReason())
s.Equal(0, len(attributes.GetLastCompletionResult()))
s.Equal(memo, attributes.Memo)
s.Equal(searchAttr, attributes.SearchAttributes)

events = s.getHistory(s.domainName, executions[1])
lastEvent = events[len(events)-1]
Expand All @@ -978,6 +983,7 @@ func (s *integrationSuite) TestCronWorkflow() {
s.Equal("", attributes.GetFailureReason())
s.Equal("cron-test-result", string(attributes.GetLastCompletionResult()))
s.Equal(memo, attributes.Memo)
s.Equal(searchAttr, attributes.SearchAttributes)

events = s.getHistory(s.domainName, executions[2])
lastEvent = events[len(events)-1]
Expand All @@ -987,6 +993,7 @@ func (s *integrationSuite) TestCronWorkflow() {
s.Equal("cron-test-error", attributes.GetFailureReason())
s.Equal("cron-test-result", string(attributes.GetLastCompletionResult()))
s.Equal(memo, attributes.Memo)
s.Equal(searchAttr, attributes.SearchAttributes)

startFilter.LatestTime = common.Int64Ptr(time.Now().UnixNano())
var closedExecutions []*workflow.WorkflowExecutionInfo
Expand Down Expand Up @@ -1037,6 +1044,108 @@ func (s *integrationSuite) TestCronWorkflow() {
}
}

func (s *integrationSuite) TestCronWorkflowTimeout() {
id := "integration-wf-cron-timeout-test"
wt := "integration-wf-cron-timeout-type"
tl := "integration-wf-cron-timeout-tasklist"
identity := "worker1"
cronSchedule := "@every 3s"

workflowType := &workflow.WorkflowType{}
workflowType.Name = common.StringPtr(wt)

taskList := &workflow.TaskList{}
taskList.Name = common.StringPtr(tl)

memo := &workflow.Memo{
Fields: map[string][]byte{"memoKey": []byte("memoVal")},
}
searchAttr := &workflow.SearchAttributes{
IndexedFields: map[string][]byte{"CustomKeywordField": []byte("1")},
}

request := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(s.domainName),
WorkflowId: common.StringPtr(id),
WorkflowType: workflowType,
TaskList: taskList,
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1), // set workflow timeout to 1s
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1),
Identity: common.StringPtr(identity),
CronSchedule: common.StringPtr(cronSchedule), //minimum interval by standard spec is 1m (* * * * *), use non-standard descriptor for short interval for test
Memo: memo,
SearchAttributes: searchAttr,
}

we, err0 := s.engine.StartWorkflowExecution(createContext(), request)
s.Nil(err0)

s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(*we.RunId))

var executions []*workflow.WorkflowExecution
dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,
previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) {

executions = append(executions, execution)
return nil, []*workflow.Decision{
{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeStartTimer),

StartTimerDecisionAttributes: &workflow.StartTimerDecisionAttributes{
TimerId: common.StringPtr("timer-id"),
StartToFireTimeoutSeconds: common.Int64Ptr(5),
},
}}, nil
}

poller := &TaskPoller{
Engine: s.engine,
Domain: s.domainName,
TaskList: taskList,
Identity: identity,
DecisionHandler: dtHandler,
Logger: s.Logger,
T: s.T(),
}

_, err := poller.PollAndProcessDecisionTask(false, false)
s.True(err == nil, err)

time.Sleep(1 * time.Second) // wait for workflow timeout

// check when workflow timeout, continueAsNew event contains expected fields
events := s.getHistory(s.domainName, executions[0])
lastEvent := events[len(events)-1]
s.Equal(workflow.EventTypeWorkflowExecutionContinuedAsNew, lastEvent.GetEventType())
attributes := lastEvent.WorkflowExecutionContinuedAsNewEventAttributes
s.Equal(workflow.ContinueAsNewInitiatorCronSchedule, attributes.GetInitiator())
s.Equal("cadenceInternal:Timeout START_TO_CLOSE", attributes.GetFailureReason())
s.Equal(memo, attributes.Memo)
s.Equal(searchAttr, attributes.SearchAttributes)

_, err = poller.PollAndProcessDecisionTask(false, false)
s.True(err == nil, err)

// check new run contains expected fields
events = s.getHistory(s.domainName, executions[1])
firstEvent := events[0]
s.Equal(workflow.EventTypeWorkflowExecutionStarted, firstEvent.GetEventType())
startAttributes := firstEvent.WorkflowExecutionStartedEventAttributes
s.Equal(memo, startAttributes.Memo)
s.Equal(searchAttr, startAttributes.SearchAttributes)

// terminate cron
terminateErr := s.engine.TerminateWorkflowExecution(createContext(), &workflow.TerminateWorkflowExecutionRequest{
Domain: common.StringPtr(s.domainName),
WorkflowExecution: &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(id),
},
})
s.NoError(terminateErr)
}

func (s *integrationSuite) TestSequential_UserTimers() {
id := "integration-sequential-user-timers-test"
wt := "integration-sequential-user-timers-test-type"
Expand Down
6 changes: 3 additions & 3 deletions host/signalworkflow_test.go
Expand Up @@ -225,9 +225,9 @@ func (s *integrationSuite) TestSignalWorkflow() {
}

func (s *integrationSuite) TestSignalWorkflow_DuplicateRequest() {
id := "interation-signal-workflow-test-duplicate"
wt := "interation-signal-workflow-test-duplicate-type"
tl := "interation-signal-workflow-test-duplicate-tasklist"
id := "integration-signal-workflow-test-duplicate"
wt := "integration-signal-workflow-test-duplicate-type"
tl := "integration-signal-workflow-test-duplicate-tasklist"
identity := "worker1"
activityName := "activity_type1"

Expand Down
6 changes: 4 additions & 2 deletions service/history/timerQueueActiveProcessor.go
Expand Up @@ -749,15 +749,17 @@ func (t *timerQueueActiveProcessorImpl) processWorkflowTimeout(
continueAsnewAttributes := &workflow.ContinueAsNewWorkflowExecutionDecisionAttributes{
WorkflowType: startAttributes.WorkflowType,
TaskList: startAttributes.TaskList,
RetryPolicy: startAttributes.RetryPolicy,
Input: startAttributes.Input,
Header: startAttributes.Header,
ExecutionStartToCloseTimeoutSeconds: startAttributes.ExecutionStartToCloseTimeoutSeconds,
TaskStartToCloseTimeoutSeconds: startAttributes.TaskStartToCloseTimeoutSeconds,
BackoffStartIntervalInSeconds: common.Int32Ptr(int32(backoffInterval.Seconds())),
RetryPolicy: startAttributes.RetryPolicy,
Initiator: continueAsNewInitiator.Ptr(),
FailureReason: common.StringPtr(timeoutReason),
CronSchedule: common.StringPtr(msBuilder.GetExecutionInfo().CronSchedule),
Header: startAttributes.Header,
Memo: startAttributes.Memo,
SearchAttributes: startAttributes.SearchAttributes,
}
newMutableState, err := retryWorkflow(
msBuilder,
Expand Down

0 comments on commit a725304

Please sign in to comment.