From 4f00346f7cbcbab221375b9218739164218f016e Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Mon, 26 Aug 2019 17:22:00 -0700 Subject: [PATCH] Do not delete first event batch if creation of mutable state fails (#2453) --- service/history/historyEngine.go | 51 +++++-------------------- service/history/historyEngine2_test.go | 53 -------------------------- 2 files changed, 9 insertions(+), 95 deletions(-) diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index bb13970137..38ae0fe77d 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -464,7 +464,6 @@ func (e *historyEngineImpl) StartWorkflowExecution( if err != nil { if t, ok := err.(*persistence.WorkflowExecutionAlreadyStartedError); ok { if t.StartRequestID == *request.RequestId { - e.deleteEvents(domainID, execution, eventStoreVersion, msBuilder.GetCurrentBranch()) return &workflow.StartWorkflowExecutionResponse{ RunId: common.StringPtr(t.RunID), }, nil @@ -472,7 +471,6 @@ func (e *historyEngineImpl) StartWorkflowExecution( } if msBuilder.GetCurrentVersion() < t.LastWriteVersion { - e.deleteEvents(domainID, execution, eventStoreVersion, msBuilder.GetCurrentBranch()) return nil, ce.NewDomainNotActiveError( *request.Domain, clusterMetadata.GetCurrentClusterName(), @@ -484,9 +482,15 @@ func (e *historyEngineImpl) StartWorkflowExecution( createMode = persistence.CreateWorkflowModeWorkflowIDReuse prevRunID = t.RunID prevLastWriteVersion = t.LastWriteVersion - err = e.applyWorkflowIDReusePolicyHelper(t.StartRequestID, prevRunID, t.State, t.CloseStatus, domainID, execution, startRequest.StartRequest.GetWorkflowIdReusePolicy()) - if err != nil { - e.deleteEvents(domainID, execution, eventStoreVersion, msBuilder.GetCurrentBranch()) + if err = e.applyWorkflowIDReusePolicyHelper( + t.StartRequestID, + prevRunID, + t.State, + t.CloseStatus, + domainID, + execution, + startRequest.StartRequest.GetWorkflowIdReusePolicy(), + ); err != nil { return nil, err } err = context.createWorkflowExecution( @@ -1580,7 +1584,6 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution( ) if t, ok := err.(*persistence.WorkflowExecutionAlreadyStartedError); ok { - e.deleteEvents(domainID, execution, eventStoreVersion, msBuilder.GetCurrentBranch()) if t.StartRequestID == *request.RequestId { return &workflow.StartWorkflowExecutionResponse{ RunId: common.StringPtr(t.RunID), @@ -2019,42 +2022,6 @@ func createDeleteHistoryEventTimerTask( } } -func (e *historyEngineImpl) deleteEvents( - domainID string, - execution workflow.WorkflowExecution, - eventStoreVersion int32, - branchToken []byte, -) { - - var err error - defer func() { - // This is the only code path that deleting history could cause history corruption(missing first batch). - // We will use this warn log to verify this issue: https://github.com/uber/cadence/issues/2441 - e.logger.Warn("encounter WorkflowExecutionAlreadyStartedError, deleting duplicated history", - tag.ShardID(e.shard.GetShardID()), - tag.WorkflowDomainID(domainID), - tag.WorkflowID(execution.GetWorkflowId()), - tag.WorkflowRunID(execution.GetRunId()), - tag.Number(int64(eventStoreVersion)), - tag.Error(err)) - }() - // We created the history events but failed to create workflow execution, so cleanup the history which could cause - // us to leak history events which are never cleaned up. Cleaning up the events is absolutely safe here as they - // are always created for a unique run_id which is not visible beyond this call yet. - // TODO: Handle error on deletion of execution history - if eventStoreVersion == persistence.EventStoreVersionV2 { - err = e.historyV2Mgr.DeleteHistoryBranch(&persistence.DeleteHistoryBranchRequest{ - BranchToken: branchToken, - ShardID: common.IntPtr(e.shard.GetShardID()), - }) - } else { - err = e.historyMgr.DeleteWorkflowExecutionHistory(&persistence.DeleteWorkflowExecutionHistoryRequest{ - DomainID: domainID, - Execution: execution, - }) - } -} - func (e *historyEngineImpl) failDecision( context workflowExecutionContext, scheduleID int64, diff --git a/service/history/historyEngine2_test.go b/service/history/historyEngine2_test.go index 7d4bbd6802..dbb500a0d3 100644 --- a/service/history/historyEngine2_test.go +++ b/service/history/historyEngine2_test.go @@ -1189,7 +1189,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_Dedup() { CloseStatus: p.WorkflowCloseStatusNone, LastWriteVersion: lastWriteVersion, }).Once() - s.mockHistoryV2Mgr.On("DeleteHistoryBranch", mock.Anything).Return(nil).Once() s.mockMetadataMgr.On("GetDomain", mock.Anything).Return( &p.GetDomainResponse{ Info: &p.DomainInfo{ID: domainID}, @@ -1240,7 +1239,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_NonDeDup() { CloseStatus: p.WorkflowCloseStatusNone, LastWriteVersion: lastWriteVersion, }).Once() - s.mockHistoryV2Mgr.On("DeleteHistoryBranch", mock.Anything).Return(nil).Once() s.mockMetadataMgr.On("GetDomain", mock.Anything).Return( &p.GetDomainResponse{ Info: &p.DomainInfo{ID: domainID}, @@ -1275,51 +1273,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_NonDeDup() { s.Nil(resp) } -func (s *engine2Suite) TestStartWorkflowExecution_TimeoutError() { - domainID := validDomainID - workflowID := "workflowID" - workflowType := "workflowType" - taskList := "testTaskList" - identity := "testIdentity" - - s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once() - s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(nil, &p.TimeoutError{ - Msg: "random message", - }).Once() - // NOTE: should not delete history for timeout because timeout is unknown instead of failure - //s.mockHistoryV2Mgr.On("DeleteHistoryBranch", mock.Anything).Return(nil).Once() - s.mockMetadataMgr.On("GetDomain", mock.Anything).Return( - &p.GetDomainResponse{ - Info: &p.DomainInfo{ID: domainID}, - Config: &p.DomainConfig{Retention: 1}, - ReplicationConfig: &p.DomainReplicationConfig{ - ActiveClusterName: cluster.TestCurrentClusterName, - Clusters: []*p.ClusterReplicationConfig{ - &p.ClusterReplicationConfig{ClusterName: cluster.TestCurrentClusterName}, - }, - }, - TableVersion: p.DomainTableVersionV1, - }, - nil, - ) - - resp, err := s.historyEngine.StartWorkflowExecution(context.Background(), &h.StartWorkflowExecutionRequest{ - DomainUUID: common.StringPtr(domainID), - StartRequest: &workflow.StartWorkflowExecutionRequest{ - Domain: common.StringPtr(domainID), - WorkflowId: common.StringPtr(workflowID), - WorkflowType: &workflow.WorkflowType{Name: common.StringPtr(workflowType)}, - TaskList: &workflow.TaskList{Name: common.StringPtr(taskList)}, - ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1), - TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2), - Identity: common.StringPtr(identity), - RequestId: common.StringPtr("newRequestID"), - }, - }) - s.True(p.IsTimeoutError(err)) - s.Nil(resp) -} - func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevSuccess() { domainID := validDomainID workflowID := "workflowID" @@ -1376,8 +1329,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevSuccess() { request.PreviousLastWriteVersion == lastWriteVersion }), ).Return(&p.CreateWorkflowExecutionResponse{}, nil).Once() - } else { - s.mockHistoryV2Mgr.On("DeleteHistoryBranch", mock.Anything).Return(nil).Once() } resp, err := s.historyEngine.StartWorkflowExecution(context.Background(), &h.StartWorkflowExecutionRequest{ @@ -1473,8 +1424,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevFail() { request.PreviousLastWriteVersion == lastWriteVersion }), ).Return(&p.CreateWorkflowExecutionResponse{}, nil).Once() - } else { - s.mockHistoryV2Mgr.On("DeleteHistoryBranch", mock.Anything).Return(nil).Once() } resp, err := s.historyEngine.StartWorkflowExecution(context.Background(), &h.StartWorkflowExecutionRequest{ @@ -1776,7 +1725,6 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_Start_DuplicateReque s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once() s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once() s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(nil, workflowAlreadyStartedErr).Once() - s.mockHistoryV2Mgr.On("DeleteHistoryBranch", mock.Anything).Return(nil).Once() s.mockMetadataMgr.On("GetDomain", mock.Anything).Return( &p.GetDomainResponse{ Info: &p.DomainInfo{ID: domainID}, @@ -1843,7 +1791,6 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_Start_WorkflowAlread s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once() s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once() s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(nil, workflowAlreadyStartedErr).Once() - s.mockHistoryV2Mgr.On("DeleteHistoryBranch", mock.Anything).Return(nil).Once() s.mockMetadataMgr.On("GetDomain", mock.Anything).Return( &p.GetDomainResponse{ Info: &p.DomainInfo{ID: domainID},