Skip to content

Commit

Permalink
Persist workflow request ids into Cassandra
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed Mar 29, 2024
1 parent 831ebf5 commit 3bdcd47
Show file tree
Hide file tree
Showing 20 changed files with 731 additions and 150 deletions.
11 changes: 11 additions & 0 deletions common/persistence/data_manager_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,13 @@ type (
Events []*types.HistoryEvent
}

// WorkflowRequest is used as requestID and it's corresponding failover version container
WorkflowRequest struct {
RequestID string
Version int64
IsReplicationRequest bool
}

// WorkflowMutation is used as generic workflow execution state mutation
WorkflowMutation struct {
ExecutionInfo *WorkflowExecutionInfo
Expand All @@ -759,6 +766,8 @@ type (
ReplicationTasks []Task
TimerTasks []Task

WorkflowRequest *WorkflowRequest

Condition int64
Checksum checksum.Checksum
}
Expand All @@ -781,6 +790,8 @@ type (
ReplicationTasks []Task
TimerTasks []Task

WorkflowRequest *WorkflowRequest

Condition int64
Checksum checksum.Checksum
}
Expand Down
4 changes: 4 additions & 0 deletions common/persistence/data_store_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,8 @@ type (
TimerTasks []Task
ReplicationTasks []Task

WorkflowRequest *WorkflowRequest

Condition int64

Checksum checksum.Checksum
Expand All @@ -493,6 +495,8 @@ type (
TimerTasks []Task
ReplicationTasks []Task

WorkflowRequest *WorkflowRequest

Condition int64

Checksum checksum.Checksum
Expand Down
8 changes: 8 additions & 0 deletions common/persistence/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,16 @@ type (
CloseStatus int
LastWriteVersion int64
}

DuplicateRequestError struct {
RunID string
}
)

func (e *DuplicateRequestError) Error() string {
return "Request has already been applied"

Check warning on line 83 in common/persistence/errors.go

View check run for this annotation

Codecov / codecov/patch

common/persistence/errors.go#L82-L83

Added lines #L82 - L83 were not covered by tests
}

func (e *InvalidPersistenceRequestError) Error() string {
return e.Msg
}
Expand Down
4 changes: 4 additions & 0 deletions common/persistence/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,8 @@ func (m *executionManagerImpl) SerializeWorkflowMutation(
ReplicationTasks: input.ReplicationTasks,
TimerTasks: input.TimerTasks,

WorkflowRequest: input.WorkflowRequest,

Condition: input.Condition,
Checksum: input.Checksum,
ChecksumData: checksumData,
Expand Down Expand Up @@ -739,6 +741,8 @@ func (m *executionManagerImpl) SerializeWorkflowSnapshot(
ReplicationTasks: input.ReplicationTasks,
TimerTasks: input.TimerTasks,

WorkflowRequest: input.WorkflowRequest,

Condition: input.Condition,
Checksum: input.Checksum,
ChecksumData: checksumData,
Expand Down
13 changes: 11 additions & 2 deletions common/persistence/nosql/nosql_execution_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,16 @@ func (d *nosqlExecutionStore) CreateWorkflowExecution(
return nil, err
}

workflowRequest := d.prepareCreateWorkflowRequest(&newWorkflow)

shardCondition := &nosqlplugin.ShardCondition{
ShardID: d.shardID,
RangeID: request.RangeID,
}

err = d.db.InsertWorkflowExecutionWithTasks(
ctx,
workflowRequest,
currentWorkflowWriteReq, workflowExecutionWriteReq,
transferTasks, crossClusterTasks, replicationTasks, timerTasks,
shardCondition,
Expand Down Expand Up @@ -133,6 +136,10 @@ func (d *nosqlExecutionStore) CreateWorkflowExecution(
CloseStatus: conditionFailureErr.WorkflowExecutionAlreadyExists.CloseStatus,
LastWriteVersion: conditionFailureErr.WorkflowExecutionAlreadyExists.LastWriteVersion,
}
case conditionFailureErr.DuplicateRequest != nil:
return nil, &persistence.DuplicateRequestError{
RunID: conditionFailureErr.DuplicateRequest.RunID,
}
default:
// If ever runs into this branch, there is bug in the code either in here, or in the implementation of nosql plugin
err := fmt.Errorf("unsupported conditionFailureReason error")
Expand Down Expand Up @@ -284,6 +291,7 @@ func (d *nosqlExecutionStore) UpdateWorkflowExecution(
if err != nil {
return err
}
workflowRequest := d.prepareUpdateWorkflowRequest(&updateWorkflow)

// 2. new
if newWorkflow != nil {
Expand All @@ -308,7 +316,7 @@ func (d *nosqlExecutionStore) UpdateWorkflowExecution(
}

err = d.db.UpdateWorkflowExecutionWithTasks(
ctx, currentWorkflowWriteReq,
ctx, workflowRequest, currentWorkflowWriteReq,
mutateExecution, insertExecution, nil, // no workflow to reset here
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
shardCondition)
Expand Down Expand Up @@ -424,6 +432,7 @@ func (d *nosqlExecutionStore) ConflictResolveWorkflowExecution(
if err != nil {
return err
}
workflowRequest := d.prepareCreateWorkflowRequest(&resetWorkflow)

Check warning on line 435 in common/persistence/nosql/nosql_execution_store.go

View check run for this annotation

Codecov / codecov/patch

common/persistence/nosql/nosql_execution_store.go#L435

Added line #L435 was not covered by tests

// 3. new
if newWorkflow != nil {
Expand All @@ -448,7 +457,7 @@ func (d *nosqlExecutionStore) ConflictResolveWorkflowExecution(
}

err = d.db.UpdateWorkflowExecutionWithTasks(
ctx, currentWorkflowWriteReq,
ctx, workflowRequest, currentWorkflowWriteReq,

Check warning on line 460 in common/persistence/nosql/nosql_execution_store.go

View check run for this annotation

Codecov / codecov/patch

common/persistence/nosql/nosql_execution_store.go#L460

Added line #L460 was not covered by tests
mutateExecution, insertExecution, resetExecution,
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
shardCondition)
Expand Down
49 changes: 41 additions & 8 deletions common/persistence/nosql/nosql_execution_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestNosqlExecutionStore(t *testing.T) {
setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore {
mockDB := nosqlplugin.NewMockDB(ctrl)
mockDB.EXPECT().
InsertWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
InsertWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil)
return newTestNosqlExecutionStore(mockDB, log.NewNoop())
},
Expand All @@ -65,7 +65,7 @@ func TestNosqlExecutionStore(t *testing.T) {
setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore {
mockDB := nosqlplugin.NewMockDB(ctrl)
mockDB.EXPECT().
InsertWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
InsertWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&persistence.WorkflowExecutionAlreadyStartedError{}).Times(1)
mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(false).AnyTimes()
mockDB.EXPECT().IsTimeoutError(gomock.Any()).Return(false).AnyTimes()
Expand All @@ -84,7 +84,7 @@ func TestNosqlExecutionStore(t *testing.T) {
setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore {
mockDB := nosqlplugin.NewMockDB(ctrl)
mockDB.EXPECT().
InsertWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
InsertWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&persistence.ShardOwnershipLostError{ShardID: shardID, Msg: "shard ownership lost"}).Times(1)
mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(false).AnyTimes()
mockDB.EXPECT().IsTimeoutError(gomock.Any()).Return(false).AnyTimes()
Expand All @@ -103,7 +103,7 @@ func TestNosqlExecutionStore(t *testing.T) {
setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore {
mockDB := nosqlplugin.NewMockDB(ctrl)
mockDB.EXPECT().
InsertWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
InsertWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&persistence.CurrentWorkflowConditionFailedError{Msg: "current workflow condition failed"}).Times(1)
mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(false).AnyTimes()
mockDB.EXPECT().IsTimeoutError(gomock.Any()).Return(false).AnyTimes()
Expand All @@ -122,7 +122,7 @@ func TestNosqlExecutionStore(t *testing.T) {
setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore {
mockDB := nosqlplugin.NewMockDB(ctrl)
mockDB.EXPECT().
InsertWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
InsertWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&types.InternalServiceError{Message: "generic internal service error"}).Times(1)
mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(false).AnyTimes()
mockDB.EXPECT().IsTimeoutError(gomock.Any()).Return(false).AnyTimes()
Expand All @@ -136,6 +136,29 @@ func TestNosqlExecutionStore(t *testing.T) {
},
expectedError: &types.InternalServiceError{},
},
{
name: "CreateWorkflowExecution failure - duplicate request error",
setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore {
mockDB := nosqlplugin.NewMockDB(ctrl)
mockDB.EXPECT().
InsertWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&nosqlplugin.WorkflowOperationConditionFailure{
DuplicateRequest: &nosqlplugin.DuplicateRequest{
RunID: "abc",
},
}).Times(1)
mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(false).AnyTimes()
mockDB.EXPECT().IsTimeoutError(gomock.Any()).Return(false).AnyTimes()
mockDB.EXPECT().IsThrottlingError(gomock.Any()).Return(false).AnyTimes()
mockDB.EXPECT().IsDBUnavailableError(gomock.Any()).Return(false).AnyTimes()
return newTestNosqlExecutionStore(mockDB, log.NewNoop())
},
testFunc: func(store *nosqlExecutionStore) error {
_, err := store.CreateWorkflowExecution(ctx, newCreateWorkflowExecutionRequest())
return err
},
expectedError: &persistence.DuplicateRequestError{},
},
{
name: "GetWorkflowExecution success",
setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore {
Expand Down Expand Up @@ -172,7 +195,7 @@ func TestNosqlExecutionStore(t *testing.T) {
setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore {
mockDB := nosqlplugin.NewMockDB(ctrl)
mockDB.EXPECT().
UpdateWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Nil(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
UpdateWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Nil(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).Times(1)
return newTestNosqlExecutionStore(mockDB, log.NewNoop())
},
Expand Down Expand Up @@ -204,7 +227,7 @@ func TestNosqlExecutionStore(t *testing.T) {
UnknownConditionFailureDetails: common.StringPtr("condition not met"),
}
mockDB.EXPECT().
UpdateWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Nil(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
UpdateWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Nil(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(conditionFailure).Times(1)
return newTestNosqlExecutionStore(mockDB, log.NewNoop())
},
Expand All @@ -218,7 +241,7 @@ func TestNosqlExecutionStore(t *testing.T) {
setupMock: func(ctrl *gomock.Controller) *nosqlExecutionStore {
mockDB := nosqlplugin.NewMockDB(ctrl)
mockDB.EXPECT().
UpdateWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Nil(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
UpdateWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Nil(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(errors.New("database is unavailable")).Times(1)
mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(true).AnyTimes()
return newTestNosqlExecutionStore(mockDB, log.NewNoop())
Expand Down Expand Up @@ -615,6 +638,11 @@ func newUpdateWorkflowExecutionRequest() *persistence.InternalUpdateWorkflowExec
State: persistence.WorkflowStateCreated,
CloseStatus: persistence.WorkflowCloseStatusNone,
},
WorkflowRequest: &persistence.WorkflowRequest{
RequestID: constants.TestRequestID,
Version: 1,
IsReplicationRequest: false,
},
},
}
}
Expand All @@ -627,6 +655,11 @@ func getNewWorkflowSnapshot() persistence.InternalWorkflowSnapshot {
WorkflowID: constants.TestWorkflowID,
RunID: constants.TestRunID,
},
WorkflowRequest: &persistence.WorkflowRequest{
RequestID: constants.TestRequestID,
Version: 1,
IsReplicationRequest: false,
},
}
}
func newTestNosqlExecutionStore(db nosqlplugin.DB, logger log.Logger) *nosqlExecutionStore {
Expand Down
44 changes: 44 additions & 0 deletions common/persistence/nosql/nosql_execution_store_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,50 @@ func (d *nosqlExecutionStore) prepareCreateWorkflowExecutionRequestWithMaps(newW
return executionRequest, nil
}

func (d *nosqlExecutionStore) prepareCreateWorkflowRequest(newWorkflow *persistence.InternalWorkflowSnapshot) *nosqlplugin.WorkflowRequest {
executionInfo := newWorkflow.ExecutionInfo
if newWorkflow.WorkflowRequest != nil {
writeMode := nosqlplugin.WorkflowRequestWriteModeInsert
if newWorkflow.WorkflowRequest.IsReplicationRequest {
writeMode = nosqlplugin.WorkflowRequestWriteModeUpsert

Check warning on line 82 in common/persistence/nosql/nosql_execution_store_util.go

View check run for this annotation

Codecov / codecov/patch

common/persistence/nosql/nosql_execution_store_util.go#L82

Added line #L82 was not covered by tests
}
return &nosqlplugin.WorkflowRequest{
Row: &nosqlplugin.WorkflowRequestRow{
ShardID: d.shardID,
DomainID: executionInfo.DomainID,
WorkflowID: executionInfo.WorkflowID,
RequestID: newWorkflow.WorkflowRequest.RequestID,
Version: newWorkflow.WorkflowRequest.Version,
RunID: executionInfo.RunID,
},
WriteMode: writeMode,
}
}
return nil

Check warning on line 96 in common/persistence/nosql/nosql_execution_store_util.go

View check run for this annotation

Codecov / codecov/patch

common/persistence/nosql/nosql_execution_store_util.go#L96

Added line #L96 was not covered by tests
}

func (d *nosqlExecutionStore) prepareUpdateWorkflowRequest(updateWorkflow *persistence.InternalWorkflowMutation) *nosqlplugin.WorkflowRequest {
executionInfo := updateWorkflow.ExecutionInfo
if updateWorkflow.WorkflowRequest != nil {
writeMode := nosqlplugin.WorkflowRequestWriteModeInsert
if updateWorkflow.WorkflowRequest.IsReplicationRequest {
writeMode = nosqlplugin.WorkflowRequestWriteModeUpsert

Check warning on line 104 in common/persistence/nosql/nosql_execution_store_util.go

View check run for this annotation

Codecov / codecov/patch

common/persistence/nosql/nosql_execution_store_util.go#L104

Added line #L104 was not covered by tests
}
return &nosqlplugin.WorkflowRequest{
Row: &nosqlplugin.WorkflowRequestRow{
ShardID: d.shardID,
DomainID: executionInfo.DomainID,
WorkflowID: executionInfo.WorkflowID,
RequestID: updateWorkflow.WorkflowRequest.RequestID,
Version: updateWorkflow.WorkflowRequest.Version,
RunID: executionInfo.RunID,
},
WriteMode: writeMode,
}
}
return nil

Check warning on line 118 in common/persistence/nosql/nosql_execution_store_util.go

View check run for this annotation

Codecov / codecov/patch

common/persistence/nosql/nosql_execution_store_util.go#L118

Added line #L118 was not covered by tests
}

func (d *nosqlExecutionStore) prepareResetWorkflowExecutionRequestWithMapsAndEventBuffer(resetWorkflow *persistence.InternalWorkflowSnapshot) (*nosqlplugin.WorkflowExecutionRequest, error) {
executionInfo := resetWorkflow.ExecutionInfo
lastWriteVersion := resetWorkflow.LastWriteVersion
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/nosql/nosqlplugin/cassandra/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ const (
rowTypeReplicationTask
rowTypeDLQ
rowTypeCrossClusterTask
rowTypeWorkflowRequest
)

// Guidelines for creating new special UUID constants
Expand Down Expand Up @@ -103,4 +104,6 @@ const (
rowTypeExecutionTaskID = int64(-10)
rowTypeShardTaskID = int64(-11)
emptyInitiatedID = int64(-7)
emptyRequestVersion = int64(-1000)
workflowRequestTTL = 10800
)
18 changes: 14 additions & 4 deletions common/persistence/nosql/nosqlplugin/cassandra/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var _ nosqlplugin.WorkflowCRUD = (*cdb)(nil)

func (db *cdb) InsertWorkflowExecutionWithTasks(
ctx context.Context,
workflowRequest *nosqlplugin.WorkflowRequest,
currentWorkflowRequest *nosqlplugin.CurrentWorkflowWriteRequest,
execution *nosqlplugin.WorkflowExecutionRequest,
transferTasks []*nosqlplugin.TransferTask,
Expand All @@ -52,7 +53,11 @@ func (db *cdb) InsertWorkflowExecutionWithTasks(

batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)

err := createOrUpdateCurrentWorkflow(batch, shardID, domainID, workflowID, currentWorkflowRequest)
err := insertOrUpsertWorkflowRequestRow(batch, workflowRequest)
if err != nil {
return err
}
err = createOrUpdateCurrentWorkflow(batch, shardID, domainID, workflowID, currentWorkflowRequest)
if err != nil {
return err
}
Expand All @@ -68,7 +73,7 @@ func (db *cdb) InsertWorkflowExecutionWithTasks(
createTimerTasks(batch, shardID, domainID, workflowID, timerTasks)
assertShardRangeID(batch, shardID, shardCondition.RangeID)

return executeCreateWorkflowBatchTransaction(db.session, batch, currentWorkflowRequest, execution, shardCondition)
return executeCreateWorkflowBatchTransaction(ctx, db.session, batch, currentWorkflowRequest, execution, shardCondition, workflowRequest)
}

func (db *cdb) SelectCurrentWorkflow(
Expand Down Expand Up @@ -110,6 +115,7 @@ func (db *cdb) SelectCurrentWorkflow(

func (db *cdb) UpdateWorkflowExecutionWithTasks(
ctx context.Context,
workflowRequest *nosqlplugin.WorkflowRequest,
currentWorkflowRequest *nosqlplugin.CurrentWorkflowWriteRequest,
mutatedExecution *nosqlplugin.WorkflowExecutionRequest,
insertedExecution *nosqlplugin.WorkflowExecutionRequest,
Expand Down Expand Up @@ -137,7 +143,11 @@ func (db *cdb) UpdateWorkflowExecutionWithTasks(

batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)

err := createOrUpdateCurrentWorkflow(batch, shardID, domainID, workflowID, currentWorkflowRequest)
err := insertOrUpsertWorkflowRequestRow(batch, workflowRequest)
if err != nil {
return err
}
err = createOrUpdateCurrentWorkflow(batch, shardID, domainID, workflowID, currentWorkflowRequest)
if err != nil {
return err
}
Expand Down Expand Up @@ -169,7 +179,7 @@ func (db *cdb) UpdateWorkflowExecutionWithTasks(
createTimerTasks(batch, shardID, domainID, workflowID, timerTasks)
assertShardRangeID(batch, shardID, shardCondition.RangeID)

return executeUpdateWorkflowBatchTransaction(db.session, batch, currentWorkflowRequest, previousNextEventIDCondition, shardCondition)
return executeUpdateWorkflowBatchTransaction(ctx, db.session, batch, currentWorkflowRequest, previousNextEventIDCondition, shardCondition, workflowRequest)
}

func (db *cdb) SelectWorkflowExecution(ctx context.Context, shardID int, domainID, workflowID, runID string) (*nosqlplugin.WorkflowExecution, error) {
Expand Down

0 comments on commit 3bdcd47

Please sign in to comment.