Skip to content

Commit

Permalink
Save last RetryStatus for retryable failures (#432)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Jun 5, 2020
1 parent ba7072e commit 4e0c4e0
Show file tree
Hide file tree
Showing 23 changed files with 582 additions and 519 deletions.
3 changes: 2 additions & 1 deletion common/backoff/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ import (
"time"

"github.com/robfig/cron"
"github.com/temporalio/temporal/common/convert"
"go.temporal.io/temporal-proto/serviceerror"

"github.com/temporalio/temporal/common/convert"
)

// NoBackoff is used to represent backoff when no cron backoff is needed
Expand Down
7 changes: 6 additions & 1 deletion common/testing/history_event_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pborman/uuid"
commonpb "go.temporal.io/temporal-proto/common"
eventpb "go.temporal.io/temporal-proto/event"
failurepb "go.temporal.io/temporal-proto/failure"
tasklistpb "go.temporal.io/temporal-proto/tasklist"

"github.com/temporalio/temporal/common/failure"
Expand Down Expand Up @@ -463,7 +464,11 @@ func InitializeHistoryEventGenerator(
historyEvent.Attributes = &eventpb.HistoryEvent_ActivityTaskTimedOutEventAttributes{ActivityTaskTimedOutEventAttributes: &eventpb.ActivityTaskTimedOutEventAttributes{
ScheduledEventId: lastEvent.GetActivityTaskStartedEventAttributes().ScheduledEventId,
StartedEventId: lastEvent.EventId,
TimeoutType: commonpb.TimeoutType_ScheduleToClose,
Failure: &failurepb.Failure{
FailureInfo: &failurepb.Failure_TimeoutFailureInfo{TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{
TimeoutType: commonpb.TimeoutType_ScheduleToClose,
}},
},
}}
return historyEvent
})
Expand Down
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ require (
github.com/urfave/cli v1.22.4
github.com/valyala/fastjson v1.5.1
github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2
go.temporal.io/temporal v0.23.5
go.temporal.io/temporal-proto v0.23.3
go.temporal.io/temporal v0.23.6
go.temporal.io/temporal-proto v0.23.4
go.uber.org/atomic v1.6.0
go.uber.org/multierr v1.5.0
go.uber.org/zap v1.15.0
Expand All @@ -75,7 +75,6 @@ require (
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1
golang.org/x/tools v0.0.0-20200604183345-4d5ea46c79fe // indirect
google.golang.org/api v0.26.0
google.golang.org/genproto v0.0.0-20200604104852-0b0486081ffb // indirect
google.golang.org/grpc v1.29.1
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/validator.v2 v2.0.0-20191107172027-c3144fdedc21
Expand Down
11 changes: 4 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,10 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3 h1:8sGtKOrtQqkN1bp2AtX+misvLIlOmsEsNd+9NIcPEm8=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.temporal.io/temporal v0.23.5 h1:bTLnjcv/wD6fB4QrAvB3mOR+J8FrNLWSHs6cmtANnPo=
go.temporal.io/temporal v0.23.5/go.mod h1:cqXeyF9fmSbXTP5j1TOrr5WvSG+H61aDybR0NcwVASU=
go.temporal.io/temporal-proto v0.23.3 h1:BAtO0Msx8B9d7JS/uT2xor66V/lcM2tDettb5M+aFN8=
go.temporal.io/temporal-proto v0.23.3/go.mod h1:MH1w9QQvM360POj9mhZdJqOPtFYm1S/ylwrTnYiFr6I=
go.temporal.io/temporal v0.23.6 h1:dxN8NJnn5tBu+9DN7q6ocLaL/mxvOezevdsn5a9HgkU=
go.temporal.io/temporal v0.23.6/go.mod h1:KmBabmbN6XzCAQFHd+kjF8OIgj8SQB2BtqOenFiOdsY=
go.temporal.io/temporal-proto v0.23.4 h1:PnQmmTWpQJXapzcBKQ8FgVDeK/B7Z3G08YjmomurJDs=
go.temporal.io/temporal-proto v0.23.4/go.mod h1:B5DnsWAGFpqd9M6bg2OUVzC3Y5/7UVDM0wEkZm7rYN0=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
Expand Down Expand Up @@ -534,7 +534,6 @@ google.golang.org/api v0.18.0/go.mod h1:BwFmGc8tA3vsd7r/7kR8DY7iEEGSU04BFxCo5jP/
google.golang.org/api v0.19.0/go.mod h1:BwFmGc8tA3vsd7r/7kR8DY7iEEGSU04BFxCo5jP/sfE=
google.golang.org/api v0.20.0/go.mod h1:BwFmGc8tA3vsd7r/7kR8DY7iEEGSU04BFxCo5jP/sfE=
google.golang.org/api v0.22.0/go.mod h1:BwFmGc8tA3vsd7r/7kR8DY7iEEGSU04BFxCo5jP/sfE=
google.golang.org/api v0.24.0 h1:cG03eaksBzhfSIk7JRGctfp3lanklcOM/mTGvow7BbQ=
google.golang.org/api v0.24.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0MncE=
google.golang.org/api v0.26.0 h1:VJZ8h6E8ip82FRpQl848c5vAadxlTXrUh8RzQzSRm08=
google.golang.org/api v0.26.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0MncE=
Expand Down Expand Up @@ -571,7 +570,6 @@ google.golang.org/genproto v0.0.0-20200430143042-b979b6f78d84/go.mod h1:55QSHmfG
google.golang.org/genproto v0.0.0-20200511104702-f5ebc3bea380/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200515170657-fc4c6c6a6587/go.mod h1:YsZOwe1myG/8QRHRsmBRE1LrgQY60beZKjly0O1fX9U=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20200603110839-e855014d5736 h1:+IE3xTD+6Eb7QWG5JFp+dQr/XjKpjmrNkh4pdjTdHEs=
google.golang.org/genproto v0.0.0-20200603110839-e855014d5736/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA=
google.golang.org/genproto v0.0.0-20200604104852-0b0486081ffb h1:ek2py5bOqzR7MR/6obzk0rXUgYCLmjyLnaO9ssT+l6w=
google.golang.org/genproto v0.0.0-20200604104852-0b0486081ffb/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA=
Expand All @@ -593,7 +591,6 @@ google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQ
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.24.0 h1:UhZDfRO8JRQru4/+LlLE0BRKGF8L+PICnvYZmx/fEGA=
Expand Down
6 changes: 3 additions & 3 deletions host/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ func (s *integrationSuite) TestActivityTimeouts() {
}}, nil
}

switch timeoutEvent.GetTimeoutType() {
switch timeoutEvent.GetFailure().GetTimeoutFailureInfo().GetTimeoutType() {
case commonpb.TimeoutType_ScheduleToStart:
if scheduledEvent.GetActivityTaskScheduledEventAttributes().GetActivityId() == "A" {
activityATimedout = true
Expand Down Expand Up @@ -975,12 +975,12 @@ func (s *integrationSuite) TestActivityHeartbeatTimeouts() {
break ProcessLoop
}

switch timeoutEvent.GetTimeoutType() {
switch timeoutEvent.GetFailure().GetTimeoutFailureInfo().GetTimeoutType() {
case commonpb.TimeoutType_Heartbeat:
activitiesTimedout++
scheduleID := timeoutEvent.GetScheduledEventId()
var details string
err := payloads.Decode(timeoutEvent.GetLastHeartbeatDetails(), &details)
err := payloads.Decode(timeoutEvent.GetFailure().GetTimeoutFailureInfo().GetLastHeartbeatDetails(), &details)
s.NoError(err)
lastHeartbeat, _ := strconv.Atoi(details)
lastHeartbeatMap[scheduleID] = lastHeartbeat
Expand Down
13 changes: 11 additions & 2 deletions host/ndc/nDC_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"testing"
"time"

failurepb "go.temporal.io/temporal-proto/failure"
replicationpb "go.temporal.io/temporal-proto/replication"

eventgenpb "github.com/temporalio/temporal/.gen/proto/event"
Expand Down Expand Up @@ -590,7 +591,11 @@ func (s *nDCIntegrationTestSuite) TestHandcraftedMultipleBranches() {
Attributes: &eventpb.HistoryEvent_ActivityTaskTimedOutEventAttributes{ActivityTaskTimedOutEventAttributes: &eventpb.ActivityTaskTimedOutEventAttributes{
ScheduledEventId: 6,
StartedEventId: 7,
TimeoutType: commonpb.TimeoutType_StartToClose,
Failure: &failurepb.Failure{
FailureInfo: &failurepb.Failure_TimeoutFailureInfo{TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{
TimeoutType: commonpb.TimeoutType_StartToClose,
}},
},
}},
},
{
Expand Down Expand Up @@ -1383,7 +1388,11 @@ func (s *nDCIntegrationTestSuite) TestAdminGetWorkflowExecutionRawHistoryV2() {
Attributes: &eventpb.HistoryEvent_ActivityTaskTimedOutEventAttributes{ActivityTaskTimedOutEventAttributes: &eventpb.ActivityTaskTimedOutEventAttributes{
ScheduledEventId: 6,
StartedEventId: 7,
TimeoutType: commonpb.TimeoutType_StartToClose,
Failure: &failurepb.Failure{
FailureInfo: &failurepb.Failure_TimeoutFailureInfo{TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{
TimeoutType: commonpb.TimeoutType_StartToClose,
}},
},
}},
},
{
Expand Down
2 changes: 1 addition & 1 deletion proto/temporal-proto
4 changes: 2 additions & 2 deletions service/history/decisionChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (c *workflowSizeChecker) failWorkflowIfPayloadSizeExceedsLimit(
Failure: failure.NewServerFailure(message, true),
}

if _, err := c.mutableState.AddFailWorkflowEvent(c.completedID, attributes); err != nil {
if _, err := c.mutableState.AddFailWorkflowEvent(c.completedID, commonpb.RetryStatus_NonRetryableFailure, attributes); err != nil {
return false, err
}

Expand All @@ -172,7 +172,7 @@ func (c *workflowSizeChecker) failWorkflowSizeExceedsLimit() (bool, error) {
Failure: failure.NewServerFailure(common.FailureReasonSizeExceedsLimit, false),
}

if _, err := c.mutableState.AddFailWorkflowEvent(c.completedID, attributes); err != nil {
if _, err := c.mutableState.AddFailWorkflowEvent(c.completedID, commonpb.RetryStatus_NonRetryableFailure, attributes); err != nil {
return false, err
}
return true, nil
Expand Down
6 changes: 3 additions & 3 deletions service/history/decisionTaskHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func (handler *decisionTaskHandlerImpl) handleDecisionFailWorkflow(
}

// below will check whether to do continue as new based on backoff & backoff or cron
backoffInterval := handler.mutableState.GetRetryBackoffDuration(attr.GetFailure())
backoffInterval, retryStatus := handler.mutableState.GetRetryBackoffDuration(attr.GetFailure())
continueAsNewInitiator := commonpb.ContinueAsNewInitiator_Retry
// first check the backoff retry
if backoffInterval == backoff.NoBackoff {
Expand All @@ -456,7 +456,7 @@ func (handler *decisionTaskHandlerImpl) handleDecisionFailWorkflow(
// second check the backoff / cron schedule
if backoffInterval == backoff.NoBackoff {
// no retry or cron
if _, err := handler.mutableState.AddFailWorkflowEvent(handler.decisionTaskCompletedID, attr); err != nil {
if _, err := handler.mutableState.AddFailWorkflowEvent(handler.decisionTaskCompletedID, retryStatus, attr); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -670,7 +670,7 @@ func (handler *decisionTaskHandlerImpl) handleDecisionContinueAsNewWorkflow(
// TODO(maxim): is decisionTaskCompletedID the correct id?
// TODO(maxim): should we introduce new TimeoutTypes (Workflow, Run) for workflows?
handler.stopProcessing = true
_, err := handler.mutableState.AddTimeoutWorkflowEvent(handler.decisionTaskCompletedID)
_, err := handler.mutableState.AddTimeoutWorkflowEvent(handler.decisionTaskCompletedID, commonpb.RetryStatus_Timeout)
return err
}
handler.logger.Debug("!!!! Continued as new without timeout",
Expand Down
44 changes: 23 additions & 21 deletions service/history/historyBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,20 +160,19 @@ func (b *historyBuilder) AddActivityTaskCompletedEvent(scheduleEventID, startedE
}

func (b *historyBuilder) AddActivityTaskFailedEvent(scheduleEventID, startedEventID int64,
request *workflowservice.RespondActivityTaskFailedRequest) *eventpb.HistoryEvent {
event := b.newActivityTaskFailedEvent(scheduleEventID, startedEventID, request)
failure *failurepb.Failure, retryStatus commonpb.RetryStatus, identity string) *eventpb.HistoryEvent {
event := b.newActivityTaskFailedEvent(scheduleEventID, startedEventID, failure, retryStatus, identity)

return b.addEventToHistory(event)
}

func (b *historyBuilder) AddActivityTaskTimedOutEvent(
scheduleEventID,
startedEventID int64,
timeoutType commonpb.TimeoutType,
lastHeartBeatDetails *commonpb.Payloads,
lastFailure *failurepb.Failure,
timeoutFailure *failurepb.Failure,
retryStatus commonpb.RetryStatus,
) *eventpb.HistoryEvent {
event := b.newActivityTaskTimedOutEvent(scheduleEventID, startedEventID, timeoutType, lastHeartBeatDetails, lastFailure)
event := b.newActivityTaskTimedOutEvent(scheduleEventID, startedEventID, timeoutFailure, retryStatus)

return b.addEventToHistory(event)
}
Expand All @@ -185,15 +184,15 @@ func (b *historyBuilder) AddCompletedWorkflowEvent(decisionCompletedEventID int6
return b.addEventToHistory(event)
}

func (b *historyBuilder) AddFailWorkflowEvent(decisionCompletedEventID int64,
func (b *historyBuilder) AddFailWorkflowEvent(decisionCompletedEventID int64, retryStatus commonpb.RetryStatus,
attributes *decisionpb.FailWorkflowExecutionDecisionAttributes) *eventpb.HistoryEvent {
event := b.newFailWorkflowExecutionEvent(decisionCompletedEventID, attributes)
event := b.newFailWorkflowExecutionEvent(decisionCompletedEventID, retryStatus, attributes)

return b.addEventToHistory(event)
}

func (b *historyBuilder) AddTimeoutWorkflowEvent() *eventpb.HistoryEvent {
event := b.newTimeoutWorkflowExecutionEvent()
func (b *historyBuilder) AddTimeoutWorkflowEvent(retryStatus commonpb.RetryStatus) *eventpb.HistoryEvent {
event := b.newTimeoutWorkflowExecutionEvent(retryStatus)

return b.addEventToHistory(event)
}
Expand Down Expand Up @@ -623,31 +622,30 @@ func (b *historyBuilder) newActivityTaskCompletedEvent(scheduleEventID, startedE

func (b *historyBuilder) newActivityTaskTimedOutEvent(
scheduleEventID, startedEventID int64,
timeoutType commonpb.TimeoutType,
lastHeartBeatDetails *commonpb.Payloads,
lastFailure *failurepb.Failure,
timeoutFailure *failurepb.Failure,
retryStatus commonpb.RetryStatus,
) *eventpb.HistoryEvent {
historyEvent := b.msBuilder.CreateNewHistoryEvent(eventpb.EventType_ActivityTaskTimedOut)
attributes := &eventpb.ActivityTaskTimedOutEventAttributes{}
attributes.ScheduledEventId = scheduleEventID
attributes.StartedEventId = startedEventID
attributes.TimeoutType = timeoutType
attributes.LastHeartbeatDetails = lastHeartBeatDetails
attributes.LastFailure = lastFailure
attributes.Failure = timeoutFailure
attributes.RetryStatus = retryStatus

historyEvent.Attributes = &eventpb.HistoryEvent_ActivityTaskTimedOutEventAttributes{ActivityTaskTimedOutEventAttributes: attributes}

return historyEvent
}

func (b *historyBuilder) newActivityTaskFailedEvent(scheduleEventID, startedEventID int64,
request *workflowservice.RespondActivityTaskFailedRequest) *eventpb.HistoryEvent {
failure *failurepb.Failure, retryStatus commonpb.RetryStatus, identity string) *eventpb.HistoryEvent {
historyEvent := b.msBuilder.CreateNewHistoryEvent(eventpb.EventType_ActivityTaskFailed)
attributes := &eventpb.ActivityTaskFailedEventAttributes{}
attributes.Failure = request.GetFailure()
attributes.Failure = failure
attributes.RetryStatus = retryStatus
attributes.ScheduledEventId = scheduleEventID
attributes.StartedEventId = startedEventID
attributes.Identity = request.Identity
attributes.Identity = identity
historyEvent.Attributes = &eventpb.HistoryEvent_ActivityTaskFailedEventAttributes{ActivityTaskFailedEventAttributes: attributes}

return historyEvent
Expand All @@ -664,21 +662,23 @@ func (b *historyBuilder) newCompleteWorkflowExecutionEvent(decisionTaskCompleted
return historyEvent
}

func (b *historyBuilder) newFailWorkflowExecutionEvent(decisionTaskCompletedEventID int64,
func (b *historyBuilder) newFailWorkflowExecutionEvent(decisionTaskCompletedEventID int64, retryStatus commonpb.RetryStatus,
request *decisionpb.FailWorkflowExecutionDecisionAttributes) *eventpb.HistoryEvent {
historyEvent := b.msBuilder.CreateNewHistoryEvent(eventpb.EventType_WorkflowExecutionFailed)
attributes := &eventpb.WorkflowExecutionFailedEventAttributes{}
attributes.Failure = request.GetFailure()
attributes.RetryStatus = retryStatus
attributes.DecisionTaskCompletedEventId = decisionTaskCompletedEventID
historyEvent.Attributes = &eventpb.HistoryEvent_WorkflowExecutionFailedEventAttributes{WorkflowExecutionFailedEventAttributes: attributes}

return historyEvent
}

func (b *historyBuilder) newTimeoutWorkflowExecutionEvent() *eventpb.HistoryEvent {
func (b *historyBuilder) newTimeoutWorkflowExecutionEvent(retryStatus commonpb.RetryStatus) *eventpb.HistoryEvent {
historyEvent := b.msBuilder.CreateNewHistoryEvent(eventpb.EventType_WorkflowExecutionTimedOut)
attributes := &eventpb.WorkflowExecutionTimedOutEventAttributes{}
attributes.TimeoutType = commonpb.TimeoutType_StartToClose
attributes.RetryStatus = retryStatus
historyEvent.Attributes = &eventpb.HistoryEvent_WorkflowExecutionTimedOutEventAttributes{WorkflowExecutionTimedOutEventAttributes: attributes}

return historyEvent
Expand Down Expand Up @@ -970,6 +970,7 @@ func (b *historyBuilder) newChildWorkflowExecutionFailedEvent(namespace string,
attributes.InitiatedEventId = initiatedID
attributes.StartedEventId = startedID
attributes.Failure = failedAttributes.GetFailure()
attributes.RetryStatus = failedAttributes.GetRetryStatus()
historyEvent.Attributes = &eventpb.HistoryEvent_ChildWorkflowExecutionFailedEventAttributes{ChildWorkflowExecutionFailedEventAttributes: attributes}

return historyEvent
Expand Down Expand Up @@ -1017,6 +1018,7 @@ func (b *historyBuilder) newChildWorkflowExecutionTimedOutEvent(namespace string
attributes.WorkflowType = workflowType
attributes.InitiatedEventId = initiatedID
attributes.StartedEventId = startedID
attributes.RetryStatus = timedOutAttributes.GetRetryStatus()
historyEvent.Attributes = &eventpb.HistoryEvent_ChildWorkflowExecutionTimedOutEventAttributes{ChildWorkflowExecutionTimedOutEventAttributes: attributes}

return historyEvent
Expand Down
Loading

0 comments on commit 4e0c4e0

Please sign in to comment.