Skip to content

Commit

Permalink
Unify ReadHistoryBranch logic (#1441)
Browse files Browse the repository at this point in the history
* New InternalHistoryNode struct containing history node info
* Modify SQL persistence layer to use key set pagination, instead of existing hacky solution
* ReadHistoryBranch in addition will allow returning history node metadata only
* Move history node filtering logic from persistence layer to upper history store layer
  • Loading branch information
wxing1292 committed Apr 13, 2021
1 parent bd7df35 commit fd744a8
Show file tree
Hide file tree
Showing 12 changed files with 332 additions and 211 deletions.
123 changes: 59 additions & 64 deletions common/persistence/cassandra/cassandraHistoryPersistence.go
Expand Up @@ -28,7 +28,6 @@ import (
"fmt"
"sort"

commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/serviceerror"

persistencespb "go.temporal.io/server/api/persistence/v1"
Expand All @@ -42,14 +41,17 @@ import (

const (
// below are templates for history_node table
v2templateUpsertData = `INSERT INTO history_node (` +
v2templateUpsertHistoryNode = `INSERT INTO history_node (` +
`tree_id, branch_id, node_id, prev_txn_id, txn_id, data, data_encoding) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?) `

v2templateReadData = `SELECT node_id, prev_txn_id, txn_id, data, data_encoding FROM history_node ` +
v2templateReadHistoryNode = `SELECT node_id, prev_txn_id, txn_id, data, data_encoding FROM history_node ` +
`WHERE tree_id = ? AND branch_id = ? AND node_id >= ? AND node_id < ? `

v2templateRangeDeleteData = `DELETE FROM history_node WHERE tree_id = ? AND branch_id = ? AND node_id >= ? `
v2templateReadHistoryNodeMetadata = `SELECT node_id, prev_txn_id, txn_id FROM history_node ` +
`WHERE tree_id = ? AND branch_id = ? AND node_id >= ? AND node_id < ? `

v2templateRangeDeleteHistoryNode = `DELETE FROM history_node WHERE tree_id = ? AND branch_id = ? AND node_id >= ? `

// below are templates for history_tree table
v2templateInsertTree = `INSERT INTO history_tree (` +
Expand All @@ -75,7 +77,9 @@ func NewHistoryV2PersistenceFromSession(
logger log.Logger,
) p.HistoryStore {

return &cassandraHistoryV2Persistence{cassandraStore: cassandraStore{session: session, logger: logger}}
return &cassandraHistoryV2Persistence{
cassandraStore: cassandraStore{session: session, logger: logger},
}
}

// newHistoryPersistence is used to create an instance of HistoryManager implementation
Expand All @@ -99,22 +103,23 @@ func (h *cassandraHistoryV2Persistence) AppendHistoryNodes(

branchInfo := request.BranchInfo
beginNodeID := p.GetBeginNodeID(branchInfo)
node := request.Node

if request.NodeID < beginNodeID {
if node.NodeID < beginNodeID {
return &p.InvalidPersistenceRequestError{
Msg: fmt.Sprintf("cannot append to ancestors' nodes"),
}
}

if !request.IsNewBranch {
query := h.session.Query(v2templateUpsertData,
query := h.session.Query(v2templateUpsertHistoryNode,
branchInfo.TreeId,
branchInfo.BranchId,
request.NodeID,
request.PrevTransactionID,
request.TransactionID,
request.Events.Data,
request.Events.EncodingType.String(),
node.NodeID,
node.PrevTransactionID,
node.TransactionID,
node.Events.Data,
node.Events.EncodingType.String(),
)
if err := query.Exec(); err != nil {
return gocql.ConvertError("AppendHistoryNodes", err)
Expand All @@ -141,14 +146,14 @@ func (h *cassandraHistoryV2Persistence) AppendHistoryNodes(
treeInfoDataBlob.Data,
treeInfoDataBlob.EncodingType.String(),
)
batch.Query(v2templateUpsertData,
batch.Query(v2templateUpsertHistoryNode,
branchInfo.TreeId,
branchInfo.BranchId,
request.NodeID,
request.PrevTransactionID,
request.TransactionID,
request.Events.Data,
request.Events.EncodingType.String(),
node.NodeID,
node.PrevTransactionID,
node.TransactionID,
node.Events.Data,
node.Events.EncodingType.String(),
)
if err = h.session.ExecuteBatch(batch); err != nil {
return gocql.ConvertError("AppendHistoryNodes", err)
Expand All @@ -172,62 +177,31 @@ func (h *cassandraHistoryV2Persistence) ReadHistoryBranch(
return nil, serviceerror.NewInternal(fmt.Sprintf("ReadHistoryBranch - Gocql BranchId UUID cast failed. Error: %v", err))
}

lastNodeID := request.LastNodeID
lastTxnID := request.LastTransactionID

query := h.session.Query(v2templateReadData, treeID, branchID, request.MinNodeID, request.MaxNodeID)
var queryString string
if request.MetadataOnly {
queryString = v2templateReadHistoryNodeMetadata
} else {
queryString = v2templateReadHistoryNode
}
query := h.session.Query(queryString, treeID, branchID, request.MinNodeID, request.MaxNodeID)

iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()
pagingToken := iter.PageState()

history := make([]*commonpb.DataBlob, 0, request.PageSize)

for {
var data []byte
var encoding string
nodeID := int64(0)
prevTxnID := int64(0)
txnID := int64(0)
if !iter.Scan(&nodeID, &prevTxnID, &txnID, &data, &encoding) {
break
}
if txnID < lastTxnID {
// assuming that business logic layer is correct and transaction ID only increase
// thus, valid event batch will come with increasing transaction ID

// event batches with smaller node ID
// -> should not be possible since records are already sorted
// event batches with same node ID
// -> batch with higher transaction ID is valid
// event batches with larger node ID
// -> batch with lower transaction ID is invalid (happens before)
// -> batch with higher transaction ID is valid
continue
}

switch {
case nodeID < lastNodeID:
return nil, serviceerror.NewInternal(fmt.Sprintf("corrupted data, nodeID cannot decrease"))
case nodeID == lastNodeID:
return nil, serviceerror.NewInternal(fmt.Sprintf("corrupted data, same nodeID must have smaller txnID"))
default: // row.NodeID > lastNodeID:
// NOTE: when row.nodeID > lastNodeID, we expect the one with largest txnID comes first
lastTxnID = txnID
lastNodeID = nodeID
eventBlob := p.NewDataBlob(data, encoding)
history = append(history, eventBlob)
}
nodes := make([]p.InternalHistoryNode, 0, request.PageSize)
message := make(map[string]interface{})
for iter.MapScan(message) {
nodes = append(nodes, convertHistoryNode(message))
message = make(map[string]interface{})
}

if err := iter.Close(); err != nil {
return nil, serviceerror.NewInternal(fmt.Sprintf("ReadHistoryBranch. Close operation failed. Error: %v", err))
}

return &p.InternalReadHistoryBranchResponse{
History: history,
NextPageToken: pagingToken,
LastNodeID: lastNodeID,
LastTransactionID: lastTxnID,
Nodes: nodes,
NextPageToken: pagingToken,
}, nil
}

Expand Down Expand Up @@ -406,7 +380,7 @@ func (h *cassandraHistoryV2Persistence) deleteBranchRangeNodes(
beginNodeID int64,
) {

batch.Query(v2templateRangeDeleteData,
batch.Query(v2templateRangeDeleteHistoryNode,
treeID,
branchID,
beginNodeID)
Expand Down Expand Up @@ -522,3 +496,24 @@ func (h *cassandraHistoryV2Persistence) sortAncestors(
}
}
}

func convertHistoryNode(
message map[string]interface{},
) p.InternalHistoryNode {
nodeID := message["node_id"].(int64)
prevTxnID := message["prev_txn_id"].(int64)
txnID := message["txn_id"].(int64)

var data []byte
var dataEncoding string
if _, ok := message["data"]; ok {
data = message["data"].([]byte)
dataEncoding = message["data_encoding"].(string)
}
return p.InternalHistoryNode{
NodeID: nodeID,
PrevTransactionID: prevTxnID,
TransactionID: txnID,
Events: p.NewDataBlob(data, dataEncoding),
}
}
99 changes: 74 additions & 25 deletions common/persistence/historyStore.go
Expand Up @@ -203,14 +203,16 @@ func (m *historyV2ManagerImpl) AppendHistoryNodes(
}

req := &InternalAppendHistoryNodesRequest{
IsNewBranch: request.IsNewBranch,
Info: request.Info,
BranchInfo: branch,
NodeID: nodeID,
Events: blob,
PrevTransactionID: request.PrevTransactionID,
TransactionID: request.TransactionID,
ShardID: request.ShardID,
IsNewBranch: request.IsNewBranch,
Info: request.Info,
BranchInfo: branch,
Node: InternalHistoryNode{
NodeID: nodeID,
Events: blob,
PrevTransactionID: request.PrevTransactionID,
TransactionID: request.TransactionID,
},
ShardID: request.ShardID,
}

err = m.persistence.AppendHistoryNodes(req)
Expand Down Expand Up @@ -347,34 +349,45 @@ func (m *historyV2ManagerImpl) readRawHistoryBranch(
}

req := &InternalReadHistoryBranchRequest{
TreeID: treeID,
BranchID: allBRs[token.CurrentRangeIndex].GetBranchId(),
MinNodeID: minNodeID,
MaxNodeID: maxNodeID,
NextPageToken: token.StoreToken,
LastNodeID: token.LastNodeID,
LastTransactionID: token.LastTransactionID,
ShardID: request.ShardID,
PageSize: request.PageSize,
TreeID: treeID,
BranchID: allBRs[token.CurrentRangeIndex].GetBranchId(),
MinNodeID: minNodeID,
MaxNodeID: maxNodeID,
NextPageToken: token.StoreToken,
ShardID: request.ShardID,
PageSize: request.PageSize,
MetadataOnly: false,
}

resp, err := m.persistence.ReadHistoryBranch(req)
if err != nil {
return nil, nil, 0, nil, err
}
if len(resp.History) == 0 && len(request.NextPageToken) == 0 {
if len(resp.Nodes) == 0 && len(request.NextPageToken) == 0 {
return nil, nil, 0, nil, serviceerror.NewNotFound("Workflow execution history not found.")
}

dataBlobs := resp.History
dataSize := 0
for _, dataBlob := range resp.History {
dataSize += len(dataBlob.Data)
nodes, err := m.filterHistoryNodes(
token.LastNodeID,
token.LastTransactionID,
resp.Nodes,
)
if err != nil {
return nil, nil, 0, nil, err
}

var dataBlobs []*commonpb.DataBlob
dataSize := 0
token.StoreToken = resp.NextPageToken
token.LastNodeID = resp.LastNodeID
token.LastTransactionID = resp.LastTransactionID
if len(nodes) > 0 {
dataBlobs = make([]*commonpb.DataBlob, len(nodes))
for index, node := range nodes {
dataBlobs[index] = node.Events
dataSize += len(node.Events.Data)
}
lastNode := nodes[len(nodes)-1]
token.LastNodeID = lastNode.NodeID
token.LastTransactionID = lastNode.TransactionID
}

// NOTE: in this method, we need to make sure eventVersion is NOT
// decreasing(otherwise we skip the events), eventID should be contiguous(otherwise return error)
Expand Down Expand Up @@ -464,6 +477,42 @@ func (m *historyV2ManagerImpl) readHistoryBranch(
return historyEvents, historyEventBatches, nextPageToken, dataSize, lastFirstEventID, nil
}

func (m *historyV2ManagerImpl) filterHistoryNodes(
lastNodeID int64,
lastTransactionID int64,
nodes []InternalHistoryNode,
) ([]InternalHistoryNode, error) {
var result []InternalHistoryNode
for _, node := range nodes {
// assuming that business logic layer is correct and transaction ID only increase
// thus, valid event batch will come with increasing transaction ID

// event batches with smaller node ID
// -> should not be possible since records are already sorted
// event batches with same node ID
// -> batch with higher transaction ID is valid
// event batches with larger node ID
// -> batch with lower transaction ID is invalid (happens before)
// -> batch with higher transaction ID is valid
if node.TransactionID < lastTransactionID {
continue
}

switch {
case node.NodeID < lastNodeID:
return nil, serviceerror.NewInternal(fmt.Sprintf("corrupted data, nodeID cannot decrease"))
case node.NodeID == lastNodeID:
return nil, serviceerror.NewInternal(fmt.Sprintf("corrupted data, same nodeID must have smaller txnID"))
default: // row.NodeID > lastNodeID:
// NOTE: when row.nodeID > lastNodeID, we expect the one with largest txnID comes first
lastTransactionID = node.TransactionID
lastNodeID = node.NodeID
result = append(result, node)
}
}
return result, nil
}

func (m *historyV2ManagerImpl) deserializeToken(
token []byte,
defaultLastEventID int64,
Expand Down

0 comments on commit fd744a8

Please sign in to comment.