Skip to content

Commit

Permalink
Chain events
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 committed Apr 6, 2021
1 parent 32c3d9f commit cdbc969
Show file tree
Hide file tree
Showing 12 changed files with 63 additions and 41 deletions.
33 changes: 26 additions & 7 deletions common/persistence/cassandra/cassandraHistoryPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ import (
const (
// below are templates for history_node table
v2templateUpsertData = `INSERT INTO history_node (` +
`tree_id, branch_id, node_id, txn_id, data, data_encoding) ` +
`VALUES (?, ?, ?, ?, ?, ?) `
`tree_id, branch_id, node_id, prev_txn_id, txn_id, data, data_encoding) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?) `

v2templateReadData = `SELECT node_id, txn_id, data, data_encoding FROM history_node ` +
v2templateReadData = `SELECT node_id, prev_txn_id, txn_id, data, data_encoding FROM history_node ` +
`WHERE tree_id = ? AND branch_id = ? AND node_id >= ? AND node_id < ? `

v2templateRangeDeleteData = `DELETE FROM history_node WHERE tree_id = ? AND branch_id = ? AND node_id >= ? `
Expand Down Expand Up @@ -108,7 +108,14 @@ func (h *cassandraHistoryV2Persistence) AppendHistoryNodes(

if !request.IsNewBranch {
query := h.session.Query(v2templateUpsertData,
branchInfo.TreeId, branchInfo.BranchId, request.NodeID, request.TransactionID, request.Events.Data, request.Events.EncodingType.String())
branchInfo.TreeId,
branchInfo.BranchId,
request.NodeID,
request.PrevTransactionID,
request.TransactionID,
request.Events.Data,
request.Events.EncodingType.String(),
)
if err := query.Exec(); err != nil {
return gocql.ConvertError("AppendHistoryNodes", err)
}
Expand All @@ -129,9 +136,20 @@ func (h *cassandraHistoryV2Persistence) AppendHistoryNodes(

batch := h.session.NewBatch(gocql.LoggedBatch)
batch.Query(v2templateInsertTree,
branchInfo.TreeId, branchInfo.BranchId, treeInfoDataBlob.Data, treeInfoDataBlob.EncodingType.String())
branchInfo.TreeId,
branchInfo.BranchId,
treeInfoDataBlob.Data,
treeInfoDataBlob.EncodingType.String(),
)
batch.Query(v2templateUpsertData,
branchInfo.TreeId, branchInfo.BranchId, request.NodeID, request.TransactionID, request.Events.Data, request.Events.EncodingType.String())
branchInfo.TreeId,
branchInfo.BranchId,
request.NodeID,
request.PrevTransactionID,
request.TransactionID,
request.Events.Data,
request.Events.EncodingType.String(),
)
if err = h.session.ExecuteBatch(batch); err != nil {
return gocql.ConvertError("AppendHistoryNodes", err)
}
Expand Down Expand Up @@ -171,8 +189,9 @@ func (h *cassandraHistoryV2Persistence) ReadHistoryBranch(
var data []byte
var encoding string
nodeID := int64(0)
prevTxnID := int64(0)
txnID := int64(0)
if !iter.Scan(&nodeID, &txnID, &data, &encoding) {
if !iter.Scan(&nodeID, &prevTxnID, &txnID, &data, &encoding) {
break
}
if txnID < lastTxnID {
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ type (
WorkflowID string
RunID string
BranchToken []byte
PrevTxnID int64
Events []*historypb.HistoryEvent
}

Expand Down Expand Up @@ -990,6 +991,8 @@ type (
BranchToken []byte
// The batch of events to be appended. The first eventID will become the nodeID of this batch
Events []*historypb.HistoryEvent
// TransactionID for events before these events. For events chaining
PrevTransactionID int64
// requested TransactionID for this write operation. For the same eventID, the node with larger TransactionID always wins
TransactionID int64
}
Expand Down
15 changes: 8 additions & 7 deletions common/persistence/historyStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,14 @@ func (m *historyV2ManagerImpl) AppendHistoryNodes(
}

req := &InternalAppendHistoryNodesRequest{
IsNewBranch: request.IsNewBranch,
Info: request.Info,
BranchInfo: branch,
NodeID: nodeID,
Events: blob,
TransactionID: request.TransactionID,
ShardID: request.ShardID,
IsNewBranch: request.IsNewBranch,
Info: request.Info,
BranchInfo: branch,
NodeID: nodeID,
Events: blob,
PrevTransactionID: request.PrevTransactionID,
TransactionID: request.TransactionID,
ShardID: request.ShardID,
}

err = m.persistence.AppendHistoryNodes(req)
Expand Down
4 changes: 3 additions & 1 deletion common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,9 @@ type (
NodeID int64
// The events to be appended
Events *commonpb.DataBlob
// Requested TransactionID for conditional update
// TransactionID for events before these events. For events chaining
PrevTransactionID int64
// requested TransactionID for this write operation. For the same eventID, the node with larger TransactionID always wins
TransactionID int64
// Used in sharded data stores to identify which shard to use
ShardID int32
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/sql/sqlHistoryManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func (m *sqlHistoryV2Manager) AppendHistoryNodes(
TreeID: treeIDBytes,
BranchID: branchIDBytes,
NodeID: request.NodeID,
PrevTxnID: request.PrevTransactionID,
TxnID: request.TransactionID,
Data: request.Events.Data,
DataEncoding: request.Events.EncodingType.String(),
Expand Down Expand Up @@ -199,7 +200,6 @@ func (m *sqlHistoryV2Manager) ReadHistoryBranch(

for _, row := range rows {
eventBlob := p.NewDataBlob(row.Data, row.DataEncoding)

if row.TxnID < lastTxnID {
// assuming that business logic layer is correct and transaction ID only increase
// thus, valid event batch will come with increasing transaction ID
Expand Down
1 change: 1 addition & 0 deletions common/persistence/sql/sqlplugin/history_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type (
TreeID primitives.UUID
BranchID primitives.UUID
NodeID int64
PrevTxnID int64
TxnID int64
Data []byte
DataEncoding string
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/sql/sqlplugin/mysql/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ import (
const (
// below are templates for history_node table
addHistoryNodesQuery = `INSERT INTO history_node (` +
`shard_id, tree_id, branch_id, node_id, txn_id, data, data_encoding) ` +
`VALUES (:shard_id, :tree_id, :branch_id, :node_id, :txn_id, :data, :data_encoding) `
`shard_id, tree_id, branch_id, node_id, prev_txn_id, txn_id, data, data_encoding) ` +
`VALUES (:shard_id, :tree_id, :branch_id, :node_id, :prev_txn_id, :txn_id, :data, :data_encoding) `

getHistoryNodesQuery = `SELECT node_id, txn_id, data, data_encoding FROM history_node ` +
getHistoryNodesQuery = `SELECT node_id, prev_txn_id, txn_id, data, data_encoding FROM history_node ` +
`WHERE shard_id = ? AND tree_id = ? AND branch_id = ? AND node_id >= ? and node_id < ? ORDER BY shard_id, tree_id, branch_id, node_id, txn_id LIMIT ? `

deleteHistoryNodesQuery = `DELETE FROM history_node WHERE shard_id = ? AND tree_id = ? AND branch_id = ? AND node_id >= ? `
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/sql/sqlplugin/postgresql/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ import (
const (
// below are templates for history_node table
addHistoryNodesQuery = `INSERT INTO history_node (` +
`shard_id, tree_id, branch_id, node_id, txn_id, data, data_encoding) ` +
`VALUES (:shard_id, :tree_id, :branch_id, :node_id, :txn_id, :data, :data_encoding) `
`shard_id, tree_id, branch_id, node_id, prev_txn_id, txn_id, data, data_encoding) ` +
`VALUES (:shard_id, :tree_id, :branch_id, :node_id, :prev_txn_id, :txn_id, :data, :data_encoding) `

getHistoryNodesQuery = `SELECT node_id, txn_id, data, data_encoding FROM history_node ` +
getHistoryNodesQuery = `SELECT node_id, prev_txn_id, txn_id, data, data_encoding FROM history_node ` +
`WHERE shard_id = $1 AND tree_id = $2 AND branch_id = $3 AND node_id >= $4 and node_id < $5 ORDER BY shard_id, tree_id, branch_id, node_id, txn_id LIMIT $6 `

deleteHistoryNodesQuery = `DELETE FROM history_node WHERE shard_id = $1 AND tree_id = $2 AND branch_id = $3 AND node_id >= $4 `
Expand Down
3 changes: 2 additions & 1 deletion service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3880,8 +3880,10 @@ func (e *mutableStateBuilder) prepareEventsAndReplicationTasks(
WorkflowID: e.executionInfo.WorkflowId,
RunID: e.executionState.RunId,
BranchToken: currentBranchToken,
PrevTxnID: e.executionInfo.LastEventTaskId,
Events: eventBatch,
}
e.GetExecutionInfo().LastEventTaskId = eventBatch[len(eventBatch)-1].GetTaskId()
}

if err := e.validateNoEventsAfterWorkflowFinish(
Expand Down Expand Up @@ -3996,7 +3998,6 @@ func (e *mutableStateBuilder) updateWithLastWriteEvent(
// already handled in state builder
return nil
}
e.GetExecutionInfo().LastEventTaskId = lastEvent.GetTaskId()

if e.executionInfo.VersionHistories != nil {
currentVersionHistory, err := versionhistory.GetCurrentVersionHistory(e.executionInfo.VersionHistories)
Expand Down
8 changes: 0 additions & 8 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,15 +736,7 @@ func (s *ContextImpl) AppendHistoryEvents(
execution commonpb.WorkflowExecution,
) (int, error) {

// NOTE: do not use generateNextTransferTaskIDLocked since
// generateNextTransferTaskIDLocked is not guarded by lock
transactionID, err := s.GenerateTransferTaskID()
if err != nil {
return 0, err
}

request.ShardID = s.shardID
request.TransactionID = transactionID

size := 0
defer func() {
Expand Down
1 change: 0 additions & 1 deletion service/history/stateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ func (b *stateBuilderImpl) applyEvents(
return nil, err
}
}
b.mutableState.GetExecutionInfo().LastEventTaskId = event.GetTaskId()

switch event.GetEventType() {
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED:
Expand Down
22 changes: 13 additions & 9 deletions service/history/workflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,16 +897,18 @@ func (c *workflowExecutionContextImpl) persistFirstWorkflowEvents(
}
branchToken := workflowEvents.BranchToken
events := workflowEvents.Events
prevTxnID := workflowEvents.PrevTxnID

size, err := c.appendHistoryV2EventsWithRetry(
namespaceID,
execution,
&persistence.AppendHistoryNodesRequest{
IsNewBranch: true,
Info: persistence.BuildHistoryGarbageCleanupInfo(namespaceID, workflowID, runID),
BranchToken: branchToken,
Events: events,
// TransactionID is set by shard context
IsNewBranch: true,
Info: persistence.BuildHistoryGarbageCleanupInfo(namespaceID, workflowID, runID),
BranchToken: branchToken,
Events: events,
PrevTransactionID: prevTxnID,
TransactionID: events[len(events)-1].GetTaskId(),
},
)
return size, err
Expand All @@ -927,15 +929,17 @@ func (c *workflowExecutionContextImpl) persistNonFirstWorkflowEvents(
}
branchToken := workflowEvents.BranchToken
events := workflowEvents.Events
prevTxnID := workflowEvents.PrevTxnID

size, err := c.appendHistoryV2EventsWithRetry(
namespaceID,
execution,
&persistence.AppendHistoryNodesRequest{
IsNewBranch: false,
BranchToken: branchToken,
Events: events,
// TransactionID is set by shard context
IsNewBranch: false,
BranchToken: branchToken,
Events: events,
PrevTransactionID: prevTxnID,
TransactionID: events[len(events)-1].GetTaskId(),
},
)
return size, err
Expand Down

0 comments on commit cdbc969

Please sign in to comment.