Skip to content

Commit

Permalink
Begin the migration of executions table operation using atomic counter (
Browse files Browse the repository at this point in the history
#1420)

* Add db version attribute to executions table for better CAS guarantee
* 1.9.x will by default use existing next event ID for CAS, but will also support the new DB version CAS
* 1.10.x will by default use DB version CAS

Last event ID is not enough for CAS operation protection: e.g. activity heartbeat / buffer events.
The newly introduced DB version will be increased by 1 each time execution table is updated (update to mutable state record).
  • Loading branch information
wxing1292 committed Apr 5, 2021
1 parent ea173e0 commit 5767244
Show file tree
Hide file tree
Showing 35 changed files with 352 additions and 129 deletions.
7 changes: 7 additions & 0 deletions common/dynamicconfig/constants.go
Expand Up @@ -63,6 +63,9 @@ var Keys = map[Key]string{
AdminMatchingNamespaceToPartitionDispatchRate: "admin.matchingNamespaceToPartitionDispatchRate",
AdminMatchingNamespaceTaskqueueToPartitionDispatchRate: "admin.matchingNamespaceTaskqueueToPartitionDispatchRate",

// TODO remove this dynamic flag in 1.11.x
EnableDBRecordVersion: "system.enableDBRecordVersion",

// system settings
EnableVisibilitySampling: "system.enableVisibilitySampling",
AdvancedVisibilityWritingMode: "system.advancedVisibilityWritingMode",
Expand Down Expand Up @@ -341,6 +344,10 @@ const (
// AdminMatchingNamespaceTaskqueueToPartitionDispatchRate is the max qps of a task queue partition for a given namespace & task queue
AdminMatchingNamespaceTaskqueueToPartitionDispatchRate

// TODO remove this dynamic flag in 1.11.x
// EnableDBRecordVersion is key for enable db version
EnableDBRecordVersion

// EnableVisibilitySampling is key for enable visibility sampling
EnableVisibilitySampling
// AdvancedVisibilityWritingMode is key for how to write to advanced visibility
Expand Down
5 changes: 5 additions & 0 deletions common/log/tag/tags.go
Expand Up @@ -46,6 +46,11 @@ const LoggingCallAtKey = "logging-call-at"

/////////////////// Common tags defined here ///////////////////

// Operation returns tag for Operation
func Operation(operation string) ZapTag {
return NewStringTag("operation", operation)
}

// Error returns tag for Error
func Error(err error) ZapTag {
return NewErrorTag(err)
Expand Down
47 changes: 47 additions & 0 deletions common/migration/db_record_version.go
@@ -0,0 +1,47 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package migration

import (
"sync/atomic"
)

var (
enableDBRecordVersion int32
)

func IsDBVersionEnabled() bool {
return atomic.LoadInt32(&enableDBRecordVersion) == 1
}

func SetDBVersionFlag(
enabled bool,
) {
if enabled {
atomic.StoreInt32(&enableDBRecordVersion, 1)
} else {
atomic.StoreInt32(&enableDBRecordVersion, 0)
}
}
41 changes: 30 additions & 11 deletions common/persistence/cassandra/cassandraPersistence.go
Expand Up @@ -159,9 +159,9 @@ const (

templateCreateWorkflowExecutionQuery = `INSERT INTO executions (` +
`shard_id, namespace_id, workflow_id, run_id, type, ` +
`execution, execution_encoding, execution_state, execution_state_encoding, next_event_id, ` +
`execution, execution_encoding, execution_state, execution_state_encoding, next_event_id, db_record_version, ` +
`visibility_ts, task_id, checksum, checksum_encoding) ` +
`VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS `
`VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS `

templateCreateTransferTaskQuery = `INSERT INTO executions (` +
`shard_id, type, namespace_id, workflow_id, run_id, transfer, transfer_encoding, visibility_ts, task_id) ` +
Expand Down Expand Up @@ -192,7 +192,7 @@ const (

templateGetWorkflowExecutionQuery = `SELECT execution, execution_encoding, execution_state, execution_state_encoding, next_event_id, activity_map, activity_map_encoding, timer_map, timer_map_encoding, ` +
`child_executions_map, child_executions_map_encoding, request_cancel_map, request_cancel_map_encoding, signal_map, signal_map_encoding, signal_requested, buffered_events_list, ` +
`checksum, checksum_encoding ` +
`checksum, checksum_encoding, db_record_version ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
Expand All @@ -217,23 +217,31 @@ const (
`WHERE shard_id = ? ` +
`and type = ?`

templateCheckWorkflowExecutionQuery = `UPDATE executions ` +
`SET next_event_id = ? ` +
// TODO deprecate templateUpdateWorkflowExecutionQueryDeprecated in favor of templateUpdateWorkflowExecutionQuery
// Deprecated.
templateUpdateWorkflowExecutionQueryDeprecated = `UPDATE executions ` +
`SET execution = ? ` +
`, execution_encoding = ? ` +
`, execution_state = ? ` +
`, execution_state_encoding = ? ` +
`, next_event_id = ? ` +
`, checksum = ? ` +
`, checksum_encoding = ? ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and namespace_id = ? ` +
`and workflow_id = ? ` +
`and run_id = ? ` +
`and visibility_ts = ? ` +
`and task_id = ? ` +
`IF next_event_id = ?`

`IF next_event_id = ? `
templateUpdateWorkflowExecutionQuery = `UPDATE executions ` +
`SET execution = ? ` +
`, execution_encoding = ? ` +
`, execution_state = ? ` +
`, execution_state_encoding = ? ` +
`, next_event_id = ? ` +
`, db_record_version = ? ` +
`, checksum = ? ` +
`, checksum_encoding = ? ` +
`WHERE shard_id = ? ` +
Expand All @@ -243,7 +251,7 @@ const (
`and run_id = ? ` +
`and visibility_ts = ? ` +
`and task_id = ? ` +
`IF next_event_id = ? `
`IF db_record_version = ? `

templateUpdateActivityInfoQuery = `UPDATE executions ` +
`SET activity_map[ ? ] = ?, activity_map_encoding = ? ` +
Expand Down Expand Up @@ -1075,8 +1083,9 @@ func (d *cassandraPersistence) CreateWorkflowExecution(
return &p.CreateWorkflowExecutionResponse{}, nil
}

func (d *cassandraPersistence) GetWorkflowExecution(request *p.GetWorkflowExecutionRequest) (
*p.InternalGetWorkflowExecutionResponse, error) {
func (d *cassandraPersistence) GetWorkflowExecution(
request *p.GetWorkflowExecutionRequest,
) (*p.InternalGetWorkflowExecutionResponse, error) {
execution := request.Execution
query := d.session.Query(templateGetWorkflowExecutionQuery,
d.shardID,
Expand Down Expand Up @@ -1172,7 +1181,17 @@ func (d *cassandraPersistence) GetWorkflowExecution(request *p.GetWorkflowExecut
}
state.Checksum = cs

return &p.InternalGetWorkflowExecutionResponse{State: state}, nil
dbVersion := int64(0)
if dbRecordVersion, ok := result["db_record_version"]; ok {
dbVersion = dbRecordVersion.(int64)
} else {
dbVersion = 0
}

return &p.InternalGetWorkflowExecutionResponse{
State: state,
DBRecordVersion: dbVersion,
}, nil
}

func protoExecutionStateFromRow(result map[string]interface{}) (*persistencespb.WorkflowExecutionState, error) {
Expand Down
65 changes: 45 additions & 20 deletions common/persistence/cassandra/cassandraPersistenceUtil.go
Expand Up @@ -62,6 +62,7 @@ func applyWorkflowMutationBatch(
workflowMutation.NextEventID,
cqlNowTimestampMillis,
workflowMutation.Condition,
workflowMutation.DBRecordVersion,
workflowMutation.Checksum,
); err != nil {
return err
Expand Down Expand Up @@ -172,7 +173,6 @@ func applyWorkflowSnapshotBatchAsReset(
namespaceID := workflowSnapshot.ExecutionInfo.NamespaceId
workflowID := workflowSnapshot.ExecutionInfo.WorkflowId
runID := workflowSnapshot.ExecutionState.RunId
condition := workflowSnapshot.Condition

if err := updateExecution(
batch,
Expand All @@ -181,7 +181,8 @@ func applyWorkflowSnapshotBatchAsReset(
workflowSnapshot.ExecutionState,
workflowSnapshot.NextEventID,
cqlNowTimestampMillis,
condition,
workflowSnapshot.Condition,
workflowSnapshot.DBRecordVersion,
workflowSnapshot.Checksum,
); err != nil {
return err
Expand Down Expand Up @@ -291,6 +292,7 @@ func applyWorkflowSnapshotBatchAsNew(
workflowSnapshot.ExecutionInfo,
workflowSnapshot.ExecutionState,
workflowSnapshot.NextEventID,
workflowSnapshot.DBRecordVersion,
workflowSnapshot.Checksum,
cqlNowTimestampMillis,
); err != nil {
Expand Down Expand Up @@ -387,6 +389,7 @@ func createExecution(
executionInfo *persistencespb.WorkflowExecutionInfo,
executionState *persistencespb.WorkflowExecutionState,
nextEventID int64,
dbRecordVersion int64,
checksum *persistencespb.Checksum,
cqlNowTimestampMillis int64,
) error {
Expand Down Expand Up @@ -432,6 +435,7 @@ func createExecution(
executionStateDatablob.Data,
executionStateDatablob.EncodingType.String(),
nextEventID,
dbRecordVersion,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
checksumDatablob.Data,
Expand All @@ -449,6 +453,7 @@ func updateExecution(
nextEventID int64,
cqlNowTimestampMillis int64,
condition int64,
dbRecordVersion int64,
checksum *persistencespb.Checksum,
) error {

Expand Down Expand Up @@ -481,24 +486,44 @@ func updateExecution(
return err
}

// TODO also need to set the start / current / last write version
batch.Query(templateUpdateWorkflowExecutionQuery,
executionDatablob.Data,
executionDatablob.EncodingType.String(),
executionStateDatablob.Data,
executionStateDatablob.EncodingType.String(),
nextEventID,
checksumDatablob.Data,
checksumDatablob.EncodingType.String(),
shardID,
rowTypeExecution,
namespaceID,
workflowID,
runID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
condition,
)
if dbRecordVersion == 0 {
batch.Query(templateUpdateWorkflowExecutionQueryDeprecated,
executionDatablob.Data,
executionDatablob.EncodingType.String(),
executionStateDatablob.Data,
executionStateDatablob.EncodingType.String(),
nextEventID,
checksumDatablob.Data,
checksumDatablob.EncodingType.String(),
shardID,
rowTypeExecution,
namespaceID,
workflowID,
runID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
condition,
)
} else {
batch.Query(templateUpdateWorkflowExecutionQuery,
executionDatablob.Data,
executionDatablob.EncodingType.String(),
executionStateDatablob.Data,
executionStateDatablob.EncodingType.String(),
nextEventID,
dbRecordVersion,
checksumDatablob.Data,
checksumDatablob.EncodingType.String(),
shardID,
rowTypeExecution,
namespaceID,
workflowID,
runID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
dbRecordVersion-1,
)
}

return nil
}
Expand Down
19 changes: 13 additions & 6 deletions common/persistence/dataInterfaces.go
Expand Up @@ -448,6 +448,7 @@ type (
// GetWorkflowExecutionResponse is the response to GetWorkflowExecutionRequest
GetWorkflowExecutionResponse struct {
State *persistencespb.WorkflowMutableState
DBRecordVersion int64
MutableStateStats *MutableStateStats
}

Expand Down Expand Up @@ -537,7 +538,8 @@ type (
WorkflowMutation struct {
ExecutionInfo *persistencespb.WorkflowExecutionInfo
ExecutionState *persistencespb.WorkflowExecutionState
NextEventID int64
// TODO deprecate NextEventID in favor of DBRecordVersion
NextEventID int64

UpsertActivityInfos map[int64]*persistencespb.ActivityInfo
DeleteActivityInfos map[int64]struct{}
Expand All @@ -559,15 +561,18 @@ type (
TimerTasks []Task
VisibilityTasks []Task

Condition int64
Checksum *persistencespb.Checksum
// TODO deprecate Condition in favor of DBRecordVersion
Condition int64
DBRecordVersion int64
Checksum *persistencespb.Checksum
}

// WorkflowSnapshot is used as generic workflow execution state snapshot
WorkflowSnapshot struct {
ExecutionInfo *persistencespb.WorkflowExecutionInfo
ExecutionState *persistencespb.WorkflowExecutionState
NextEventID int64
// TODO deprecate NextEventID in favor of DBRecordVersion
NextEventID int64

ActivityInfos map[int64]*persistencespb.ActivityInfo
TimerInfos map[string]*persistencespb.TimerInfo
Expand All @@ -581,8 +586,10 @@ type (
TimerTasks []Task
VisibilityTasks []Task

Condition int64
Checksum *persistencespb.Checksum
// TODO deprecate Condition in favor of DBRecordVersion
Condition int64
DBRecordVersion int64
Checksum *persistencespb.Checksum
}

// DeleteWorkflowExecutionRequest is used to delete a workflow execution
Expand Down
11 changes: 7 additions & 4 deletions common/persistence/executionStore.go
Expand Up @@ -92,6 +92,7 @@ func (m *executionManagerImpl) GetWorkflowExecution(
ExecutionState: response.State.ExecutionState,
NextEventId: response.State.NextEventID,
},
DBRecordVersion: response.DBRecordVersion,
}

newResponse.State.BufferedEvents, err = m.DeserializeBufferedEvents(response.State.BufferedEvents)
Expand Down Expand Up @@ -395,8 +396,9 @@ func (m *executionManagerImpl) SerializeWorkflowMutation(
TimerTasks: input.TimerTasks,
VisibilityTasks: input.VisibilityTasks,

Condition: input.Condition,
Checksum: input.Checksum,
Condition: input.Condition,
DBRecordVersion: input.DBRecordVersion,
Checksum: input.Checksum,
}, nil
}

Expand Down Expand Up @@ -432,8 +434,9 @@ func (m *executionManagerImpl) SerializeWorkflowSnapshot(
TimerTasks: input.TimerTasks,
VisibilityTasks: input.VisibilityTasks,

Condition: input.Condition,
Checksum: input.Checksum,
Condition: input.Condition,
DBRecordVersion: input.DBRecordVersion,
Checksum: input.Checksum,
}, nil
}

Expand Down

0 comments on commit 5767244

Please sign in to comment.