Skip to content

Commit

Permalink
Do not delete first event batch if creation of mutable state fails (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 authored and Wenquan Xing committed Aug 27, 2019
1 parent b64b881 commit 4f00346
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 95 deletions.
51 changes: 9 additions & 42 deletions service/history/historyEngine.go
Expand Up @@ -464,15 +464,13 @@ func (e *historyEngineImpl) StartWorkflowExecution(
if err != nil {
if t, ok := err.(*persistence.WorkflowExecutionAlreadyStartedError); ok {
if t.StartRequestID == *request.RequestId {
e.deleteEvents(domainID, execution, eventStoreVersion, msBuilder.GetCurrentBranch())
return &workflow.StartWorkflowExecutionResponse{
RunId: common.StringPtr(t.RunID),
}, nil
// delete history is expected here because duplicate start request will create history with different rid
}

if msBuilder.GetCurrentVersion() < t.LastWriteVersion {
e.deleteEvents(domainID, execution, eventStoreVersion, msBuilder.GetCurrentBranch())
return nil, ce.NewDomainNotActiveError(
*request.Domain,
clusterMetadata.GetCurrentClusterName(),
Expand All @@ -484,9 +482,15 @@ func (e *historyEngineImpl) StartWorkflowExecution(
createMode = persistence.CreateWorkflowModeWorkflowIDReuse
prevRunID = t.RunID
prevLastWriteVersion = t.LastWriteVersion
err = e.applyWorkflowIDReusePolicyHelper(t.StartRequestID, prevRunID, t.State, t.CloseStatus, domainID, execution, startRequest.StartRequest.GetWorkflowIdReusePolicy())
if err != nil {
e.deleteEvents(domainID, execution, eventStoreVersion, msBuilder.GetCurrentBranch())
if err = e.applyWorkflowIDReusePolicyHelper(
t.StartRequestID,
prevRunID,
t.State,
t.CloseStatus,
domainID,
execution,
startRequest.StartRequest.GetWorkflowIdReusePolicy(),
); err != nil {
return nil, err
}
err = context.createWorkflowExecution(
Expand Down Expand Up @@ -1580,7 +1584,6 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(
)

if t, ok := err.(*persistence.WorkflowExecutionAlreadyStartedError); ok {
e.deleteEvents(domainID, execution, eventStoreVersion, msBuilder.GetCurrentBranch())
if t.StartRequestID == *request.RequestId {
return &workflow.StartWorkflowExecutionResponse{
RunId: common.StringPtr(t.RunID),
Expand Down Expand Up @@ -2019,42 +2022,6 @@ func createDeleteHistoryEventTimerTask(
}
}

func (e *historyEngineImpl) deleteEvents(
domainID string,
execution workflow.WorkflowExecution,
eventStoreVersion int32,
branchToken []byte,
) {

var err error
defer func() {
// This is the only code path that deleting history could cause history corruption(missing first batch).
// We will use this warn log to verify this issue: https://github.com/uber/cadence/issues/2441
e.logger.Warn("encounter WorkflowExecutionAlreadyStartedError, deleting duplicated history",
tag.ShardID(e.shard.GetShardID()),
tag.WorkflowDomainID(domainID),
tag.WorkflowID(execution.GetWorkflowId()),
tag.WorkflowRunID(execution.GetRunId()),
tag.Number(int64(eventStoreVersion)),
tag.Error(err))
}()
// We created the history events but failed to create workflow execution, so cleanup the history which could cause
// us to leak history events which are never cleaned up. Cleaning up the events is absolutely safe here as they
// are always created for a unique run_id which is not visible beyond this call yet.
// TODO: Handle error on deletion of execution history
if eventStoreVersion == persistence.EventStoreVersionV2 {
err = e.historyV2Mgr.DeleteHistoryBranch(&persistence.DeleteHistoryBranchRequest{
BranchToken: branchToken,
ShardID: common.IntPtr(e.shard.GetShardID()),
})
} else {
err = e.historyMgr.DeleteWorkflowExecutionHistory(&persistence.DeleteWorkflowExecutionHistoryRequest{
DomainID: domainID,
Execution: execution,
})
}
}

func (e *historyEngineImpl) failDecision(
context workflowExecutionContext,
scheduleID int64,
Expand Down
53 changes: 0 additions & 53 deletions service/history/historyEngine2_test.go
Expand Up @@ -1189,7 +1189,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_Dedup() {
CloseStatus: p.WorkflowCloseStatusNone,
LastWriteVersion: lastWriteVersion,
}).Once()
s.mockHistoryV2Mgr.On("DeleteHistoryBranch", mock.Anything).Return(nil).Once()
s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
&p.GetDomainResponse{
Info: &p.DomainInfo{ID: domainID},
Expand Down Expand Up @@ -1240,7 +1239,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_NonDeDup() {
CloseStatus: p.WorkflowCloseStatusNone,
LastWriteVersion: lastWriteVersion,
}).Once()
s.mockHistoryV2Mgr.On("DeleteHistoryBranch", mock.Anything).Return(nil).Once()
s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
&p.GetDomainResponse{
Info: &p.DomainInfo{ID: domainID},
Expand Down Expand Up @@ -1275,51 +1273,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_NonDeDup() {
s.Nil(resp)
}

func (s *engine2Suite) TestStartWorkflowExecution_TimeoutError() {
domainID := validDomainID
workflowID := "workflowID"
workflowType := "workflowType"
taskList := "testTaskList"
identity := "testIdentity"

s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once()
s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(nil, &p.TimeoutError{
Msg: "random message",
}).Once()
// NOTE: should not delete history for timeout because timeout is unknown instead of failure
//s.mockHistoryV2Mgr.On("DeleteHistoryBranch", mock.Anything).Return(nil).Once()
s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
&p.GetDomainResponse{
Info: &p.DomainInfo{ID: domainID},
Config: &p.DomainConfig{Retention: 1},
ReplicationConfig: &p.DomainReplicationConfig{
ActiveClusterName: cluster.TestCurrentClusterName,
Clusters: []*p.ClusterReplicationConfig{
&p.ClusterReplicationConfig{ClusterName: cluster.TestCurrentClusterName},
},
},
TableVersion: p.DomainTableVersionV1,
},
nil,
)

resp, err := s.historyEngine.StartWorkflowExecution(context.Background(), &h.StartWorkflowExecutionRequest{
DomainUUID: common.StringPtr(domainID),
StartRequest: &workflow.StartWorkflowExecutionRequest{
Domain: common.StringPtr(domainID),
WorkflowId: common.StringPtr(workflowID),
WorkflowType: &workflow.WorkflowType{Name: common.StringPtr(workflowType)},
TaskList: &workflow.TaskList{Name: common.StringPtr(taskList)},
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2),
Identity: common.StringPtr(identity),
RequestId: common.StringPtr("newRequestID"),
},
})
s.True(p.IsTimeoutError(err))
s.Nil(resp)
}

func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevSuccess() {
domainID := validDomainID
workflowID := "workflowID"
Expand Down Expand Up @@ -1376,8 +1329,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevSuccess() {
request.PreviousLastWriteVersion == lastWriteVersion
}),
).Return(&p.CreateWorkflowExecutionResponse{}, nil).Once()
} else {
s.mockHistoryV2Mgr.On("DeleteHistoryBranch", mock.Anything).Return(nil).Once()
}

resp, err := s.historyEngine.StartWorkflowExecution(context.Background(), &h.StartWorkflowExecutionRequest{
Expand Down Expand Up @@ -1473,8 +1424,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevFail() {
request.PreviousLastWriteVersion == lastWriteVersion
}),
).Return(&p.CreateWorkflowExecutionResponse{}, nil).Once()
} else {
s.mockHistoryV2Mgr.On("DeleteHistoryBranch", mock.Anything).Return(nil).Once()
}

resp, err := s.historyEngine.StartWorkflowExecution(context.Background(), &h.StartWorkflowExecutionRequest{
Expand Down Expand Up @@ -1776,7 +1725,6 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_Start_DuplicateReque
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once()
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once()
s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(nil, workflowAlreadyStartedErr).Once()
s.mockHistoryV2Mgr.On("DeleteHistoryBranch", mock.Anything).Return(nil).Once()
s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
&p.GetDomainResponse{
Info: &p.DomainInfo{ID: domainID},
Expand Down Expand Up @@ -1843,7 +1791,6 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_Start_WorkflowAlread
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once()
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once()
s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(nil, workflowAlreadyStartedErr).Once()
s.mockHistoryV2Mgr.On("DeleteHistoryBranch", mock.Anything).Return(nil).Once()
s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
&p.GetDomainResponse{
Info: &p.DomainInfo{ID: domainID},
Expand Down

0 comments on commit 4f00346

Please sign in to comment.