diff --git a/common/persistence/workflowStateCloseStatusValidator.go b/common/persistence/workflowStateCloseStatusValidator.go index 3323acaf79..eec26352d3 100644 --- a/common/persistence/workflowStateCloseStatusValidator.go +++ b/common/persistence/workflowStateCloseStatusValidator.go @@ -65,6 +65,7 @@ func ValidateUpdateWorkflowStateCloseStatus(state int, closeStatus int) error { return nil } +// ToThriftWorkflowExecutionCloseStatus convert persistence representation of close status to thrift representation func ToThriftWorkflowExecutionCloseStatus( closeStatus int, ) workflow.WorkflowExecutionCloseStatus { diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 6db7b1d016..12e64dee3e 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -459,7 +459,6 @@ func (e *historyEngineImpl) StartWorkflowExecution( if err != nil { if t, ok := err.(*persistence.WorkflowExecutionAlreadyStartedError); ok { if t.StartRequestID == *request.RequestId { - e.deleteEvents(domainID, execution, eventStoreVersion, msBuilder.GetCurrentBranch()) return &workflow.StartWorkflowExecutionResponse{ RunId: common.StringPtr(t.RunID), }, nil @@ -467,7 +466,6 @@ func (e *historyEngineImpl) StartWorkflowExecution( } if msBuilder.GetCurrentVersion() < t.LastWriteVersion { - e.deleteEvents(domainID, execution, eventStoreVersion, msBuilder.GetCurrentBranch()) return nil, ce.NewDomainNotActiveError( *request.Domain, clusterMetadata.GetCurrentClusterName(), @@ -488,7 +486,6 @@ func (e *historyEngineImpl) StartWorkflowExecution( execution, startRequest.StartRequest.GetWorkflowIdReusePolicy(), ); err != nil { - e.deleteEvents(domainID, execution, eventStoreVersion, msBuilder.GetCurrentBranch()) return nil, err } err = context.createWorkflowExecution( @@ -1560,7 +1557,6 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution( ) if t, ok := err.(*persistence.WorkflowExecutionAlreadyStartedError); ok { - e.deleteEvents(domainID, execution, eventStoreVersion, msBuilder.GetCurrentBranch()) if t.StartRequestID == *request.RequestId { return &workflow.StartWorkflowExecutionResponse{ RunId: common.StringPtr(t.RunID), @@ -1965,42 +1961,6 @@ func createDeleteHistoryEventTimerTask( } } -func (e *historyEngineImpl) deleteEvents( - domainID string, - execution workflow.WorkflowExecution, - eventStoreVersion int32, - branchToken []byte, -) { - - var err error - defer func() { - // This is the only code path that deleting history could cause history corruption(missing first batch). - // We will use this warn log to verify this issue: https://github.com/uber/cadence/issues/2441 - e.logger.Warn("encounter WorkflowExecutionAlreadyStartedError, deleting duplicated history", - tag.ShardID(e.shard.GetShardID()), - tag.WorkflowDomainID(domainID), - tag.WorkflowID(execution.GetWorkflowId()), - tag.WorkflowRunID(execution.GetRunId()), - tag.Number(int64(eventStoreVersion)), - tag.Error(err)) - }() - // We created the history events but failed to create workflow execution, so cleanup the history which could cause - // us to leak history events which are never cleaned up. Cleaning up the events is absolutely safe here as they - // are always created for a unique run_id which is not visible beyond this call yet. - // TODO: Handle error on deletion of execution history - if eventStoreVersion == persistence.EventStoreVersionV2 { - err = e.historyV2Mgr.DeleteHistoryBranch(&persistence.DeleteHistoryBranchRequest{ - BranchToken: branchToken, - ShardID: common.IntPtr(e.shard.GetShardID()), - }) - } else { - err = e.historyMgr.DeleteWorkflowExecutionHistory(&persistence.DeleteWorkflowExecutionHistoryRequest{ - DomainID: domainID, - Execution: execution, - }) - } -} - func (e *historyEngineImpl) failDecision( context workflowExecutionContext, scheduleID int64, diff --git a/service/history/historyEngine2_test.go b/service/history/historyEngine2_test.go index ab0a68b976..f5ea7acaf4 100644 --- a/service/history/historyEngine2_test.go +++ b/service/history/historyEngine2_test.go @@ -1193,7 +1193,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_Dedup() { CloseStatus: p.WorkflowCloseStatusNone, LastWriteVersion: lastWriteVersion, }).Once() - s.mockHistoryV2Mgr.On("DeleteHistoryBranch", mock.Anything).Return(nil).Once() s.mockMetadataMgr.On("GetDomain", mock.Anything).Return( &p.GetDomainResponse{ Info: &p.DomainInfo{ID: domainID}, @@ -1244,7 +1243,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_NonDeDup() { CloseStatus: p.WorkflowCloseStatusNone, LastWriteVersion: lastWriteVersion, }).Once() - s.mockHistoryV2Mgr.On("DeleteHistoryBranch", mock.Anything).Return(nil).Once() s.mockMetadataMgr.On("GetDomain", mock.Anything).Return( &p.GetDomainResponse{ Info: &p.DomainInfo{ID: domainID}, @@ -1279,51 +1277,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_NonDeDup() { s.Nil(resp) } -func (s *engine2Suite) TestStartWorkflowExecution_TimeoutError() { - domainID := validDomainID - workflowID := "workflowID" - workflowType := "workflowType" - taskList := "testTaskList" - identity := "testIdentity" - - s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once() - s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(nil, &p.TimeoutError{ - Msg: "random message", - }).Once() - // NOTE: should not delete history for timeout because timeout is unknown instead of failure - //s.mockHistoryV2Mgr.On("DeleteHistoryBranch", mock.Anything).Return(nil).Once() - s.mockMetadataMgr.On("GetDomain", mock.Anything).Return( - &p.GetDomainResponse{ - Info: &p.DomainInfo{ID: domainID}, - Config: &p.DomainConfig{Retention: 1}, - ReplicationConfig: &p.DomainReplicationConfig{ - ActiveClusterName: cluster.TestCurrentClusterName, - Clusters: []*p.ClusterReplicationConfig{ - &p.ClusterReplicationConfig{ClusterName: cluster.TestCurrentClusterName}, - }, - }, - TableVersion: p.DomainTableVersionV1, - }, - nil, - ) - - resp, err := s.historyEngine.StartWorkflowExecution(context.Background(), &h.StartWorkflowExecutionRequest{ - DomainUUID: common.StringPtr(domainID), - StartRequest: &workflow.StartWorkflowExecutionRequest{ - Domain: common.StringPtr(domainID), - WorkflowId: common.StringPtr(workflowID), - WorkflowType: &workflow.WorkflowType{Name: common.StringPtr(workflowType)}, - TaskList: &workflow.TaskList{Name: common.StringPtr(taskList)}, - ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1), - TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2), - Identity: common.StringPtr(identity), - RequestId: common.StringPtr("newRequestID"), - }, - }) - s.True(p.IsTimeoutError(err)) - s.Nil(resp) -} - func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevSuccess() { domainID := validDomainID workflowID := "workflowID" @@ -1380,8 +1333,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevSuccess() { request.PreviousLastWriteVersion == lastWriteVersion }), ).Return(&p.CreateWorkflowExecutionResponse{}, nil).Once() - } else { - s.mockHistoryV2Mgr.On("DeleteHistoryBranch", mock.Anything).Return(nil).Once() } resp, err := s.historyEngine.StartWorkflowExecution(context.Background(), &h.StartWorkflowExecutionRequest{ @@ -1477,8 +1428,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevFail() { request.PreviousLastWriteVersion == lastWriteVersion }), ).Return(&p.CreateWorkflowExecutionResponse{}, nil).Once() - } else { - s.mockHistoryV2Mgr.On("DeleteHistoryBranch", mock.Anything).Return(nil).Once() } resp, err := s.historyEngine.StartWorkflowExecution(context.Background(), &h.StartWorkflowExecutionRequest{ @@ -1780,7 +1729,6 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_Start_DuplicateReque s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once() s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once() s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(nil, workflowAlreadyStartedErr).Once() - s.mockHistoryV2Mgr.On("DeleteHistoryBranch", mock.Anything).Return(nil).Once() s.mockMetadataMgr.On("GetDomain", mock.Anything).Return( &p.GetDomainResponse{ Info: &p.DomainInfo{ID: domainID}, @@ -1847,7 +1795,6 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_Start_WorkflowAlread s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once() s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once() s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(nil, workflowAlreadyStartedErr).Once() - s.mockHistoryV2Mgr.On("DeleteHistoryBranch", mock.Anything).Return(nil).Once() s.mockMetadataMgr.On("GetDomain", mock.Anything).Return( &p.GetDomainResponse{ Info: &p.DomainInfo{ID: domainID},