Skip to content

Commit

Permalink
Add prev transaction ID attribute to history node table for events ch…
Browse files Browse the repository at this point in the history
…aining (#1435)

* New last_history_node_txn_id within execution info tracking the last valid history node
* New prev_txn_id within history node table tracking previous valid history node record
  • Loading branch information
wxing1292 committed Apr 6, 2021
1 parent b536952 commit 394542b
Show file tree
Hide file tree
Showing 26 changed files with 350 additions and 248 deletions.
438 changes: 241 additions & 197 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

33 changes: 26 additions & 7 deletions common/persistence/cassandra/cassandraHistoryPersistence.go
Expand Up @@ -43,10 +43,10 @@ import (
const (
// below are templates for history_node table
v2templateUpsertData = `INSERT INTO history_node (` +
`tree_id, branch_id, node_id, txn_id, data, data_encoding) ` +
`VALUES (?, ?, ?, ?, ?, ?) `
`tree_id, branch_id, node_id, prev_txn_id, txn_id, data, data_encoding) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?) `

v2templateReadData = `SELECT node_id, txn_id, data, data_encoding FROM history_node ` +
v2templateReadData = `SELECT node_id, prev_txn_id, txn_id, data, data_encoding FROM history_node ` +
`WHERE tree_id = ? AND branch_id = ? AND node_id >= ? AND node_id < ? `

v2templateRangeDeleteData = `DELETE FROM history_node WHERE tree_id = ? AND branch_id = ? AND node_id >= ? `
Expand Down Expand Up @@ -108,7 +108,14 @@ func (h *cassandraHistoryV2Persistence) AppendHistoryNodes(

if !request.IsNewBranch {
query := h.session.Query(v2templateUpsertData,
branchInfo.TreeId, branchInfo.BranchId, request.NodeID, request.TransactionID, request.Events.Data, request.Events.EncodingType.String())
branchInfo.TreeId,
branchInfo.BranchId,
request.NodeID,
request.PrevTransactionID,
request.TransactionID,
request.Events.Data,
request.Events.EncodingType.String(),
)
if err := query.Exec(); err != nil {
return gocql.ConvertError("AppendHistoryNodes", err)
}
Expand All @@ -129,9 +136,20 @@ func (h *cassandraHistoryV2Persistence) AppendHistoryNodes(

batch := h.session.NewBatch(gocql.LoggedBatch)
batch.Query(v2templateInsertTree,
branchInfo.TreeId, branchInfo.BranchId, treeInfoDataBlob.Data, treeInfoDataBlob.EncodingType.String())
branchInfo.TreeId,
branchInfo.BranchId,
treeInfoDataBlob.Data,
treeInfoDataBlob.EncodingType.String(),
)
batch.Query(v2templateUpsertData,
branchInfo.TreeId, branchInfo.BranchId, request.NodeID, request.TransactionID, request.Events.Data, request.Events.EncodingType.String())
branchInfo.TreeId,
branchInfo.BranchId,
request.NodeID,
request.PrevTransactionID,
request.TransactionID,
request.Events.Data,
request.Events.EncodingType.String(),
)
if err = h.session.ExecuteBatch(batch); err != nil {
return gocql.ConvertError("AppendHistoryNodes", err)
}
Expand Down Expand Up @@ -171,8 +189,9 @@ func (h *cassandraHistoryV2Persistence) ReadHistoryBranch(
var data []byte
var encoding string
nodeID := int64(0)
prevTxnID := int64(0)
txnID := int64(0)
if !iter.Scan(&nodeID, &txnID, &data, &encoding) {
if !iter.Scan(&nodeID, &prevTxnID, &txnID, &data, &encoding) {
break
}
if txnID < lastTxnID {
Expand Down
4 changes: 4 additions & 0 deletions common/persistence/dataInterfaces.go
Expand Up @@ -531,6 +531,8 @@ type (
WorkflowID string
RunID string
BranchToken []byte
PrevTxnID int64
TxnID int64
Events []*historypb.HistoryEvent
}

Expand Down Expand Up @@ -990,6 +992,8 @@ type (
BranchToken []byte
// The batch of events to be appended. The first eventID will become the nodeID of this batch
Events []*historypb.HistoryEvent
// TransactionID for events before these events. For events chaining
PrevTransactionID int64
// requested TransactionID for this write operation. For the same eventID, the node with larger TransactionID always wins
TransactionID int64
}
Expand Down
15 changes: 8 additions & 7 deletions common/persistence/historyStore.go
Expand Up @@ -203,13 +203,14 @@ func (m *historyV2ManagerImpl) AppendHistoryNodes(
}

req := &InternalAppendHistoryNodesRequest{
IsNewBranch: request.IsNewBranch,
Info: request.Info,
BranchInfo: branch,
NodeID: nodeID,
Events: blob,
TransactionID: request.TransactionID,
ShardID: request.ShardID,
IsNewBranch: request.IsNewBranch,
Info: request.Info,
BranchInfo: branch,
NodeID: nodeID,
Events: blob,
PrevTransactionID: request.PrevTransactionID,
TransactionID: request.TransactionID,
ShardID: request.ShardID,
}

err = m.persistence.AppendHistoryNodes(req)
Expand Down
4 changes: 3 additions & 1 deletion common/persistence/persistenceInterface.go
Expand Up @@ -340,7 +340,9 @@ type (
NodeID int64
// The events to be appended
Events *commonpb.DataBlob
// Requested TransactionID for conditional update
// TransactionID for events before these events. For events chaining
PrevTransactionID int64
// requested TransactionID for this write operation. For the same eventID, the node with larger TransactionID always wins
TransactionID int64
// Used in sharded data stores to identify which shard to use
ShardID int32
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/sql/sqlHistoryManager.go
Expand Up @@ -87,6 +87,7 @@ func (m *sqlHistoryV2Manager) AppendHistoryNodes(
TreeID: treeIDBytes,
BranchID: branchIDBytes,
NodeID: request.NodeID,
PrevTxnID: request.PrevTransactionID,
TxnID: request.TransactionID,
Data: request.Events.Data,
DataEncoding: request.Events.EncodingType.String(),
Expand Down Expand Up @@ -199,7 +200,6 @@ func (m *sqlHistoryV2Manager) ReadHistoryBranch(

for _, row := range rows {
eventBlob := p.NewDataBlob(row.Data, row.DataEncoding)

if row.TxnID < lastTxnID {
// assuming that business logic layer is correct and transaction ID only increase
// thus, valid event batch will come with increasing transaction ID
Expand Down
1 change: 1 addition & 0 deletions common/persistence/sql/sqlplugin/history_node.go
Expand Up @@ -38,6 +38,7 @@ type (
TreeID primitives.UUID
BranchID primitives.UUID
NodeID int64
PrevTxnID int64
TxnID int64
Data []byte
DataEncoding string
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/sql/sqlplugin/mysql/events.go
Expand Up @@ -34,10 +34,10 @@ import (
const (
// below are templates for history_node table
addHistoryNodesQuery = `INSERT INTO history_node (` +
`shard_id, tree_id, branch_id, node_id, txn_id, data, data_encoding) ` +
`VALUES (:shard_id, :tree_id, :branch_id, :node_id, :txn_id, :data, :data_encoding) `
`shard_id, tree_id, branch_id, node_id, prev_txn_id, txn_id, data, data_encoding) ` +
`VALUES (:shard_id, :tree_id, :branch_id, :node_id, :prev_txn_id, :txn_id, :data, :data_encoding) `

getHistoryNodesQuery = `SELECT node_id, txn_id, data, data_encoding FROM history_node ` +
getHistoryNodesQuery = `SELECT node_id, prev_txn_id, txn_id, data, data_encoding FROM history_node ` +
`WHERE shard_id = ? AND tree_id = ? AND branch_id = ? AND node_id >= ? and node_id < ? ORDER BY shard_id, tree_id, branch_id, node_id, txn_id LIMIT ? `

deleteHistoryNodesQuery = `DELETE FROM history_node WHERE shard_id = ? AND tree_id = ? AND branch_id = ? AND node_id >= ? `
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/sql/sqlplugin/postgresql/events.go
Expand Up @@ -34,10 +34,10 @@ import (
const (
// below are templates for history_node table
addHistoryNodesQuery = `INSERT INTO history_node (` +
`shard_id, tree_id, branch_id, node_id, txn_id, data, data_encoding) ` +
`VALUES (:shard_id, :tree_id, :branch_id, :node_id, :txn_id, :data, :data_encoding) `
`shard_id, tree_id, branch_id, node_id, prev_txn_id, txn_id, data, data_encoding) ` +
`VALUES (:shard_id, :tree_id, :branch_id, :node_id, :prev_txn_id, :txn_id, :data, :data_encoding) `

getHistoryNodesQuery = `SELECT node_id, txn_id, data, data_encoding FROM history_node ` +
getHistoryNodesQuery = `SELECT node_id, prev_txn_id, txn_id, data, data_encoding FROM history_node ` +
`WHERE shard_id = $1 AND tree_id = $2 AND branch_id = $3 AND node_id >= $4 and node_id < $5 ORDER BY shard_id, tree_id, branch_id, node_id, txn_id LIMIT $6 `

deleteHistoryNodesQuery = `DELETE FROM history_node WHERE shard_id = $1 AND tree_id = $2 AND branch_id = $3 AND node_id >= $4 `
Expand Down
20 changes: 13 additions & 7 deletions common/persistence/sql/sqlplugin/tests/history_node_test.go
Expand Up @@ -86,9 +86,10 @@ func (s *historyNodeSuite) TestInsert_Success() {
treeID := primitives.NewUUID()
branchID := primitives.NewUUID()
nodeID := rand.Int63()
prevTransactionID := rand.Int63()
transactionID := rand.Int63()

node := s.newRandomNodeRow(shardID, treeID, branchID, nodeID, transactionID)
node := s.newRandomNodeRow(shardID, treeID, branchID, nodeID, prevTransactionID, transactionID)
result, err := s.store.InsertIntoHistoryNode(newExecutionContext(), &node)
s.NoError(err)
rowsAffected, err := result.RowsAffected()
Expand All @@ -101,16 +102,17 @@ func (s *historyNodeSuite) TestInsert_Fail_Duplicate() {
treeID := primitives.NewUUID()
branchID := primitives.NewUUID()
nodeID := rand.Int63()
prevTransactionID := rand.Int63()
transactionID := rand.Int63()

node := s.newRandomNodeRow(shardID, treeID, branchID, nodeID, transactionID)
node := s.newRandomNodeRow(shardID, treeID, branchID, nodeID, prevTransactionID, transactionID)
result, err := s.store.InsertIntoHistoryNode(newExecutionContext(), &node)
s.NoError(err)
rowsAffected, err := result.RowsAffected()
s.NoError(err)
s.Equal(1, int(rowsAffected))

node = s.newRandomNodeRow(shardID, treeID, branchID, nodeID, transactionID)
node = s.newRandomNodeRow(shardID, treeID, branchID, nodeID, prevTransactionID, transactionID)
_, err = s.store.InsertIntoHistoryNode(newExecutionContext(), &node)
s.Error(err) // TODO persistence layer should do proper error translation
}
Expand All @@ -122,9 +124,10 @@ func (s *historyNodeSuite) TestInsertSelect_Single() {
treeID := primitives.NewUUID()
branchID := primitives.NewUUID()
nodeID := int64(1)
prevTransactionID := rand.Int63()
transactionID := rand.Int63()

node := s.newRandomNodeRow(shardID, treeID, branchID, nodeID, transactionID)
node := s.newRandomNodeRow(shardID, treeID, branchID, nodeID, prevTransactionID, transactionID)
result, err := s.store.InsertIntoHistoryNode(newExecutionContext(), &node)
s.NoError(err)
rowsAffected, err := result.RowsAffected()
Expand Down Expand Up @@ -167,7 +170,7 @@ func (s *historyNodeSuite) TestInsertSelect_Multiple() {
var nodes []sqlplugin.HistoryNodeRow
for i := 0; i < numNodeIDs; i++ {
for j := 0; j < nodePerNodeID; j++ {
node := s.newRandomNodeRow(shardID, treeID, branchID, nodeID, rand.Int63())
node := s.newRandomNodeRow(shardID, treeID, branchID, nodeID, rand.Int63(), rand.Int63())
result, err := s.store.InsertIntoHistoryNode(newExecutionContext(), &node)
s.NoError(err)
rowsAffected, err := result.RowsAffected()
Expand Down Expand Up @@ -276,9 +279,10 @@ func (s *historyNodeSuite) TestInsertDeleteSelect_Single() {
treeID := primitives.NewUUID()
branchID := primitives.NewUUID()
nodeID := int64(1)
prevTransactionID := rand.Int63()
transactionID := rand.Int63()

node := s.newRandomNodeRow(shardID, treeID, branchID, nodeID, transactionID)
node := s.newRandomNodeRow(shardID, treeID, branchID, nodeID, prevTransactionID, transactionID)
result, err := s.store.InsertIntoHistoryNode(newExecutionContext(), &node)
s.NoError(err)
rowsAffected, err := result.RowsAffected()
Expand Down Expand Up @@ -330,7 +334,7 @@ func (s *historyNodeSuite) TestInsertDeleteSelect_Multiple() {
var nodes []sqlplugin.HistoryNodeRow
for i := 0; i < numNodeIDs; i++ {
for j := 0; j < nodePerNodeID; j++ {
node := s.newRandomNodeRow(shardID, treeID, branchID, nodeID, rand.Int63())
node := s.newRandomNodeRow(shardID, treeID, branchID, nodeID, rand.Int63(), rand.Int63())
result, err := s.store.InsertIntoHistoryNode(newExecutionContext(), &node)
s.NoError(err)
rowsAffected, err := result.RowsAffected()
Expand Down Expand Up @@ -376,13 +380,15 @@ func (s *historyNodeSuite) newRandomNodeRow(
treeID primitives.UUID,
branchID primitives.UUID,
nodeID int64,
prevTransactionID int64,
transactionID int64,
) sqlplugin.HistoryNodeRow {
return sqlplugin.HistoryNodeRow{
ShardID: shardID,
TreeID: treeID,
BranchID: branchID,
NodeID: nodeID,
PrevTxnID: prevTransactionID,
TxnID: transactionID,
Data: shuffle.Bytes(testHistoryNodeData),
DataEncoding: testHistoryNodeEncoding,
Expand Down
Expand Up @@ -120,6 +120,7 @@ message WorkflowExecutionInfo {
string first_execution_run_id = 55;
ExecutionStats execution_stats = 56;
google.protobuf.Timestamp workflow_run_expiration_time = 57 [(gogoproto.stdtime) = true];
int64 last_history_node_txn_id = 58;
}

message ExecutionStats {
Expand Down
1 change: 1 addition & 0 deletions schema/cassandra/temporal/schema.cql
Expand Up @@ -56,6 +56,7 @@ CREATE TABLE history_node (
branch_id uuid,
node_id bigint, -- node_id: first eventID in a batch of events
txn_id bigint, -- for override the same node_id: bigger txn_id wins
prev_txn_id bigint, -- pointing to the previous node: event chaining
data blob, -- Batch of workflow execution history events as a blob
data_encoding text, -- Protocol used for history serialization
PRIMARY KEY ((tree_id), branch_id, node_id, txn_id )
Expand Down
1 change: 1 addition & 0 deletions schema/cassandra/temporal/versioned/v1.5/event.cql
@@ -0,0 +1 @@
ALTER TABLE history_node ADD prev_txn_id bigint;
3 changes: 2 additions & 1 deletion schema/cassandra/temporal/versioned/v1.5/manifest.json
@@ -1,8 +1,9 @@
{
"CurrVersion": "1.5",
"MinCompatibleVersion": "1.0",
"Description": "schema update for execution table version",
"Description": "schema update for execution & history node table version",
"SchemaUpdateCqlFiles": [
"event.cql",
"executions.cql"
]
}
1 change: 1 addition & 0 deletions schema/mysql/v57/temporal/schema.sql
Expand Up @@ -215,6 +215,7 @@ CREATE TABLE history_node (
node_id BIGINT NOT NULL,
txn_id BIGINT NOT NULL,
--
prev_txn_id BIGINT NOT NULL DEFAULT 0,
data MEDIUMBLOB NOT NULL,
data_encoding VARCHAR(16) NOT NULL,
PRIMARY KEY (shard_id, tree_id, branch_id, node_id, txn_id)
Expand Down
1 change: 1 addition & 0 deletions schema/mysql/v57/temporal/versioned/v1.5/event.sql
@@ -0,0 +1 @@
ALTER TABLE history_node ADD prev_txn_id BIGINT NOT NULL DEFAULT 0;
3 changes: 2 additions & 1 deletion schema/mysql/v57/temporal/versioned/v1.5/manifest.json
@@ -1,8 +1,9 @@
{
"CurrVersion": "1.5",
"MinCompatibleVersion": "1.0",
"Description": "schema update for execution table version",
"Description": "schema update for execution & history node table version",
"SchemaUpdateCqlFiles": [
"event.sql",
"executions.sql"
]
}
1 change: 1 addition & 0 deletions schema/postgresql/v96/temporal/schema.sql
Expand Up @@ -215,6 +215,7 @@ CREATE TABLE history_node (
node_id BIGINT NOT NULL,
txn_id BIGINT NOT NULL,
--
prev_txn_id BIGINT NOT NULL DEFAULT 0,
data BYTEA NOT NULL,
data_encoding VARCHAR(16) NOT NULL,
PRIMARY KEY (shard_id, tree_id, branch_id, node_id, txn_id)
Expand Down
1 change: 1 addition & 0 deletions schema/postgresql/v96/temporal/versioned/v1.5/event.sql
@@ -0,0 +1 @@
ALTER TABLE history_node ADD prev_txn_id BIGINT NOT NULL DEFAULT 0;
3 changes: 2 additions & 1 deletion schema/postgresql/v96/temporal/versioned/v1.5/manifest.json
@@ -1,8 +1,9 @@
{
"CurrVersion": "1.5",
"MinCompatibleVersion": "1.0",
"Description": "schema update for execution table version",
"Description": "schema update for execution & history node table version",
"SchemaUpdateCqlFiles": [
"event.sql",
"executions.sql"
]
}
11 changes: 10 additions & 1 deletion service/history/mutableStateBuilder.go
Expand Up @@ -3874,14 +3874,22 @@ func (e *mutableStateBuilder) prepareEventsAndReplicationTasks(
e.updatePendingEventIDs(historyMutation.ScheduleIDToStartID)

workflowEventsSeq := make([]*persistence.WorkflowEvents, len(newEventsBatches))
historyNodeTxnIDs, err := e.shard.GenerateTransferTaskIDs(len(newEventsBatches))
if err != nil {
return nil, nil, false, err
}
for index, eventBatch := range newEventsBatches {
workflowEventsSeq[index] = &persistence.WorkflowEvents{
NamespaceID: e.executionInfo.NamespaceId,
WorkflowID: e.executionInfo.WorkflowId,
RunID: e.executionState.RunId,
BranchToken: currentBranchToken,
PrevTxnID: e.executionInfo.LastHistoryNodeTxnId,
TxnID: historyNodeTxnIDs[index],
Events: eventBatch,
}
e.GetExecutionInfo().LastEventTaskId = eventBatch[len(eventBatch)-1].GetTaskId()
e.executionInfo.LastHistoryNodeTxnId = historyNodeTxnIDs[index]
}

if err := e.validateNoEventsAfterWorkflowFinish(
Expand Down Expand Up @@ -3978,10 +3986,12 @@ Loop:
for scheduleID, startID := range scheduleIDToStartID {
if activityInfo, ok := e.GetActivityInfo(scheduleID); ok {
activityInfo.StartedId = startID
e.updateActivityInfos[activityInfo.ScheduleId] = activityInfo
continue Loop
}
if childInfo, ok := e.GetChildExecutionInfo(scheduleID); ok {
childInfo.StartedId = startID
e.updateChildExecutionInfos[childInfo.InitiatedId] = childInfo
continue Loop
}
}
Expand All @@ -3996,7 +4006,6 @@ func (e *mutableStateBuilder) updateWithLastWriteEvent(
// already handled in state builder
return nil
}
e.GetExecutionInfo().LastEventTaskId = lastEvent.GetTaskId()

if e.executionInfo.VersionHistories != nil {
currentVersionHistory, err := versionhistory.GetCurrentVersionHistory(e.executionInfo.VersionHistories)
Expand Down
7 changes: 7 additions & 0 deletions service/history/nDCHistoryReplicator.go
Expand Up @@ -536,6 +536,11 @@ func (r *nDCHistoryReplicatorImpl) applyNonStartEventsToNoneCurrentBranchWithout
return err
}

transactionID, err := r.shard.GenerateTransferTaskID()
if err != nil {
return err
}

err = r.transactionMgr.backfillWorkflow(
ctx,
task.getEventTime(),
Expand All @@ -552,6 +557,8 @@ func (r *nDCHistoryReplicatorImpl) applyNonStartEventsToNoneCurrentBranchWithout
WorkflowID: task.getExecution().GetWorkflowId(),
RunID: task.getExecution().GetRunId(),
BranchToken: versionHistory.GetBranchToken(),
PrevTxnID: 0, // TODO @wxing1292 events chaining will not work for backfill case
TxnID: transactionID,
Events: task.getEvents(),
},
)
Expand Down

0 comments on commit 394542b

Please sign in to comment.