Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify ReadHistoryBranch logic #1441

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
123 changes: 59 additions & 64 deletions common/persistence/cassandra/cassandraHistoryPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"fmt"
"sort"

commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/serviceerror"

persistencespb "go.temporal.io/server/api/persistence/v1"
Expand All @@ -42,14 +41,17 @@ import (

const (
// below are templates for history_node table
v2templateUpsertData = `INSERT INTO history_node (` +
v2templateUpsertHistoryNode = `INSERT INTO history_node (` +
`tree_id, branch_id, node_id, prev_txn_id, txn_id, data, data_encoding) ` +
`VALUES (?, ?, ?, ?, ?, ?, ?) `

v2templateReadData = `SELECT node_id, prev_txn_id, txn_id, data, data_encoding FROM history_node ` +
v2templateReadHistoryNode = `SELECT node_id, prev_txn_id, txn_id, data, data_encoding FROM history_node ` +
`WHERE tree_id = ? AND branch_id = ? AND node_id >= ? AND node_id < ? `

v2templateRangeDeleteData = `DELETE FROM history_node WHERE tree_id = ? AND branch_id = ? AND node_id >= ? `
v2templateReadHistoryNodeMetadata = `SELECT node_id, prev_txn_id, txn_id FROM history_node ` +
`WHERE tree_id = ? AND branch_id = ? AND node_id >= ? AND node_id < ? `

v2templateRangeDeleteHistoryNode = `DELETE FROM history_node WHERE tree_id = ? AND branch_id = ? AND node_id >= ? `

// below are templates for history_tree table
v2templateInsertTree = `INSERT INTO history_tree (` +
Expand All @@ -75,7 +77,9 @@ func NewHistoryV2PersistenceFromSession(
logger log.Logger,
) p.HistoryStore {

return &cassandraHistoryV2Persistence{cassandraStore: cassandraStore{session: session, logger: logger}}
return &cassandraHistoryV2Persistence{
cassandraStore: cassandraStore{session: session, logger: logger},
}
}

// newHistoryPersistence is used to create an instance of HistoryManager implementation
Expand All @@ -99,22 +103,23 @@ func (h *cassandraHistoryV2Persistence) AppendHistoryNodes(

branchInfo := request.BranchInfo
beginNodeID := p.GetBeginNodeID(branchInfo)
node := request.Node

if request.NodeID < beginNodeID {
if node.NodeID < beginNodeID {
return &p.InvalidPersistenceRequestError{
Msg: fmt.Sprintf("cannot append to ancestors' nodes"),
}
}

if !request.IsNewBranch {
query := h.session.Query(v2templateUpsertData,
query := h.session.Query(v2templateUpsertHistoryNode,
branchInfo.TreeId,
branchInfo.BranchId,
request.NodeID,
request.PrevTransactionID,
request.TransactionID,
request.Events.Data,
request.Events.EncodingType.String(),
node.NodeID,
node.PrevTransactionID,
node.TransactionID,
node.Events.Data,
node.Events.EncodingType.String(),
)
if err := query.Exec(); err != nil {
return gocql.ConvertError("AppendHistoryNodes", err)
Expand All @@ -141,14 +146,14 @@ func (h *cassandraHistoryV2Persistence) AppendHistoryNodes(
treeInfoDataBlob.Data,
treeInfoDataBlob.EncodingType.String(),
)
batch.Query(v2templateUpsertData,
batch.Query(v2templateUpsertHistoryNode,
branchInfo.TreeId,
branchInfo.BranchId,
request.NodeID,
request.PrevTransactionID,
request.TransactionID,
request.Events.Data,
request.Events.EncodingType.String(),
node.NodeID,
node.PrevTransactionID,
node.TransactionID,
node.Events.Data,
node.Events.EncodingType.String(),
)
if err = h.session.ExecuteBatch(batch); err != nil {
return gocql.ConvertError("AppendHistoryNodes", err)
Expand All @@ -172,62 +177,31 @@ func (h *cassandraHistoryV2Persistence) ReadHistoryBranch(
return nil, serviceerror.NewInternal(fmt.Sprintf("ReadHistoryBranch - Gocql BranchId UUID cast failed. Error: %v", err))
}

lastNodeID := request.LastNodeID
lastTxnID := request.LastTransactionID

query := h.session.Query(v2templateReadData, treeID, branchID, request.MinNodeID, request.MaxNodeID)
var queryString string
if request.MetadataOnly {
queryString = v2templateReadHistoryNodeMetadata
} else {
queryString = v2templateReadHistoryNode
}
query := h.session.Query(queryString, treeID, branchID, request.MinNodeID, request.MaxNodeID)

iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()
pagingToken := iter.PageState()

history := make([]*commonpb.DataBlob, 0, request.PageSize)

for {
var data []byte
var encoding string
nodeID := int64(0)
prevTxnID := int64(0)
txnID := int64(0)
if !iter.Scan(&nodeID, &prevTxnID, &txnID, &data, &encoding) {
break
}
if txnID < lastTxnID {
// assuming that business logic layer is correct and transaction ID only increase
// thus, valid event batch will come with increasing transaction ID

// event batches with smaller node ID
// -> should not be possible since records are already sorted
// event batches with same node ID
// -> batch with higher transaction ID is valid
// event batches with larger node ID
// -> batch with lower transaction ID is invalid (happens before)
// -> batch with higher transaction ID is valid
continue
}

switch {
case nodeID < lastNodeID:
return nil, serviceerror.NewInternal(fmt.Sprintf("corrupted data, nodeID cannot decrease"))
case nodeID == lastNodeID:
return nil, serviceerror.NewInternal(fmt.Sprintf("corrupted data, same nodeID must have smaller txnID"))
default: // row.NodeID > lastNodeID:
// NOTE: when row.nodeID > lastNodeID, we expect the one with largest txnID comes first
lastTxnID = txnID
lastNodeID = nodeID
eventBlob := p.NewDataBlob(data, encoding)
history = append(history, eventBlob)
}
nodes := make([]p.InternalHistoryNode, 0, request.PageSize)
message := make(map[string]interface{})
for iter.MapScan(message) {
nodes = append(nodes, convertHistoryNode(message))
message = make(map[string]interface{})
}

if err := iter.Close(); err != nil {
return nil, serviceerror.NewInternal(fmt.Sprintf("ReadHistoryBranch. Close operation failed. Error: %v", err))
}

return &p.InternalReadHistoryBranchResponse{
History: history,
NextPageToken: pagingToken,
LastNodeID: lastNodeID,
LastTransactionID: lastTxnID,
Nodes: nodes,
NextPageToken: pagingToken,
}, nil
}

Expand Down Expand Up @@ -406,7 +380,7 @@ func (h *cassandraHistoryV2Persistence) deleteBranchRangeNodes(
beginNodeID int64,
) {

batch.Query(v2templateRangeDeleteData,
batch.Query(v2templateRangeDeleteHistoryNode,
treeID,
branchID,
beginNodeID)
Expand Down Expand Up @@ -522,3 +496,24 @@ func (h *cassandraHistoryV2Persistence) sortAncestors(
}
}
}

func convertHistoryNode(
message map[string]interface{},
) p.InternalHistoryNode {
nodeID := message["node_id"].(int64)
prevTxnID := message["prev_txn_id"].(int64)
txnID := message["txn_id"].(int64)

var data []byte
var dataEncoding string
if _, ok := message["data"]; ok {
data = message["data"].([]byte)
dataEncoding = message["data_encoding"].(string)
}
return p.InternalHistoryNode{
NodeID: nodeID,
PrevTransactionID: prevTxnID,
TransactionID: txnID,
Events: p.NewDataBlob(data, dataEncoding),
}
}
99 changes: 74 additions & 25 deletions common/persistence/historyStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,16 @@ func (m *historyV2ManagerImpl) AppendHistoryNodes(
}

req := &InternalAppendHistoryNodesRequest{
IsNewBranch: request.IsNewBranch,
Info: request.Info,
BranchInfo: branch,
NodeID: nodeID,
Events: blob,
PrevTransactionID: request.PrevTransactionID,
TransactionID: request.TransactionID,
ShardID: request.ShardID,
IsNewBranch: request.IsNewBranch,
Info: request.Info,
BranchInfo: branch,
Node: InternalHistoryNode{
NodeID: nodeID,
Events: blob,
PrevTransactionID: request.PrevTransactionID,
TransactionID: request.TransactionID,
},
ShardID: request.ShardID,
}

err = m.persistence.AppendHistoryNodes(req)
Expand Down Expand Up @@ -347,34 +349,45 @@ func (m *historyV2ManagerImpl) readRawHistoryBranch(
}

req := &InternalReadHistoryBranchRequest{
TreeID: treeID,
BranchID: allBRs[token.CurrentRangeIndex].GetBranchId(),
MinNodeID: minNodeID,
MaxNodeID: maxNodeID,
NextPageToken: token.StoreToken,
LastNodeID: token.LastNodeID,
LastTransactionID: token.LastTransactionID,
ShardID: request.ShardID,
PageSize: request.PageSize,
TreeID: treeID,
BranchID: allBRs[token.CurrentRangeIndex].GetBranchId(),
MinNodeID: minNodeID,
MaxNodeID: maxNodeID,
NextPageToken: token.StoreToken,
ShardID: request.ShardID,
PageSize: request.PageSize,
MetadataOnly: false,
}

resp, err := m.persistence.ReadHistoryBranch(req)
if err != nil {
return nil, nil, 0, nil, err
}
if len(resp.History) == 0 && len(request.NextPageToken) == 0 {
if len(resp.Nodes) == 0 && len(request.NextPageToken) == 0 {
return nil, nil, 0, nil, serviceerror.NewNotFound("Workflow execution history not found.")
}

dataBlobs := resp.History
dataSize := 0
for _, dataBlob := range resp.History {
dataSize += len(dataBlob.Data)
nodes, err := m.filterHistoryNodes(
token.LastNodeID,
token.LastTransactionID,
resp.Nodes,
)
if err != nil {
return nil, nil, 0, nil, err
}

var dataBlobs []*commonpb.DataBlob
dataSize := 0
token.StoreToken = resp.NextPageToken
token.LastNodeID = resp.LastNodeID
token.LastTransactionID = resp.LastTransactionID
if len(nodes) > 0 {
dataBlobs = make([]*commonpb.DataBlob, len(nodes))
for index, node := range nodes {
dataBlobs[index] = node.Events
dataSize += len(node.Events.Data)
}
lastNode := nodes[len(nodes)-1]
token.LastNodeID = lastNode.NodeID
token.LastTransactionID = lastNode.TransactionID
}

// NOTE: in this method, we need to make sure eventVersion is NOT
// decreasing(otherwise we skip the events), eventID should be contiguous(otherwise return error)
Expand Down Expand Up @@ -464,6 +477,42 @@ func (m *historyV2ManagerImpl) readHistoryBranch(
return historyEvents, historyEventBatches, nextPageToken, dataSize, lastFirstEventID, nil
}

func (m *historyV2ManagerImpl) filterHistoryNodes(
lastNodeID int64,
lastTransactionID int64,
nodes []InternalHistoryNode,
) ([]InternalHistoryNode, error) {
var result []InternalHistoryNode
for _, node := range nodes {
// assuming that business logic layer is correct and transaction ID only increase
// thus, valid event batch will come with increasing transaction ID

// event batches with smaller node ID
// -> should not be possible since records are already sorted
// event batches with same node ID
// -> batch with higher transaction ID is valid
// event batches with larger node ID
// -> batch with lower transaction ID is invalid (happens before)
// -> batch with higher transaction ID is valid
if node.TransactionID < lastTransactionID {
continue
}

switch {
case node.NodeID < lastNodeID:
return nil, serviceerror.NewInternal(fmt.Sprintf("corrupted data, nodeID cannot decrease"))
case node.NodeID == lastNodeID:
return nil, serviceerror.NewInternal(fmt.Sprintf("corrupted data, same nodeID must have smaller txnID"))
default: // row.NodeID > lastNodeID:
// NOTE: when row.nodeID > lastNodeID, we expect the one with largest txnID comes first
lastTransactionID = node.TransactionID
lastNodeID = node.NodeID
result = append(result, node)
}
}
return result, nil
}

func (m *historyV2ManagerImpl) deserializeToken(
token []byte,
defaultLastEventID int64,
Expand Down