Skip to content

Commit

Permalink
Replicate workflow state (#2852)
Browse files Browse the repository at this point in the history
* Replicate workflow state
  • Loading branch information
yux0 committed May 20, 2022
1 parent 1eed3a7 commit 150516a
Show file tree
Hide file tree
Showing 25 changed files with 1,306 additions and 448 deletions.
1,025 changes: 655 additions & 370 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions common/persistence/cassandra/mutable_state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,10 +388,10 @@ func (d *MutableStateStore) CreateWorkflowExecution(
var requestCurrentRunID string

switch request.Mode {
case p.CreateWorkflowModeZombie:
case p.CreateWorkflowModeBypassCurrent:
// noop

case p.CreateWorkflowModeWorkflowIDReuse:
case p.CreateWorkflowModeUpdateCurrent:
batch.Query(templateUpdateCurrentWorkflowExecutionForNewQuery,
runID,
newWorkflow.ExecutionStateBlob.Data,
Expand Down Expand Up @@ -1001,7 +1001,7 @@ func (d *MutableStateStore) SetWorkflowExecution(
conflictRecord := make(map[string]interface{})
applied, conflictIter, err := d.Session.MapExecuteBatchCAS(batch, conflictRecord)
if err != nil {
return gocql.ConvertError("ConflictResolveWorkflowExecution", err)
return gocql.ConvertError("SetWorkflowExecution", err)
}
defer func() {
_ = conflictIter.Close()
Expand Down
8 changes: 4 additions & 4 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ const (
// CreateWorkflowModeBrandNew fail if current record exists
// Only applicable for CreateWorkflowExecution
CreateWorkflowModeBrandNew CreateWorkflowMode = iota
// CreateWorkflowModeWorkflowIDReuse update current record only if workflow is closed
// CreateWorkflowModeUpdateCurrent update current record only if workflow is closed
// Only applicable for CreateWorkflowExecution
CreateWorkflowModeWorkflowIDReuse
// CreateWorkflowModeZombie do not update current record since workflow is in zombie state
CreateWorkflowModeUpdateCurrent
// CreateWorkflowModeBypassCurrent do not update current record since workflow is in zombie state
// applicable for CreateWorkflowExecution, UpdateWorkflowExecution
CreateWorkflowModeZombie
CreateWorkflowModeBypassCurrent
)

// UpdateWorkflowMode update mode
Expand Down
7 changes: 3 additions & 4 deletions common/persistence/operationModeValidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func ValidateCreateWorkflowModeState(

switch mode {
case CreateWorkflowModeBrandNew,
CreateWorkflowModeWorkflowIDReuse:
CreateWorkflowModeUpdateCurrent:
if workflowState == enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE {
return newInvalidCreateWorkflowMode(
mode,
Expand All @@ -59,10 +59,9 @@ func ValidateCreateWorkflowModeState(
}
return nil

case CreateWorkflowModeZombie:
case CreateWorkflowModeBypassCurrent:
if workflowState == enumsspb.WORKFLOW_EXECUTION_STATE_CREATED ||
workflowState == enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING ||
workflowState == enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED {
workflowState == enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING {
return newInvalidCreateWorkflowMode(
mode,
workflowState,
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/operationModeValidator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (s *validateOperationWorkflowModeStateSuite) TestCreateMode_UpdateCurrent()

creatModes := []CreateWorkflowMode{
CreateWorkflowModeBrandNew,
CreateWorkflowModeWorkflowIDReuse,
CreateWorkflowModeUpdateCurrent,
}

for state, expectError := range stateToError {
Expand All @@ -92,13 +92,13 @@ func (s *validateOperationWorkflowModeStateSuite) TestCreateMode_BypassCurrent()
stateToError := map[enumsspb.WorkflowExecutionState]bool{
enumsspb.WORKFLOW_EXECUTION_STATE_CREATED: true,
enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING: true,
enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED: true,
enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED: false,
enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE: false,
}

for state, expectError := range stateToError {
testSnapshot := s.newTestWorkflowSnapshot(state)
err := ValidateCreateWorkflowModeState(CreateWorkflowModeZombie, testSnapshot)
err := ValidateCreateWorkflowModeState(CreateWorkflowModeBypassCurrent, testSnapshot)
if !expectError {
s.NoError(err, err)
} else {
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/sql/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (m *sqlExecutionStore) createWorkflowExecutionTx(
// current run ID is already request ID
}

case p.CreateWorkflowModeWorkflowIDReuse:
case p.CreateWorkflowModeUpdateCurrent:
if currentRow == nil {
return nil, extractCurrentWorkflowConflictError(currentRow, "")
}
Expand Down Expand Up @@ -184,7 +184,7 @@ func (m *sqlExecutionStore) createWorkflowExecutionTx(
)
}

case p.CreateWorkflowModeZombie:
case p.CreateWorkflowModeBypassCurrent:
if err := assertRunIDMismatch(
primitives.MustParseUUID(newWorkflow.ExecutionState.RunId),
currentRow,
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/sql/execution_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ func createOrUpdateCurrentExecution(
}

switch createMode {
case p.CreateWorkflowModeWorkflowIDReuse:
case p.CreateWorkflowModeUpdateCurrent:
if err := updateCurrentExecution(ctx,
tx,
shardID,
Expand All @@ -621,7 +621,7 @@ func createOrUpdateCurrentExecution(
if _, err := tx.InsertIntoCurrentExecutions(ctx, &row); err != nil {
return serviceerror.NewUnavailable(fmt.Sprintf("createOrUpdateCurrentExecution failed. Failed to insert into current_executions table. Error: %v", err))
}
case p.CreateWorkflowModeZombie:
case p.CreateWorkflowModeBypassCurrent:
// noop
default:
return fmt.Errorf("createOrUpdateCurrentExecution failed. Unknown workflow creation mode: %v", createMode)
Expand Down
117 changes: 99 additions & 18 deletions common/persistence/tests/execution_mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (s *ExecutionMutableStateSuite) TestCreate_Reuse() {
_, err := s.ExecutionManager.CreateWorkflowExecution(s.Ctx, &p.CreateWorkflowExecutionRequest{
ShardID: s.ShardID,
RangeID: s.RangeID,
Mode: p.CreateWorkflowModeWorkflowIDReuse,
Mode: p.CreateWorkflowModeUpdateCurrent,

PreviousRunID: prevSnapshot.ExecutionState.RunId,
PreviousLastWriteVersion: prevLastWriteVersion,
Expand All @@ -222,7 +222,7 @@ func (s *ExecutionMutableStateSuite) TestCreate_Reuse_CurrentConflict() {
_, err := s.ExecutionManager.CreateWorkflowExecution(s.Ctx, &p.CreateWorkflowExecutionRequest{
ShardID: s.ShardID,
RangeID: s.RangeID,
Mode: p.CreateWorkflowModeWorkflowIDReuse,
Mode: p.CreateWorkflowModeUpdateCurrent,

PreviousRunID: uuid.New().String(),
PreviousLastWriteVersion: rand.Int63(),
Expand Down Expand Up @@ -267,7 +267,7 @@ func (s *ExecutionMutableStateSuite) TestCreate_Zombie() {
_, err := s.ExecutionManager.CreateWorkflowExecution(s.Ctx, &p.CreateWorkflowExecutionRequest{
ShardID: s.ShardID,
RangeID: s.RangeID,
Mode: p.CreateWorkflowModeZombie,
Mode: p.CreateWorkflowModeBypassCurrent,

PreviousRunID: "",
PreviousLastWriteVersion: 0,
Expand All @@ -292,7 +292,7 @@ func (s *ExecutionMutableStateSuite) TestCreate_Conflict() {
_, err := s.ExecutionManager.CreateWorkflowExecution(s.Ctx, &p.CreateWorkflowExecutionRequest{
ShardID: s.ShardID,
RangeID: s.RangeID,
Mode: p.CreateWorkflowModeWorkflowIDReuse,
Mode: p.CreateWorkflowModeUpdateCurrent,

PreviousRunID: newSnapshot.ExecutionState.RunId,
PreviousLastWriteVersion: lastWriteVersion,
Expand All @@ -303,6 +303,87 @@ func (s *ExecutionMutableStateSuite) TestCreate_Conflict() {
s.IsType(&p.WorkflowConditionFailedError{}, err)
}

func (s *ExecutionMutableStateSuite) TestCreate_ClosedWorkflow_BrandNew() {
newSnapshot := s.CreateWorkflow(
rand.Int63(),
enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED,
enumspb.WORKFLOW_EXECUTION_STATUS_TERMINATED,
rand.Int63(),
)

s.AssertEqualWithDB(newSnapshot)
}

func (s *ExecutionMutableStateSuite) TestCreate_ClosedWorkflow_Bypass() {
prevLastWriteVersion := rand.Int63()
_ = s.CreateWorkflow(
prevLastWriteVersion,
enumsspb.WORKFLOW_EXECUTION_STATE_CREATED,
enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
rand.Int63(),
)

newSnapshot := RandomSnapshot(
s.NamespaceID,
s.WorkflowID,
uuid.New().String(),
rand.Int63(),
enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED,
enumspb.WORKFLOW_EXECUTION_STATUS_TERMINATED,
rand.Int63(),
)

_, err := s.ExecutionManager.CreateWorkflowExecution(s.Ctx, &p.CreateWorkflowExecutionRequest{
ShardID: s.ShardID,
RangeID: s.RangeID,
Mode: p.CreateWorkflowModeBypassCurrent,

PreviousRunID: "",
PreviousLastWriteVersion: 0,

NewWorkflowSnapshot: *newSnapshot,
NewWorkflowEvents: nil,
})
s.NoError(err)

s.AssertEqualWithDB(newSnapshot)
}

func (s *ExecutionMutableStateSuite) TestCreate_ClosedWorkflow_UpdateCurrent() {
prevLastWriteVersion := rand.Int63()
prevSnapshot := s.CreateWorkflow(
prevLastWriteVersion,
enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED,
enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
rand.Int63(),
)

newSnapshot := RandomSnapshot(
s.NamespaceID,
s.WorkflowID,
uuid.New().String(),
rand.Int63(),
enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED,
enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
rand.Int63(),
)

_, err := s.ExecutionManager.CreateWorkflowExecution(s.Ctx, &p.CreateWorkflowExecutionRequest{
ShardID: s.ShardID,
RangeID: s.RangeID,
Mode: p.CreateWorkflowModeUpdateCurrent,

PreviousRunID: prevSnapshot.ExecutionState.RunId,
PreviousLastWriteVersion: prevLastWriteVersion,

NewWorkflowSnapshot: *newSnapshot,
NewWorkflowEvents: nil,
})
s.NoError(err)

s.AssertEqualWithDB(newSnapshot)
}

func (s *ExecutionMutableStateSuite) TestUpdate_NotZombie() {
currentSnapshot := s.CreateWorkflow(
rand.Int63(),
Expand Down Expand Up @@ -469,7 +550,7 @@ func (s *ExecutionMutableStateSuite) TestUpdate_Zombie() {
_, err := s.ExecutionManager.CreateWorkflowExecution(s.Ctx, &p.CreateWorkflowExecutionRequest{
ShardID: s.ShardID,
RangeID: s.RangeID,
Mode: p.CreateWorkflowModeZombie,
Mode: p.CreateWorkflowModeBypassCurrent,

PreviousRunID: "",
PreviousLastWriteVersion: 0,
Expand Down Expand Up @@ -557,7 +638,7 @@ func (s *ExecutionMutableStateSuite) TestUpdate_Zombie_Conflict() {
_, err := s.ExecutionManager.CreateWorkflowExecution(s.Ctx, &p.CreateWorkflowExecutionRequest{
ShardID: s.ShardID,
RangeID: s.RangeID,
Mode: p.CreateWorkflowModeZombie,
Mode: p.CreateWorkflowModeBypassCurrent,

PreviousRunID: "",
PreviousLastWriteVersion: 0,
Expand Down Expand Up @@ -612,7 +693,7 @@ func (s *ExecutionMutableStateSuite) TestUpdate_Zombie_WithNew() {
_, err := s.ExecutionManager.CreateWorkflowExecution(s.Ctx, &p.CreateWorkflowExecutionRequest{
ShardID: s.ShardID,
RangeID: s.RangeID,
Mode: p.CreateWorkflowModeZombie,
Mode: p.CreateWorkflowModeBypassCurrent,

PreviousRunID: "",
PreviousLastWriteVersion: 0,
Expand Down Expand Up @@ -678,7 +759,7 @@ func (s *ExecutionMutableStateSuite) TestConflictResolve_SuppressCurrent() {
_, err := s.ExecutionManager.CreateWorkflowExecution(s.Ctx, &p.CreateWorkflowExecutionRequest{
ShardID: s.ShardID,
RangeID: s.RangeID,
Mode: p.CreateWorkflowModeZombie,
Mode: p.CreateWorkflowModeBypassCurrent,

PreviousRunID: "",
PreviousLastWriteVersion: 0,
Expand Down Expand Up @@ -747,7 +828,7 @@ func (s *ExecutionMutableStateSuite) TestConflictResolve_SuppressCurrent_Current
_, err := s.ExecutionManager.CreateWorkflowExecution(s.Ctx, &p.CreateWorkflowExecutionRequest{
ShardID: s.ShardID,
RangeID: s.RangeID,
Mode: p.CreateWorkflowModeZombie,
Mode: p.CreateWorkflowModeBypassCurrent,

PreviousRunID: "",
PreviousLastWriteVersion: 0,
Expand Down Expand Up @@ -816,7 +897,7 @@ func (s *ExecutionMutableStateSuite) TestConflictResolve_SuppressCurrent_Conflic
_, err := s.ExecutionManager.CreateWorkflowExecution(s.Ctx, &p.CreateWorkflowExecutionRequest{
ShardID: s.ShardID,
RangeID: s.RangeID,
Mode: p.CreateWorkflowModeZombie,
Mode: p.CreateWorkflowModeBypassCurrent,

PreviousRunID: "",
PreviousLastWriteVersion: 0,
Expand Down Expand Up @@ -885,7 +966,7 @@ func (s *ExecutionMutableStateSuite) TestConflictResolve_SuppressCurrent_Conflic
_, err := s.ExecutionManager.CreateWorkflowExecution(s.Ctx, &p.CreateWorkflowExecutionRequest{
ShardID: s.ShardID,
RangeID: s.RangeID,
Mode: p.CreateWorkflowModeZombie,
Mode: p.CreateWorkflowModeBypassCurrent,

PreviousRunID: "",
PreviousLastWriteVersion: 0,
Expand Down Expand Up @@ -954,7 +1035,7 @@ func (s *ExecutionMutableStateSuite) TestConflictResolve_SuppressCurrent_WithNew
_, err := s.ExecutionManager.CreateWorkflowExecution(s.Ctx, &p.CreateWorkflowExecutionRequest{
ShardID: s.ShardID,
RangeID: s.RangeID,
Mode: p.CreateWorkflowModeZombie,
Mode: p.CreateWorkflowModeBypassCurrent,

PreviousRunID: "",
PreviousLastWriteVersion: 0,
Expand Down Expand Up @@ -1068,7 +1149,7 @@ func (s *ExecutionMutableStateSuite) TestConflictResolve_ResetCurrent_CurrentCon
_, err := s.ExecutionManager.CreateWorkflowExecution(s.Ctx, &p.CreateWorkflowExecutionRequest{
ShardID: s.ShardID,
RangeID: s.RangeID,
Mode: p.CreateWorkflowModeZombie,
Mode: p.CreateWorkflowModeBypassCurrent,

PreviousRunID: "",
PreviousLastWriteVersion: 0,
Expand Down Expand Up @@ -1208,7 +1289,7 @@ func (s *ExecutionMutableStateSuite) TestConflictResolve_Zombie() {
_, err := s.ExecutionManager.CreateWorkflowExecution(s.Ctx, &p.CreateWorkflowExecutionRequest{
ShardID: s.ShardID,
RangeID: s.RangeID,
Mode: p.CreateWorkflowModeZombie,
Mode: p.CreateWorkflowModeBypassCurrent,

PreviousRunID: "",
PreviousLastWriteVersion: 0,
Expand Down Expand Up @@ -1302,7 +1383,7 @@ func (s *ExecutionMutableStateSuite) TestConflictResolve_Zombie_Conflict() {
_, err := s.ExecutionManager.CreateWorkflowExecution(s.Ctx, &p.CreateWorkflowExecutionRequest{
ShardID: s.ShardID,
RangeID: s.RangeID,
Mode: p.CreateWorkflowModeZombie,
Mode: p.CreateWorkflowModeBypassCurrent,

PreviousRunID: "",
PreviousLastWriteVersion: 0,
Expand Down Expand Up @@ -1360,7 +1441,7 @@ func (s *ExecutionMutableStateSuite) TestConflictResolve_Zombie_WithNew() {
_, err := s.ExecutionManager.CreateWorkflowExecution(s.Ctx, &p.CreateWorkflowExecutionRequest{
ShardID: s.ShardID,
RangeID: s.RangeID,
Mode: p.CreateWorkflowModeZombie,
Mode: p.CreateWorkflowModeBypassCurrent,

PreviousRunID: "",
PreviousLastWriteVersion: 0,
Expand Down Expand Up @@ -1525,7 +1606,7 @@ func (s *ExecutionMutableStateSuite) TestDeleteCurrent_NotCurrent() {
_, err := s.ExecutionManager.CreateWorkflowExecution(s.Ctx, &p.CreateWorkflowExecutionRequest{
ShardID: s.ShardID,
RangeID: s.RangeID,
Mode: p.CreateWorkflowModeZombie,
Mode: p.CreateWorkflowModeBypassCurrent,

PreviousRunID: "",
PreviousLastWriteVersion: 0,
Expand Down Expand Up @@ -1567,7 +1648,7 @@ func (s *ExecutionMutableStateSuite) TestDelete_Exists() {
_, err := s.ExecutionManager.CreateWorkflowExecution(s.Ctx, &p.CreateWorkflowExecutionRequest{
ShardID: s.ShardID,
RangeID: s.RangeID,
Mode: p.CreateWorkflowModeZombie,
Mode: p.CreateWorkflowModeBypassCurrent,

PreviousRunID: "",
PreviousLastWriteVersion: 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,11 @@ message ReplicateEventsV2Request {
temporal.api.common.v1.DataBlob new_run_events = 5;
}

message ReplicateWorkflowStateRequest {
temporal.server.api.persistence.v1.WorkflowMutableState workflow_state = 1;
string remote_cluster = 2;
}

message ReplicateEventsV2Response {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func startAndSignalWithoutCurrentWorkflow(
prevRunID := ""
prevLastWriteVersion := int64(0)
if casPredicate != nil {
createMode = persistence.CreateWorkflowModeWorkflowIDReuse
createMode = persistence.CreateWorkflowModeUpdateCurrent
prevRunID = casPredicate.RunID
prevLastWriteVersion = casPredicate.LastWriteVersion
if err := api.NewWorkflowVersionCheck(shard, casPredicate.LastWriteVersion, newWorkflowContext.GetMutableState()); err != nil {
Expand Down

0 comments on commit 150516a

Please sign in to comment.