Skip to content

Commit

Permalink
Add test
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 committed Apr 6, 2021
1 parent 067de41 commit 1d72fe6
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 207 deletions.
438 changes: 241 additions & 197 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ type (
RunID string
BranchToken []byte
PrevTxnID int64
TxnID int64
Events []*historypb.HistoryEvent
}

Expand Down
20 changes: 13 additions & 7 deletions common/persistence/sql/sqlplugin/tests/history_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,10 @@ func (s *historyNodeSuite) TestInsert_Success() {
treeID := primitives.NewUUID()
branchID := primitives.NewUUID()
nodeID := rand.Int63()
prevTransactionID := rand.Int63()
transactionID := rand.Int63()

node := s.newRandomNodeRow(shardID, treeID, branchID, nodeID, transactionID)
node := s.newRandomNodeRow(shardID, treeID, branchID, nodeID, prevTransactionID, transactionID)
result, err := s.store.InsertIntoHistoryNode(newExecutionContext(), &node)
s.NoError(err)
rowsAffected, err := result.RowsAffected()
Expand All @@ -101,16 +102,17 @@ func (s *historyNodeSuite) TestInsert_Fail_Duplicate() {
treeID := primitives.NewUUID()
branchID := primitives.NewUUID()
nodeID := rand.Int63()
prevTransactionID := rand.Int63()
transactionID := rand.Int63()

node := s.newRandomNodeRow(shardID, treeID, branchID, nodeID, transactionID)
node := s.newRandomNodeRow(shardID, treeID, branchID, nodeID, prevTransactionID, transactionID)
result, err := s.store.InsertIntoHistoryNode(newExecutionContext(), &node)
s.NoError(err)
rowsAffected, err := result.RowsAffected()
s.NoError(err)
s.Equal(1, int(rowsAffected))

node = s.newRandomNodeRow(shardID, treeID, branchID, nodeID, transactionID)
node = s.newRandomNodeRow(shardID, treeID, branchID, nodeID, prevTransactionID, transactionID)
_, err = s.store.InsertIntoHistoryNode(newExecutionContext(), &node)
s.Error(err) // TODO persistence layer should do proper error translation
}
Expand All @@ -122,9 +124,10 @@ func (s *historyNodeSuite) TestInsertSelect_Single() {
treeID := primitives.NewUUID()
branchID := primitives.NewUUID()
nodeID := int64(1)
prevTransactionID := rand.Int63()
transactionID := rand.Int63()

node := s.newRandomNodeRow(shardID, treeID, branchID, nodeID, transactionID)
node := s.newRandomNodeRow(shardID, treeID, branchID, nodeID, prevTransactionID, transactionID)
result, err := s.store.InsertIntoHistoryNode(newExecutionContext(), &node)
s.NoError(err)
rowsAffected, err := result.RowsAffected()
Expand Down Expand Up @@ -167,7 +170,7 @@ func (s *historyNodeSuite) TestInsertSelect_Multiple() {
var nodes []sqlplugin.HistoryNodeRow
for i := 0; i < numNodeIDs; i++ {
for j := 0; j < nodePerNodeID; j++ {
node := s.newRandomNodeRow(shardID, treeID, branchID, nodeID, rand.Int63())
node := s.newRandomNodeRow(shardID, treeID, branchID, nodeID, rand.Int63(), rand.Int63())
result, err := s.store.InsertIntoHistoryNode(newExecutionContext(), &node)
s.NoError(err)
rowsAffected, err := result.RowsAffected()
Expand Down Expand Up @@ -276,9 +279,10 @@ func (s *historyNodeSuite) TestInsertDeleteSelect_Single() {
treeID := primitives.NewUUID()
branchID := primitives.NewUUID()
nodeID := int64(1)
prevTransactionID := rand.Int63()
transactionID := rand.Int63()

node := s.newRandomNodeRow(shardID, treeID, branchID, nodeID, transactionID)
node := s.newRandomNodeRow(shardID, treeID, branchID, nodeID, prevTransactionID, transactionID)
result, err := s.store.InsertIntoHistoryNode(newExecutionContext(), &node)
s.NoError(err)
rowsAffected, err := result.RowsAffected()
Expand Down Expand Up @@ -330,7 +334,7 @@ func (s *historyNodeSuite) TestInsertDeleteSelect_Multiple() {
var nodes []sqlplugin.HistoryNodeRow
for i := 0; i < numNodeIDs; i++ {
for j := 0; j < nodePerNodeID; j++ {
node := s.newRandomNodeRow(shardID, treeID, branchID, nodeID, rand.Int63())
node := s.newRandomNodeRow(shardID, treeID, branchID, nodeID, rand.Int63(), rand.Int63())
result, err := s.store.InsertIntoHistoryNode(newExecutionContext(), &node)
s.NoError(err)
rowsAffected, err := result.RowsAffected()
Expand Down Expand Up @@ -376,13 +380,15 @@ func (s *historyNodeSuite) newRandomNodeRow(
treeID primitives.UUID,
branchID primitives.UUID,
nodeID int64,
prevTransactionID int64,
transactionID int64,
) sqlplugin.HistoryNodeRow {
return sqlplugin.HistoryNodeRow{
ShardID: shardID,
TreeID: treeID,
BranchID: branchID,
NodeID: nodeID,
PrevTxnID: prevTransactionID,
TxnID: transactionID,
Data: shuffle.Bytes(testHistoryNodeData),
DataEncoding: testHistoryNodeEncoding,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ message WorkflowExecutionInfo {
string first_execution_run_id = 55;
ExecutionStats execution_stats = 56;
google.protobuf.Timestamp workflow_run_expiration_time = 57 [(gogoproto.stdtime) = true];
int64 last_history_node_txn_id = 58;
}

message ExecutionStats {
Expand Down
8 changes: 7 additions & 1 deletion service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3874,16 +3874,22 @@ func (e *mutableStateBuilder) prepareEventsAndReplicationTasks(
e.updatePendingEventIDs(historyMutation.ScheduleIDToStartID)

workflowEventsSeq := make([]*persistence.WorkflowEvents, len(newEventsBatches))
historyNodeTxnIDs, err := e.shard.GenerateTransferTaskIDs(len(newEventsBatches))
if err != nil {
return nil, nil, false, err
}
for index, eventBatch := range newEventsBatches {
workflowEventsSeq[index] = &persistence.WorkflowEvents{
NamespaceID: e.executionInfo.NamespaceId,
WorkflowID: e.executionInfo.WorkflowId,
RunID: e.executionState.RunId,
BranchToken: currentBranchToken,
PrevTxnID: e.executionInfo.LastEventTaskId,
PrevTxnID: e.executionInfo.LastHistoryNodeTxnId,
TxnID: historyNodeTxnIDs[index],
Events: eventBatch,
}
e.GetExecutionInfo().LastEventTaskId = eventBatch[len(eventBatch)-1].GetTaskId()
e.executionInfo.LastHistoryNodeTxnId = historyNodeTxnIDs[index]
}

if err := e.validateNoEventsAfterWorkflowFinish(
Expand Down
2 changes: 2 additions & 0 deletions service/history/nDCHistoryReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,8 @@ func (r *nDCHistoryReplicatorImpl) applyNonStartEventsToNoneCurrentBranchWithout
WorkflowID: task.getExecution().GetWorkflowId(),
RunID: task.getExecution().GetRunId(),
BranchToken: versionHistory.GetBranchToken(),
PrevTxnID: 0, // TODO @wxing1292 events chaining will not work for backfill case
TxnID: 0, // TODO @wxing1292 events chaining will not work for backfill case
Events: task.getEvents(),
},
)
Expand Down
1 change: 1 addition & 0 deletions service/history/timerQueueStandbyTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Multiple
mutableState.executionInfo.LastUpdateTime = input.UpdateWorkflowMutation.ExecutionInfo.LastUpdateTime
input.RangeID = 0
input.UpdateWorkflowMutation.ExecutionInfo.LastEventTaskId = 0
input.UpdateWorkflowMutation.ExecutionInfo.LastHistoryNodeTxnId = 0
mutableState.executionInfo.LastEventTaskId = 0
mutableState.executionInfo.WorkflowTaskOriginalScheduledTime = input.UpdateWorkflowMutation.ExecutionInfo.WorkflowTaskOriginalScheduledTime
mutableState.executionInfo.ExecutionStats = &persistencespb.ExecutionStats{}
Expand Down
6 changes: 4 additions & 2 deletions service/history/workflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,7 @@ func (c *workflowExecutionContextImpl) persistFirstWorkflowEvents(
branchToken := workflowEvents.BranchToken
events := workflowEvents.Events
prevTxnID := workflowEvents.PrevTxnID
txnID := workflowEvents.TxnID

size, err := c.appendHistoryV2EventsWithRetry(
namespaceID,
Expand All @@ -908,7 +909,7 @@ func (c *workflowExecutionContextImpl) persistFirstWorkflowEvents(
BranchToken: branchToken,
Events: events,
PrevTransactionID: prevTxnID,
TransactionID: events[len(events)-1].GetTaskId(),
TransactionID: txnID,
},
)
return size, err
Expand All @@ -930,6 +931,7 @@ func (c *workflowExecutionContextImpl) persistNonFirstWorkflowEvents(
branchToken := workflowEvents.BranchToken
events := workflowEvents.Events
prevTxnID := workflowEvents.PrevTxnID
txnID := workflowEvents.TxnID

size, err := c.appendHistoryV2EventsWithRetry(
namespaceID,
Expand All @@ -939,7 +941,7 @@ func (c *workflowExecutionContextImpl) persistNonFirstWorkflowEvents(
BranchToken: branchToken,
Events: events,
PrevTransactionID: prevTxnID,
TransactionID: events[len(events)-1].GetTaskId(),
TransactionID: txnID,
},
)
return size, err
Expand Down

0 comments on commit 1d72fe6

Please sign in to comment.