Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed May 14, 2024
1 parent 788be6c commit 50bd1f7
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 2 deletions.
4 changes: 2 additions & 2 deletions common/persistence/versionhistory/version_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func GetVersionHistoryEventVersion(v *historyspb.VersionHistory, eventID int64)
return 0, err
}
if eventID < common.FirstEventID || eventID > lastItem.GetEventId() {
return 0, serviceerror.NewInternal(fmt.Sprintf("input event ID is not in range.: %v", eventID))
return 0, serviceerror.NewInternal(fmt.Sprintf("input event ID is not in range, eventID: %v", eventID))
}

// items are sorted by eventID & version
Expand All @@ -239,7 +239,7 @@ func GetVersionHistoryEventVersion(v *historyspb.VersionHistory, eventID int64)
return currentItem.GetVersion(), nil
}
}
return 0, serviceerror.NewInternal("input event ID is not in range.")
return 0, serviceerror.NewInternal(fmt.Sprintf("input event ID is not in range, eventID: %v", eventID))
}

// IsEmptyVersionHistory indicate whether version history is empty
Expand Down
1 change: 1 addition & 0 deletions service/history/tests/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,5 +193,6 @@ func NewDynamicConfig() *configs.Config {
config.ReplicationEnableUpdateWithNewTaskMerge = dynamicconfig.GetBoolPropertyFn(true)
config.ShardOwnershipAssertionEnabled = dynamicconfig.GetBoolPropertyFn(true)
config.EnableWorkflowExecutionTimeoutTimer = dynamicconfig.GetBoolPropertyFn(true)
config.EnableMutableStateTransitionHistory = dynamicconfig.GetBoolPropertyFn(true)
return config
}
157 changes: 157 additions & 0 deletions service/history/workflow/mutable_state_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
taskqueuepb "go.temporal.io/api/taskqueue/v1"
updatepb "go.temporal.io/api/update/v1"
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/api/workflowservice/v1"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"

Expand All @@ -62,6 +63,7 @@ import (
"go.temporal.io/server/common/persistence/versionhistory"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/common/testing/protorequire"
"go.temporal.io/server/common/tqid"
"go.temporal.io/server/common/worker_versioning"
"go.temporal.io/server/service/history/configs"
Expand Down Expand Up @@ -817,6 +819,12 @@ func (s *mutableStateSuite) buildWorkflowMutableState() *persistencespb.Workflow
},
},
},
TransitionHistory: []*persistencespb.VersionedTransition{
{
NamespaceFailoverVersion: failoverVersion,
MaxTransitionCount: 1024,
},
},
}

state := &persistencespb.WorkflowExecutionState{
Expand Down Expand Up @@ -1406,6 +1414,155 @@ func (s *mutableStateSuite) TestRolloverAutoResetPointsWithExpiringTime() {
s.Equal(expected, newPoints.Points)
}

func (s *mutableStateSuite) TestCloseTransactionUpdateTransition() {
namespaceEntry := tests.GlobalNamespaceEntry

completWorkflowTaskFn := func(ms MutableState) {
workflowTaskInfo := ms.GetStartedWorkflowTask()
_, err := ms.AddWorkflowTaskCompletedEvent(
workflowTaskInfo,
&workflowservice.RespondWorkflowTaskCompletedRequest{},
WorkflowTaskCompletionLimits{
MaxResetPoints: 10,
MaxSearchAttributeValueSize: 1024,
},
)
s.NoError(err)
}

testCases := []struct {
name string
dbStateMutationFn func(dbState *persistencespb.WorkflowMutableState)
txFunc func(ms MutableState) (*persistencespb.WorkflowExecutionInfo, error)
versionedTransitionUpdated bool
}{
{
name: "CloseTranstionAsPassive",
dbStateMutationFn: func(dbState *persistencespb.WorkflowMutableState) {
dbState.BufferedEvents = nil
},
txFunc: func(ms MutableState) (*persistencespb.WorkflowExecutionInfo, error) {
completWorkflowTaskFn(ms)

mutation, _, err := ms.CloseTransactionAsMutation(TransactionPolicyPassive)
if err != nil {
return nil, err
}
return mutation.ExecutionInfo, err
},
versionedTransitionUpdated: false,
},
{
name: "CloseTransactionAsMutation_HistoryEvents",
dbStateMutationFn: func(dbState *persistencespb.WorkflowMutableState) {
dbState.BufferedEvents = nil
},
txFunc: func(ms MutableState) (*persistencespb.WorkflowExecutionInfo, error) {
completWorkflowTaskFn(ms)

mutation, _, err := ms.CloseTransactionAsMutation(TransactionPolicyActive)
if err != nil {
return nil, err
}
return mutation.ExecutionInfo, err
},
versionedTransitionUpdated: true,
},
{
name: "CloseTransactionAsMutation_SyncActivity",
dbStateMutationFn: func(dbState *persistencespb.WorkflowMutableState) {
dbState.BufferedEvents = nil
},
txFunc: func(ms MutableState) (*persistencespb.WorkflowExecutionInfo, error) {
for _, ai := range ms.GetPendingActivityInfos() {
ms.UpdateActivityProgress(ai, &workflowservice.RecordActivityTaskHeartbeatRequest{})
break
}

mutation, _, err := ms.CloseTransactionAsMutation(TransactionPolicyActive)
if err != nil {
return nil, err
}
return mutation.ExecutionInfo, err
},
versionedTransitionUpdated: true,
},
{
name: "CloseTransactionAsMutation_DirtyStateMachine",
dbStateMutationFn: func(dbState *persistencespb.WorkflowMutableState) {
dbState.BufferedEvents = nil
},
txFunc: func(ms MutableState) (*persistencespb.WorkflowExecutionInfo, error) {
root := ms.HSM()
err := hsm.MachineTransition(root, func(*MutableStateImpl) (hsm.TransitionOutput, error) {
return hsm.TransitionOutput{}, nil
})
s.NoError(err)
s.True(root.Dirty())

mutation, _, err := ms.CloseTransactionAsMutation(TransactionPolicyActive)
if err != nil {
return nil, err
}
return mutation.ExecutionInfo, err
},
versionedTransitionUpdated: true,
},
{
name: "CloseTransactionAsSnapshot",
dbStateMutationFn: func(dbState *persistencespb.WorkflowMutableState) {
dbState.BufferedEvents = nil
},
txFunc: func(ms MutableState) (*persistencespb.WorkflowExecutionInfo, error) {
completWorkflowTaskFn(ms)

mutation, _, err := ms.CloseTransactionAsSnapshot(TransactionPolicyActive)
if err != nil {
return nil, err
}
return mutation.ExecutionInfo, err
},
versionedTransitionUpdated: true,
},
// TODO: add a test for flushing buffered events using last event version after current version change is landed.
}

for _, tc := range testCases {
s.T().Run(tc.name, func(t *testing.T) {

dbState := s.buildWorkflowMutableState()
if tc.dbStateMutationFn != nil {
tc.dbStateMutationFn(dbState)
}

var err error
s.mutableState, err = NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, namespaceEntry, dbState, 123)
s.NoError(err)
err = s.mutableState.UpdateCurrentVersion(namespaceEntry.FailoverVersion(), false)
s.NoError(err)
// TODO: remove following line after CurrentVersion change is landed.
s.mutableState.currentTransactionNamespaceFailoverVersion = namespaceEntry.FailoverVersion()

s.mockShard.Resource.ClusterMetadata.EXPECT().ClusterNameForFailoverVersion(
namespaceEntry.IsGlobalNamespace(),
namespaceEntry.FailoverVersion(),
).Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()

expectedTransitionHistory := s.mutableState.executionInfo.TransitionHistory
if tc.versionedTransitionUpdated {
expectedTransitionHistory = UpdatedTransitionHistory(expectedTransitionHistory, namespaceEntry.FailoverVersion())
}

execInfo, err := tc.txFunc(s.mutableState)
s.Nil(err)

protorequire.ProtoSliceEqual(t, expectedTransitionHistory, execInfo.TransitionHistory)
})
}

}

func (s *mutableStateSuite) getBuildIdsFromMutableState() []string {
payload, found := s.mutableState.executionInfo.SearchAttributes[searchattribute.BuildIds]
if !found {
Expand Down

0 comments on commit 50bd1f7

Please sign in to comment.