From 5767244334af49f3e900540d914ddbb1d2c5d564 Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Mon, 5 Apr 2021 15:03:57 -0700 Subject: [PATCH] Begin the migration of executions table operation using atomic counter (#1420) * Add db version attribute to executions table for better CAS guarantee * 1.9.x will by default use existing next event ID for CAS, but will also support the new DB version CAS * 1.10.x will by default use DB version CAS Last event ID is not enough for CAS operation protection: e.g. activity heartbeat / buffer events. The newly introduced DB version will be increased by 1 each time execution table is updated (update to mutable state record). --- common/dynamicconfig/constants.go | 7 ++ common/log/tag/tags.go | 5 ++ common/migration/db_record_version.go | 47 ++++++++++++++ .../cassandra/cassandraPersistence.go | 41 ++++++++---- .../cassandra/cassandraPersistenceUtil.go | 65 +++++++++++++------ common/persistence/dataInterfaces.go | 19 ++++-- common/persistence/executionStore.go | 11 ++-- common/persistence/persistenceInterface.go | 10 ++- common/persistence/sql/sql_execution.go | 15 ++++- common/persistence/sql/sql_execution_util.go | 45 +++++++++---- .../sql/sqlplugin/history_execution.go | 11 +++- .../sql/sqlplugin/mysql/execution.go | 24 +++---- .../sql/sqlplugin/postgresql/execution.go | 24 +++---- .../sqlplugin/tests/history_execution_test.go | 6 +- common/rpc/interceptor/telemetry.go | 9 +-- schema/cassandra/temporal/schema.cql | 1 + .../temporal/versioned/v1.5/executions.cql | 1 + .../temporal/versioned/v1.5/manifest.json | 8 +++ schema/cassandra/version.go | 2 +- schema/mysql/v57/temporal/schema.sql | 1 + .../temporal/versioned/v1.5/executions.sql | 1 + .../v57/temporal/versioned/v1.5/manifest.json | 8 +++ schema/mysql/version.go | 2 +- schema/postgresql/v96/temporal/schema.sql | 1 + .../temporal/versioned/v1.5/executions.sql | 1 + .../v96/temporal/versioned/v1.5/manifest.json | 8 +++ schema/postgresql/version.go | 2 +- service/history/configs/config.go | 9 ++- service/history/mutableState.go | 6 +- service/history/mutableStateBuilder.go | 30 +++++++-- service/history/mutableStateBuilder_test.go | 11 ++-- service/history/mutableState_mock.go | 21 +++--- service/history/nDCConflictResolver_test.go | 10 +-- service/history/service.go | 15 +++-- service/history/workflowExecutionContext.go | 4 +- 35 files changed, 352 insertions(+), 129 deletions(-) create mode 100644 common/migration/db_record_version.go create mode 100644 schema/cassandra/temporal/versioned/v1.5/executions.cql create mode 100644 schema/cassandra/temporal/versioned/v1.5/manifest.json create mode 100644 schema/mysql/v57/temporal/versioned/v1.5/executions.sql create mode 100644 schema/mysql/v57/temporal/versioned/v1.5/manifest.json create mode 100644 schema/postgresql/v96/temporal/versioned/v1.5/executions.sql create mode 100644 schema/postgresql/v96/temporal/versioned/v1.5/manifest.json diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index ac1063f09fa..22f9568f0a9 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -63,6 +63,9 @@ var Keys = map[Key]string{ AdminMatchingNamespaceToPartitionDispatchRate: "admin.matchingNamespaceToPartitionDispatchRate", AdminMatchingNamespaceTaskqueueToPartitionDispatchRate: "admin.matchingNamespaceTaskqueueToPartitionDispatchRate", + // TODO remove this dynamic flag in 1.11.x + EnableDBRecordVersion: "system.enableDBRecordVersion", + // system settings EnableVisibilitySampling: "system.enableVisibilitySampling", AdvancedVisibilityWritingMode: "system.advancedVisibilityWritingMode", @@ -341,6 +344,10 @@ const ( // AdminMatchingNamespaceTaskqueueToPartitionDispatchRate is the max qps of a task queue partition for a given namespace & task queue AdminMatchingNamespaceTaskqueueToPartitionDispatchRate + // TODO remove this dynamic flag in 1.11.x + // EnableDBRecordVersion is key for enable db version + EnableDBRecordVersion + // EnableVisibilitySampling is key for enable visibility sampling EnableVisibilitySampling // AdvancedVisibilityWritingMode is key for how to write to advanced visibility diff --git a/common/log/tag/tags.go b/common/log/tag/tags.go index 63312697121..2e1893417ea 100644 --- a/common/log/tag/tags.go +++ b/common/log/tag/tags.go @@ -46,6 +46,11 @@ const LoggingCallAtKey = "logging-call-at" /////////////////// Common tags defined here /////////////////// +// Operation returns tag for Operation +func Operation(operation string) ZapTag { + return NewStringTag("operation", operation) +} + // Error returns tag for Error func Error(err error) ZapTag { return NewErrorTag(err) diff --git a/common/migration/db_record_version.go b/common/migration/db_record_version.go new file mode 100644 index 00000000000..5aeeef25d79 --- /dev/null +++ b/common/migration/db_record_version.go @@ -0,0 +1,47 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package migration + +import ( + "sync/atomic" +) + +var ( + enableDBRecordVersion int32 +) + +func IsDBVersionEnabled() bool { + return atomic.LoadInt32(&enableDBRecordVersion) == 1 +} + +func SetDBVersionFlag( + enabled bool, +) { + if enabled { + atomic.StoreInt32(&enableDBRecordVersion, 1) + } else { + atomic.StoreInt32(&enableDBRecordVersion, 0) + } +} diff --git a/common/persistence/cassandra/cassandraPersistence.go b/common/persistence/cassandra/cassandraPersistence.go index b7c5b4f916d..317ba5cdff7 100644 --- a/common/persistence/cassandra/cassandraPersistence.go +++ b/common/persistence/cassandra/cassandraPersistence.go @@ -159,9 +159,9 @@ const ( templateCreateWorkflowExecutionQuery = `INSERT INTO executions (` + `shard_id, namespace_id, workflow_id, run_id, type, ` + - `execution, execution_encoding, execution_state, execution_state_encoding, next_event_id, ` + + `execution, execution_encoding, execution_state, execution_state_encoding, next_event_id, db_record_version, ` + `visibility_ts, task_id, checksum, checksum_encoding) ` + - `VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS ` + `VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS ` templateCreateTransferTaskQuery = `INSERT INTO executions (` + `shard_id, type, namespace_id, workflow_id, run_id, transfer, transfer_encoding, visibility_ts, task_id) ` + @@ -192,7 +192,7 @@ const ( templateGetWorkflowExecutionQuery = `SELECT execution, execution_encoding, execution_state, execution_state_encoding, next_event_id, activity_map, activity_map_encoding, timer_map, timer_map_encoding, ` + `child_executions_map, child_executions_map_encoding, request_cancel_map, request_cancel_map_encoding, signal_map, signal_map_encoding, signal_requested, buffered_events_list, ` + - `checksum, checksum_encoding ` + + `checksum, checksum_encoding, db_record_version ` + `FROM executions ` + `WHERE shard_id = ? ` + `and type = ? ` + @@ -217,8 +217,16 @@ const ( `WHERE shard_id = ? ` + `and type = ?` - templateCheckWorkflowExecutionQuery = `UPDATE executions ` + - `SET next_event_id = ? ` + + // TODO deprecate templateUpdateWorkflowExecutionQueryDeprecated in favor of templateUpdateWorkflowExecutionQuery + // Deprecated. + templateUpdateWorkflowExecutionQueryDeprecated = `UPDATE executions ` + + `SET execution = ? ` + + `, execution_encoding = ? ` + + `, execution_state = ? ` + + `, execution_state_encoding = ? ` + + `, next_event_id = ? ` + + `, checksum = ? ` + + `, checksum_encoding = ? ` + `WHERE shard_id = ? ` + `and type = ? ` + `and namespace_id = ? ` + @@ -226,14 +234,14 @@ const ( `and run_id = ? ` + `and visibility_ts = ? ` + `and task_id = ? ` + - `IF next_event_id = ?` - + `IF next_event_id = ? ` templateUpdateWorkflowExecutionQuery = `UPDATE executions ` + `SET execution = ? ` + `, execution_encoding = ? ` + `, execution_state = ? ` + `, execution_state_encoding = ? ` + `, next_event_id = ? ` + + `, db_record_version = ? ` + `, checksum = ? ` + `, checksum_encoding = ? ` + `WHERE shard_id = ? ` + @@ -243,7 +251,7 @@ const ( `and run_id = ? ` + `and visibility_ts = ? ` + `and task_id = ? ` + - `IF next_event_id = ? ` + `IF db_record_version = ? ` templateUpdateActivityInfoQuery = `UPDATE executions ` + `SET activity_map[ ? ] = ?, activity_map_encoding = ? ` + @@ -1075,8 +1083,9 @@ func (d *cassandraPersistence) CreateWorkflowExecution( return &p.CreateWorkflowExecutionResponse{}, nil } -func (d *cassandraPersistence) GetWorkflowExecution(request *p.GetWorkflowExecutionRequest) ( - *p.InternalGetWorkflowExecutionResponse, error) { +func (d *cassandraPersistence) GetWorkflowExecution( + request *p.GetWorkflowExecutionRequest, +) (*p.InternalGetWorkflowExecutionResponse, error) { execution := request.Execution query := d.session.Query(templateGetWorkflowExecutionQuery, d.shardID, @@ -1172,7 +1181,17 @@ func (d *cassandraPersistence) GetWorkflowExecution(request *p.GetWorkflowExecut } state.Checksum = cs - return &p.InternalGetWorkflowExecutionResponse{State: state}, nil + dbVersion := int64(0) + if dbRecordVersion, ok := result["db_record_version"]; ok { + dbVersion = dbRecordVersion.(int64) + } else { + dbVersion = 0 + } + + return &p.InternalGetWorkflowExecutionResponse{ + State: state, + DBRecordVersion: dbVersion, + }, nil } func protoExecutionStateFromRow(result map[string]interface{}) (*persistencespb.WorkflowExecutionState, error) { diff --git a/common/persistence/cassandra/cassandraPersistenceUtil.go b/common/persistence/cassandra/cassandraPersistenceUtil.go index 942e17f35a3..8fbbfabf238 100644 --- a/common/persistence/cassandra/cassandraPersistenceUtil.go +++ b/common/persistence/cassandra/cassandraPersistenceUtil.go @@ -62,6 +62,7 @@ func applyWorkflowMutationBatch( workflowMutation.NextEventID, cqlNowTimestampMillis, workflowMutation.Condition, + workflowMutation.DBRecordVersion, workflowMutation.Checksum, ); err != nil { return err @@ -172,7 +173,6 @@ func applyWorkflowSnapshotBatchAsReset( namespaceID := workflowSnapshot.ExecutionInfo.NamespaceId workflowID := workflowSnapshot.ExecutionInfo.WorkflowId runID := workflowSnapshot.ExecutionState.RunId - condition := workflowSnapshot.Condition if err := updateExecution( batch, @@ -181,7 +181,8 @@ func applyWorkflowSnapshotBatchAsReset( workflowSnapshot.ExecutionState, workflowSnapshot.NextEventID, cqlNowTimestampMillis, - condition, + workflowSnapshot.Condition, + workflowSnapshot.DBRecordVersion, workflowSnapshot.Checksum, ); err != nil { return err @@ -291,6 +292,7 @@ func applyWorkflowSnapshotBatchAsNew( workflowSnapshot.ExecutionInfo, workflowSnapshot.ExecutionState, workflowSnapshot.NextEventID, + workflowSnapshot.DBRecordVersion, workflowSnapshot.Checksum, cqlNowTimestampMillis, ); err != nil { @@ -387,6 +389,7 @@ func createExecution( executionInfo *persistencespb.WorkflowExecutionInfo, executionState *persistencespb.WorkflowExecutionState, nextEventID int64, + dbRecordVersion int64, checksum *persistencespb.Checksum, cqlNowTimestampMillis int64, ) error { @@ -432,6 +435,7 @@ func createExecution( executionStateDatablob.Data, executionStateDatablob.EncodingType.String(), nextEventID, + dbRecordVersion, defaultVisibilityTimestamp, rowTypeExecutionTaskID, checksumDatablob.Data, @@ -449,6 +453,7 @@ func updateExecution( nextEventID int64, cqlNowTimestampMillis int64, condition int64, + dbRecordVersion int64, checksum *persistencespb.Checksum, ) error { @@ -481,24 +486,44 @@ func updateExecution( return err } - // TODO also need to set the start / current / last write version - batch.Query(templateUpdateWorkflowExecutionQuery, - executionDatablob.Data, - executionDatablob.EncodingType.String(), - executionStateDatablob.Data, - executionStateDatablob.EncodingType.String(), - nextEventID, - checksumDatablob.Data, - checksumDatablob.EncodingType.String(), - shardID, - rowTypeExecution, - namespaceID, - workflowID, - runID, - defaultVisibilityTimestamp, - rowTypeExecutionTaskID, - condition, - ) + if dbRecordVersion == 0 { + batch.Query(templateUpdateWorkflowExecutionQueryDeprecated, + executionDatablob.Data, + executionDatablob.EncodingType.String(), + executionStateDatablob.Data, + executionStateDatablob.EncodingType.String(), + nextEventID, + checksumDatablob.Data, + checksumDatablob.EncodingType.String(), + shardID, + rowTypeExecution, + namespaceID, + workflowID, + runID, + defaultVisibilityTimestamp, + rowTypeExecutionTaskID, + condition, + ) + } else { + batch.Query(templateUpdateWorkflowExecutionQuery, + executionDatablob.Data, + executionDatablob.EncodingType.String(), + executionStateDatablob.Data, + executionStateDatablob.EncodingType.String(), + nextEventID, + dbRecordVersion, + checksumDatablob.Data, + checksumDatablob.EncodingType.String(), + shardID, + rowTypeExecution, + namespaceID, + workflowID, + runID, + defaultVisibilityTimestamp, + rowTypeExecutionTaskID, + dbRecordVersion-1, + ) + } return nil } diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index 35903b90983..4fc5fb1c478 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -448,6 +448,7 @@ type ( // GetWorkflowExecutionResponse is the response to GetWorkflowExecutionRequest GetWorkflowExecutionResponse struct { State *persistencespb.WorkflowMutableState + DBRecordVersion int64 MutableStateStats *MutableStateStats } @@ -537,7 +538,8 @@ type ( WorkflowMutation struct { ExecutionInfo *persistencespb.WorkflowExecutionInfo ExecutionState *persistencespb.WorkflowExecutionState - NextEventID int64 + // TODO deprecate NextEventID in favor of DBRecordVersion + NextEventID int64 UpsertActivityInfos map[int64]*persistencespb.ActivityInfo DeleteActivityInfos map[int64]struct{} @@ -559,15 +561,18 @@ type ( TimerTasks []Task VisibilityTasks []Task - Condition int64 - Checksum *persistencespb.Checksum + // TODO deprecate Condition in favor of DBRecordVersion + Condition int64 + DBRecordVersion int64 + Checksum *persistencespb.Checksum } // WorkflowSnapshot is used as generic workflow execution state snapshot WorkflowSnapshot struct { ExecutionInfo *persistencespb.WorkflowExecutionInfo ExecutionState *persistencespb.WorkflowExecutionState - NextEventID int64 + // TODO deprecate NextEventID in favor of DBRecordVersion + NextEventID int64 ActivityInfos map[int64]*persistencespb.ActivityInfo TimerInfos map[string]*persistencespb.TimerInfo @@ -581,8 +586,10 @@ type ( TimerTasks []Task VisibilityTasks []Task - Condition int64 - Checksum *persistencespb.Checksum + // TODO deprecate Condition in favor of DBRecordVersion + Condition int64 + DBRecordVersion int64 + Checksum *persistencespb.Checksum } // DeleteWorkflowExecutionRequest is used to delete a workflow execution diff --git a/common/persistence/executionStore.go b/common/persistence/executionStore.go index 5addb697fef..d40b7251603 100644 --- a/common/persistence/executionStore.go +++ b/common/persistence/executionStore.go @@ -92,6 +92,7 @@ func (m *executionManagerImpl) GetWorkflowExecution( ExecutionState: response.State.ExecutionState, NextEventId: response.State.NextEventID, }, + DBRecordVersion: response.DBRecordVersion, } newResponse.State.BufferedEvents, err = m.DeserializeBufferedEvents(response.State.BufferedEvents) @@ -395,8 +396,9 @@ func (m *executionManagerImpl) SerializeWorkflowMutation( TimerTasks: input.TimerTasks, VisibilityTasks: input.VisibilityTasks, - Condition: input.Condition, - Checksum: input.Checksum, + Condition: input.Condition, + DBRecordVersion: input.DBRecordVersion, + Checksum: input.Checksum, }, nil } @@ -432,8 +434,9 @@ func (m *executionManagerImpl) SerializeWorkflowSnapshot( TimerTasks: input.TimerTasks, VisibilityTasks: input.VisibilityTasks, - Condition: input.Condition, - Checksum: input.Checksum, + Condition: input.Condition, + DBRecordVersion: input.DBRecordVersion, + Checksum: input.Checksum, }, nil } diff --git a/common/persistence/persistenceInterface.go b/common/persistence/persistenceInterface.go index 3f3c7b241a0..4123d7001ea 100644 --- a/common/persistence/persistenceInterface.go +++ b/common/persistence/persistenceInterface.go @@ -226,8 +226,9 @@ type ( ExecutionState *persistencespb.WorkflowExecutionState NextEventID int64 - BufferedEvents []*commonpb.DataBlob - Checksum *persistencespb.Checksum + BufferedEvents []*commonpb.DataBlob + Checksum *persistencespb.Checksum + DBRecordVersion int64 } // InternalUpdateWorkflowExecutionRequest is used to update a workflow execution for Persistence Interface @@ -263,6 +264,7 @@ type ( ExecutionState *persistencespb.WorkflowExecutionState NextEventID int64 LastWriteVersion int64 + DBRecordVersion int64 UpsertActivityInfos map[int64]*persistencespb.ActivityInfo DeleteActivityInfos map[int64]struct{} @@ -295,6 +297,7 @@ type ( ExecutionState *persistencespb.WorkflowExecutionState LastWriteVersion int64 NextEventID int64 + DBRecordVersion int64 ActivityInfos map[int64]*persistencespb.ActivityInfo TimerInfos map[string]*persistencespb.TimerInfo @@ -345,7 +348,8 @@ type ( // InternalGetWorkflowExecutionResponse is the response to GetworkflowExecution for Persistence Interface InternalGetWorkflowExecutionResponse struct { - State *InternalWorkflowMutableState + State *InternalWorkflowMutableState + DBRecordVersion int64 } // InternalListConcreteExecutionsResponse is the response to ListConcreteExecutions for Persistence Interface diff --git a/common/persistence/sql/sql_execution.go b/common/persistence/sql/sql_execution.go index c2954e04e06..ea261698242 100644 --- a/common/persistence/sql/sql_execution.go +++ b/common/persistence/sql/sql_execution.go @@ -227,7 +227,10 @@ func (m *sqlExecutionManager) GetWorkflowExecution( runID := primitives.MustParseUUID(request.Execution.RunId) wfID := request.Execution.WorkflowId executionsRow, err := m.db.SelectFromExecutions(ctx, sqlplugin.ExecutionsFilter{ - ShardID: m.shardID, NamespaceID: namespaceID, WorkflowID: wfID, RunID: runID, + ShardID: m.shardID, + NamespaceID: namespaceID, + WorkflowID: wfID, + RunID: runID, }) switch err { case nil: @@ -251,7 +254,10 @@ func (m *sqlExecutionManager) GetWorkflowExecution( state := &p.InternalWorkflowMutableState{ ExecutionInfo: info, ExecutionState: executionState, - NextEventID: executionsRow.NextEventID} + NextEventID: executionsRow.NextEventID, + + DBRecordVersion: executionsRow.DBRecordVersion, + } state.ActivityInfos, err = getActivityInfoMap(ctx, m.db, @@ -330,7 +336,10 @@ func (m *sqlExecutionManager) GetWorkflowExecution( return nil, serviceerror.NewInternal(fmt.Sprintf("GetWorkflowExecution: failed to get signals requested. Error: %v", err)) } - return &p.InternalGetWorkflowExecutionResponse{State: state}, nil + return &p.InternalGetWorkflowExecutionResponse{ + State: state, + DBRecordVersion: executionsRow.DBRecordVersion, + }, nil } func (m *sqlExecutionManager) UpdateWorkflowExecution( diff --git a/common/persistence/sql/sql_execution_util.go b/common/persistence/sql/sql_execution_util.go index c45917cc2ab..7c1709ddcbc 100644 --- a/common/persistence/sql/sql_execution_util.go +++ b/common/persistence/sql/sql_execution_util.go @@ -65,13 +65,14 @@ func applyWorkflowMutationTx( } // TODO Remove me if UPDATE holds the lock to the end of a transaction - if err := lockAndCheckNextEventID(ctx, + if err := lockAndCheckExecution(ctx, tx, shardID, namespaceIDBytes, workflowID, runIDBytes, workflowMutation.Condition, + workflowMutation.DBRecordVersion, ); err != nil { switch err.(type) { case *p.ConditionFailedError: @@ -87,6 +88,7 @@ func applyWorkflowMutationTx( workflowMutation.ExecutionState, workflowMutation.NextEventID, lastWriteVersion, + workflowMutation.DBRecordVersion, shardID, ); err != nil { return serviceerror.NewInternal(fmt.Sprintf("applyWorkflowMutationTx failed. Failed to update executions row. Erorr: %v", err)) @@ -223,13 +225,14 @@ func applyWorkflowSnapshotTxAsReset( } // TODO Is there a way to modify the various map tables without fear of other people adding rows after we delete, without locking the executions row? - if err := lockAndCheckNextEventID(ctx, + if err := lockAndCheckExecution(ctx, tx, shardID, namespaceIDBytes, workflowID, runIDBytes, workflowSnapshot.Condition, + workflowSnapshot.DBRecordVersion, ); err != nil { switch err.(type) { case *p.ConditionFailedError: @@ -245,6 +248,7 @@ func applyWorkflowSnapshotTxAsReset( workflowSnapshot.ExecutionState, workflowSnapshot.NextEventID, lastWriteVersion, + workflowSnapshot.DBRecordVersion, shardID, ); err != nil { return serviceerror.NewInternal(fmt.Sprintf("applyWorkflowSnapshotTxAsReset failed. Failed to update executions row. Erorr: %v", err)) @@ -433,6 +437,7 @@ func (m *sqlExecutionManager) applyWorkflowSnapshotTxAsNew( workflowSnapshot.ExecutionState, workflowSnapshot.NextEventID, lastWriteVersion, + workflowSnapshot.DBRecordVersion, shardID, ); err != nil { return err @@ -679,7 +684,7 @@ func createOrUpdateCurrentExecution( return nil } -func lockAndCheckNextEventID( +func lockAndCheckExecution( ctx context.Context, tx sqlplugin.Tx, shardID int32, @@ -687,30 +692,42 @@ func lockAndCheckNextEventID( workflowID string, runID primitives.UUID, condition int64, + dbRecordVersion int64, ) error { - nextEventID, err := lockNextEventID(ctx, tx, shardID, namespaceID, workflowID, runID) + version, nextEventID, err := lockExecution(ctx, tx, shardID, namespaceID, workflowID, runID) if err != nil { return err } + if nextEventID != condition { return &p.ConditionFailedError{ - Msg: fmt.Sprintf("lockAndCheckNextEventID failed. Next_event_id was %v when it should have been %v.", nextEventID, condition), + Msg: fmt.Sprintf("lockAndCheckExecution failed. Next_event_id was %v when it should have been %v.", nextEventID, condition), } } + + if dbRecordVersion != 0 { + dbRecordVersion -= 1 + } + if version != dbRecordVersion { + return &p.ConditionFailedError{ + Msg: fmt.Sprintf("lockAndCheckExecution failed. DBRecordVersion expected: %v, actually %v.", dbRecordVersion, version), + } + } + return nil } -func lockNextEventID( +func lockExecution( ctx context.Context, tx sqlplugin.Tx, shardID int32, namespaceID primitives.UUID, workflowID string, runID primitives.UUID, -) (int64, error) { +) (int64, int64, error) { - nextEventID, err := tx.WriteLockExecutions(ctx, sqlplugin.ExecutionsFilter{ + dbRecordVersion, nextEventID, err := tx.WriteLockExecutions(ctx, sqlplugin.ExecutionsFilter{ ShardID: shardID, NamespaceID: namespaceID, WorkflowID: workflowID, @@ -718,15 +735,15 @@ func lockNextEventID( }) if err != nil { if err == sql.ErrNoRows { - return 0, serviceerror.NewNotFound(fmt.Sprintf("lockNextEventID failed. Unable to lock executions row with (shard, namespace, workflow, run) = (%v,%v,%v,%v) which does not exist.", + return 0, 0, serviceerror.NewNotFound(fmt.Sprintf("lockNextEventID failed. Unable to lock executions row with (shard, namespace, workflow, run) = (%v,%v,%v,%v) which does not exist.", shardID, namespaceID, workflowID, runID)) } - return 0, serviceerror.NewInternal(fmt.Sprintf("lockNextEventID failed. Error: %v", err)) + return 0, 0, serviceerror.NewInternal(fmt.Sprintf("lockNextEventID failed. Error: %v", err)) } - return nextEventID, nil + return dbRecordVersion, nextEventID, nil } func createTransferTasks( @@ -1208,6 +1225,7 @@ func buildExecutionRow( executionState *persistencespb.WorkflowExecutionState, nextEventID int64, lastWriteVersion int64, + dbRecordVersion int64, shardID int32, ) (row *sqlplugin.ExecutionsRow, err error) { @@ -1242,6 +1260,7 @@ func buildExecutionRow( DataEncoding: infoBlob.EncodingType.String(), State: stateBlob.Data, StateEncoding: stateBlob.EncodingType.String(), + DBRecordVersion: dbRecordVersion, }, nil } @@ -1252,6 +1271,7 @@ func (m *sqlExecutionManager) createExecution( executionState *persistencespb.WorkflowExecutionState, nextEventID int64, lastWriteVersion int64, + dbRecordVersion int64, shardID int32, ) error { @@ -1270,6 +1290,7 @@ func (m *sqlExecutionManager) createExecution( executionState, nextEventID, lastWriteVersion, + dbRecordVersion, shardID, ) if err != nil { @@ -1307,6 +1328,7 @@ func updateExecution( executionState *persistencespb.WorkflowExecutionState, nextEventID int64, lastWriteVersion int64, + dbRecordVersion int64, shardID int32, ) error { @@ -1325,6 +1347,7 @@ func updateExecution( executionState, nextEventID, lastWriteVersion, + dbRecordVersion, shardID, ) if err != nil { diff --git a/common/persistence/sql/sqlplugin/history_execution.go b/common/persistence/sql/sqlplugin/history_execution.go index cff05e9ffb7..78f97e9cd14 100644 --- a/common/persistence/sql/sqlplugin/history_execution.go +++ b/common/persistence/sql/sqlplugin/history_execution.go @@ -47,6 +47,7 @@ type ( DataEncoding string State []byte StateEncoding string + DBRecordVersion int64 } // ExecutionsFilter contains the column names within executions table that @@ -80,14 +81,20 @@ type ( RunID primitives.UUID } + // TODO remove this block in 1.11.x + ExecutionVersion struct { + DBRecordVersion int64 + NextEventID int64 + } + // HistoryExecution is the SQL persistence interface for history executions HistoryExecution interface { InsertIntoExecutions(ctx context.Context, row *ExecutionsRow) (sql.Result, error) UpdateExecutions(ctx context.Context, row *ExecutionsRow) (sql.Result, error) SelectFromExecutions(ctx context.Context, filter ExecutionsFilter) (*ExecutionsRow, error) DeleteFromExecutions(ctx context.Context, filter ExecutionsFilter) (sql.Result, error) - ReadLockExecutions(ctx context.Context, filter ExecutionsFilter) (int64, error) - WriteLockExecutions(ctx context.Context, filter ExecutionsFilter) (int64, error) + ReadLockExecutions(ctx context.Context, filter ExecutionsFilter) (int64, int64, error) + WriteLockExecutions(ctx context.Context, filter ExecutionsFilter) (int64, int64, error) LockCurrentExecutionsJoinExecutions(ctx context.Context, filter CurrentExecutionsFilter) ([]CurrentExecutionsRow, error) diff --git a/common/persistence/sql/sqlplugin/mysql/execution.go b/common/persistence/sql/sqlplugin/mysql/execution.go index afc6e53f005..80e935b035a 100644 --- a/common/persistence/sql/sqlplugin/mysql/execution.go +++ b/common/persistence/sql/sqlplugin/mysql/execution.go @@ -32,13 +32,13 @@ import ( ) const ( - executionsColumns = `shard_id, namespace_id, workflow_id, run_id, next_event_id, last_write_version, data, data_encoding, state, state_encoding` + executionsColumns = `shard_id, namespace_id, workflow_id, run_id, next_event_id, last_write_version, data, data_encoding, state, state_encoding, db_record_version` createExecutionQuery = `INSERT INTO executions(` + executionsColumns + `) - VALUES(:shard_id, :namespace_id, :workflow_id, :run_id, :next_event_id, :last_write_version, :data, :data_encoding, :state, :state_encoding)` + VALUES(:shard_id, :namespace_id, :workflow_id, :run_id, :next_event_id, :last_write_version, :data, :data_encoding, :state, :state_encoding, :db_record_version)` updateExecutionQuery = `UPDATE executions SET - next_event_id = :next_event_id, last_write_version = :last_write_version, data = :data, data_encoding = :data_encoding, state = :state, state_encoding = :state_encoding + db_record_version = :db_record_version, next_event_id = :next_event_id, last_write_version = :last_write_version, data = :data, data_encoding = :data_encoding, state = :state, state_encoding = :state_encoding WHERE shard_id = :shard_id AND namespace_id = :namespace_id AND workflow_id = :workflow_id AND run_id = :run_id` getExecutionQuery = `SELECT ` + executionsColumns + ` FROM executions @@ -47,7 +47,7 @@ const ( deleteExecutionQuery = `DELETE FROM executions WHERE shard_id = ? AND namespace_id = ? AND workflow_id = ? AND run_id = ?` - lockExecutionQueryBase = `SELECT next_event_id FROM executions + lockExecutionQueryBase = `SELECT db_record_version, next_event_id FROM executions WHERE shard_id = ? AND namespace_id = ? AND workflow_id = ? AND run_id = ?` writeLockExecutionQuery = lockExecutionQueryBase + ` FOR UPDATE` @@ -236,34 +236,34 @@ func (mdb *db) DeleteFromExecutions( func (mdb *db) ReadLockExecutions( ctx context.Context, filter sqlplugin.ExecutionsFilter, -) (int64, error) { - var nextEventID int64 +) (int64, int64, error) { + var executionVersion sqlplugin.ExecutionVersion err := mdb.conn.GetContext(ctx, - &nextEventID, + &executionVersion, readLockExecutionQuery, filter.ShardID, filter.NamespaceID, filter.WorkflowID, filter.RunID, ) - return nextEventID, err + return executionVersion.DBRecordVersion, executionVersion.NextEventID, err } // WriteLockExecutions acquires a write lock on a single row in executions table func (mdb *db) WriteLockExecutions( ctx context.Context, filter sqlplugin.ExecutionsFilter, -) (int64, error) { - var nextEventID int64 +) (int64, int64, error) { + var executionVersion sqlplugin.ExecutionVersion err := mdb.conn.GetContext(ctx, - &nextEventID, + &executionVersion, writeLockExecutionQuery, filter.ShardID, filter.NamespaceID, filter.WorkflowID, filter.RunID, ) - return nextEventID, err + return executionVersion.DBRecordVersion, executionVersion.NextEventID, err } // InsertIntoCurrentExecutions inserts a single row into current_executions table diff --git a/common/persistence/sql/sqlplugin/postgresql/execution.go b/common/persistence/sql/sqlplugin/postgresql/execution.go index 6d5251f458a..86a7e20cb05 100644 --- a/common/persistence/sql/sqlplugin/postgresql/execution.go +++ b/common/persistence/sql/sqlplugin/postgresql/execution.go @@ -32,13 +32,13 @@ import ( ) const ( - executionsColumns = `shard_id, namespace_id, workflow_id, run_id, next_event_id, last_write_version, data, data_encoding, state, state_encoding` + executionsColumns = `shard_id, namespace_id, workflow_id, run_id, next_event_id, last_write_version, data, data_encoding, state, state_encoding, db_record_version` createExecutionQuery = `INSERT INTO executions(` + executionsColumns + `) - VALUES(:shard_id, :namespace_id, :workflow_id, :run_id, :next_event_id, :last_write_version, :data, :data_encoding, :state, :state_encoding)` + VALUES(:shard_id, :namespace_id, :workflow_id, :run_id, :next_event_id, :last_write_version, :data, :data_encoding, :state, :state_encoding, :db_record_version)` updateExecutionQuery = `UPDATE executions SET - next_event_id = :next_event_id, last_write_version = :last_write_version, data = :data, data_encoding = :data_encoding, state = :state, state_encoding = :state_encoding + db_record_version = :db_record_version, next_event_id = :next_event_id, last_write_version = :last_write_version, data = :data, data_encoding = :data_encoding, state = :state, state_encoding = :state_encoding WHERE shard_id = :shard_id AND namespace_id = :namespace_id AND workflow_id = :workflow_id AND run_id = :run_id` getExecutionQuery = `SELECT ` + executionsColumns + ` FROM executions @@ -47,7 +47,7 @@ const ( deleteExecutionQuery = `DELETE FROM executions WHERE shard_id = $1 AND namespace_id = $2 AND workflow_id = $3 AND run_id = $4` - lockExecutionQueryBase = `SELECT next_event_id FROM executions + lockExecutionQueryBase = `SELECT db_record_version, next_event_id FROM executions WHERE shard_id = $1 AND namespace_id = $2 AND workflow_id = $3 AND run_id = $4` writeLockExecutionQuery = lockExecutionQueryBase + ` FOR UPDATE` @@ -236,34 +236,34 @@ func (pdb *db) DeleteFromExecutions( func (pdb *db) ReadLockExecutions( ctx context.Context, filter sqlplugin.ExecutionsFilter, -) (int64, error) { - var nextEventID int64 +) (int64, int64, error) { + var executionVersion sqlplugin.ExecutionVersion err := pdb.conn.GetContext(ctx, - &nextEventID, + &executionVersion, readLockExecutionQuery, filter.ShardID, filter.NamespaceID, filter.WorkflowID, filter.RunID, ) - return nextEventID, err + return executionVersion.DBRecordVersion, executionVersion.NextEventID, err } // WriteLockExecutions acquires a write lock on a single row in executions table func (pdb *db) WriteLockExecutions( ctx context.Context, filter sqlplugin.ExecutionsFilter, -) (int64, error) { - var nextEventID int64 +) (int64, int64, error) { + var executionVersion sqlplugin.ExecutionVersion err := pdb.conn.GetContext(ctx, - &nextEventID, + &executionVersion, writeLockExecutionQuery, filter.ShardID, filter.NamespaceID, filter.WorkflowID, filter.RunID, ) - return nextEventID, err + return executionVersion.DBRecordVersion, executionVersion.NextEventID, err } // InsertIntoCurrentExecutions inserts a single row into current_executions table diff --git a/common/persistence/sql/sqlplugin/tests/history_execution_test.go b/common/persistence/sql/sqlplugin/tests/history_execution_test.go index 95af4c21cdb..c21e1e91c3f 100644 --- a/common/persistence/sql/sqlplugin/tests/history_execution_test.go +++ b/common/persistence/sql/sqlplugin/tests/history_execution_test.go @@ -291,8 +291,9 @@ func (s *historyExecutionSuite) TestReadLock() { WorkflowID: workflowID, RunID: runID, } - rowNextEventID, err := s.store.ReadLockExecutions(newExecutionContext(), filter) + rowDBVersion, rowNextEventID, err := s.store.ReadLockExecutions(newExecutionContext(), filter) s.NoError(err) + s.Equal(execution.DBRecordVersion, rowDBVersion) s.Equal(execution.NextEventID, rowNextEventID) } @@ -317,8 +318,9 @@ func (s *historyExecutionSuite) TestWriteLock() { WorkflowID: workflowID, RunID: runID, } - rowNextEventID, err := s.store.WriteLockExecutions(newExecutionContext(), filter) + rowDBVersion, rowNextEventID, err := s.store.WriteLockExecutions(newExecutionContext(), filter) s.NoError(err) + s.Equal(execution.DBRecordVersion, rowDBVersion) s.Equal(execution.NextEventID, rowNextEventID) } diff --git a/common/rpc/interceptor/telemetry.go b/common/rpc/interceptor/telemetry.go index 5ad2afdf2fa..d66584241bf 100644 --- a/common/rpc/interceptor/telemetry.go +++ b/common/rpc/interceptor/telemetry.go @@ -112,7 +112,7 @@ func (ti *TelemetryInterceptor) Intercept( resp, err := handler(ctx, req) if err != nil { - ti.handleError(scope, err) + ti.handleError(scope, methodName, err) return nil, err } return resp, nil @@ -120,6 +120,7 @@ func (ti *TelemetryInterceptor) Intercept( func (ti *TelemetryInterceptor) handleError( scope int, + methodName string, err error, ) { @@ -153,13 +154,13 @@ func (ti *TelemetryInterceptor) handleError( ti.metricsClient.IncCounter(scope, metrics.ServiceErrClientVersionNotSupportedCounter) case *serviceerror.DataLoss: ti.metricsClient.IncCounter(scope, metrics.ServiceFailures) - ti.logger.Error("internal service error, data loss", tag.Error(err)) + ti.logger.Error("internal service error, data loss", tag.Operation(methodName), tag.Error(err)) case *serviceerror.Internal: ti.metricsClient.IncCounter(scope, metrics.ServiceFailures) - ti.logger.Error("internal service error", tag.Error(err)) + ti.logger.Error("internal service error", tag.Operation(methodName), tag.Error(err)) default: ti.metricsClient.IncCounter(scope, metrics.ServiceFailures) - ti.logger.Error("uncategorized error", tag.Error(err)) + ti.logger.Error("uncategorized error", tag.Operation(methodName), tag.Error(err)) } } diff --git a/schema/cassandra/temporal/schema.cql b/schema/cassandra/temporal/schema.cql index a834fe1d82c..06d45818a99 100644 --- a/schema/cassandra/temporal/schema.cql +++ b/schema/cassandra/temporal/schema.cql @@ -45,6 +45,7 @@ CREATE TABLE executions ( workflow_state int, checksum blob, checksum_encoding text, + db_record_version bigint, PRIMARY KEY (shard_id, type, namespace_id, workflow_id, run_id, visibility_ts, task_id) ) WITH COMPACTION = { 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy' diff --git a/schema/cassandra/temporal/versioned/v1.5/executions.cql b/schema/cassandra/temporal/versioned/v1.5/executions.cql new file mode 100644 index 00000000000..aab838ddeb8 --- /dev/null +++ b/schema/cassandra/temporal/versioned/v1.5/executions.cql @@ -0,0 +1 @@ +ALTER TABLE executions Add db_record_version bigint; diff --git a/schema/cassandra/temporal/versioned/v1.5/manifest.json b/schema/cassandra/temporal/versioned/v1.5/manifest.json new file mode 100644 index 00000000000..954b58d643e --- /dev/null +++ b/schema/cassandra/temporal/versioned/v1.5/manifest.json @@ -0,0 +1,8 @@ +{ + "CurrVersion": "1.5", + "MinCompatibleVersion": "1.0", + "Description": "schema update for execution table version", + "SchemaUpdateCqlFiles": [ + "executions.cql" + ] +} diff --git a/schema/cassandra/version.go b/schema/cassandra/version.go index fb6aae03494..9acddb61af9 100644 --- a/schema/cassandra/version.go +++ b/schema/cassandra/version.go @@ -27,7 +27,7 @@ package cassandra // NOTE: whenever there is a new data base schema update, plz update the following versions // Version is the Cassandra database release version -const Version = "1.4" +const Version = "1.5" // VisibilityVersion is the Cassandra visibility database release version const VisibilityVersion = "1.0" diff --git a/schema/mysql/v57/temporal/schema.sql b/schema/mysql/v57/temporal/schema.sql index c3b9ddbd223..0c6744c7a78 100644 --- a/schema/mysql/v57/temporal/schema.sql +++ b/schema/mysql/v57/temporal/schema.sql @@ -39,6 +39,7 @@ CREATE TABLE executions( data_encoding VARCHAR(16) NOT NULL, state MEDIUMBLOB NOT NULL, state_encoding VARCHAR(16) NOT NULL, + db_record_version BIGINT NOT NULL DEFAULT 0, PRIMARY KEY (shard_id, namespace_id, workflow_id, run_id) ); diff --git a/schema/mysql/v57/temporal/versioned/v1.5/executions.sql b/schema/mysql/v57/temporal/versioned/v1.5/executions.sql new file mode 100644 index 00000000000..dbdd9e52af7 --- /dev/null +++ b/schema/mysql/v57/temporal/versioned/v1.5/executions.sql @@ -0,0 +1 @@ +ALTER TABLE executions ADD db_record_version BIGINT NOT NULL DEFAULT 0; diff --git a/schema/mysql/v57/temporal/versioned/v1.5/manifest.json b/schema/mysql/v57/temporal/versioned/v1.5/manifest.json new file mode 100644 index 00000000000..cbefb27fb2b --- /dev/null +++ b/schema/mysql/v57/temporal/versioned/v1.5/manifest.json @@ -0,0 +1,8 @@ +{ + "CurrVersion": "1.5", + "MinCompatibleVersion": "1.0", + "Description": "schema update for execution table version", + "SchemaUpdateCqlFiles": [ + "executions.sql" + ] +} diff --git a/schema/mysql/version.go b/schema/mysql/version.go index fd2f5dd73e6..cf3deedaab2 100644 --- a/schema/mysql/version.go +++ b/schema/mysql/version.go @@ -27,7 +27,7 @@ package mysql // NOTE: whenever there is a new data base schema update, plz update the following versions // Version is the MySQL database release version -const Version = "1.4" +const Version = "1.5" // VisibilityVersion is the MySQL visibility database release version const VisibilityVersion = "1.1" diff --git a/schema/postgresql/v96/temporal/schema.sql b/schema/postgresql/v96/temporal/schema.sql index 13ddca9f125..5bfc9d47ccd 100644 --- a/schema/postgresql/v96/temporal/schema.sql +++ b/schema/postgresql/v96/temporal/schema.sql @@ -39,6 +39,7 @@ CREATE TABLE executions( data_encoding VARCHAR(16) NOT NULL, state BYTEA NOT NULL, state_encoding VARCHAR(16) NOT NULL, + db_record_version BIGINT NOT NULL DEFAULT 0, PRIMARY KEY (shard_id, namespace_id, workflow_id, run_id) ); diff --git a/schema/postgresql/v96/temporal/versioned/v1.5/executions.sql b/schema/postgresql/v96/temporal/versioned/v1.5/executions.sql new file mode 100644 index 00000000000..dbdd9e52af7 --- /dev/null +++ b/schema/postgresql/v96/temporal/versioned/v1.5/executions.sql @@ -0,0 +1 @@ +ALTER TABLE executions ADD db_record_version BIGINT NOT NULL DEFAULT 0; diff --git a/schema/postgresql/v96/temporal/versioned/v1.5/manifest.json b/schema/postgresql/v96/temporal/versioned/v1.5/manifest.json new file mode 100644 index 00000000000..cbefb27fb2b --- /dev/null +++ b/schema/postgresql/v96/temporal/versioned/v1.5/manifest.json @@ -0,0 +1,8 @@ +{ + "CurrVersion": "1.5", + "MinCompatibleVersion": "1.0", + "Description": "schema update for execution table version", + "SchemaUpdateCqlFiles": [ + "executions.sql" + ] +} diff --git a/schema/postgresql/version.go b/schema/postgresql/version.go index 19e174398f3..972858375fa 100644 --- a/schema/postgresql/version.go +++ b/schema/postgresql/version.go @@ -28,7 +28,7 @@ package postgresql // Version is the Postgres database release version // Temporal supports both MySQL and Postgres officially, so upgrade should be perform for both MySQL and Postgres -const Version = "1.4" +const Version = "1.5" // VisibilityVersion is the Postgres visibility database release version // Temporal supports both MySQL and Postgres officially, so upgrade should be perform for both MySQL and Postgres diff --git a/service/history/configs/config.go b/service/history/configs/config.go index 485cafbdb6e..48f113ad07e 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -39,6 +39,9 @@ import ( type Config struct { NumberOfShards int32 + // TODO remove this dynamic flag in 1.11.x + EnableDBRecordVersion dynamicconfig.BoolPropertyFn + RPS dynamicconfig.IntPropertyFn MaxIDLengthLimit dynamicconfig.IntPropertyFn PersistenceMaxQPS dynamicconfig.IntPropertyFn @@ -264,7 +267,11 @@ const ( // NewConfig returns new service config with default values func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVisConfigExist bool) *Config { cfg := &Config{ - NumberOfShards: numberOfShards, + NumberOfShards: numberOfShards, + + // TODO remove this dynamic flag in 1.11.x + EnableDBRecordVersion: dc.GetBoolProperty(dynamicconfig.EnableDBRecordVersion, false), + RPS: dc.GetIntProperty(dynamicconfig.HistoryRPS, 3000), MaxIDLengthLimit: dc.GetIntProperty(dynamicconfig.MaxIDLengthLimit, 1000), PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.HistoryPersistenceMaxQPS, 9000), diff --git a/service/history/mutableState.go b/service/history/mutableState.go index c3ef05f617a..e4a9ed8d461 100644 --- a/service/history/mutableState.go +++ b/service/history/mutableState.go @@ -171,7 +171,7 @@ type ( IsWorkflowExecutionRunning() bool IsResourceDuplicated(resourceDedupKey definition.DeduplicationID) bool UpdateDuplicatedResource(resourceDedupKey definition.DeduplicationID) - Load(*persistencespb.WorkflowMutableState) error + Load(*persistencespb.WorkflowMutableState, int64) error ReplicateActivityInfo(*historyservice.SyncActivityRequest, bool) error ReplicateActivityTaskCancelRequestedEvent(*historypb.HistoryEvent) error ReplicateActivityTaskCanceledEvent(*historypb.HistoryEvent) error @@ -228,8 +228,8 @@ type ( AddTransferTasks(transferTasks ...persistence.Task) AddTimerTasks(timerTasks ...persistence.Task) AddVisibilityTasks(visibilityTasks ...persistence.Task) - SetUpdateCondition(int64) - GetUpdateCondition() int64 + SetUpdateCondition(int64, int64) + GetUpdateCondition() (int64, int64) StartTransaction(entry *cache.NamespaceCacheEntry) (bool, error) StartTransactionSkipWorkflowTaskFail(entry *cache.NamespaceCacheEntry) error diff --git a/service/history/mutableStateBuilder.go b/service/history/mutableStateBuilder.go index 173c041db78..9978ee1daad 100644 --- a/service/history/mutableStateBuilder.go +++ b/service/history/mutableStateBuilder.go @@ -57,6 +57,7 @@ import ( "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/migration" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/versionhistory" "go.temporal.io/server/common/primitives/timestamp" @@ -144,8 +145,11 @@ type ( // indicates the workflow state in DB, can be used to calculate // whether this workflow is pointed by current workflow record stateInDB enumsspb.WorkflowExecutionState + // TODO deprecate nextEventIDInDB in favor of dbRecordVersion // indicates the next event ID in DB, for conditional update nextEventIDInDB int64 + // indicates the DB record version, for conditional update + dbRecordVersion int64 // namespace entry contains a snapshot of namespace // NOTE: do not use the failover version inside, use currentVersion above namespaceEntry *cache.NamespaceCacheEntry @@ -219,6 +223,7 @@ func newMutableStateBuilder( hasBufferedEventsInDB: false, stateInDB: enumsspb.WORKFLOW_EXECUTION_STATE_VOID, nextEventIDInDB: 0, + dbRecordVersion: 0, namespaceEntry: namespaceEntry, appliedEvents: make(map[string]struct{}), @@ -289,6 +294,7 @@ func (e *mutableStateBuilder) CloneToProto() *persistencespb.WorkflowMutableStat func (e *mutableStateBuilder) Load( state *persistencespb.WorkflowMutableState, + dbRecordVersion int64, ) error { e.pendingActivityInfoIDs = state.ActivityInfos @@ -318,6 +324,7 @@ func (e *mutableStateBuilder) Load( e.hasBufferedEventsInDB = len(e.bufferedEvents) > 0 e.stateInDB = state.ExecutionState.State e.nextEventIDInDB = state.NextEventId + e.dbRecordVersion = dbRecordVersion e.checksum = state.Checksum if len(state.Checksum.GetValue()) > 0 { @@ -3821,13 +3828,15 @@ func (e *mutableStateBuilder) AddTimerTasks( func (e *mutableStateBuilder) SetUpdateCondition( nextEventIDInDB int64, + dbRecordVersion int64, ) { e.nextEventIDInDB = nextEventIDInDB + e.dbRecordVersion = dbRecordVersion } -func (e *mutableStateBuilder) GetUpdateCondition() int64 { - return e.nextEventIDInDB +func (e *mutableStateBuilder) GetUpdateCondition() (int64, int64) { + return e.nextEventIDInDB, e.dbRecordVersion } func (e *mutableStateBuilder) GetWorkflowStateStatus() (enumsspb.WorkflowExecutionState, enumspb.WorkflowExecutionStatus) { @@ -3914,6 +3923,12 @@ func (e *mutableStateBuilder) CloseTransactionAsMutation( // impact the checksum calculation checksum := e.generateChecksum() + if e.dbRecordVersion == 0 && !migration.IsDBVersionEnabled() { + // noop, existing behavior + } else { + e.dbRecordVersion += 1 + } + workflowMutation := &persistence.WorkflowMutation{ ExecutionInfo: e.executionInfo, ExecutionState: e.executionState, @@ -3939,8 +3954,9 @@ func (e *mutableStateBuilder) CloseTransactionAsMutation( TimerTasks: e.insertTimerTasks, VisibilityTasks: e.insertVisibilityTasks, - Condition: e.nextEventIDInDB, - Checksum: checksum, + Condition: e.nextEventIDInDB, + DBRecordVersion: e.dbRecordVersion, + Checksum: checksum, } e.checksum = checksum @@ -4017,8 +4033,9 @@ func (e *mutableStateBuilder) CloseTransactionAsSnapshot( TimerTasks: e.insertTimerTasks, VisibilityTasks: e.insertVisibilityTasks, - Condition: e.nextEventIDInDB, - Checksum: checksum, + Condition: e.nextEventIDInDB, + DBRecordVersion: e.dbRecordVersion, + Checksum: checksum, } e.checksum = checksum @@ -4120,6 +4137,7 @@ func (e *mutableStateBuilder) cleanupTransaction( e.hasBufferedEventsInDB = len(e.bufferedEvents) > 0 e.stateInDB = e.executionState.State e.nextEventIDInDB = e.GetNextEventID() + // e.dbRecordVersion remains the same e.insertTransferTasks = nil e.insertReplicationTasks = nil diff --git a/service/history/mutableStateBuilder_test.go b/service/history/mutableStateBuilder_test.go index 1643ba4876e..c473c4a13d1 100644 --- a/service/history/mutableStateBuilder_test.go +++ b/service/history/mutableStateBuilder_test.go @@ -343,8 +343,9 @@ func (s *mutableStateSuite) TestReorderEvents() { ActivityInfos: activityInfos, BufferedEvents: bufferedEvents, } + dbVersion := int64(123) - s.msBuilder.Load(dbState) + _ = s.msBuilder.Load(dbState, dbVersion) s.Equal(enumspb.EVENT_TYPE_ACTIVITY_TASK_COMPLETED, s.msBuilder.bufferedEvents[0].GetEventType()) s.Equal(enumspb.EVENT_TYPE_ACTIVITY_TASK_STARTED, s.msBuilder.bufferedEvents[1].GetEventType()) @@ -407,7 +408,7 @@ func (s *mutableStateSuite) TestChecksum() { // create mutable state and verify checksum is generated on close loadErrors = loadErrorsFunc() - s.msBuilder.Load(dbState) + _ = s.msBuilder.Load(dbState, 123) s.Equal(loadErrors, loadErrorsFunc()) // no errors expected s.EqualValues(dbState.Checksum, s.msBuilder.checksum) s.msBuilder.namespaceEntry = s.newNamespaceCacheEntry() @@ -420,7 +421,7 @@ func (s *mutableStateSuite) TestChecksum() { // verify checksum is verified on Load dbState.Checksum = csum - s.msBuilder.Load(dbState) + _ = s.msBuilder.Load(dbState, 123) s.Equal(loadErrors, loadErrorsFunc()) // generate checksum again and verify its the same @@ -431,7 +432,7 @@ func (s *mutableStateSuite) TestChecksum() { // modify checksum and verify Load fails dbState.Checksum.Value[0]++ - s.msBuilder.Load(dbState) + _ = s.msBuilder.Load(dbState, 123) s.Equal(loadErrors+1, loadErrorsFunc()) s.EqualValues(dbState.Checksum, s.msBuilder.checksum) @@ -440,7 +441,7 @@ func (s *mutableStateSuite) TestChecksum() { s.mockConfig.MutableStateChecksumInvalidateBefore = func(...dynamicconfig.FilterOption) float64 { return float64((s.msBuilder.executionInfo.LastUpdateTime.UnixNano() / int64(time.Second)) + 1) } - s.msBuilder.Load(dbState) + _ = s.msBuilder.Load(dbState, 123) s.Equal(loadErrors, loadErrorsFunc()) s.Nil(s.msBuilder.checksum) diff --git a/service/history/mutableState_mock.go b/service/history/mutableState_mock.go index 3e187150b8d..fcaf0ae0c19 100644 --- a/service/history/mutableState_mock.go +++ b/service/history/mutableState_mock.go @@ -1401,11 +1401,12 @@ func (mr *MockmutableStateMockRecorder) GetStartVersion() *gomock.Call { } // GetUpdateCondition mocks base method. -func (m *MockmutableState) GetUpdateCondition() int64 { +func (m *MockmutableState) GetUpdateCondition() (int64, int64) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetUpdateCondition") ret0, _ := ret[0].(int64) - return ret0 + ret1, _ := ret[1].(int64) + return ret0, ret1 } // GetUpdateCondition indicates an expected call of GetUpdateCondition. @@ -1643,17 +1644,17 @@ func (mr *MockmutableStateMockRecorder) IsWorkflowExecutionRunning() *gomock.Cal } // Load mocks base method. -func (m *MockmutableState) Load(arg0 *persistence.WorkflowMutableState) error { +func (m *MockmutableState) Load(arg0 *persistence.WorkflowMutableState, arg1 int64) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Load", arg0) + ret := m.ctrl.Call(m, "Load", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } // Load indicates an expected call of Load. -func (mr *MockmutableStateMockRecorder) Load(arg0 interface{}) *gomock.Call { +func (mr *MockmutableStateMockRecorder) Load(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Load", reflect.TypeOf((*MockmutableState)(nil).Load), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Load", reflect.TypeOf((*MockmutableState)(nil).Load), arg0, arg1) } // ReplicateActivityInfo mocks base method. @@ -2304,15 +2305,15 @@ func (mr *MockmutableStateMockRecorder) SetNextEventID(nextEventID interface{}) } // SetUpdateCondition mocks base method. -func (m *MockmutableState) SetUpdateCondition(arg0 int64) { +func (m *MockmutableState) SetUpdateCondition(arg0, arg1 int64) { m.ctrl.T.Helper() - m.ctrl.Call(m, "SetUpdateCondition", arg0) + m.ctrl.Call(m, "SetUpdateCondition", arg0, arg1) } // SetUpdateCondition indicates an expected call of SetUpdateCondition. -func (mr *MockmutableStateMockRecorder) SetUpdateCondition(arg0 interface{}) *gomock.Call { +func (mr *MockmutableStateMockRecorder) SetUpdateCondition(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetUpdateCondition", reflect.TypeOf((*MockmutableState)(nil).SetUpdateCondition), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetUpdateCondition", reflect.TypeOf((*MockmutableState)(nil).SetUpdateCondition), arg0, arg1) } // StartTransaction mocks base method. diff --git a/service/history/nDCConflictResolver_test.go b/service/history/nDCConflictResolver_test.go index 956610e5e83..f208c1c0a6a 100644 --- a/service/history/nDCConflictResolver_test.go +++ b/service/history/nDCConflictResolver_test.go @@ -108,6 +108,7 @@ func (s *nDCConflictResolverSuite) TearDownTest() { func (s *nDCConflictResolverSuite) TestRebuild() { ctx := context.Background() updateCondition := int64(59) + dbVersion := int64(1444) requestID := uuid.New() version := int64(12) historySize := int64(12345) @@ -128,7 +129,7 @@ func (s *nDCConflictResolverSuite) TestRebuild() { _, _, err := versionhistory.AddVersionHistory(versionHistories, versionHistory1) s.NoError(err) - s.mockMutableState.EXPECT().GetUpdateCondition().Return(updateCondition).AnyTimes() + s.mockMutableState.EXPECT().GetUpdateCondition().Return(updateCondition, dbVersion).AnyTimes() s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ NamespaceId: s.namespaceID, WorkflowId: s.workflowID, @@ -154,7 +155,7 @@ func (s *nDCConflictResolverSuite) TestRebuild() { ), }, ).Times(2) - mockRebuildMutableState.EXPECT().SetUpdateCondition(updateCondition) + mockRebuildMutableState.EXPECT().SetUpdateCondition(updateCondition, dbVersion) s.mockStateBuilder.EXPECT().rebuild( ctx, @@ -197,6 +198,7 @@ func (s *nDCConflictResolverSuite) TestPrepareMutableState_NoRebuild() { func (s *nDCConflictResolverSuite) TestPrepareMutableState_Rebuild() { ctx := context.Background() updateCondition := int64(59) + dbVersion := int64(1444) version := int64(12) incomingVersion := version + 1 historySize := int64(12345) @@ -224,7 +226,7 @@ func (s *nDCConflictResolverSuite) TestPrepareMutableState_Rebuild() { _, _, err := versionhistory.AddVersionHistory(versionHistories, versionHistory1) s.Nil(err) - s.mockMutableState.EXPECT().GetUpdateCondition().Return(updateCondition).AnyTimes() + s.mockMutableState.EXPECT().GetUpdateCondition().Return(updateCondition, dbVersion).AnyTimes() s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ NamespaceId: s.namespaceID, WorkflowId: s.workflowID, @@ -250,7 +252,7 @@ func (s *nDCConflictResolverSuite) TestPrepareMutableState_Rebuild() { ), }, ).Times(2) - mockRebuildMutableState.EXPECT().SetUpdateCondition(updateCondition) + mockRebuildMutableState.EXPECT().SetUpdateCondition(updateCondition, dbVersion) s.mockStateBuilder.EXPECT().rebuild( ctx, diff --git a/service/history/service.go b/service/history/service.go index 9140e616558..a4a6e36e281 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -31,15 +31,15 @@ import ( "google.golang.org/grpc" healthpb "google.golang.org/grpc/health/grpc_health_v1" - "go.temporal.io/server/common/config" - "go.temporal.io/server/common/masker" - "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/common" + "go.temporal.io/server/common/config" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" + "go.temporal.io/server/common/masker" "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/migration" "go.temporal.io/server/common/persistence" persistenceClient "go.temporal.io/server/common/persistence/client" espersistence "go.temporal.io/server/common/persistence/elasticsearch" @@ -66,9 +66,11 @@ func NewService( ) (*Service, error) { logger := params.Logger - serviceConfig := configs.NewConfig(dynamicconfig.NewCollection(params.DynamicConfigClient, params.Logger), + serviceConfig := configs.NewConfig( + dynamicconfig.NewCollection(params.DynamicConfigClient, params.Logger), params.PersistenceConfig.NumHistoryShards, - params.PersistenceConfig.IsAdvancedVisibilityConfigExist()) + params.PersistenceConfig.IsAdvancedVisibilityConfigExist(), + ) params.PersistenceConfig.VisibilityConfig = &config.VisibilityConfig{ VisibilityOpenMaxQPS: serviceConfig.VisibilityOpenMaxQPS, @@ -166,6 +168,9 @@ func (s *Service) Start() { return } + // TODO remove this dynamic flag in 1.11.x + migration.SetDBVersionFlag(s.config.EnableDBRecordVersion()) + logger := s.GetLogger() logger.Info("history starting") diff --git a/service/history/workflowExecutionContext.go b/service/history/workflowExecutionContext.go index fc867e6318e..10e1f905462 100644 --- a/service/history/workflowExecutionContext.go +++ b/service/history/workflowExecutionContext.go @@ -252,7 +252,7 @@ func (c *workflowExecutionContextImpl) loadWorkflowExecutionForReplication( namespaceEntry, ) - if err := c.mutableState.Load(response.State); err != nil { + if err := c.mutableState.Load(response.State, response.DBRecordVersion); err != nil { return nil, err } @@ -334,7 +334,7 @@ func (c *workflowExecutionContextImpl) loadWorkflowExecution() (mutableState, er namespaceEntry, ) - if err := c.mutableState.Load(response.State); err != nil { + if err := c.mutableState.Load(response.State, response.DBRecordVersion); err != nil { return nil, err }