Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist workflow request ids into Cassandra #5826

Merged
merged 2 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 43 additions & 1 deletion common/persistence/data_manager_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,32 @@ const (
TaskTypeWorkflowBackoffTimer
)

// WorkflowRequestType is the type of workflow request
type WorkflowRequestType int

// Types of workflow requests
const (
WorkflowRequestTypeStart WorkflowRequestType = iota
WorkflowRequestTypeSignal
WorkflowRequestTypeCancel
WorkflowRequestTypeReset
)

// CreateWorkflowRequestMode is the mode of create workflow request
type CreateWorkflowRequestMode int

// Modes of create workflow request
const (
// Fail if data with the same domain_id, workflow_id, request_id exists
// It is used for transactions started by external API requests
// to allow us detecting duplicate requests
CreateWorkflowRequestModeNew CreateWorkflowRequestMode = iota
// Upsert the data without checking duplication
// It is used for transactions started by replication stack to achieve
// eventual consistency
CreateWorkflowRequestModeReplicated
)

// UnknownNumRowsAffected is returned when the number of rows that an API affected cannot be determined
const UnknownNumRowsAffected = -1

Expand Down Expand Up @@ -608,7 +634,8 @@ type (

NewWorkflowSnapshot WorkflowSnapshot

DomainName string
WorkflowRequestMode CreateWorkflowRequestMode
DomainName string
}

// CreateWorkflowExecutionResponse is the response to CreateWorkflowExecutionRequest
Expand Down Expand Up @@ -699,6 +726,8 @@ type (

NewWorkflowSnapshot *WorkflowSnapshot

WorkflowRequestMode CreateWorkflowRequestMode

Encoding common.EncodingType // optional binary encoding type

DomainName string
Expand All @@ -719,6 +748,8 @@ type (
// current workflow
CurrentWorkflowMutation *WorkflowMutation

WorkflowRequestMode CreateWorkflowRequestMode

Encoding common.EncodingType // optional binary encoding type

DomainName string
Expand All @@ -733,6 +764,13 @@ type (
Events []*types.HistoryEvent
}

// WorkflowRequest is used as requestID and it's corresponding failover version container
WorkflowRequest struct {
RequestID string
Version int64
RequestType WorkflowRequestType
}

// WorkflowMutation is used as generic workflow execution state mutation
WorkflowMutation struct {
ExecutionInfo *WorkflowExecutionInfo
Expand All @@ -759,6 +797,8 @@ type (
ReplicationTasks []Task
TimerTasks []Task

WorkflowRequests []*WorkflowRequest

Condition int64
Checksum checksum.Checksum
}
Expand All @@ -781,6 +821,8 @@ type (
ReplicationTasks []Task
TimerTasks []Task

WorkflowRequests []*WorkflowRequest

Condition int64
Checksum checksum.Checksum
}
Expand Down
10 changes: 10 additions & 0 deletions common/persistence/data_store_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ type (
PreviousLastWriteVersion int64

NewWorkflowSnapshot InternalWorkflowSnapshot

WorkflowRequestMode CreateWorkflowRequestMode
}

// InternalGetReplicationTasksResponse is the response to GetReplicationTask
Expand Down Expand Up @@ -424,6 +426,8 @@ type (
UpdateWorkflowMutation InternalWorkflowMutation

NewWorkflowSnapshot *InternalWorkflowSnapshot

WorkflowRequestMode CreateWorkflowRequestMode
}

// InternalConflictResolveWorkflowExecutionRequest is used to reset workflow execution state for Persistence Interface
Expand All @@ -440,6 +444,8 @@ type (

// current workflow
CurrentWorkflowMutation *InternalWorkflowMutation

WorkflowRequestMode CreateWorkflowRequestMode
}

// InternalWorkflowMutation is used as generic workflow execution state mutation for Persistence Interface
Expand Down Expand Up @@ -469,6 +475,8 @@ type (
TimerTasks []Task
ReplicationTasks []Task

WorkflowRequests []*WorkflowRequest

Condition int64

Checksum checksum.Checksum
Expand All @@ -494,6 +502,8 @@ type (
TimerTasks []Task
ReplicationTasks []Task

WorkflowRequests []*WorkflowRequest

Condition int64

Checksum checksum.Checksum
Expand Down
10 changes: 10 additions & 0 deletions common/persistence/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

package persistence

import "fmt"

type (
// TimeoutError is returned when a write operation fails due to a timeout
TimeoutError struct {
Expand Down Expand Up @@ -73,8 +75,16 @@
CloseStatus int
LastWriteVersion int64
}

DuplicateRequestError struct {
RunID string
}
)

func (e *DuplicateRequestError) Error() string {
return fmt.Sprintf("Request has already been applied to runID: %s", e.RunID)

Check warning on line 85 in common/persistence/errors.go

View check run for this annotation

Codecov / codecov/patch

common/persistence/errors.go#L84-L85

Added lines #L84 - L85 were not covered by tests
}

func (e *InvalidPersistenceRequestError) Error() string {
return e.Msg
}
Expand Down
10 changes: 10 additions & 0 deletions common/persistence/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@

UpdateWorkflowMutation: *serializedWorkflowMutation,
NewWorkflowSnapshot: serializedNewWorkflowSnapshot,

WorkflowRequestMode: request.WorkflowRequestMode,
}
msuss := m.statsComputer.computeMutableStateUpdateStats(newRequest)
err = m.persistence.UpdateWorkflowExecution(ctx, newRequest)
Expand Down Expand Up @@ -563,6 +565,8 @@
NewWorkflowSnapshot: serializedNewWorkflowMutation,

CurrentWorkflowMutation: serializedCurrentWorkflowMutation,

WorkflowRequestMode: request.WorkflowRequestMode,

Check warning on line 569 in common/persistence/execution_manager.go

View check run for this annotation

Codecov / codecov/patch

common/persistence/execution_manager.go#L569

Added line #L569 was not covered by tests
}
msuss := m.statsComputer.computeMutableStateConflictResolveStats(newRequest)
err = m.persistence.ConflictResolveWorkflowExecution(ctx, newRequest)
Expand Down Expand Up @@ -593,6 +597,8 @@
PreviousLastWriteVersion: request.PreviousLastWriteVersion,

NewWorkflowSnapshot: *serializedNewWorkflowSnapshot,

WorkflowRequestMode: request.WorkflowRequestMode,

Check warning on line 601 in common/persistence/execution_manager.go

View check run for this annotation

Codecov / codecov/patch

common/persistence/execution_manager.go#L601

Added line #L601 was not covered by tests
}

msuss := m.statsComputer.computeMutableStateCreateStats(newRequest)
Expand Down Expand Up @@ -675,6 +681,8 @@
ReplicationTasks: input.ReplicationTasks,
TimerTasks: input.TimerTasks,

WorkflowRequests: input.WorkflowRequests,

Condition: input.Condition,
Checksum: input.Checksum,
ChecksumData: checksumData,
Expand Down Expand Up @@ -739,6 +747,8 @@
ReplicationTasks: input.ReplicationTasks,
TimerTasks: input.TimerTasks,

WorkflowRequests: input.WorkflowRequests,

Condition: input.Condition,
Checksum: input.Checksum,
ChecksumData: checksumData,
Expand Down
48 changes: 44 additions & 4 deletions common/persistence/nosql/nosql_execution_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@
return nil, err
}

workflowRequestWriteMode, err := getWorkflowRequestWriteMode(request.WorkflowRequestMode)
if err != nil {
return nil, err

Check warning on line 80 in common/persistence/nosql/nosql_execution_store.go

View check run for this annotation

Codecov / codecov/patch

common/persistence/nosql/nosql_execution_store.go#L80

Added line #L80 was not covered by tests
}

currentWorkflowWriteReq, err := d.prepareCurrentWorkflowRequestForCreateWorkflowTxn(domainID, workflowID, runID, executionInfo, lastWriteVersion, request)
if err != nil {
return nil, err
Expand All @@ -94,13 +99,21 @@
return nil, err
}

workflowRequests := d.prepareWorkflowRequestRows(domainID, workflowID, runID, newWorkflow.WorkflowRequests, nil)

shardCondition := &nosqlplugin.ShardCondition{
ShardID: d.shardID,
RangeID: request.RangeID,
}

workflowRequestsWriteRequest := &nosqlplugin.WorkflowRequestsWriteRequest{
Rows: workflowRequests,
WriteMode: workflowRequestWriteMode,
}

err = d.db.InsertWorkflowExecutionWithTasks(
ctx,
workflowRequestsWriteRequest,
currentWorkflowWriteReq, workflowExecutionWriteReq,
transferTasks, crossClusterTasks, replicationTasks, timerTasks,
shardCondition,
Expand Down Expand Up @@ -133,6 +146,10 @@
CloseStatus: conditionFailureErr.WorkflowExecutionAlreadyExists.CloseStatus,
LastWriteVersion: conditionFailureErr.WorkflowExecutionAlreadyExists.LastWriteVersion,
}
case conditionFailureErr.DuplicateRequest != nil:
return nil, &persistence.DuplicateRequestError{
RunID: conditionFailureErr.DuplicateRequest.RunID,
}
default:
// If ever runs into this branch, there is bug in the code either in here, or in the implementation of nosql plugin
err := fmt.Errorf("unsupported conditionFailureReason error")
Expand Down Expand Up @@ -187,6 +204,10 @@
return err
}

workflowRequestWriteMode, err := getWorkflowRequestWriteMode(request.WorkflowRequestMode)
if err != nil {
return err

Check warning on line 209 in common/persistence/nosql/nosql_execution_store.go

View check run for this annotation

Codecov / codecov/patch

common/persistence/nosql/nosql_execution_store.go#L209

Added line #L209 was not covered by tests
}
var currentWorkflowWriteReq *nosqlplugin.CurrentWorkflowWriteRequest

switch request.Mode {
Expand Down Expand Up @@ -269,7 +290,7 @@
var nosqlCrossClusterTasks []*nosqlplugin.CrossClusterTask
var nosqlReplicationTasks []*nosqlplugin.ReplicationTask
var nosqlTimerTasks []*nosqlplugin.TimerTask
var err error
var workflowRequests []*nosqlplugin.WorkflowRequestRow

// 1. current
mutateExecution, err = d.prepareUpdateWorkflowExecutionRequestWithMapsAndEventBuffer(&updateWorkflow)
Expand All @@ -284,6 +305,7 @@
if err != nil {
return err
}
workflowRequests = d.prepareWorkflowRequestRows(domainID, workflowID, runID, updateWorkflow.WorkflowRequests, workflowRequests)

// 2. new
if newWorkflow != nil {
Expand All @@ -300,15 +322,21 @@
if err != nil {
return err
}
workflowRequests = d.prepareWorkflowRequestRows(domainID, workflowID, newWorkflow.ExecutionInfo.RunID, newWorkflow.WorkflowRequests, workflowRequests)

Check warning on line 325 in common/persistence/nosql/nosql_execution_store.go

View check run for this annotation

Codecov / codecov/patch

common/persistence/nosql/nosql_execution_store.go#L325

Added line #L325 was not covered by tests
}

shardCondition := &nosqlplugin.ShardCondition{
ShardID: d.shardID,
RangeID: request.RangeID,
}

workflowRequestsWriteRequest := &nosqlplugin.WorkflowRequestsWriteRequest{
Rows: workflowRequests,
WriteMode: workflowRequestWriteMode,
}

err = d.db.UpdateWorkflowExecutionWithTasks(
ctx, currentWorkflowWriteReq,
ctx, workflowRequestsWriteRequest, currentWorkflowWriteReq,
mutateExecution, insertExecution, nil, // no workflow to reset here
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
shardCondition)
Expand Down Expand Up @@ -336,6 +364,10 @@
return err
}

workflowRequestWriteMode, err := getWorkflowRequestWriteMode(request.WorkflowRequestMode)
if err != nil {
return err

Check warning on line 369 in common/persistence/nosql/nosql_execution_store.go

View check run for this annotation

Codecov / codecov/patch

common/persistence/nosql/nosql_execution_store.go#L369

Added line #L369 was not covered by tests
}
var currentWorkflowWriteReq *nosqlplugin.CurrentWorkflowWriteRequest
var prevRunID string

Expand Down Expand Up @@ -393,7 +425,7 @@
var nosqlCrossClusterTasks []*nosqlplugin.CrossClusterTask
var nosqlReplicationTasks []*nosqlplugin.ReplicationTask
var nosqlTimerTasks []*nosqlplugin.TimerTask
var err error
var workflowRequests []*nosqlplugin.WorkflowRequestRow

// 1. current
if currentWorkflow != nil {
Expand All @@ -409,6 +441,7 @@
if err != nil {
return err
}
workflowRequests = d.prepareWorkflowRequestRows(domainID, workflowID, currentWorkflow.ExecutionInfo.RunID, currentWorkflow.WorkflowRequests, workflowRequests)
}

// 2. reset
Expand All @@ -424,6 +457,7 @@
if err != nil {
return err
}
workflowRequests = d.prepareWorkflowRequestRows(domainID, workflowID, resetWorkflow.ExecutionInfo.RunID, resetWorkflow.WorkflowRequests, workflowRequests)

// 3. new
if newWorkflow != nil {
Expand All @@ -440,15 +474,21 @@
if err != nil {
return err
}
workflowRequests = d.prepareWorkflowRequestRows(domainID, workflowID, newWorkflow.ExecutionInfo.RunID, newWorkflow.WorkflowRequests, workflowRequests)

Check warning on line 477 in common/persistence/nosql/nosql_execution_store.go

View check run for this annotation

Codecov / codecov/patch

common/persistence/nosql/nosql_execution_store.go#L477

Added line #L477 was not covered by tests
}

shardCondition := &nosqlplugin.ShardCondition{
ShardID: d.shardID,
RangeID: request.RangeID,
}

workflowRequestsWriteRequest := &nosqlplugin.WorkflowRequestsWriteRequest{
Rows: workflowRequests,
WriteMode: workflowRequestWriteMode,
}

err = d.db.UpdateWorkflowExecutionWithTasks(
ctx, currentWorkflowWriteReq,
ctx, workflowRequestsWriteRequest, currentWorkflowWriteReq,
mutateExecution, insertExecution, resetExecution,
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
shardCondition)
Expand Down