From fd744a87a6ecd5179b49b666301442fefdf456d1 Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Tue, 13 Apr 2021 16:14:40 -0700 Subject: [PATCH] Unify ReadHistoryBranch logic (#1441) * New InternalHistoryNode struct containing history node info * Modify SQL persistence layer to use key set pagination, instead of existing hacky solution * ReadHistoryBranch in addition will allow returning history node metadata only * Move history node filtering logic from persistence layer to upper history store layer --- .../cassandra/cassandraHistoryPersistence.go | 123 ++++++++------- common/persistence/historyStore.go | 99 +++++++++--- common/persistence/persistenceInterface.go | 44 ++---- common/persistence/sql/sqlHistoryManager.go | 142 +++++++++--------- common/persistence/sql/sql_token.go | 60 ++++++++ .../persistence/sql/sqlplugin/history_node.go | 14 +- .../persistence/sql/sqlplugin/mysql/events.go | 18 ++- .../sql/sqlplugin/postgresql/events.go | 18 ++- .../sql/sqlplugin/tests/history_node_test.go | 14 +- .../mutablestate/history_builder_test.go | 3 - tools/cli/adminCommands.go | 4 +- tools/cli/adminDBScanCommand.go | 4 +- 12 files changed, 332 insertions(+), 211 deletions(-) create mode 100644 common/persistence/sql/sql_token.go diff --git a/common/persistence/cassandra/cassandraHistoryPersistence.go b/common/persistence/cassandra/cassandraHistoryPersistence.go index 4daf5fafedf..5e6a83e3167 100644 --- a/common/persistence/cassandra/cassandraHistoryPersistence.go +++ b/common/persistence/cassandra/cassandraHistoryPersistence.go @@ -28,7 +28,6 @@ import ( "fmt" "sort" - commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/serviceerror" persistencespb "go.temporal.io/server/api/persistence/v1" @@ -42,14 +41,17 @@ import ( const ( // below are templates for history_node table - v2templateUpsertData = `INSERT INTO history_node (` + + v2templateUpsertHistoryNode = `INSERT INTO history_node (` + `tree_id, branch_id, node_id, prev_txn_id, txn_id, data, data_encoding) ` + `VALUES (?, ?, ?, ?, ?, ?, ?) ` - v2templateReadData = `SELECT node_id, prev_txn_id, txn_id, data, data_encoding FROM history_node ` + + v2templateReadHistoryNode = `SELECT node_id, prev_txn_id, txn_id, data, data_encoding FROM history_node ` + `WHERE tree_id = ? AND branch_id = ? AND node_id >= ? AND node_id < ? ` - v2templateRangeDeleteData = `DELETE FROM history_node WHERE tree_id = ? AND branch_id = ? AND node_id >= ? ` + v2templateReadHistoryNodeMetadata = `SELECT node_id, prev_txn_id, txn_id FROM history_node ` + + `WHERE tree_id = ? AND branch_id = ? AND node_id >= ? AND node_id < ? ` + + v2templateRangeDeleteHistoryNode = `DELETE FROM history_node WHERE tree_id = ? AND branch_id = ? AND node_id >= ? ` // below are templates for history_tree table v2templateInsertTree = `INSERT INTO history_tree (` + @@ -75,7 +77,9 @@ func NewHistoryV2PersistenceFromSession( logger log.Logger, ) p.HistoryStore { - return &cassandraHistoryV2Persistence{cassandraStore: cassandraStore{session: session, logger: logger}} + return &cassandraHistoryV2Persistence{ + cassandraStore: cassandraStore{session: session, logger: logger}, + } } // newHistoryPersistence is used to create an instance of HistoryManager implementation @@ -99,22 +103,23 @@ func (h *cassandraHistoryV2Persistence) AppendHistoryNodes( branchInfo := request.BranchInfo beginNodeID := p.GetBeginNodeID(branchInfo) + node := request.Node - if request.NodeID < beginNodeID { + if node.NodeID < beginNodeID { return &p.InvalidPersistenceRequestError{ Msg: fmt.Sprintf("cannot append to ancestors' nodes"), } } if !request.IsNewBranch { - query := h.session.Query(v2templateUpsertData, + query := h.session.Query(v2templateUpsertHistoryNode, branchInfo.TreeId, branchInfo.BranchId, - request.NodeID, - request.PrevTransactionID, - request.TransactionID, - request.Events.Data, - request.Events.EncodingType.String(), + node.NodeID, + node.PrevTransactionID, + node.TransactionID, + node.Events.Data, + node.Events.EncodingType.String(), ) if err := query.Exec(); err != nil { return gocql.ConvertError("AppendHistoryNodes", err) @@ -141,14 +146,14 @@ func (h *cassandraHistoryV2Persistence) AppendHistoryNodes( treeInfoDataBlob.Data, treeInfoDataBlob.EncodingType.String(), ) - batch.Query(v2templateUpsertData, + batch.Query(v2templateUpsertHistoryNode, branchInfo.TreeId, branchInfo.BranchId, - request.NodeID, - request.PrevTransactionID, - request.TransactionID, - request.Events.Data, - request.Events.EncodingType.String(), + node.NodeID, + node.PrevTransactionID, + node.TransactionID, + node.Events.Data, + node.Events.EncodingType.String(), ) if err = h.session.ExecuteBatch(batch); err != nil { return gocql.ConvertError("AppendHistoryNodes", err) @@ -172,51 +177,22 @@ func (h *cassandraHistoryV2Persistence) ReadHistoryBranch( return nil, serviceerror.NewInternal(fmt.Sprintf("ReadHistoryBranch - Gocql BranchId UUID cast failed. Error: %v", err)) } - lastNodeID := request.LastNodeID - lastTxnID := request.LastTransactionID - - query := h.session.Query(v2templateReadData, treeID, branchID, request.MinNodeID, request.MaxNodeID) + var queryString string + if request.MetadataOnly { + queryString = v2templateReadHistoryNodeMetadata + } else { + queryString = v2templateReadHistoryNode + } + query := h.session.Query(queryString, treeID, branchID, request.MinNodeID, request.MaxNodeID) iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter() pagingToken := iter.PageState() - history := make([]*commonpb.DataBlob, 0, request.PageSize) - - for { - var data []byte - var encoding string - nodeID := int64(0) - prevTxnID := int64(0) - txnID := int64(0) - if !iter.Scan(&nodeID, &prevTxnID, &txnID, &data, &encoding) { - break - } - if txnID < lastTxnID { - // assuming that business logic layer is correct and transaction ID only increase - // thus, valid event batch will come with increasing transaction ID - - // event batches with smaller node ID - // -> should not be possible since records are already sorted - // event batches with same node ID - // -> batch with higher transaction ID is valid - // event batches with larger node ID - // -> batch with lower transaction ID is invalid (happens before) - // -> batch with higher transaction ID is valid - continue - } - - switch { - case nodeID < lastNodeID: - return nil, serviceerror.NewInternal(fmt.Sprintf("corrupted data, nodeID cannot decrease")) - case nodeID == lastNodeID: - return nil, serviceerror.NewInternal(fmt.Sprintf("corrupted data, same nodeID must have smaller txnID")) - default: // row.NodeID > lastNodeID: - // NOTE: when row.nodeID > lastNodeID, we expect the one with largest txnID comes first - lastTxnID = txnID - lastNodeID = nodeID - eventBlob := p.NewDataBlob(data, encoding) - history = append(history, eventBlob) - } + nodes := make([]p.InternalHistoryNode, 0, request.PageSize) + message := make(map[string]interface{}) + for iter.MapScan(message) { + nodes = append(nodes, convertHistoryNode(message)) + message = make(map[string]interface{}) } if err := iter.Close(); err != nil { @@ -224,10 +200,8 @@ func (h *cassandraHistoryV2Persistence) ReadHistoryBranch( } return &p.InternalReadHistoryBranchResponse{ - History: history, - NextPageToken: pagingToken, - LastNodeID: lastNodeID, - LastTransactionID: lastTxnID, + Nodes: nodes, + NextPageToken: pagingToken, }, nil } @@ -406,7 +380,7 @@ func (h *cassandraHistoryV2Persistence) deleteBranchRangeNodes( beginNodeID int64, ) { - batch.Query(v2templateRangeDeleteData, + batch.Query(v2templateRangeDeleteHistoryNode, treeID, branchID, beginNodeID) @@ -522,3 +496,24 @@ func (h *cassandraHistoryV2Persistence) sortAncestors( } } } + +func convertHistoryNode( + message map[string]interface{}, +) p.InternalHistoryNode { + nodeID := message["node_id"].(int64) + prevTxnID := message["prev_txn_id"].(int64) + txnID := message["txn_id"].(int64) + + var data []byte + var dataEncoding string + if _, ok := message["data"]; ok { + data = message["data"].([]byte) + dataEncoding = message["data_encoding"].(string) + } + return p.InternalHistoryNode{ + NodeID: nodeID, + PrevTransactionID: prevTxnID, + TransactionID: txnID, + Events: p.NewDataBlob(data, dataEncoding), + } +} diff --git a/common/persistence/historyStore.go b/common/persistence/historyStore.go index 94fff0a308e..6b52266e56f 100644 --- a/common/persistence/historyStore.go +++ b/common/persistence/historyStore.go @@ -203,14 +203,16 @@ func (m *historyV2ManagerImpl) AppendHistoryNodes( } req := &InternalAppendHistoryNodesRequest{ - IsNewBranch: request.IsNewBranch, - Info: request.Info, - BranchInfo: branch, - NodeID: nodeID, - Events: blob, - PrevTransactionID: request.PrevTransactionID, - TransactionID: request.TransactionID, - ShardID: request.ShardID, + IsNewBranch: request.IsNewBranch, + Info: request.Info, + BranchInfo: branch, + Node: InternalHistoryNode{ + NodeID: nodeID, + Events: blob, + PrevTransactionID: request.PrevTransactionID, + TransactionID: request.TransactionID, + }, + ShardID: request.ShardID, } err = m.persistence.AppendHistoryNodes(req) @@ -347,34 +349,45 @@ func (m *historyV2ManagerImpl) readRawHistoryBranch( } req := &InternalReadHistoryBranchRequest{ - TreeID: treeID, - BranchID: allBRs[token.CurrentRangeIndex].GetBranchId(), - MinNodeID: minNodeID, - MaxNodeID: maxNodeID, - NextPageToken: token.StoreToken, - LastNodeID: token.LastNodeID, - LastTransactionID: token.LastTransactionID, - ShardID: request.ShardID, - PageSize: request.PageSize, + TreeID: treeID, + BranchID: allBRs[token.CurrentRangeIndex].GetBranchId(), + MinNodeID: minNodeID, + MaxNodeID: maxNodeID, + NextPageToken: token.StoreToken, + ShardID: request.ShardID, + PageSize: request.PageSize, + MetadataOnly: false, } resp, err := m.persistence.ReadHistoryBranch(req) if err != nil { return nil, nil, 0, nil, err } - if len(resp.History) == 0 && len(request.NextPageToken) == 0 { + if len(resp.Nodes) == 0 && len(request.NextPageToken) == 0 { return nil, nil, 0, nil, serviceerror.NewNotFound("Workflow execution history not found.") } - - dataBlobs := resp.History - dataSize := 0 - for _, dataBlob := range resp.History { - dataSize += len(dataBlob.Data) + nodes, err := m.filterHistoryNodes( + token.LastNodeID, + token.LastTransactionID, + resp.Nodes, + ) + if err != nil { + return nil, nil, 0, nil, err } + var dataBlobs []*commonpb.DataBlob + dataSize := 0 token.StoreToken = resp.NextPageToken - token.LastNodeID = resp.LastNodeID - token.LastTransactionID = resp.LastTransactionID + if len(nodes) > 0 { + dataBlobs = make([]*commonpb.DataBlob, len(nodes)) + for index, node := range nodes { + dataBlobs[index] = node.Events + dataSize += len(node.Events.Data) + } + lastNode := nodes[len(nodes)-1] + token.LastNodeID = lastNode.NodeID + token.LastTransactionID = lastNode.TransactionID + } // NOTE: in this method, we need to make sure eventVersion is NOT // decreasing(otherwise we skip the events), eventID should be contiguous(otherwise return error) @@ -464,6 +477,42 @@ func (m *historyV2ManagerImpl) readHistoryBranch( return historyEvents, historyEventBatches, nextPageToken, dataSize, lastFirstEventID, nil } +func (m *historyV2ManagerImpl) filterHistoryNodes( + lastNodeID int64, + lastTransactionID int64, + nodes []InternalHistoryNode, +) ([]InternalHistoryNode, error) { + var result []InternalHistoryNode + for _, node := range nodes { + // assuming that business logic layer is correct and transaction ID only increase + // thus, valid event batch will come with increasing transaction ID + + // event batches with smaller node ID + // -> should not be possible since records are already sorted + // event batches with same node ID + // -> batch with higher transaction ID is valid + // event batches with larger node ID + // -> batch with lower transaction ID is invalid (happens before) + // -> batch with higher transaction ID is valid + if node.TransactionID < lastTransactionID { + continue + } + + switch { + case node.NodeID < lastNodeID: + return nil, serviceerror.NewInternal(fmt.Sprintf("corrupted data, nodeID cannot decrease")) + case node.NodeID == lastNodeID: + return nil, serviceerror.NewInternal(fmt.Sprintf("corrupted data, same nodeID must have smaller txnID")) + default: // row.NodeID > lastNodeID: + // NOTE: when row.nodeID > lastNodeID, we expect the one with largest txnID comes first + lastTransactionID = node.TransactionID + lastNodeID = node.NodeID + result = append(result, node) + } + } + return result, nil +} + func (m *historyV2ManagerImpl) deserializeToken( token []byte, defaultLastEventID int64, diff --git a/common/persistence/persistenceInterface.go b/common/persistence/persistenceInterface.go index 9450d4762d8..f7b3efc56ad 100644 --- a/common/persistence/persistenceInterface.go +++ b/common/persistence/persistenceInterface.go @@ -316,16 +316,16 @@ type ( Checksum *persistencespb.Checksum } - // InternalAppendHistoryEventsRequest is used to append new events to workflow execution history for Persistence Interface - InternalAppendHistoryEventsRequest struct { - NamespaceID string - Execution commonpb.WorkflowExecution - FirstEventID int64 - EventBatchVersion int64 - RangeID int64 - TransactionID int64 - Events *commonpb.DataBlob - Overwrite bool + // InternalHistoryNode represent a history node metadata + InternalHistoryNode struct { + // The first eventID becomes the nodeID to be appended + NodeID int64 + // requested TransactionID for this write operation. For the same eventID, the node with larger TransactionID always wins + TransactionID int64 + // TransactionID for events before these events. For events chaining + PrevTransactionID int64 + // The events to be appended + Events *commonpb.DataBlob } // InternalAppendHistoryNodesRequest is used to append a batch of history nodes @@ -336,14 +336,8 @@ type ( Info string // The branch to be appended BranchInfo *persistencespb.HistoryBranch - // The first eventID becomes the nodeID to be appended - NodeID int64 - // The events to be appended - Events *commonpb.DataBlob - // TransactionID for events before these events. For events chaining - PrevTransactionID int64 - // requested TransactionID for this write operation. For the same eventID, the node with larger TransactionID always wins - TransactionID int64 + // The history node + Node InternalHistoryNode // Used in sharded data stores to identify which shard to use ShardID int32 } @@ -402,12 +396,10 @@ type ( PageSize int // Pagination token NextPageToken []byte - // LastNodeID is the last known node ID attached to a history node - LastNodeID int64 - // LastTransactionID is the last known transaction ID attached to a history node - LastTransactionID int64 // Used in sharded data stores to identify which shard to use ShardID int32 + // whether to only return metadata, excluding node content + MetadataOnly bool } // InternalCompleteForkBranchRequest is used to update some tree/branch meta data for forking @@ -422,14 +414,10 @@ type ( // InternalReadHistoryBranchResponse is the response to ReadHistoryBranchRequest InternalReadHistoryBranchResponse struct { - // History events - History []*commonpb.DataBlob + // History nodes + Nodes []InternalHistoryNode // Pagination token NextPageToken []byte - // LastNodeID is the last known node ID attached to a history node - LastNodeID int64 - // LastTransactionID is the last known transaction ID attached to a history node - LastTransactionID int64 } // VisibilityWorkflowExecutionInfo is visibility info for internal response diff --git a/common/persistence/sql/sqlHistoryManager.go b/common/persistence/sql/sqlHistoryManager.go index 9215f47ffd3..3498c96298b 100644 --- a/common/persistence/sql/sqlHistoryManager.go +++ b/common/persistence/sql/sqlHistoryManager.go @@ -27,8 +27,8 @@ package sql import ( "database/sql" "fmt" + "math" - commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/serviceerror" persistencespb "go.temporal.io/server/api/persistence/v1" @@ -41,9 +41,16 @@ import ( "go.temporal.io/server/common/primitives/timestamp" ) -type sqlHistoryV2Manager struct { - sqlStore -} +type ( + sqlHistoryV2Manager struct { + sqlStore + } +) + +const ( + // NOTE: transaction ID is *= -1 in DB + MinTxnID = math.MaxInt64 +) // newHistoryV2Persistence creates an instance of HistoryManager func newHistoryV2Persistence( @@ -67,6 +74,7 @@ func (m *sqlHistoryV2Manager) AppendHistoryNodes( defer cancel() branchInfo := request.BranchInfo beginNodeID := p.GetBeginNodeID(branchInfo) + node := request.Node branchIDBytes, err := primitives.ParseUUID(branchInfo.GetBranchId()) if err != nil { @@ -77,7 +85,7 @@ func (m *sqlHistoryV2Manager) AppendHistoryNodes( return err } - if request.NodeID < beginNodeID { + if node.NodeID < beginNodeID { return &p.InvalidPersistenceRequestError{ Msg: fmt.Sprintf("cannot append to ancestors' nodes"), } @@ -86,11 +94,11 @@ func (m *sqlHistoryV2Manager) AppendHistoryNodes( nodeRow := &sqlplugin.HistoryNodeRow{ TreeID: treeIDBytes, BranchID: branchIDBytes, - NodeID: request.NodeID, - PrevTxnID: request.PrevTransactionID, - TxnID: request.TransactionID, - Data: request.Events.Data, - DataEncoding: request.Events.EncodingType.String(), + NodeID: node.NodeID, + PrevTxnID: node.PrevTransactionID, + TxnID: node.TransactionID, + Data: node.Events.Data, + DataEncoding: node.Events.EncodingType.String(), ShardID: request.ShardID, } @@ -167,85 +175,69 @@ func (m *sqlHistoryV2Manager) ReadHistoryBranch( return nil, err } - minNodeID := request.MinNodeID - maxNodeID := request.MaxNodeID - - lastNodeID := request.LastNodeID - lastTxnID := request.LastTransactionID - - if len(request.NextPageToken) > 0 { - var lastNodeID int64 - var err error - // TODO the inner pagination token can be replaced by a dummy token - // since lastNodeID & lastTxnID are both provided - if lastNodeID, err = deserializePageToken(request.NextPageToken); err != nil { - return nil, serviceerror.NewInternal(fmt.Sprintf("invalid next page token %v", request.NextPageToken)) + var token historyNodePaginationToken + if len(request.NextPageToken) == 0 { + token = newHistoryNodePaginationToken(request.MinNodeID, MinTxnID) + } else if len(request.NextPageToken) == 8 { + // TODO @wxing1292 remove this block in 1.10.x + // this else if block exists to handle forward / backwards compatibility + lastNodeID, err := deserializePageToken(request.NextPageToken) + if err != nil { + return nil, err + } + token = newHistoryNodePaginationToken(lastNodeID+1, MinTxnID) + } else { + token, err = deserializeHistoryNodePaginationToken(request.NextPageToken) + if err != nil { + return nil, err } - minNodeID = lastNodeID + 1 } rows, err := m.db.SelectFromHistoryNode(ctx, sqlplugin.HistoryNodeSelectFilter{ - ShardID: request.ShardID, - TreeID: treeIDBytes, - BranchID: branchIDBytes, - MinNodeID: minNodeID, - MaxNodeID: maxNodeID, - PageSize: request.PageSize, + ShardID: request.ShardID, + TreeID: treeIDBytes, + BranchID: branchIDBytes, + MinNodeID: token.LastNodeID, + MinTxnID: token.LastTxnID, + MaxNodeID: request.MaxNodeID, + PageSize: request.PageSize, + MetadataOnly: request.MetadataOnly, }) - if err == sql.ErrNoRows || (err == nil && len(rows) == 0) { - return &p.InternalReadHistoryBranchResponse{}, nil + switch err { + case nil: + // noop + case sql.ErrNoRows: + // noop + default: + return nil, err } - history := make([]*commonpb.DataBlob, 0, request.PageSize) - + nodes := make([]p.InternalHistoryNode, 0, len(rows)) for _, row := range rows { - eventBlob := p.NewDataBlob(row.Data, row.DataEncoding) - if row.TxnID < lastTxnID { - // assuming that business logic layer is correct and transaction ID only increase - // thus, valid event batch will come with increasing transaction ID - - // event batches with smaller node ID - // -> should not be possible since records are already sorted - // event batches with same node ID - // -> batch with higher transaction ID is valid - // event batches with larger node ID - // -> batch with lower transaction ID is invalid (happens before) - // -> batch with higher transaction ID is valid - if row.NodeID < lastNodeID { - return nil, serviceerror.NewInternal(fmt.Sprintf("corrupted data, nodeID cannot decrease")) - } else if row.NodeID > lastNodeID { - // update lastNodeID so that our pagination can make progress in the corner case that - // the page are all rows with smaller txnID - // because next page we always have minNodeID = lastNodeID+1 - lastNodeID = row.NodeID - } - continue - } - - switch { - case row.NodeID < lastNodeID: - return nil, serviceerror.NewInternal(fmt.Sprintf("corrupted data, nodeID cannot decrease")) - case row.NodeID == lastNodeID: - return nil, serviceerror.NewInternal(fmt.Sprintf("corrupted data, same nodeID must have smaller txnID")) - default: // row.NodeID > lastNodeID: - // NOTE: when row.nodeID > lastNodeID, we expect the one with largest txnID comes first - lastTxnID = row.TxnID - lastNodeID = row.NodeID - history = append(history, eventBlob) - eventBlob = &commonpb.DataBlob{} - } + nodes = append(nodes, p.InternalHistoryNode{ + NodeID: row.NodeID, + PrevTransactionID: row.PrevTxnID, + TransactionID: row.TxnID, + Events: p.NewDataBlob(row.Data, row.DataEncoding), + }) } var pagingToken []byte - if len(rows) >= request.PageSize { - pagingToken = serializePageToken(lastNodeID) + if len(rows) < request.PageSize { + pagingToken = nil + } else { + lastRow := rows[len(rows)-1] + pagingToken, err = serializeHistoryNodePaginationToken( + newHistoryNodePaginationToken(lastRow.NodeID, lastRow.TxnID), + ) + if err != nil { + return nil, err + } } return &p.InternalReadHistoryBranchResponse{ - History: history, - NextPageToken: pagingToken, - LastNodeID: lastNodeID, - LastTransactionID: lastTxnID, + Nodes: nodes, + NextPageToken: pagingToken, }, nil } diff --git a/common/persistence/sql/sql_token.go b/common/persistence/sql/sql_token.go new file mode 100644 index 00000000000..d52d7162b42 --- /dev/null +++ b/common/persistence/sql/sql_token.go @@ -0,0 +1,60 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package sql + +import ( + "encoding/json" +) + +type ( + historyNodePaginationToken struct { + LastNodeID int64 + LastTxnID int64 + } +) + +func newHistoryNodePaginationToken( + nodeID int64, + transactionID int64, +) historyNodePaginationToken { + return historyNodePaginationToken{ + LastNodeID: nodeID, + LastTxnID: transactionID, + } +} + +func serializeHistoryNodePaginationToken( + token historyNodePaginationToken, +) ([]byte, error) { + return json.Marshal(token) +} + +func deserializeHistoryNodePaginationToken( + bytes []byte, +) (historyNodePaginationToken, error) { + var token historyNodePaginationToken + err := json.Unmarshal(bytes, &token) + return token, err +} diff --git a/common/persistence/sql/sqlplugin/history_node.go b/common/persistence/sql/sqlplugin/history_node.go index e085298a71d..86935dadffc 100644 --- a/common/persistence/sql/sqlplugin/history_node.go +++ b/common/persistence/sql/sqlplugin/history_node.go @@ -47,12 +47,14 @@ type ( // HistoryNodeSelectFilter contains the column names within history_node table that // can be used to filter results through a WHERE clause HistoryNodeSelectFilter struct { - ShardID int32 - TreeID primitives.UUID - BranchID primitives.UUID - MinNodeID int64 - MaxNodeID int64 - PageSize int + ShardID int32 + TreeID primitives.UUID + BranchID primitives.UUID + MinNodeID int64 + MinTxnID int64 + MaxNodeID int64 + PageSize int + MetadataOnly bool } // HistoryNodeDeleteFilter contains the column names within history_node table that diff --git a/common/persistence/sql/sqlplugin/mysql/events.go b/common/persistence/sql/sqlplugin/mysql/events.go index da91cfa4bb0..773b816d001 100644 --- a/common/persistence/sql/sqlplugin/mysql/events.go +++ b/common/persistence/sql/sqlplugin/mysql/events.go @@ -38,7 +38,12 @@ const ( `VALUES (:shard_id, :tree_id, :branch_id, :node_id, :prev_txn_id, :txn_id, :data, :data_encoding) ` getHistoryNodesQuery = `SELECT node_id, prev_txn_id, txn_id, data, data_encoding FROM history_node ` + - `WHERE shard_id = ? AND tree_id = ? AND branch_id = ? AND node_id >= ? and node_id < ? ORDER BY shard_id, tree_id, branch_id, node_id, txn_id LIMIT ? ` + `WHERE shard_id = ? AND tree_id = ? AND branch_id = ? AND ((node_id = ? AND txn_id > ?) OR node_id > ?) AND node_id < ? ` + + `ORDER BY shard_id, tree_id, branch_id, node_id, txn_id LIMIT ? ` + + getHistoryNodeMetadataQuery = `SELECT node_id, prev_txn_id, txn_id FROM history_node ` + + `WHERE shard_id = ? AND tree_id = ? AND branch_id = ? AND ((node_id = ? AND txn_id > ?) OR node_id > ?) AND node_id < ? ` + + `ORDER BY shard_id, tree_id, branch_id, node_id, txn_id LIMIT ? ` deleteHistoryNodesQuery = `DELETE FROM history_node WHERE shard_id = ? AND tree_id = ? AND branch_id = ? AND node_id >= ? ` @@ -73,14 +78,23 @@ func (mdb *db) SelectFromHistoryNode( ctx context.Context, filter sqlplugin.HistoryNodeSelectFilter, ) ([]sqlplugin.HistoryNodeRow, error) { + var query string + if filter.MetadataOnly { + query = getHistoryNodeMetadataQuery + } else { + query = getHistoryNodesQuery + } + var rows []sqlplugin.HistoryNodeRow if err := mdb.conn.SelectContext(ctx, &rows, - getHistoryNodesQuery, + query, filter.ShardID, filter.TreeID, filter.BranchID, filter.MinNodeID, + -filter.MinTxnID, // NOTE: transaction ID is *= -1 when stored + filter.MinNodeID, filter.MaxNodeID, filter.PageSize, ); err != nil { diff --git a/common/persistence/sql/sqlplugin/postgresql/events.go b/common/persistence/sql/sqlplugin/postgresql/events.go index 1172144d565..e80df9f9e78 100644 --- a/common/persistence/sql/sqlplugin/postgresql/events.go +++ b/common/persistence/sql/sqlplugin/postgresql/events.go @@ -38,7 +38,12 @@ const ( `VALUES (:shard_id, :tree_id, :branch_id, :node_id, :prev_txn_id, :txn_id, :data, :data_encoding) ` getHistoryNodesQuery = `SELECT node_id, prev_txn_id, txn_id, data, data_encoding FROM history_node ` + - `WHERE shard_id = $1 AND tree_id = $2 AND branch_id = $3 AND node_id >= $4 and node_id < $5 ORDER BY shard_id, tree_id, branch_id, node_id, txn_id LIMIT $6 ` + `WHERE shard_id = $1 AND tree_id = $2 AND branch_id = $3 AND ((node_id = $4 AND txn_id > $5) OR node_id > $6) AND node_id < $7 ` + + `ORDER BY shard_id, tree_id, branch_id, node_id, txn_id LIMIT $8 ` + + getHistoryNodeMetadataQuery = `SELECT node_id, prev_txn_id, txn_id FROM history_node ` + + `WHERE shard_id = $1 AND tree_id = $2 AND branch_id = $3 AND ((node_id = $4 AND txn_id > $5) OR node_id > $6) AND node_id < $7 ` + + `ORDER BY shard_id, tree_id, branch_id, node_id, txn_id LIMIT $8 ` deleteHistoryNodesQuery = `DELETE FROM history_node WHERE shard_id = $1 AND tree_id = $2 AND branch_id = $3 AND node_id >= $4 ` @@ -74,14 +79,23 @@ func (pdb *db) SelectFromHistoryNode( ctx context.Context, filter sqlplugin.HistoryNodeSelectFilter, ) ([]sqlplugin.HistoryNodeRow, error) { + var query string + if filter.MetadataOnly { + query = getHistoryNodeMetadataQuery + } else { + query = getHistoryNodesQuery + } + var rows []sqlplugin.HistoryNodeRow err := pdb.conn.SelectContext(ctx, &rows, - getHistoryNodesQuery, + query, filter.ShardID, filter.TreeID, filter.BranchID, filter.MinNodeID, + -filter.MinTxnID, // NOTE: transaction ID is *= -1 when stored + filter.MinNodeID, filter.MaxNodeID, filter.PageSize, ) diff --git a/common/persistence/sql/sqlplugin/tests/history_node_test.go b/common/persistence/sql/sqlplugin/tests/history_node_test.go index bc97d5b367f..403f1f53cd3 100644 --- a/common/persistence/sql/sqlplugin/tests/history_node_test.go +++ b/common/persistence/sql/sqlplugin/tests/history_node_test.go @@ -33,6 +33,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.temporal.io/server/common/persistence/sql" "go.temporal.io/server/common/persistence/sql/sqlplugin" "go.temporal.io/server/common/primitives" "go.temporal.io/server/common/shuffle" @@ -139,6 +140,7 @@ func (s *historyNodeSuite) TestInsertSelect_Single() { TreeID: treeID, BranchID: branchID, MinNodeID: nodeID, + MinTxnID: sql.MinTxnID, MaxNodeID: math.MaxInt64, PageSize: pageSize, } @@ -156,8 +158,8 @@ func (s *historyNodeSuite) TestInsertSelect_Single() { func (s *historyNodeSuite) TestInsertSelect_Multiple() { numNodeIDs := 100 - nodePerNodeID := 2 - pageSize := 100 + nodePerNodeID := 2 + rand.Intn(8) + pageSize := 10 + rand.Intn(10) shardID := rand.Int31() treeID := primitives.NewUUID() @@ -186,6 +188,7 @@ func (s *historyNodeSuite) TestInsertSelect_Multiple() { TreeID: treeID, BranchID: branchID, MinNodeID: minNodeID, + MinTxnID: sql.MinTxnID, MaxNodeID: maxNodeID, PageSize: pageSize, } @@ -196,7 +199,9 @@ func (s *historyNodeSuite) TestInsertSelect_Multiple() { rows = append(rows, rowsPerPage...) if len(rowsPerPage) > 0 { - selectFilter.MinNodeID = rowsPerPage[len(rowsPerPage)-1].NodeID + 1 + lastNode := rowsPerPage[len(rowsPerPage)-1] + selectFilter.MinNodeID = lastNode.NodeID + selectFilter.MinTxnID = lastNode.TxnID } else { break } @@ -259,6 +264,7 @@ func (s *historyNodeSuite) TestDeleteSelect() { TreeID: treeID, BranchID: branchID, MinNodeID: nodeID, + MinTxnID: sql.MinTxnID, MaxNodeID: math.MaxInt64, PageSize: pageSize, } @@ -306,6 +312,7 @@ func (s *historyNodeSuite) TestInsertDeleteSelect_Single() { TreeID: treeID, BranchID: branchID, MinNodeID: nodeID, + MinTxnID: sql.MinTxnID, MaxNodeID: math.MaxInt64, PageSize: pageSize, } @@ -362,6 +369,7 @@ func (s *historyNodeSuite) TestInsertDeleteSelect_Multiple() { TreeID: treeID, BranchID: branchID, MinNodeID: nodeID, + MinTxnID: sql.MinTxnID, MaxNodeID: math.MaxInt64, PageSize: pageSize, } diff --git a/service/history/mutablestate/history_builder_test.go b/service/history/mutablestate/history_builder_test.go index 2ad2b1b3ef3..13434d1c6c6 100644 --- a/service/history/mutablestate/history_builder_test.go +++ b/service/history/mutablestate/history_builder_test.go @@ -149,9 +149,6 @@ func (s *historyBuilderSuite) TearDownTest() { } -// TODO @wxing1292 port & rewrite the tests -// TODO add validation of task ID - /* workflow */ func (s *historyBuilderSuite) TestWorkflowExecutionStarted() { attempt := rand.Int31() diff --git a/tools/cli/adminCommands.go b/tools/cli/adminCommands.go index 45ae10d65b1..a73bd47d5a6 100644 --- a/tools/cli/adminCommands.go +++ b/tools/cli/adminCommands.go @@ -80,7 +80,9 @@ func AdminShowWorkflow(c *cli.Context) { ErrorAndExit("ReadHistoryBranch err", err) } - history = resp.History + for _, node := range resp.Nodes { + history = append(history, node.Events) + } } else { ErrorAndExit("need to specify TreeId/BranchId/ShardId", nil) } diff --git a/tools/cli/adminDBScanCommand.go b/tools/cli/adminDBScanCommand.go index c8a9cdec259..95fc67a0de1 100644 --- a/tools/cli/adminDBScanCommand.go +++ b/tools/cli/adminDBScanCommand.go @@ -511,7 +511,7 @@ func fetchAndVerifyHistoryExists( Details: err.Error(), }) return VerificationResultCheckFailure, nil, nil - } else if history == nil || len(history.History) == 0 { + } else if history == nil || len(history.Nodes) == 0 { corruptedExecutionWriter.Add(&CorruptedExecution{ ShardID: shardID, NamespaceID: executionInfo.NamespaceId, @@ -542,7 +542,7 @@ func verifyFirstHistoryEvent( payloadSerializer persistence.PayloadSerializer, history *persistence.InternalReadHistoryBranchResponse, ) VerificationResult { - firstBatch, err := payloadSerializer.DeserializeEvents(history.History[0]) + firstBatch, err := payloadSerializer.DeserializeEvents(history.Nodes[0].Events) if err != nil || len(firstBatch) == 0 { checkFailureWriter.Add(&ExecutionCheckFailure{ ShardID: shardID,